vk-webhook-flooder-mock/teststore.go
2023-01-13 13:28:05 +03:00

178 lines
3.4 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"math/rand"
"runtime"
"strconv"
"time"
"github.com/SevereCloud/vksdk/v2/object"
"github.com/allegro/bigcache/v3"
"go.uber.org/atomic"
)
type UsersUser struct {
ID int64
}
var UserUpdateInterval = 24
const (
maxUsersPerChannel = 50
fetcherInterval = time.Millisecond
fetcherBatchingInterval = time.Millisecond * 20
)
type Fetcher interface {
Enqueue(userID int64)
WaitFor(userID int64, sleepBetweenAttempts time.Duration, attemptsCount int) UsersUser
}
type fetcher struct {
rnd *rand.Rand
input chan int64
offload *bigcache.BigCache
output chan []int64
result *bigcache.BigCache
stop atomic.Bool
token string
}
func newFetcher(token string) *fetcher {
input, _ := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute*10))
output, _ := bigcache.New(context.Background(), bigcache.DefaultConfig(time.Minute))
uf := &fetcher{
rnd: rand.New(rand.NewSource(time.Now().UnixNano())),
input: make(chan int64, maxUsersPerChannel),
output: make(chan []int64),
offload: input,
result: output,
token: token,
}
runtime.SetFinalizer(uf, usersFetcherDestructor)
go uf.consumeOffload()
go uf.assembleBatches()
go uf.doWork()
return uf
}
func usersFetcherDestructor(uf *fetcher) {
uf.stop.Store(true)
}
func (u *fetcher) Enqueue(userID int64) {
if u.stop.Load() {
return
}
if userID == 0 {
return
}
if len(u.input) < 50 {
u.input <- userID
return
}
_ = u.offload.Set(strconv.FormatInt(userID, 10), []byte{})
}
func (u *fetcher) WaitFor(userID int64, sleepBetweenAttempts time.Duration, attemptsCount int) object.UsersUser {
attempt := 0
uid := strconv.FormatInt(userID, 10)
retryFetching:
val, err := u.result.Get(uid)
if err != nil {
if errors.Is(err, bigcache.ErrEntryNotFound) {
if attempt == attemptsCount {
return object.UsersUser{}
}
time.Sleep(sleepBetweenAttempts)
attempt++
goto retryFetching
}
return object.UsersUser{}
}
var user object.UsersUser
if err := json.Unmarshal(val, &user); err != nil {
return object.UsersUser{}
}
return user
}
func (u *fetcher) consumeOffload() {
for {
if u.stop.Load() {
break
}
it := u.offload.Iterator()
for it.SetNext() {
entry, err := it.Value()
if err != nil {
break
}
uid, _ := strconv.ParseInt(entry.Key(), 10, 64)
if uid == 0 {
u.offload.Delete(entry.Key())
continue
}
u.input <- uid
u.offload.Delete(entry.Key())
time.Sleep(fetcherInterval)
}
}
}
func (u *fetcher) assembleBatches() {
cnt := 0
items := make([]int64, 50)
for {
if u.stop.Load() {
break
}
select {
case item := <-u.input:
items[cnt] = item
cnt++
if cnt == 50 {
cnt = 0
u.output <- items
items = make([]int64, 50)
continue
}
case <-time.After(fetcherBatchingInterval):
if len(items) > 0 && items[0] != 0 {
cnt = 0
u.output <- items
items = make([]int64, 50)
}
}
}
}
func (u *fetcher) doWork() {
defer func() {
if r := recover(); r != nil {
time.Sleep(fetcherInterval)
go u.doWork()
}
}()
for batch := range u.output {
time.Sleep(time.Millisecond * time.Duration(u.rnd.Intn(150)+200))
for _, user := range batch {
data, err := json.Marshal(UsersUser{ID: user})
if err != nil {
continue
}
_ = u.result.Set(strconv.FormatInt(user, 10), data)
}
}
close(u.output)
close(u.input)
}