sshpoke/internal/server/driver/plugin/driver.go

104 lines
2.0 KiB
Go

package plugin
import (
"context"
"errors"
"io"
"sync/atomic"
"github.com/Neur0toxine/sshpoke/internal/config"
"github.com/Neur0toxine/sshpoke/internal/server/driver/base"
"github.com/Neur0toxine/sshpoke/internal/server/driver/util"
"github.com/Neur0toxine/sshpoke/pkg/dto"
)
var ErrAlreadyConnected = errors.New("already connected")
// Driver plugin uses RPC to communicate with external plugin.
type Driver struct {
base.Base
params Params
send *Queue[dto.Event]
listening atomic.Bool
}
type EventStream interface {
Send(event dto.Event) error
}
type Plugin interface {
base.Driver
Token() string
Listen(ctx context.Context, stream EventStream) error
}
func New(ctx context.Context, name string, params config.DriverParams) (base.Driver, error) {
drv := &Driver{
Base: base.New(ctx, name),
send: NewQueue[dto.Event](),
}
if err := util.UnmarshalParams(params, &drv.params); err != nil {
return nil, err
}
return drv, nil
}
func (d *Driver) Handle(event dto.Event) error {
if d.isDone() {
d.send.Enqueue(dto.Event{Type: dto.EventShutdown})
return nil
}
d.send.Enqueue(event)
return nil
}
func (d *Driver) Driver() config.DriverType {
return config.DriverPlugin
}
func (d *Driver) Token() string {
return d.params.Token
}
func (d *Driver) Listen(ctx context.Context, stream EventStream) error {
if d.listening.Load() {
return ErrAlreadyConnected
}
d.listening.Store(true)
defer d.listening.Store(false)
for {
select {
case <-ctx.Done():
return nil
default:
event, exists := d.send.Dequeue()
if !exists {
continue
}
err := stream.Send(event)
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
d.Log().Errorw("error writing event to plugin",
"server", d.Name(), "error", err)
return err
}
}
}
}
func (d *Driver) isDone() bool {
select {
case <-d.Context().Done():
return true
default:
return false
}
}
func (d *Driver) WaitForShutdown() {
<-d.Context().Done()
return
}