package plugin import ( "context" "errors" "io" "sync/atomic" "github.com/Neur0toxine/sshpoke/internal/config" "github.com/Neur0toxine/sshpoke/internal/logger" "github.com/Neur0toxine/sshpoke/internal/model" "github.com/Neur0toxine/sshpoke/internal/server/driver/iface" "github.com/Neur0toxine/sshpoke/internal/server/driver/util" ) var ErrAlreadyConnected = errors.New("already connected") // Plugin driver uses RPC to communicate with external plugin. type Plugin struct { ctx context.Context name string params Params send *Queue[model.Event] listening atomic.Bool } type EventStream interface { Send(event model.Event) error } func New(ctx context.Context, name string, params config.DriverParams) (iface.Driver, error) { drv := &Plugin{ name: name, ctx: ctx, send: NewQueue[model.Event](), } if err := util.UnmarshalParams(params, &drv.params); err != nil { return nil, err } return drv, nil } func (d *Plugin) Handle(event model.Event) error { if d.isDone() { return nil } d.send.Enqueue(event) return nil } func (d *Plugin) Name() string { return d.name } func (d *Plugin) Driver() config.DriverType { return config.DriverPlugin } func (d *Plugin) Token() string { return d.params.Token } func (d *Plugin) Listen(ctx context.Context, stream EventStream) error { if d.listening.Load() { return ErrAlreadyConnected } d.listening.Store(true) defer d.listening.Store(false) for { select { case <-ctx.Done(): return nil default: event, exists := d.send.Dequeue() if !exists { continue } err := stream.Send(event) if errors.Is(err, io.EOF) { return nil } if err != nil { logger.Sugar.Errorw("error writing event to plugin", "server", d.name, "error", err) return err } } } } func (d *Plugin) HandleStatus(event model.EventRequest) { logger.Sugar.Errorw("plugin error", "serverName", d.name, "id", event.ID, "error", event.Error) } func (d *Plugin) isDone() bool { select { case <-d.ctx.Done(): return true default: return false } } func (d *Plugin) WaitForShutdown() { <-d.ctx.Done() return }