package main import ( "context" "encoding/json" "errors" "math/rand" "runtime" "strconv" "time" "github.com/SevereCloud/vksdk/v2/object" "github.com/allegro/bigcache/v3" "go.uber.org/atomic" ) type UsersUser struct { ID int64 } var UserUpdateInterval = 24 const ( maxUsersPerChannel = 50 fetcherInterval = time.Millisecond fetcherBatchingInterval = time.Millisecond * 20 ) type Fetcher interface { Enqueue(userID int64) WaitFor(userID int64, sleepBetweenAttempts time.Duration, attemptsCount int) UsersUser } type fetcher struct { rnd *rand.Rand input chan int64 offload *bigcache.BigCache output chan []int64 result *bigcache.BigCache stop atomic.Bool token string } func newFetcher(token string) *fetcher { input, _ := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute*10)) output, _ := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute)) uf := &fetcher{ rnd: rand.New(rand.NewSource(time.Now().UnixNano())), input: make(chan int64, maxUsersPerChannel), output: make(chan []int64), offload: input, result: output, token: token, } runtime.SetFinalizer(uf, usersFetcherDestructor) go uf.consumeOffload() go uf.assembleBatches() go uf.doWork() return uf } func usersFetcherDestructor(uf *fetcher) { uf.stop.Store(true) } func (u *fetcher) Enqueue(userID int64) { if u.stop.Load() { return } if userID == 0 { return } if len(u.input) < 50 { u.input <- userID return } _ = u.offload.Set(strconv.FormatInt(userID, 10), []byte{}) } func (u *fetcher) WaitFor(userID int64, sleepBetweenAttempts time.Duration, attemptsCount int) object.UsersUser { attempt := 0 uid := strconv.FormatInt(userID, 10) retryFetching: val, err := u.result.Get(uid) if err != nil { if errors.Is(err, bigcache.ErrEntryNotFound) { if attempt == attemptsCount { return object.UsersUser{} } time.Sleep(sleepBetweenAttempts) attempt++ goto retryFetching } return object.UsersUser{} } var user object.UsersUser if err := json.Unmarshal(val, &user); err != nil { return object.UsersUser{} } return user } func (u *fetcher) consumeOffload() { for { if u.stop.Load() { break } it := u.offload.Iterator() for it.SetNext() { entry, err := it.Value() if err != nil { break } uid, _ := strconv.ParseInt(entry.Key(), 10, 64) if uid == 0 { u.offload.Delete(entry.Key()) continue } u.input <- uid u.offload.Delete(entry.Key()) time.Sleep(fetcherInterval) } } } func (u *fetcher) assembleBatches() { cnt := 0 items := make([]int64, 50) for { if u.stop.Load() { break } select { case item := <-u.input: items[cnt] = item cnt++ if cnt == 50 { cnt = 0 u.output <- items items = make([]int64, 50) continue } case <-time.After(fetcherBatchingInterval): if len(items) > 0 && items[0] != 0 { cnt = 0 u.output <- items items = make([]int64, 50) } } } } func (u *fetcher) doWork() { defer func() { if r := recover(); r != nil { time.Sleep(fetcherInterval) go u.doWork() } }() for batch := range u.output { time.Sleep(time.Millisecond * time.Duration(u.rnd.Intn(150)+200)) for _, user := range batch { data, err := json.Marshal(UsersUser{ID: user}) if err != nil { continue } _ = u.result.Set(strconv.FormatInt(user, 10), data) } } close(u.output) close(u.input) }