individual queue for every machine

This commit is contained in:
Pavel 2024-05-14 15:39:55 +03:00
parent b13cd05b45
commit 876950f4ce
7 changed files with 59 additions and 37 deletions

View File

@ -8,12 +8,10 @@ import (
"gitea.neur0tx.site/Neur0toxine/vegapokerbot/internal/handler/iface"
"gitea.neur0tx.site/Neur0toxine/vegapokerbot/internal/locale"
"gitea.neur0tx.site/Neur0toxine/vegapokerbot/internal/logger"
"gitea.neur0tx.site/Neur0toxine/vegapokerbot/pkg/types"
"github.com/mymmrac/telego"
"go.uber.org/zap"
"gorm.io/gorm"
"net/url"
"time"
)
type App struct {
@ -23,7 +21,6 @@ type App struct {
Config *config.Config
Repositories *db.Repositories
Handlers map[handler.Type]handler.Handler
updates types.Queue[*telego.Update]
db *gorm.DB
}
@ -117,7 +114,6 @@ func (a *App) initTelegram() error {
}
func (a *App) initHandlers() {
a.updates = types.NewQueue[*telego.Update]()
fsmwizard.PopulateStates(a)
a.Handlers = map[handler.Type]handler.Handler{
handler.Noop: handler.NewNoopHandler(a.Logger, a.Config.Debug),
@ -144,17 +140,6 @@ func (a *App) handler(update telego.Update) handler.Handler {
return a.Handlers[handler.Noop]
}
func (a *App) processUpdates() {
for {
item := a.updates.Dequeue()
if item == nil {
time.Sleep(time.Nanosecond * 200)
continue
}
a.processUpdate(*item)
}
}
func (a *App) processUpdate(update telego.Update) {
defer func() {
if r := recover(); r != nil {
@ -172,11 +157,9 @@ func (a *App) longPoll() error {
if err != nil {
return err
}
go a.processUpdates()
defer a.Telegram.StopLongPolling()
for update := range updates {
update := update
a.updates.Enqueue(&update)
go a.processUpdate(update)
}
return nil
}
@ -201,10 +184,8 @@ func (a *App) listenWebhook() error {
defer func() {
_ = a.Telegram.StopWebhook()
}()
go a.processUpdates()
for update := range updates {
update := update
a.updates.Enqueue(&update)
go a.processUpdate(update)
}
return nil
}

View File

@ -22,13 +22,14 @@ func NewChatMemberUpdatedHandler(app iface.App) *ChatMemberUpdatedHandler {
}
func (h *ChatMemberUpdatedHandler) Handle(wh telego.Update) error {
return fsmwizard.Get(wh.MyChatMember.From.ID).Handle(func(w *fsmwizard.Wizard) *fsmwizard.Wizard {
fsmwizard.Get(wh.MyChatMember.From.ID).Enqueue(func(w *fsmwizard.Wizard) *fsmwizard.Wizard {
if w == nil {
return &fsmwizard.Wizard{Data: wh}
}
w.Data = wh
return w
})
return nil
// cm := wh.MyChatMember
// if !cm.OldChatMember.MemberIsMember() && cm.OldChatMember.MemberUser().ID == h.App.TGProfile().ID &&
// cm.NewChatMember.MemberIsMember() && cm.NewChatMember.MemberUser().ID == h.App.TGProfile().ID {

View File

@ -83,6 +83,6 @@ func (s State) Move(mc fsm.MachineControls[Wizard], stateID fsm.StateID, pl *Wiz
s.LogError(mc.Move(stateID, pl))
}
func (s State) MoveForHandle(mc fsm.MachineControls[Wizard], stateID fsm.StateID, pl *Wizard) {
s.LogError(mc.MoveForHandle(stateID, pl))
func (s State) MoveForHandling(mc fsm.MachineControls[Wizard], stateID fsm.StateID, pl *Wizard) {
s.LogError(mc.MoveForHandling(stateID, pl))
}

View File

@ -24,12 +24,12 @@ func (s *WaitingForMemberWebhookState) Enter(pl *Wizard, mc fsm.MachineControls[
}
if !cm.OldChatMember.MemberIsMember() && cm.OldChatMember.MemberUser().ID == s.App.TGProfile().ID &&
cm.NewChatMember.MemberIsMember() && cm.NewChatMember.MemberUser().ID == s.App.TGProfile().ID {
s.MoveForHandle(mc, AddChatMemberStateID, pl)
s.MoveForHandling(mc, AddChatMemberStateID, pl)
return nil
}
if cm.OldChatMember.MemberIsMember() && cm.OldChatMember.MemberUser().ID == s.App.TGProfile().ID &&
!cm.NewChatMember.MemberIsMember() && cm.NewChatMember.MemberUser().ID == s.App.TGProfile().ID {
s.MoveForHandle(mc, RemoveChatMemberStateID, pl)
s.MoveForHandling(mc, RemoveChatMemberStateID, pl)
return nil
}

View File

@ -19,13 +19,14 @@ func (h *MessageHandler) Handle(wh telego.Update) error {
return nil
}
return fsmwizard.Get(wh.Message.From.ID).Handle(func(w *fsmwizard.Wizard) *fsmwizard.Wizard {
fsmwizard.Get(wh.Message.From.ID).Enqueue(func(w *fsmwizard.Wizard) *fsmwizard.Wizard {
if w == nil {
return &fsmwizard.Wizard{Data: wh}
}
w.Data = wh
return w
})
return nil
// if util.MatchCommand("start", wh.Message) {
// return wizard.NewRegister(h.App, wh.Message.From.ID, wh.Message.Chat.ID).Handle(wh)

View File

@ -1,9 +1,12 @@
package fsm
import (
"context"
"errors"
"fmt"
"gitea.neur0tx.site/Neur0toxine/vegapokerbot/pkg/types"
"runtime"
"time"
)
var (
@ -29,15 +32,18 @@ type MachineStateProvider[T any] func(*T) *T
// - Reset the machine.
type IMachine[T any] interface {
MachineControlsWithState[T]
// Handle the state input. Handle func will accept the current payload and modify it based on user input.
Handle(MachineStateProvider[T]) error
// Enqueue the state input. Handle func will accept the current payload and modify it based on user input.
Enqueue(MachineStateProvider[T])
// Reset the machine to its initial state.
Reset()
}
// Machine is a finite-state machine implementation.
type Machine[T any] struct {
ctx context.Context
cancel context.CancelFunc
payload *T
transitions types.Queue[MachineStateProvider[T]]
stateRouter MachineStateRouter[T]
state StateID
initialState StateID
@ -52,13 +58,22 @@ func New[T any](initialState StateID, router MachineStateRouter[T], states []ISt
for _, state := range states {
stateMap.Set(state.ID(), state)
}
return &Machine[T]{
ctx, cancel := context.WithCancel(context.Background())
m := &Machine[T]{
ctx: ctx,
cancel: cancel,
state: initialState,
transitions: types.NewQueue[MachineStateProvider[T]](),
stateRouter: router,
initialState: initialState,
states: stateMap,
errHandler: errHandler,
}
go m.handleQueue()
runtime.SetFinalizer(m, func(m *Machine[T]) {
m.cancel()
})
return m
}
// Move to another state.
@ -90,13 +105,37 @@ func (m *Machine[T]) Move(id StateID, payload *T) error {
return nil
}
func (m *Machine[T]) MoveForHandle(id StateID, payload *T) error {
func (m *Machine[T]) MoveForHandling(id StateID, payload *T) error {
m.handleNow = true
return m.Move(id, payload)
}
func (m *Machine[T]) Enqueue(provider MachineStateProvider[T]) {
m.transitions.Enqueue(provider)
}
func (m *Machine[T]) handleQueue() {
defer func() {
if r := recover(); r != nil {
go m.handleQueue()
}
}()
for {
select {
case <-m.ctx.Done():
return
case <-time.After(time.Nanosecond * 10):
}
transition := m.transitions.Dequeue()
if transition == nil {
continue
}
m.handle(transition)
}
}
// Handle the input.
func (m *Machine[T]) Handle(provider MachineStateProvider[T]) error {
func (m *Machine[T]) handle(provider MachineStateProvider[T]) {
if provider != nil {
m.payload = provider(m.payload)
}
@ -105,15 +144,15 @@ func (m *Machine[T]) Handle(provider MachineStateProvider[T]) error {
}
st, err := m.loadState(m.state, m.payload)
if st == nil || err != nil {
return err
return
}
for {
st.Handle(m.payload, m)
if m.handleNow { // MoveForHandle was called, trying to handle again.
if m.handleNow { // MoveForHandling was called, trying to handle again.
m.handleNow = false
continue
}
return nil
return
}
}

View File

@ -10,8 +10,8 @@ const NilStateID = StateID("")
// It may fail with an error which should be handled by the IState implementation.
type MachineControls[T any] interface {
Move(StateID, *T) error
// MoveForHandle is the same as Move but it also triggers Handle immediately for the next state.
MoveForHandle(StateID, *T) error
// MoveForHandling is the same as Move but it also triggers Handle immediately for the next state.
MoveForHandling(StateID, *T) error
Reset()
}