diff --git a/README.md b/README.md index 35a78ec..726ee77 100644 --- a/README.md +++ b/README.md @@ -51,13 +51,13 @@ + [x] [装饰器模式(Decorator)](./structure/06_decorator) + [x] [代理模式(Proxy)](./structure/07_proxy) -## 更多模式 Go More Patterns +## 更多模式(同步/并发/安全访问/并行) Go More Patterns(Concurrency/Parallelism/Sync) + [x] [发布订阅模式(Pub-Sub)](./gomore/01_messages) + [x] [时差模式(Time Profile)](./gomore/02_profiles) + [x] [上下文模式(Context)](./gomore/03_context) + [x] [扇入模式(Fan-In)](./gomore/04_fan_in) -+ [ ] [WIP][扇出模式(Fan-Out)](./gomore/05_fan_out) ++ [x] [扇出模式(Fan-Out)](./gomore/05_fan_out) + [ ] [WIP][熔断模式(circuit breaker)](./gomore/06_circuit_breaker) + [x] [限流模式(rate limiting))](./gomore/07_rate_limiting) + [ ] [WIP][信号量模式(Semaphore)](./gomore/08_semaphore) diff --git a/gomore/05_fan_out/fan_out_complex_test.go b/gomore/05_fan_out/fan_out_complex_test.go index 06d13f7..365a63c 100644 --- a/gomore/05_fan_out/fan_out_complex_test.go +++ b/gomore/05_fan_out/fan_out_complex_test.go @@ -2,6 +2,7 @@ package fanout import ( "context" + "fmt" "testing" "google.golang.org/grpc" @@ -13,20 +14,25 @@ type taggingDispatcher struct { // stream proto.StreamClient conn *grpc.ClientConn } -type messageContent struct{} +type messageContent struct { + content string + priority int +} func TestComplexStreamingFanOut(t *testing.T) { builder := func() IDispatcher { - return &taggingDispatcher{Address: "SH"} + return &taggingDispatcher{Address: "127.0.0.2"} } tagging := &Tagging{ topic: "new topic", pipeline: NewPipeline(builder, 2, true), } - tagging.pipeline.Dispatch(messageContent{}) tagging.pipeline.Start(context.Background()) + + tagging.pipeline.Dispatch(messageContent{"all,please stay home", 1000}) + } type Tagging struct { @@ -35,6 +41,9 @@ type Tagging struct { } func (d *taggingDispatcher) Before(ctx context.Context) error { + + fmt.Println("i'm doing somthing before processing") + conn, err := grpc.Dial(d.Address, grpc.WithInsecure()) if err != nil { return err @@ -46,6 +55,7 @@ func (d *taggingDispatcher) Before(ctx context.Context) error { // // return err // // } // // d.stream = stream + return nil } @@ -56,10 +66,13 @@ func (d *taggingDispatcher) After() error { // log.Error("close connection error", field.Error(e)) // } //return err + fmt.Println("i'm doing somthing After processing") return nil } func (d *taggingDispatcher) Process(msg interface{}) error { - //return d.stream.Send(msg.(*proto.Tagging)) + + content := msg.(messageContent) + fmt.Println("i'm doing processing,with conten", content) return nil }