mirror of
https://github.com/fumiama/orbyte.git
synced 2026-06-10 21:24:50 +08:00
feat: add i/o limit
This commit is contained in:
@@ -67,7 +67,7 @@ func (bufpooler[USRDAT]) Parse(obj any, pooled UserBuffer[USRDAT]) UserBuffer[US
|
|||||||
func (bufpooler[USRDAT]) Reset(item *UserBuffer[USRDAT]) {
|
func (bufpooler[USRDAT]) Reset(item *UserBuffer[USRDAT]) {
|
||||||
// See https://golang.org/issue/23199
|
// See https://golang.org/issue/23199
|
||||||
const maxSize = 1 << 16
|
const maxSize = 1 << 16
|
||||||
if item.Cap() > maxSize { // drop large buffer
|
if item.Cap() >= maxSize { // drop large buffer
|
||||||
*item = UserBuffer[USRDAT]{}
|
*item = UserBuffer[USRDAT]{}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
44
pool.go
44
pool.go
@@ -11,8 +11,15 @@ import (
|
|||||||
type Pool[T any] struct {
|
type Pool[T any] struct {
|
||||||
countin int32
|
countin int32
|
||||||
countout int32
|
countout int32
|
||||||
pooler Pooler[T]
|
// 64 bit align
|
||||||
pool sync.Pool
|
|
||||||
|
outlim int32
|
||||||
|
inlim int32
|
||||||
|
// 64 bit align
|
||||||
|
|
||||||
|
pool sync.Pool
|
||||||
|
pooler Pooler[T]
|
||||||
|
|
||||||
noputbak bool
|
noputbak bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -23,6 +30,9 @@ func NewPool[T any](pooler Pooler[T]) *Pool[T] {
|
|||||||
p.pool.New = func() any {
|
p.pool.New = func() any {
|
||||||
return &Item[T]{pool: p}
|
return &Item[T]{pool: p}
|
||||||
}
|
}
|
||||||
|
// default limit
|
||||||
|
p.outlim = 4096
|
||||||
|
p.inlim = 4096
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -33,6 +43,24 @@ func (pool *Pool[T]) SetNoPutBack(on bool) {
|
|||||||
pool.noputbak = on
|
pool.noputbak = on
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LimitOutput will automatically set new item no-autodestroy
|
||||||
|
// if countout > outlim.
|
||||||
|
func (pool *Pool[T]) LimitOutput(n int32) {
|
||||||
|
if n <= 0 {
|
||||||
|
panic("n must > 0")
|
||||||
|
}
|
||||||
|
pool.outlim = n
|
||||||
|
}
|
||||||
|
|
||||||
|
// LimitInputwill automatically set new item no-autodestroy
|
||||||
|
// if countout > inlim.
|
||||||
|
func (pool *Pool[T]) LimitInput(n int32) {
|
||||||
|
if n <= 0 {
|
||||||
|
panic("n must > 0")
|
||||||
|
}
|
||||||
|
pool.inlim = n
|
||||||
|
}
|
||||||
|
|
||||||
func (pool *Pool[T]) incin() {
|
func (pool *Pool[T]) incin() {
|
||||||
atomic.AddInt32(&pool.countin, 1)
|
atomic.AddInt32(&pool.countin, 1)
|
||||||
}
|
}
|
||||||
@@ -51,10 +79,17 @@ func (pool *Pool[T]) decout() {
|
|||||||
|
|
||||||
func (pool *Pool[T]) newempty() *Item[T] {
|
func (pool *Pool[T]) newempty() *Item[T] {
|
||||||
item := pool.pool.Get().(*Item[T])
|
item := pool.pool.Get().(*Item[T])
|
||||||
if item.stat.hasdestroyed() { // is recycled
|
isrecycled := item.stat.hasdestroyed()
|
||||||
|
if isrecycled {
|
||||||
pool.decin()
|
pool.decin()
|
||||||
}
|
}
|
||||||
item.stat = status(0)
|
item.stat = status(0)
|
||||||
|
isfull := atomic.LoadInt32(&pool.countin) > pool.inlim ||
|
||||||
|
atomic.LoadInt32(&pool.countout) > pool.outlim
|
||||||
|
if isfull {
|
||||||
|
// no out log, no reuse
|
||||||
|
return item
|
||||||
|
}
|
||||||
pool.incout()
|
pool.incout()
|
||||||
return item.setautodestroy()
|
return item.setautodestroy()
|
||||||
}
|
}
|
||||||
@@ -66,7 +101,8 @@ func (pool *Pool[T]) put(item *Item[T]) {
|
|||||||
|
|
||||||
item.stat.setdestroyed(true)
|
item.stat.setdestroyed(true)
|
||||||
|
|
||||||
if pool.noputbak {
|
if pool.noputbak ||
|
||||||
|
atomic.LoadInt32(&pool.countin) > pool.inlim {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pool.pool.Put(item)
|
pool.pool.Put(item)
|
||||||
|
|||||||
Reference in New Issue
Block a user