diff --git a/go.mod b/go.mod index f9d5946..efd871a 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.17.0 go.uber.org/zap v1.26.0 + golang.design/x/lockfree v0.0.1 golang.org/x/crypto v0.13.0 google.golang.org/grpc v1.58.2 google.golang.org/protobuf v1.31.0 @@ -18,6 +19,7 @@ require ( require ( github.com/Microsoft/go-winio v0.6.1 // indirect + github.com/changkun/lockfree v0.0.1 // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/distribution v2.8.3+incompatible // indirect github.com/docker/go-units v0.5.0 // indirect diff --git a/go.sum b/go.sum index c6e67c8..4b6f054 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/changkun/lockfree v0.0.1 h1:5WefVJLglY4IHRqOQmh6Ao6wkJYaJkarshKU8VUtId4= +github.com/changkun/lockfree v0.0.1/go.mod h1:3bKiaXn/iNzIPlSvSOMSVbRQUQtAp8qUAyBUtzU11s4= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -242,6 +244,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +golang.design/x/lockfree v0.0.1 h1:IHFNwZgM5bnZYWkEbzn5lWHMYr8WsRBdCJ/RBVY0xMM= +golang.design/x/lockfree v0.0.1/go.mod h1:iaZUx6UgZaOdePjzI6wFd+seYMl1i0rsG8+xKvA8c4I= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/internal/plugin/server.go b/internal/plugin/server.go index 634e0c1..a61028e 100644 --- a/internal/plugin/server.go +++ b/internal/plugin/server.go @@ -23,7 +23,7 @@ type pluginAPI struct { pb.UnimplementedPluginServiceServer } -func (p *pluginAPI) Event(stream pb.PluginService_EventServer) error { +func (p *pluginAPI) Event(_ *emptypb.Empty, stream pb.PluginService_EventServer) error { pl := p.receiverForContext(stream.Context()) if pl == nil { return ErrUnauthorized diff --git a/internal/plugin/stream.go b/internal/plugin/stream.go index 5267b0e..9ca7f02 100644 --- a/internal/plugin/stream.go +++ b/internal/plugin/stream.go @@ -11,11 +11,6 @@ type Stream struct { stream pb.PluginService_EventServer } -func (s *Stream) Recv() error { - _, err := s.stream.Recv() - return err -} - func (s *Stream) Send(event model.Event) error { return s.stream.Send(s.eventToMessage(event)) } diff --git a/internal/server/driver/plugin/driver.go b/internal/server/driver/plugin/driver.go index 664b239..8d61248 100644 --- a/internal/server/driver/plugin/driver.go +++ b/internal/server/driver/plugin/driver.go @@ -17,19 +17,18 @@ type Plugin struct { ctx context.Context name string params Params - send chan model.Event + send *Queue[model.Event] } type EventStream interface { Send(event model.Event) error - Recv() error } func New(ctx context.Context, name string, params config.DriverParams) (iface.Driver, error) { drv := &Plugin{ name: name, ctx: ctx, - send: make(chan model.Event), + send: NewQueue[model.Event](), } if err := util.UnmarshalParams(params, &drv.params); err != nil { return nil, err @@ -41,7 +40,7 @@ func (d *Plugin) Handle(event model.Event) error { if d.isDone() { return nil } - d.send <- event + d.send.Enqueue(event) return nil } @@ -63,21 +62,10 @@ func (d *Plugin) Listen(ctx context.Context, stream EventStream) error { case <-ctx.Done(): return nil default: - } - - err := stream.Recv() - if errors.Is(err, io.EOF) { - return nil - } - if err != nil { - logger.Sugar.Errorw("error reading poll event from plugin", - "server", d.name, "error", err) - return err - } - select { - case <-ctx.Done(): - return nil - case event := <-d.send: + event, exists := d.send.Dequeue() + if !exists { + continue + } err := stream.Send(event) if errors.Is(err, io.EOF) { return nil @@ -98,7 +86,6 @@ func (d *Plugin) HandleStatus(event model.EventRequest) { func (d *Plugin) isDone() bool { select { case <-d.ctx.Done(): - close(d.send) return true default: return false diff --git a/internal/server/driver/plugin/queue.go b/internal/server/driver/plugin/queue.go new file mode 100644 index 0000000..3e0ae82 --- /dev/null +++ b/internal/server/driver/plugin/queue.go @@ -0,0 +1,28 @@ +package plugin + +import "golang.design/x/lockfree" + +type Queue[T any] struct { + queue *lockfree.Queue +} + +func NewQueue[T any]() *Queue[T] { + return &Queue[T]{queue: lockfree.NewQueue()} +} + +func (q *Queue[T]) Enqueue(val T) { + q.queue.Enqueue(val) +} + +func (q *Queue[T]) Dequeue() (result T, ok bool) { + item := q.queue.Dequeue() + if item == nil { + ok = false + return + } + return item.(T), true +} + +func (q *Queue[T]) Len() uint64 { + return q.queue.Length() +} diff --git a/pkg/plugin/pb.proto b/pkg/plugin/pb.proto index 7047695..02dae22 100644 --- a/pkg/plugin/pb.proto +++ b/pkg/plugin/pb.proto @@ -5,7 +5,7 @@ option go_package = "github.com/Neur0toxine/sshpoke/pkg/plugin"; option java_multiple_files = true; service PluginService { - rpc Event (stream google.protobuf.Empty) returns (stream EventMessage); + rpc Event (google.protobuf.Empty) returns (stream EventMessage); rpc EventStatus (EventStatusMessage) returns (google.protobuf.Empty); rpc Shutdown (stream google.protobuf.Empty) returns (google.protobuf.Empty); }