From 7c443887dc6d77655b0145d9c94f7dded6e99d13 Mon Sep 17 00:00:00 2001 From: Neur0toxine Date: Sat, 18 Nov 2023 12:36:17 +0300 Subject: [PATCH] simplify plugin api, better settings validation --- cmd/root.go | 10 +-- internal/{plugin => api}/server.go | 15 +++-- internal/api/stream.go | 15 +++++ internal/docker/api.go | 21 +++--- internal/docker/convert.go | 8 +-- internal/plugin/stream.go | 65 ------------------- internal/server/driver/iface/driver.go | 5 +- internal/server/driver/null/driver.go | 8 ++- internal/server/driver/plugin/driver.go | 38 ++++++----- internal/server/driver/ssh/driver.go | 10 ++- internal/server/manager.go | 37 ++++++----- internal/server/proto/sshtun/tunnel.go | 2 +- pkg/convert/convert.go | 65 +++++++++++++++++++ internal/model/event.go => pkg/dto/models.go | 10 +-- pkg/plugin/client.go | 68 ++++++++++++++++++++ pkg/plugin/pb.go | 2 +- pkg/plugin/{ => pb}/pb.proto | 7 +- pkg/plugin/port.go | 3 + pkg/plugin/stream.go | 19 ++++++ 19 files changed, 269 insertions(+), 139 deletions(-) rename internal/{plugin => api}/server.go (86%) create mode 100644 internal/api/stream.go delete mode 100644 internal/plugin/stream.go create mode 100644 pkg/convert/convert.go rename internal/model/event.go => pkg/dto/models.go (82%) create mode 100644 pkg/plugin/client.go rename pkg/plugin/{ => pb}/pb.proto (78%) create mode 100644 pkg/plugin/port.go create mode 100644 pkg/plugin/stream.go diff --git a/cmd/root.go b/cmd/root.go index c6c70e4..5982227 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -6,12 +6,12 @@ import ( "os/signal" "syscall" + "github.com/Neur0toxine/sshpoke/internal/api" "github.com/Neur0toxine/sshpoke/internal/config" "github.com/Neur0toxine/sshpoke/internal/docker" "github.com/Neur0toxine/sshpoke/internal/logger" - "github.com/Neur0toxine/sshpoke/internal/model" - "github.com/Neur0toxine/sshpoke/internal/plugin" "github.com/Neur0toxine/sshpoke/internal/server" + "github.com/Neur0toxine/sshpoke/pkg/dto" "github.com/go-playground/validator/v10" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -24,7 +24,7 @@ var rootCmd = &cobra.Command{ Short: "Expose your Docker services to the Internet via SSH.", Long: `sshpoke is a CLI application that listens to the docker socket and automatically exposes relevant services to the Internet.`, Run: func(cmd *cobra.Command, args []string) { - go plugin.StartAPIServer() + go api.StartPluginAPI() var err error ctx, cancel := context.WithCancel(context.Background()) server.DefaultManager = server.NewManager(ctx, config.Default.Servers, config.Default.DefaultServer) @@ -34,8 +34,8 @@ var rootCmd = &cobra.Command{ } for id, item := range docker.Default.Containers() { - err := server.DefaultManager.ProcessEvent(model.Event{ - Type: model.EventStart, + err := server.DefaultManager.ProcessEvent(dto.Event{ + Type: dto.EventStart, ID: id, Container: item, }) diff --git a/internal/plugin/server.go b/internal/api/server.go similarity index 86% rename from internal/plugin/server.go rename to internal/api/server.go index a61028e..941dfdc 100644 --- a/internal/plugin/server.go +++ b/internal/api/server.go @@ -1,4 +1,4 @@ -package plugin +package api import ( "context" @@ -8,10 +8,11 @@ import ( "github.com/Neur0toxine/sshpoke/internal/config" "github.com/Neur0toxine/sshpoke/internal/logger" - "github.com/Neur0toxine/sshpoke/internal/model" "github.com/Neur0toxine/sshpoke/internal/server" "github.com/Neur0toxine/sshpoke/internal/server/driver/plugin" - pb "github.com/Neur0toxine/sshpoke/pkg/plugin" + "github.com/Neur0toxine/sshpoke/pkg/dto" + plugin2 "github.com/Neur0toxine/sshpoke/pkg/plugin" + "github.com/Neur0toxine/sshpoke/pkg/plugin/pb" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/emptypb" @@ -43,14 +44,14 @@ func (p *pluginAPI) EventStatus(ctx context.Context, msg *pb.EventStatusMessage) if pl == nil { return nil, ErrUnauthorized } - pl.HandleStatus(model.EventRequest{ + pl.HandleStatus(dto.EventStatus{ ID: msg.Id, Error: msg.Error, }) return &emptypb.Empty{}, nil } -func (p *pluginAPI) receiverForContext(ctx context.Context) *plugin.Plugin { +func (p *pluginAPI) receiverForContext(ctx context.Context) plugin.Plugin { md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil @@ -62,10 +63,10 @@ func (p *pluginAPI) receiverForContext(ctx context.Context) *plugin.Plugin { return server.DefaultManager.PluginByToken(tokens[0]) } -func StartAPIServer() { +func StartPluginAPI() { port := config.Default.PluginAPIPort if port == 0 { - port = 3000 + port = plugin2.DefaultPort } socket, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { diff --git a/internal/api/stream.go b/internal/api/stream.go new file mode 100644 index 0000000..249a6c6 --- /dev/null +++ b/internal/api/stream.go @@ -0,0 +1,15 @@ +package api + +import ( + "github.com/Neur0toxine/sshpoke/pkg/convert" + "github.com/Neur0toxine/sshpoke/pkg/dto" + "github.com/Neur0toxine/sshpoke/pkg/plugin/pb" +) + +type Stream struct { + stream pb.PluginService_EventServer +} + +func (s *Stream) Send(event dto.Event) error { + return s.stream.Send(convert.AppEventToMessage(event)) +} diff --git a/internal/docker/api.go b/internal/docker/api.go index 027422e..bf6f47b 100644 --- a/internal/docker/api.go +++ b/internal/docker/api.go @@ -7,7 +7,7 @@ import ( "github.com/Neur0toxine/sshpoke/internal/config" "github.com/Neur0toxine/sshpoke/internal/logger" - "github.com/Neur0toxine/sshpoke/internal/model" + "github.com/Neur0toxine/sshpoke/pkg/dto" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" @@ -31,7 +31,7 @@ func New(ctx context.Context) (*Docker, error) { }, nil } -func (d *Docker) Containers() map[string]model.Container { +func (d *Docker) Containers() map[string]dto.Container { items, err := d.cli.ContainerList(d.ctx, types.ContainerListOptions{ Filters: filters.NewArgs(filters.Arg("status", "running")), }) @@ -39,7 +39,7 @@ func (d *Docker) Containers() map[string]model.Container { logger.Sugar.Errorf("cannot get containers list: %s", err) return nil } - containers := map[string]model.Container{} + containers := map[string]dto.Container{} for _, item := range items { container, ok := dockerContainerToInternal(item) if !ok { @@ -50,13 +50,13 @@ func (d *Docker) Containers() map[string]model.Container { return containers } -func (d *Docker) Listen() (chan model.Event, error) { +func (d *Docker) Listen() (chan dto.Event, error) { cli, err := client.NewClientWithOpts(config.Default.Docker.Opts) if err != nil { return nil, err } - output := make(chan model.Event) + output := make(chan dto.Event) go func() { for { eventSource, errSource := cli.Events(d.ctx, types.EventsOptions{ @@ -64,8 +64,8 @@ func (d *Docker) Listen() (chan model.Event, error) { }) select { case event := <-eventSource: - eventType := model.TypeFromAction(event.Action) - if (eventType != model.EventStart && eventType != model.EventStop) || !actorEnabled(event.Actor) { + eventType := dto.TypeFromAction(event.Action) + if (eventType != dto.EventStart && eventType != dto.EventStop) || !actorEnabled(event.Actor) { continue } container, err := d.cli.ContainerList(d.ctx, types.ContainerListOptions{ @@ -81,13 +81,13 @@ func (d *Docker) Listen() (chan model.Event, error) { if !ok { continue } - newEvent := model.Event{ + newEvent := dto.Event{ Type: eventType, ID: event.Actor.ID, Container: converted, } msg := "exposing container" - if eventType == model.EventStop { + if eventType == dto.EventStop { msg = "stopping container" } logger.Sugar.Debugw(msg, @@ -96,8 +96,7 @@ func (d *Docker) Listen() (chan model.Event, error) { "container.ip", converted.IP.String(), "container.port", converted.Port, "container.server", converted.Server, - "container.prefix", converted.Prefix, - "container.domain", converted.Domain) + "container.prefix", converted.Prefix) output <- newEvent case err := <-errSource: if errors.Is(err, context.Canceled) { diff --git a/internal/docker/convert.go b/internal/docker/convert.go index 19be83f..fde0b1d 100644 --- a/internal/docker/convert.go +++ b/internal/docker/convert.go @@ -5,7 +5,7 @@ import ( "strconv" "github.com/Neur0toxine/sshpoke/internal/logger" - "github.com/Neur0toxine/sshpoke/internal/model" + "github.com/Neur0toxine/sshpoke/pkg/dto" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/network" @@ -18,7 +18,6 @@ type labelsConfig struct { Server string `mapstructure:"sshpoke.server"` Port string `mapstructure:"sshpoke.port"` Prefix string `mapstructure:"sshpoke.prefix"` - Domain string `mapstructure:"sshpoke.domain"` } type boolStr string @@ -35,7 +34,7 @@ func actorEnabled(actor events.Actor) bool { return boolStr(label).Bool() } -func dockerContainerToInternal(container types.Container) (result model.Container, ok bool) { +func dockerContainerToInternal(container types.Container) (result dto.Container, ok bool) { var labels labelsConfig if err := mapstructure.Decode(container.Labels, &labels); err != nil { logger.Sugar.Debugf("skipping container %s because configuration is invalid: %s", container.ID, err) @@ -77,12 +76,11 @@ func dockerContainerToInternal(container types.Container) (result model.Containe return result, false } - return model.Container{ + return dto.Container{ IP: ip, Port: uint16(port), Server: labels.Server, Prefix: labels.Prefix, - Domain: labels.Domain, }, true } diff --git a/internal/plugin/stream.go b/internal/plugin/stream.go deleted file mode 100644 index 9ca7f02..0000000 --- a/internal/plugin/stream.go +++ /dev/null @@ -1,65 +0,0 @@ -package plugin - -import ( - "net" - - "github.com/Neur0toxine/sshpoke/internal/model" - pb "github.com/Neur0toxine/sshpoke/pkg/plugin" -) - -type Stream struct { - stream pb.PluginService_EventServer -} - -func (s *Stream) Send(event model.Event) error { - return s.stream.Send(s.eventToMessage(event)) -} - -func (s *Stream) messageToEvent(event *pb.EventMessage) model.Event { - return model.Event{ - Type: s.pbEventTypeToApp(event.Type), - ID: event.Id, - Container: model.Container{ - IP: net.ParseIP(event.Container.Ip), - Port: uint16(event.Container.Port), - Server: event.Container.Server, - Prefix: event.Container.Prefix, - Domain: event.Container.Domain, - }, - } -} - -func (s *Stream) eventToMessage(event model.Event) *pb.EventMessage { - return &pb.EventMessage{ - Type: s.appEventTypeToPB(event.Type), - Id: event.ID, - Container: &pb.Container{ - Ip: event.Container.IP.String(), - Port: uint32(event.Container.Port), - Server: event.Container.Server, - Prefix: event.Container.Prefix, - Domain: event.Container.Domain, - }, - } -} - -func (s *Stream) pbEventTypeToApp(typ pb.EventType) model.EventType { - val := model.EventType(typ.Number()) - if val > model.EventStart { - return model.EventUnknown - } - return val -} - -func (s *Stream) appEventTypeToPB(typ model.EventType) pb.EventType { - switch typ { - case 0: - return pb.EventType_EVENT_START - case 1: - return pb.EventType_EVENT_STOP - case 2: - fallthrough - default: - return pb.EventType_EVENT_UNKNOWN - } -} diff --git a/internal/server/driver/iface/driver.go b/internal/server/driver/iface/driver.go index 0af39a6..d4ff797 100644 --- a/internal/server/driver/iface/driver.go +++ b/internal/server/driver/iface/driver.go @@ -4,13 +4,14 @@ import ( "context" "github.com/Neur0toxine/sshpoke/internal/config" - "github.com/Neur0toxine/sshpoke/internal/model" + "github.com/Neur0toxine/sshpoke/pkg/dto" ) type DriverConstructor func(ctx context.Context, name string, params config.DriverParams) (Driver, error) type Driver interface { - Handle(event model.Event) error + Name() string + Handle(event dto.Event) error Driver() config.DriverType WaitForShutdown() } diff --git a/internal/server/driver/null/driver.go b/internal/server/driver/null/driver.go index b1d0979..30eb200 100644 --- a/internal/server/driver/null/driver.go +++ b/internal/server/driver/null/driver.go @@ -5,8 +5,8 @@ import ( "github.com/Neur0toxine/sshpoke/internal/config" "github.com/Neur0toxine/sshpoke/internal/logger" - "github.com/Neur0toxine/sshpoke/internal/model" "github.com/Neur0toxine/sshpoke/internal/server/driver/iface" + "github.com/Neur0toxine/sshpoke/pkg/dto" ) // Null driver only logs container events to debug log. It is used when user provides invalid driver type. @@ -19,11 +19,15 @@ func New(ctx context.Context, name string, params config.DriverParams) (iface.Dr return &Null{name: name}, nil } -func (d *Null) Handle(event model.Event) error { +func (d *Null) Handle(event dto.Event) error { logger.Sugar.Debugw("handling event with null driver", "serverName", d.name, "event", event) return nil } +func (d *Null) Name() string { + return d.name +} + func (d *Null) Driver() config.DriverType { return config.DriverNull } diff --git a/internal/server/driver/plugin/driver.go b/internal/server/driver/plugin/driver.go index 8ce7923..6a5cc83 100644 --- a/internal/server/driver/plugin/driver.go +++ b/internal/server/driver/plugin/driver.go @@ -8,31 +8,38 @@ import ( "github.com/Neur0toxine/sshpoke/internal/config" "github.com/Neur0toxine/sshpoke/internal/logger" - "github.com/Neur0toxine/sshpoke/internal/model" "github.com/Neur0toxine/sshpoke/internal/server/driver/iface" "github.com/Neur0toxine/sshpoke/internal/server/driver/util" + "github.com/Neur0toxine/sshpoke/pkg/dto" ) var ErrAlreadyConnected = errors.New("already connected") -// Plugin driver uses RPC to communicate with external plugin. -type Plugin struct { +// Driver plugin uses RPC to communicate with external plugin. +type Driver struct { ctx context.Context name string params Params - send *Queue[model.Event] + send *Queue[dto.Event] listening atomic.Bool } type EventStream interface { - Send(event model.Event) error + Send(event dto.Event) error +} + +type Plugin interface { + iface.Driver + Token() string + Listen(ctx context.Context, stream EventStream) error + HandleStatus(event dto.EventStatus) } func New(ctx context.Context, name string, params config.DriverParams) (iface.Driver, error) { - drv := &Plugin{ + drv := &Driver{ name: name, ctx: ctx, - send: NewQueue[model.Event](), + send: NewQueue[dto.Event](), } if err := util.UnmarshalParams(params, &drv.params); err != nil { return nil, err @@ -40,27 +47,28 @@ func New(ctx context.Context, name string, params config.DriverParams) (iface.Dr return drv, nil } -func (d *Plugin) Handle(event model.Event) error { +func (d *Driver) Handle(event dto.Event) error { if d.isDone() { + d.send.Enqueue(dto.Event{Type: dto.EventShutdown}) return nil } d.send.Enqueue(event) return nil } -func (d *Plugin) Name() string { +func (d *Driver) Name() string { return d.name } -func (d *Plugin) Driver() config.DriverType { +func (d *Driver) Driver() config.DriverType { return config.DriverPlugin } -func (d *Plugin) Token() string { +func (d *Driver) Token() string { return d.params.Token } -func (d *Plugin) Listen(ctx context.Context, stream EventStream) error { +func (d *Driver) Listen(ctx context.Context, stream EventStream) error { if d.listening.Load() { return ErrAlreadyConnected } @@ -88,11 +96,11 @@ func (d *Plugin) Listen(ctx context.Context, stream EventStream) error { } } -func (d *Plugin) HandleStatus(event model.EventRequest) { +func (d *Driver) HandleStatus(event dto.EventStatus) { logger.Sugar.Errorw("plugin error", "serverName", d.name, "id", event.ID, "error", event.Error) } -func (d *Plugin) isDone() bool { +func (d *Driver) isDone() bool { select { case <-d.ctx.Done(): return true @@ -101,7 +109,7 @@ func (d *Plugin) isDone() bool { } } -func (d *Plugin) WaitForShutdown() { +func (d *Driver) WaitForShutdown() { <-d.ctx.Done() return } diff --git a/internal/server/driver/ssh/driver.go b/internal/server/driver/ssh/driver.go index 09a0eca..2aef4db 100644 --- a/internal/server/driver/ssh/driver.go +++ b/internal/server/driver/ssh/driver.go @@ -6,10 +6,10 @@ import ( "sync" "github.com/Neur0toxine/sshpoke/internal/config" - "github.com/Neur0toxine/sshpoke/internal/model" "github.com/Neur0toxine/sshpoke/internal/server/driver/iface" "github.com/Neur0toxine/sshpoke/internal/server/driver/util" "github.com/Neur0toxine/sshpoke/internal/server/proto/sshtun" + "github.com/Neur0toxine/sshpoke/pkg/dto" ) type SSH struct { @@ -21,7 +21,7 @@ type SSH struct { } type conn struct { - container model.Container + container dto.Container tun *sshtun.Tunnel } @@ -33,11 +33,15 @@ func New(ctx context.Context, name string, params config.DriverParams) (iface.Dr return drv, nil } -func (d *SSH) Handle(event model.Event) error { +func (d *SSH) Handle(event dto.Event) error { // TODO: Implement event handling & connections management. return errors.New(d.name + " server handler is not implemented yet") } +func (d *SSH) Name() string { + return d.name +} + func (d *SSH) Driver() config.DriverType { return config.DriverSSH } diff --git a/internal/server/manager.go b/internal/server/manager.go index d789d26..1d2e3a4 100644 --- a/internal/server/manager.go +++ b/internal/server/manager.go @@ -7,15 +7,16 @@ import ( "github.com/Neur0toxine/sshpoke/internal/config" "github.com/Neur0toxine/sshpoke/internal/logger" - "github.com/Neur0toxine/sshpoke/internal/model" "github.com/Neur0toxine/sshpoke/internal/server/driver" "github.com/Neur0toxine/sshpoke/internal/server/driver/iface" "github.com/Neur0toxine/sshpoke/internal/server/driver/plugin" + "github.com/Neur0toxine/sshpoke/pkg/dto" ) type Manager struct { rw sync.RWMutex servers map[string]iface.Driver + plugins map[string]plugin.Plugin defaultServer string } @@ -28,6 +29,7 @@ var ( func NewManager(ctx context.Context, servers []config.Server, defaultServer string) *Manager { m := &Manager{ servers: make(map[string]iface.Driver), + plugins: make(map[string]plugin.Plugin), defaultServer: defaultServer, } for _, serverConfig := range servers { @@ -36,12 +38,25 @@ func NewManager(ctx context.Context, servers []config.Server, defaultServer stri logger.Sugar.Errorf("cannot initialize server '%s': %s", serverConfig.Name, err) continue } + if server.Driver() == config.DriverPlugin { + pl := server.(plugin.Plugin) + if pl.Token() == "" { + logger.Sugar.Warnf("server '%s' will not work because it doesn't have a token", pl.Name()) + continue + } + existing, found := m.plugins[pl.Token()] + if found { + logger.Sugar.Fatalw("two plugins cannot have the same token", + "plugin1", existing.Name(), "plugin2", pl.Name(), "token", pl.Token()) + } + m.plugins[pl.Token()] = pl + } m.servers[serverConfig.Name] = server } return m } -func (m *Manager) ProcessEvent(event model.Event) error { +func (m *Manager) ProcessEvent(event dto.Event) error { serverName := event.Container.Server if serverName == "" { serverName = m.defaultServer @@ -58,20 +73,12 @@ func (m *Manager) ProcessEvent(event model.Event) error { return srv.Handle(event) } -func (m *Manager) PluginByToken(token string) *plugin.Plugin { - defer m.rw.RUnlock() - m.rw.RLock() - for _, srv := range m.servers { - if srv.Driver() != config.DriverPlugin { - continue - } - pl := srv.(*plugin.Plugin) - if pl.Token() != token { - continue - } - return pl +func (m *Manager) PluginByToken(token string) plugin.Plugin { + server, ok := m.plugins[token] + if !ok { + return nil } - return nil + return server } func (m *Manager) WaitForShutdown() { diff --git a/internal/server/proto/sshtun/tunnel.go b/internal/server/proto/sshtun/tunnel.go index 9addddc..a3b169f 100644 --- a/internal/server/proto/sshtun/tunnel.go +++ b/internal/server/proto/sshtun/tunnel.go @@ -121,7 +121,7 @@ func (t Tunnel) Bind(ctx context.Context, wg *sync.WaitGroup) { ln.Close() }() - t.Logger.Printf("(%v) binded Tunnel", t) + t.Logger.Printf("(%v) bound Tunnel", t) defer t.Logger.Printf("(%v) collapsed Tunnel", t) // Accept all incoming connections. diff --git a/pkg/convert/convert.go b/pkg/convert/convert.go new file mode 100644 index 0000000..54bfa73 --- /dev/null +++ b/pkg/convert/convert.go @@ -0,0 +1,65 @@ +package convert + +import ( + "net" + + "github.com/Neur0toxine/sshpoke/pkg/dto" + "github.com/Neur0toxine/sshpoke/pkg/plugin/pb" +) + +func MessageToAppEvent(event *pb.EventMessage) dto.Event { + return dto.Event{ + Type: MessageEventTypeToApp(event.Type), + ID: event.Id, + Container: dto.Container{ + IP: net.ParseIP(event.Container.Ip), + Port: uint16(event.Container.Port), + Server: event.Container.Server, + Prefix: event.Container.Prefix, + Domain: event.Container.Domain, + }, + } +} + +func AppEventToMessage(event dto.Event) *pb.EventMessage { + return &pb.EventMessage{ + Type: AppEventTypeToMessage(event.Type), + Id: event.ID, + Container: &pb.Container{ + Ip: event.Container.IP.String(), + Port: uint32(event.Container.Port), + Server: event.Container.Server, + Prefix: event.Container.Prefix, + Domain: event.Container.Domain, + }, + } +} + +func AppEventStatusToMessage(status dto.EventStatus) *pb.EventStatusMessage { + return &pb.EventStatusMessage{ + Id: status.ID, + Error: status.Error, + Domain: status.Domain, + } +} + +func MessageEventTypeToApp(typ pb.EventType) dto.EventType { + val := dto.EventType(typ.Number()) + if val < dto.EventStart || val > dto.EventUnknown { + return dto.EventUnknown + } + return val +} + +func AppEventTypeToMessage(typ dto.EventType) pb.EventType { + switch typ { + case 0: + return pb.EventType_EVENT_START + case 1: + return pb.EventType_EVENT_STOP + case 2: + fallthrough + default: + return pb.EventType_EVENT_UNKNOWN + } +} diff --git a/internal/model/event.go b/pkg/dto/models.go similarity index 82% rename from internal/model/event.go rename to pkg/dto/models.go index 9e1b4b7..7029844 100644 --- a/internal/model/event.go +++ b/pkg/dto/models.go @@ -1,4 +1,4 @@ -package model +package dto import "net" @@ -7,6 +7,7 @@ type EventType uint8 const ( EventStart EventType = iota EventStop + EventShutdown EventUnknown ) @@ -27,9 +28,10 @@ type Event struct { Container Container } -type EventRequest struct { - ID string - Error string +type EventStatus struct { + ID string + Error string + Domain string } type Container struct { diff --git a/pkg/plugin/client.go b/pkg/plugin/client.go new file mode 100644 index 0000000..d24be02 --- /dev/null +++ b/pkg/plugin/client.go @@ -0,0 +1,68 @@ +package plugin + +import ( + "context" + "net" + "runtime" + "strconv" + "strings" + + "github.com/Neur0toxine/sshpoke/pkg/convert" + "github.com/Neur0toxine/sshpoke/pkg/dto" + "github.com/Neur0toxine/sshpoke/pkg/plugin/pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/emptypb" +) + +type Client struct { + parent pb.PluginServiceClient + token string + close func() error +} + +func NewClient(addr, token string) (*Client, error) { + conn, err := grpc.Dial(normalizeAddr(addr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + c := &Client{ + parent: pb.NewPluginServiceClient(conn), + token: token, + close: conn.Close, + } + runtime.SetFinalizer(c, connCloser) + return c, nil +} + +func (c *Client) Event(ctx context.Context) (*Stream, error) { + stream, err := c.parent.Event(ctx, &emptypb.Empty{}) + if err != nil { + return nil, err + } + return &Stream{stream: stream}, nil +} + +func (c *Client) EventStatus(ctx context.Context, status dto.EventStatus) error { + _, err := c.parent.EventStatus(ctx, convert.AppEventStatusToMessage(status)) + return err +} + +func connCloser(c *Client) { + _ = c.close() +} + +func normalizeAddr(addr string) string { + addr = strings.TrimSpace(addr) + if strings.HasPrefix(addr, "grpc://") { + addr = addr[7:] + } + host, port, err := net.SplitHostPort(addr) + if err != nil && err.Error() == "missing port in address" { + host, port, err = net.SplitHostPort(addr + ":" + strconv.Itoa(DefaultPort)) + } + if err != nil { + return "" + } + return host + ":" + port +} diff --git a/pkg/plugin/pb.go b/pkg/plugin/pb.go index 6d718f7..274daca 100644 --- a/pkg/plugin/pb.go +++ b/pkg/plugin/pb.go @@ -1,2 +1,2 @@ -//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb.proto +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb/pb.proto package plugin diff --git a/pkg/plugin/pb.proto b/pkg/plugin/pb/pb.proto similarity index 78% rename from pkg/plugin/pb.proto rename to pkg/plugin/pb/pb.proto index 02dae22..308607c 100644 --- a/pkg/plugin/pb.proto +++ b/pkg/plugin/pb/pb.proto @@ -1,19 +1,19 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; -option go_package = "github.com/Neur0toxine/sshpoke/pkg/plugin"; +option go_package = "github.com/Neur0toxine/sshpoke/pkg/plugin/pb"; option java_multiple_files = true; service PluginService { rpc Event (google.protobuf.Empty) returns (stream EventMessage); rpc EventStatus (EventStatusMessage) returns (google.protobuf.Empty); - rpc Shutdown (stream google.protobuf.Empty) returns (google.protobuf.Empty); } enum EventType { EVENT_START = 0; EVENT_STOP = 1; - EVENT_UNKNOWN = 2; + EVENT_SHUTDOWN = 2; + EVENT_UNKNOWN = 3; } message Container { @@ -33,4 +33,5 @@ message EventMessage { message EventStatusMessage { string id = 1; string error = 2; + string domain = 3; } diff --git a/pkg/plugin/port.go b/pkg/plugin/port.go new file mode 100644 index 0000000..861c05f --- /dev/null +++ b/pkg/plugin/port.go @@ -0,0 +1,3 @@ +package plugin + +const DefaultPort = 25681 diff --git a/pkg/plugin/stream.go b/pkg/plugin/stream.go new file mode 100644 index 0000000..694fa35 --- /dev/null +++ b/pkg/plugin/stream.go @@ -0,0 +1,19 @@ +package plugin + +import ( + "github.com/Neur0toxine/sshpoke/pkg/convert" + "github.com/Neur0toxine/sshpoke/pkg/dto" + "github.com/Neur0toxine/sshpoke/pkg/plugin/pb" +) + +type Stream struct { + stream pb.PluginService_EventClient +} + +func (s *Stream) Receive() (dto.Event, error) { + data, err := s.stream.Recv() + if err != nil { + return dto.Event{}, err + } + return convert.MessageToAppEvent(data), nil +}