From ac9d26dfb9034a7496859f1db32c548e5eccf623 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Fri, 4 Apr 2025 22:01:47 +0900 Subject: [PATCH] feat: add i/o limit --- pbuf/pooler.go | 2 +- pool.go | 44 ++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/pbuf/pooler.go b/pbuf/pooler.go index dd02ecc..c2faaa4 100644 --- a/pbuf/pooler.go +++ b/pbuf/pooler.go @@ -67,7 +67,7 @@ func (bufpooler[USRDAT]) Parse(obj any, pooled UserBuffer[USRDAT]) UserBuffer[US func (bufpooler[USRDAT]) Reset(item *UserBuffer[USRDAT]) { // See https://golang.org/issue/23199 const maxSize = 1 << 16 - if item.Cap() > maxSize { // drop large buffer + if item.Cap() >= maxSize { // drop large buffer *item = UserBuffer[USRDAT]{} return } diff --git a/pool.go b/pool.go index 8d8cb4a..d373635 100644 --- a/pool.go +++ b/pool.go @@ -11,8 +11,15 @@ import ( type Pool[T any] struct { countin int32 countout int32 - pooler Pooler[T] - pool sync.Pool + // 64 bit align + + outlim int32 + inlim int32 + // 64 bit align + + pool sync.Pool + pooler Pooler[T] + noputbak bool } @@ -23,6 +30,9 @@ func NewPool[T any](pooler Pooler[T]) *Pool[T] { p.pool.New = func() any { return &Item[T]{pool: p} } + // default limit + p.outlim = 4096 + p.inlim = 4096 return p } @@ -33,6 +43,24 @@ func (pool *Pool[T]) SetNoPutBack(on bool) { pool.noputbak = on } +// LimitOutput will automatically set new item no-autodestroy +// if countout > outlim. +func (pool *Pool[T]) LimitOutput(n int32) { + if n <= 0 { + panic("n must > 0") + } + pool.outlim = n +} + +// LimitInputwill automatically set new item no-autodestroy +// if countout > inlim. +func (pool *Pool[T]) LimitInput(n int32) { + if n <= 0 { + panic("n must > 0") + } + pool.inlim = n +} + func (pool *Pool[T]) incin() { atomic.AddInt32(&pool.countin, 1) } @@ -51,10 +79,17 @@ func (pool *Pool[T]) decout() { func (pool *Pool[T]) newempty() *Item[T] { item := pool.pool.Get().(*Item[T]) - if item.stat.hasdestroyed() { // is recycled + isrecycled := item.stat.hasdestroyed() + if isrecycled { pool.decin() } item.stat = status(0) + isfull := atomic.LoadInt32(&pool.countin) > pool.inlim || + atomic.LoadInt32(&pool.countout) > pool.outlim + if isfull { + // no out log, no reuse + return item + } pool.incout() return item.setautodestroy() } @@ -66,7 +101,8 @@ func (pool *Pool[T]) put(item *Item[T]) { item.stat.setdestroyed(true) - if pool.noputbak { + if pool.noputbak || + atomic.LoadInt32(&pool.countin) > pool.inlim { return } pool.pool.Put(item)