diff --git a/internal/app/app.go b/internal/app/app.go index 54c4cef..07095b6 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -8,12 +8,10 @@ import ( "gitea.neur0tx.site/Neur0toxine/vegapokerbot/internal/handler/iface" "gitea.neur0tx.site/Neur0toxine/vegapokerbot/internal/locale" "gitea.neur0tx.site/Neur0toxine/vegapokerbot/internal/logger" - "gitea.neur0tx.site/Neur0toxine/vegapokerbot/pkg/types" "github.com/mymmrac/telego" "go.uber.org/zap" "gorm.io/gorm" "net/url" - "time" ) type App struct { @@ -23,7 +21,6 @@ type App struct { Config *config.Config Repositories *db.Repositories Handlers map[handler.Type]handler.Handler - updates types.Queue[*telego.Update] db *gorm.DB } @@ -117,7 +114,6 @@ func (a *App) initTelegram() error { } func (a *App) initHandlers() { - a.updates = types.NewQueue[*telego.Update]() fsmwizard.PopulateStates(a) a.Handlers = map[handler.Type]handler.Handler{ handler.Noop: handler.NewNoopHandler(a.Logger, a.Config.Debug), @@ -144,17 +140,6 @@ func (a *App) handler(update telego.Update) handler.Handler { return a.Handlers[handler.Noop] } -func (a *App) processUpdates() { - for { - item := a.updates.Dequeue() - if item == nil { - time.Sleep(time.Nanosecond * 200) - continue - } - a.processUpdate(*item) - } -} - func (a *App) processUpdate(update telego.Update) { defer func() { if r := recover(); r != nil { @@ -172,11 +157,9 @@ func (a *App) longPoll() error { if err != nil { return err } - go a.processUpdates() defer a.Telegram.StopLongPolling() for update := range updates { - update := update - a.updates.Enqueue(&update) + go a.processUpdate(update) } return nil } @@ -201,10 +184,8 @@ func (a *App) listenWebhook() error { defer func() { _ = a.Telegram.StopWebhook() }() - go a.processUpdates() for update := range updates { - update := update - a.updates.Enqueue(&update) + go a.processUpdate(update) } return nil } diff --git a/internal/handler/chat_member_updated_handler.go b/internal/handler/chat_member_updated_handler.go index 3093015..b632080 100644 --- a/internal/handler/chat_member_updated_handler.go +++ b/internal/handler/chat_member_updated_handler.go @@ -22,13 +22,14 @@ func NewChatMemberUpdatedHandler(app iface.App) *ChatMemberUpdatedHandler { } func (h *ChatMemberUpdatedHandler) Handle(wh telego.Update) error { - return fsmwizard.Get(wh.MyChatMember.From.ID).Handle(func(w *fsmwizard.Wizard) *fsmwizard.Wizard { + fsmwizard.Get(wh.MyChatMember.From.ID).Enqueue(func(w *fsmwizard.Wizard) *fsmwizard.Wizard { if w == nil { return &fsmwizard.Wizard{Data: wh} } w.Data = wh return w }) + return nil // cm := wh.MyChatMember // if !cm.OldChatMember.MemberIsMember() && cm.OldChatMember.MemberUser().ID == h.App.TGProfile().ID && // cm.NewChatMember.MemberIsMember() && cm.NewChatMember.MemberUser().ID == h.App.TGProfile().ID { diff --git a/internal/handler/fsmwizard/base.go b/internal/handler/fsmwizard/base.go index 77c2c20..fe24889 100644 --- a/internal/handler/fsmwizard/base.go +++ b/internal/handler/fsmwizard/base.go @@ -83,6 +83,6 @@ func (s State) Move(mc fsm.MachineControls[Wizard], stateID fsm.StateID, pl *Wiz s.LogError(mc.Move(stateID, pl)) } -func (s State) MoveForHandle(mc fsm.MachineControls[Wizard], stateID fsm.StateID, pl *Wizard) { - s.LogError(mc.MoveForHandle(stateID, pl)) +func (s State) MoveForHandling(mc fsm.MachineControls[Wizard], stateID fsm.StateID, pl *Wizard) { + s.LogError(mc.MoveForHandling(stateID, pl)) } diff --git a/internal/handler/fsmwizard/waiting_for_member_webhook_state.go b/internal/handler/fsmwizard/waiting_for_member_webhook_state.go index df5632b..09e7001 100644 --- a/internal/handler/fsmwizard/waiting_for_member_webhook_state.go +++ b/internal/handler/fsmwizard/waiting_for_member_webhook_state.go @@ -24,12 +24,12 @@ func (s *WaitingForMemberWebhookState) Enter(pl *Wizard, mc fsm.MachineControls[ } if !cm.OldChatMember.MemberIsMember() && cm.OldChatMember.MemberUser().ID == s.App.TGProfile().ID && cm.NewChatMember.MemberIsMember() && cm.NewChatMember.MemberUser().ID == s.App.TGProfile().ID { - s.MoveForHandle(mc, AddChatMemberStateID, pl) + s.MoveForHandling(mc, AddChatMemberStateID, pl) return nil } if cm.OldChatMember.MemberIsMember() && cm.OldChatMember.MemberUser().ID == s.App.TGProfile().ID && !cm.NewChatMember.MemberIsMember() && cm.NewChatMember.MemberUser().ID == s.App.TGProfile().ID { - s.MoveForHandle(mc, RemoveChatMemberStateID, pl) + s.MoveForHandling(mc, RemoveChatMemberStateID, pl) return nil } diff --git a/internal/handler/new_message_handler.go b/internal/handler/new_message_handler.go index cdd7ae7..526c418 100644 --- a/internal/handler/new_message_handler.go +++ b/internal/handler/new_message_handler.go @@ -19,13 +19,14 @@ func (h *MessageHandler) Handle(wh telego.Update) error { return nil } - return fsmwizard.Get(wh.Message.From.ID).Handle(func(w *fsmwizard.Wizard) *fsmwizard.Wizard { + fsmwizard.Get(wh.Message.From.ID).Enqueue(func(w *fsmwizard.Wizard) *fsmwizard.Wizard { if w == nil { return &fsmwizard.Wizard{Data: wh} } w.Data = wh return w }) + return nil // if util.MatchCommand("start", wh.Message) { // return wizard.NewRegister(h.App, wh.Message.From.ID, wh.Message.Chat.ID).Handle(wh) diff --git a/pkg/fsm/machine.go b/pkg/fsm/machine.go index 13bdb0d..4327110 100644 --- a/pkg/fsm/machine.go +++ b/pkg/fsm/machine.go @@ -1,9 +1,12 @@ package fsm import ( + "context" "errors" "fmt" "gitea.neur0tx.site/Neur0toxine/vegapokerbot/pkg/types" + "runtime" + "time" ) var ( @@ -29,15 +32,18 @@ type MachineStateProvider[T any] func(*T) *T // - Reset the machine. type IMachine[T any] interface { MachineControlsWithState[T] - // Handle the state input. Handle func will accept the current payload and modify it based on user input. - Handle(MachineStateProvider[T]) error + // Enqueue the state input. Handle func will accept the current payload and modify it based on user input. + Enqueue(MachineStateProvider[T]) // Reset the machine to its initial state. Reset() } // Machine is a finite-state machine implementation. type Machine[T any] struct { + ctx context.Context + cancel context.CancelFunc payload *T + transitions types.Queue[MachineStateProvider[T]] stateRouter MachineStateRouter[T] state StateID initialState StateID @@ -52,13 +58,22 @@ func New[T any](initialState StateID, router MachineStateRouter[T], states []ISt for _, state := range states { stateMap.Set(state.ID(), state) } - return &Machine[T]{ + ctx, cancel := context.WithCancel(context.Background()) + m := &Machine[T]{ + ctx: ctx, + cancel: cancel, state: initialState, + transitions: types.NewQueue[MachineStateProvider[T]](), stateRouter: router, initialState: initialState, states: stateMap, errHandler: errHandler, } + go m.handleQueue() + runtime.SetFinalizer(m, func(m *Machine[T]) { + m.cancel() + }) + return m } // Move to another state. @@ -90,13 +105,37 @@ func (m *Machine[T]) Move(id StateID, payload *T) error { return nil } -func (m *Machine[T]) MoveForHandle(id StateID, payload *T) error { +func (m *Machine[T]) MoveForHandling(id StateID, payload *T) error { m.handleNow = true return m.Move(id, payload) } +func (m *Machine[T]) Enqueue(provider MachineStateProvider[T]) { + m.transitions.Enqueue(provider) +} + +func (m *Machine[T]) handleQueue() { + defer func() { + if r := recover(); r != nil { + go m.handleQueue() + } + }() + for { + select { + case <-m.ctx.Done(): + return + case <-time.After(time.Nanosecond * 10): + } + transition := m.transitions.Dequeue() + if transition == nil { + continue + } + m.handle(transition) + } +} + // Handle the input. -func (m *Machine[T]) Handle(provider MachineStateProvider[T]) error { +func (m *Machine[T]) handle(provider MachineStateProvider[T]) { if provider != nil { m.payload = provider(m.payload) } @@ -105,15 +144,15 @@ func (m *Machine[T]) Handle(provider MachineStateProvider[T]) error { } st, err := m.loadState(m.state, m.payload) if st == nil || err != nil { - return err + return } for { st.Handle(m.payload, m) - if m.handleNow { // MoveForHandle was called, trying to handle again. + if m.handleNow { // MoveForHandling was called, trying to handle again. m.handleNow = false continue } - return nil + return } } diff --git a/pkg/fsm/state.go b/pkg/fsm/state.go index 746304e..7671ad0 100644 --- a/pkg/fsm/state.go +++ b/pkg/fsm/state.go @@ -10,8 +10,8 @@ const NilStateID = StateID("") // It may fail with an error which should be handled by the IState implementation. type MachineControls[T any] interface { Move(StateID, *T) error - // MoveForHandle is the same as Move but it also triggers Handle immediately for the next state. - MoveForHandle(StateID, *T) error + // MoveForHandling is the same as Move but it also triggers Handle immediately for the next state. + MoveForHandling(StateID, *T) error Reset() }