From 131fd9727c50a67e6c94df623571ce82dd48b199 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Thu, 2 Jun 2022 19:33:20 +0800 Subject: [PATCH] add more --- engine.go | 6 +++++ example/echo/main.go | 2 +- example/main.go | 2 +- future.go | 6 ++--- go.mod | 2 ++ go.sum | 4 +++ matcher.go | 51 +++++++++++++++--------------------- single.go | 61 ++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 98 insertions(+), 36 deletions(-) create mode 100644 single.go diff --git a/engine.go b/engine.go index 914bf98..bb28e8e 100644 --- a/engine.go +++ b/engine.go @@ -53,6 +53,12 @@ func (e *Engine) UsePostHandler(handler ...Process) { e.postHandler = append(e.postHandler, handler...) } +// ApplySingle 应用反并发 +func (e *Engine) ApplySingle(s *Single[int64]) *Engine { + s.Apply(e) + return e +} + // On 添加新的指定消息类型的匹配器(默认Engine) func On(typ string, rules ...Rule) *Matcher { return defaultEngine.On(typ, rules...) } diff --git a/example/echo/main.go b/example/echo/main.go index 51abdac..3296002 100644 --- a/example/echo/main.go +++ b/example/echo/main.go @@ -6,7 +6,7 @@ import ( ) func init() { - rei.OnMessagePrefix("echo").SetBlock(true).SecondPriority(). + rei.OnMessagePrefix("echo").SetBlock(true). Handle(func(ctx *rei.Ctx) { args := ctx.State["args"].(string) if args == "" { diff --git a/example/main.go b/example/main.go index a77b0ef..07d991f 100644 --- a/example/main.go +++ b/example/main.go @@ -8,7 +8,7 @@ import ( ) func main() { - rei.OnMessageFullMatch("help").SetBlock(true).SecondPriority(). + rei.OnMessageFullMatch("help").SetBlock(true). Handle(func(ctx *rei.Ctx) { msg := ctx.Value.(*tgba.Message) _, _ = ctx.Caller.Send(tgba.NewMessage(msg.Chat.ID, "echo string")) diff --git a/future.go b/future.go index e23a316..3571c86 100644 --- a/future.go +++ b/future.go @@ -22,7 +22,7 @@ func NewFutureEvent(Type string, Priority int, Block bool, rule ...Rule) *Future func (m *Matcher) FutureEvent(Type string, rule ...Rule) *FutureEvent { return &FutureEvent{ Type: Type, - Priority: m.Priority, + Priority: m.priority, Block: m.Block, Rule: rule, } @@ -36,7 +36,7 @@ func (n *FutureEvent) Next() <-chan *Ctx { StoreTempMatcher(&Matcher{ Type: n.Type, Block: n.Block, - Priority: n.Priority, + priority: n.Priority, Rules: n.Rule, Engine: defaultEngine, Process: func(ctx *Ctx) { @@ -58,7 +58,7 @@ func (n *FutureEvent) Repeat() (recv <-chan *Ctx, cancel func()) { matcher := StoreMatcher(&Matcher{ Type: n.Type, Block: n.Block, - Priority: n.Priority, + priority: n.Priority, Rules: n.Rule, Engine: defaultEngine, Process: func(ctx *Ctx) { diff --git a/go.mod b/go.mod index 1bd5c7f..4fef86a 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,11 @@ module github.com/fumiama/ReiBot go 1.18 require ( + github.com/RomiChan/syncx v0.0.0-20220404072119-d7ea0ae15a4c github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.1 + github.com/wdvxdr1123/ZeroBot v1.5.1 ) require ( diff --git a/go.sum b/go.sum index 42ea528..f42c3c9 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/RomiChan/syncx v0.0.0-20220404072119-d7ea0ae15a4c h1:cNPOdTNiVwxLpROLjXCgbIPvdkE+BwvxDvgmdYmWx6Q= +github.com/RomiChan/syncx v0.0.0-20220404072119-d7ea0ae15a4c/go.mod h1:KqZzu7slNKROh3TSYEH/IUMG6f4M+1qubZ5e52QypsE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -11,6 +13,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/wdvxdr1123/ZeroBot v1.5.1 h1:riSAWc0kTy+ILgf+YnZMp+FfhVRGDrOMxK1e2wwWdus= +github.com/wdvxdr1123/ZeroBot v1.5.1/go.mod h1:K2vu0mslV8s4qhIAu/a03Z7YW24qjM0j3imIR+k21KI= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/matcher.go b/matcher.go index d6a2d52..c359b76 100644 --- a/matcher.go +++ b/matcher.go @@ -3,6 +3,8 @@ package rei import ( "sort" "sync" + + "github.com/wdvxdr1123/ZeroBot/extension/rate" ) type ( @@ -18,8 +20,8 @@ type Matcher struct { Temp bool // Block 是否阻断后续 Matcher,为 true 时当前Matcher匹配成功后,后续Matcher不参与匹配 Block bool - // Priority 优先级,越小优先级越高 - Priority int + // priority 优先级,越小优先级越高 + priority int // Event 当前匹配到的事件 Event *Event // Type 匹配的事件类型 @@ -44,7 +46,7 @@ type State map[string]interface{} func sortMatcher(typ string) { sort.Slice(matcherMap[typ], func(i, j int) bool { // 按优先级排序 - return matcherMap[typ][i].Priority < matcherMap[typ][j].Priority + return matcherMap[typ][i].priority < matcherMap[typ][j].priority }) } @@ -54,33 +56,20 @@ func (m *Matcher) SetBlock(block bool) *Matcher { return m } -// SetPriority 设置当前 Matcher 优先级 -func (m *Matcher) SetPriority(priority int) *Matcher { - matcherLock.Lock() - defer matcherLock.Unlock() - m.Priority = priority - sortMatcher(m.Type) - return m -} - -// FirstPriority 设置当前 Matcher 优先级 - 0 -func (m *Matcher) FirstPriority() *Matcher { - return m.SetPriority(0) -} - -// SecondPriority 设置当前 Matcher 优先级 - 1 -func (m *Matcher) SecondPriority() *Matcher { - return m.SetPriority(1) -} - -// ThirdPriority 设置当前 Matcher 优先级 - 2 -func (m *Matcher) ThirdPriority() *Matcher { - return m.SetPriority(2) -} - -// BindEngine bind the matcher to a engine -func (m *Matcher) BindEngine(e *Engine) *Matcher { - m.Engine = e +// Limit 限速器 +// postfn 当请求被拒绝时的操作 +func (m *Matcher) Limit(limiterfn func(*Ctx) *rate.Limiter, postfn ...func(*Ctx)) *Matcher { + m.Rules = append(m.Rules, func(ctx *Ctx) bool { + if limiterfn(ctx).Acquire() { + return true + } + if len(postfn) > 0 { + for _, fn := range postfn { + fn(ctx) + } + } + return false + }) return m } @@ -116,7 +105,7 @@ func (m *Matcher) copy() *Matcher { Type: m.Type, Rules: m.Rules, Block: m.Block, - Priority: m.Priority, + priority: m.priority, Process: m.Process, Temp: m.Temp, Engine: m.Engine, diff --git a/single.go b/single.go new file mode 100644 index 0000000..a4273ce --- /dev/null +++ b/single.go @@ -0,0 +1,61 @@ +package rei + +import ( + "github.com/RomiChan/syncx" +) + +// Option 配置项 +type Option[K comparable] func(*Single[K]) + +// Single 反并发 +type Single[K comparable] struct { + group syncx.Map[K, struct{}] + key func(ctx *Ctx) K + post func(ctx *Ctx) +} + +// WithKeyFn 指定反并发的 Key +func WithKeyFn[K comparable](fn func(ctx *Ctx) K) Option[K] { + return func(s *Single[K]) { + s.key = fn + } +} + +// WithPostFn 指定反并发拦截后的操作 +func WithPostFn[K comparable](fn func(ctx *Ctx)) Option[K] { + return func(s *Single[K]) { + s.post = fn + } +} + +// New 创建反并发中间件 +func New[K comparable](op ...Option[K]) *Single[K] { + s := Single[K]{} + for _, option := range op { + option(&s) + } + return &s +} + +// Apply 为指定 Engine 添加反并发功能 +func (s *Single[K]) Apply(engine *Engine) { + engine.UseMidHandler(func(ctx *Ctx) bool { + if s.key == nil { + return true + } + key := s.key(ctx) + if _, ok := s.group.Load(key); ok { + if s.post != nil { + defer s.post(ctx) + } + return false + } + s.group.Store(key, struct{}{}) + ctx.State["__single-key__"] = key + return true + }) + + engine.UsePostHandler(func(ctx *Ctx) { + s.group.Delete(ctx.State["__single-key__"].(K)) + }) +}