package plugin import ( "context" "errors" "io" "sync/atomic" "github.com/Neur0toxine/sshpoke/internal/config" "github.com/Neur0toxine/sshpoke/internal/server/driver/base" "github.com/Neur0toxine/sshpoke/internal/server/driver/util" "github.com/Neur0toxine/sshpoke/pkg/dto" ) var ErrAlreadyConnected = errors.New("already connected") // Driver plugin uses RPC to communicate with external plugin. type Driver struct { base.Base params Params send *Queue[dto.Event] listening atomic.Bool } type EventStream interface { Send(event dto.Event) error } type Plugin interface { base.Driver Token() string Listen(ctx context.Context, stream EventStream) error } func New(ctx context.Context, name string, params config.DriverParams) (base.Driver, error) { drv := &Driver{ Base: base.New(ctx, name), send: NewQueue[dto.Event](), } if err := util.UnmarshalParams(params, &drv.params); err != nil { return nil, err } return drv, nil } func (d *Driver) Handle(event dto.Event) error { if d.isDone() { d.send.Enqueue(dto.Event{Type: dto.EventShutdown}) return nil } d.send.Enqueue(event) return nil } func (d *Driver) Driver() config.DriverType { return config.DriverPlugin } func (d *Driver) Token() string { return d.params.Token } func (d *Driver) Listen(ctx context.Context, stream EventStream) error { if d.listening.Load() { return ErrAlreadyConnected } d.listening.Store(true) defer d.listening.Store(false) for { select { case <-ctx.Done(): return nil default: event, exists := d.send.Dequeue() if !exists { continue } err := stream.Send(event) if errors.Is(err, io.EOF) { return nil } if err != nil { d.Log().Errorw("error writing event to plugin", "server", d.Name(), "error", err) return err } } } } func (d *Driver) isDone() bool { select { case <-d.Context().Done(): return true default: return false } } func (d *Driver) WaitForShutdown() { <-d.Context().Done() return }