remove useless codes

This commit is contained in:
Edward 2020-05-07 12:20:04 +08:00
parent 62bdf96dcd
commit 80cb3ddd5e
3 changed files with 134 additions and 184 deletions

View File

@ -56,7 +56,7 @@
+ [x] [发布订阅模式(Pub-Sub)](./gomore/01_messages)
+ [x] [时差模式(Time Profile)](./gomore/02_profiles)
+ [x] [上下文模式(Context)](./gomore/03_context)
+ [ ] [WIP][淡入模式(Fan-In)](./gomore/04_fan_in)
+ [x] [淡入模式(Fan-In)](./gomore/04_fan_in)
+ [ ] [WIP][淡出模式(Fan-Out)](./gomore/05_fan_out)
+ [ ] [WIP][熔断模式(circuit breaker)](./gomore/06_circuit_breaker)
+ [x] [限流模式(rate limiting))](./gomore/07_rate_limiting)

View File

@ -1,52 +1 @@
package fanout
import "concurrency"
type taggingDispatcher struct {
Address string
stream proto.Havilah_StreamMetricClient
conn *grpc.ClientConn
}
func (d *taggingDispatcher) Before(ctx context.Context) error {
conn, err := grpc.Dial(d.Address, grpc.WithInsecure())
if err != nil {
return err
}
d.conn = conn
client := proto.NewHavilahClient(conn)
stream, err := client.StreamMetric(ctx)
if err != nil {
return err
}
d.stream = stream
return nil
}
func (d *taggingDispatcher) After() error {
_, err := d.stream.CloseAndRecv()
e := d.conn.Close()
if e != nil {
log.Error("close havilah connection error", field.Error(e))
}
return err
}
func (d *taggingDispatcher) Process(msg interface{}) error {
return d.stream.Send(msg.(*proto.Tagging))
}
tagging := &Tagging{
topic: topic,
pipeline: concurrency.NewPipeline(func() concurrency.Dispatcher {
return &taggingDispatcher{Address: address}
}, ch, idle, debug),
}
tagging.pipeline.Start()
func main(){
tagging.pipeline.Dispatch(youStruct{})
}

View File

@ -1,4 +1,5 @@
## Implementation
# Implementation
We can activate worker base on traffic of parent channel
```go