sshpoke/internal/server/driver/plugin/driver.go

116 lines
2.3 KiB
Go
Raw Normal View History

2023-11-17 20:39:00 +03:00
package plugin
import (
"context"
"errors"
"io"
2023-11-17 20:58:55 +03:00
"sync/atomic"
2023-11-17 20:39:00 +03:00
"github.com/Neur0toxine/sshpoke/internal/config"
"github.com/Neur0toxine/sshpoke/internal/logger"
"github.com/Neur0toxine/sshpoke/internal/server/driver/iface"
"github.com/Neur0toxine/sshpoke/internal/server/driver/util"
"github.com/Neur0toxine/sshpoke/pkg/dto"
2023-11-17 20:39:00 +03:00
)
2023-11-17 20:58:55 +03:00
var ErrAlreadyConnected = errors.New("already connected")
// Driver plugin uses RPC to communicate with external plugin.
type Driver struct {
2023-11-17 20:58:55 +03:00
ctx context.Context
name string
params Params
send *Queue[dto.Event]
2023-11-17 20:58:55 +03:00
listening atomic.Bool
2023-11-17 20:39:00 +03:00
}
type EventStream interface {
Send(event dto.Event) error
}
type Plugin interface {
iface.Driver
Token() string
Listen(ctx context.Context, stream EventStream) error
HandleStatus(event dto.EventStatus)
2023-11-17 20:39:00 +03:00
}
func New(ctx context.Context, name string, params config.DriverParams) (iface.Driver, error) {
drv := &Driver{
2023-11-17 20:39:00 +03:00
name: name,
ctx: ctx,
send: NewQueue[dto.Event](),
2023-11-17 20:39:00 +03:00
}
if err := util.UnmarshalParams(params, &drv.params); err != nil {
return nil, err
}
return drv, nil
}
func (d *Driver) Handle(event dto.Event) error {
2023-11-17 20:39:00 +03:00
if d.isDone() {
d.send.Enqueue(dto.Event{Type: dto.EventShutdown})
2023-11-17 20:39:00 +03:00
return nil
}
2023-11-17 20:53:52 +03:00
d.send.Enqueue(event)
2023-11-17 20:39:00 +03:00
return nil
}
func (d *Driver) Name() string {
2023-11-17 20:39:00 +03:00
return d.name
}
func (d *Driver) Driver() config.DriverType {
2023-11-17 20:39:00 +03:00
return config.DriverPlugin
}
func (d *Driver) Token() string {
2023-11-17 20:39:00 +03:00
return d.params.Token
}
func (d *Driver) Listen(ctx context.Context, stream EventStream) error {
2023-11-17 20:58:55 +03:00
if d.listening.Load() {
return ErrAlreadyConnected
}
d.listening.Store(true)
defer d.listening.Store(false)
2023-11-17 20:39:00 +03:00
for {
select {
case <-ctx.Done():
return nil
default:
2023-11-17 20:53:52 +03:00
event, exists := d.send.Dequeue()
if !exists {
continue
}
2023-11-17 20:39:00 +03:00
err := stream.Send(event)
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
logger.Sugar.Errorw("error writing event to plugin",
"server", d.name, "error", err)
return err
}
}
}
}
func (d *Driver) HandleStatus(event dto.EventStatus) {
2023-11-17 20:39:00 +03:00
logger.Sugar.Errorw("plugin error", "serverName", d.name, "id", event.ID, "error", event.Error)
}
func (d *Driver) isDone() bool {
2023-11-17 20:39:00 +03:00
select {
case <-d.ctx.Done():
return true
default:
return false
}
}
func (d *Driver) WaitForShutdown() {
2023-11-17 20:39:00 +03:00
<-d.ctx.Done()
return
}