sshpoke/internal/server/driver/plugin/driver.go
2023-11-17 20:53:52 +03:00

99 lines
1.9 KiB
Go

package plugin
import (
"context"
"errors"
"io"
"github.com/Neur0toxine/sshpoke/internal/config"
"github.com/Neur0toxine/sshpoke/internal/logger"
"github.com/Neur0toxine/sshpoke/internal/model"
"github.com/Neur0toxine/sshpoke/internal/server/driver/iface"
"github.com/Neur0toxine/sshpoke/internal/server/driver/util"
)
// Plugin driver uses RPC to communicate with external plugin.
type Plugin struct {
ctx context.Context
name string
params Params
send *Queue[model.Event]
}
type EventStream interface {
Send(event model.Event) error
}
func New(ctx context.Context, name string, params config.DriverParams) (iface.Driver, error) {
drv := &Plugin{
name: name,
ctx: ctx,
send: NewQueue[model.Event](),
}
if err := util.UnmarshalParams(params, &drv.params); err != nil {
return nil, err
}
return drv, nil
}
func (d *Plugin) Handle(event model.Event) error {
if d.isDone() {
return nil
}
d.send.Enqueue(event)
return nil
}
func (d *Plugin) Name() string {
return d.name
}
func (d *Plugin) Driver() config.DriverType {
return config.DriverPlugin
}
func (d *Plugin) Token() string {
return d.params.Token
}
func (d *Plugin) Listen(ctx context.Context, stream EventStream) error {
for {
select {
case <-ctx.Done():
return nil
default:
event, exists := d.send.Dequeue()
if !exists {
continue
}
err := stream.Send(event)
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
logger.Sugar.Errorw("error writing event to plugin",
"server", d.name, "error", err)
return err
}
}
}
}
func (d *Plugin) HandleStatus(event model.EventRequest) {
logger.Sugar.Errorw("plugin error", "serverName", d.name, "id", event.ID, "error", event.Error)
}
func (d *Plugin) isDone() bool {
select {
case <-d.ctx.Done():
return true
default:
return false
}
}
func (d *Plugin) WaitForShutdown() {
<-d.ctx.Done()
return
}