diff --git a/item.go b/item.go index c2c2f7c..959c40a 100644 --- a/item.go +++ b/item.go @@ -35,6 +35,9 @@ func (b *Item[T]) Trans() T { if b.stat.hasdestroyed() { panic("use after destroy") } + if b.pool.issync { + b.stat.setinsyncop(true) + } val := b.val atomic.StoreUintptr( (*uintptr)(&b.stat), uintptr(destroyedstatus), @@ -47,6 +50,10 @@ func (b *Item[T]) Trans() T { // HasInvolved whether this item is buffered // and will be Reset on putting back. func (b *Item[T]) HasInvolved() bool { + if b.pool.issync { + b.stat.setinsyncop(true) + defer b.stat.setinsyncop(false) + } return b.stat.isbuffered() } @@ -57,6 +64,10 @@ func (b *Item[T]) V(f func(T)) { if b.stat.hasdestroyed() { panic("use after destroy") } + if b.pool.issync { + b.stat.setinsyncop(true) + defer b.stat.setinsyncop(false) + } f(b.val) runtime.KeepAlive(b) } @@ -68,6 +79,10 @@ func (b *Item[T]) P(f func(*T)) { if b.stat.hasdestroyed() { panic("use after destroy") } + if b.pool.issync { + b.stat.setinsyncop(true) + defer b.stat.setinsyncop(false) + } f(&b.val) runtime.KeepAlive(b) } @@ -77,6 +92,10 @@ func (b *Item[T]) Copy() (cb *Item[T]) { if b.stat.hasdestroyed() { panic("use after destroy") } + if b.pool.issync { + b.stat.setinsyncop(true) + defer b.stat.setinsyncop(false) + } cb = b.pool.New(b.cfg) b.pool.pooler.Copy(&cb.val, &b.val) return @@ -100,6 +119,9 @@ func (b *Item[T]) destroybystat(stat status) { // Calling this method only when you're sure that // no one will use it, or it will cause a panic. func (b *Item[T]) ManualDestroy() { + if b.pool.issync { + b.stat.setinsyncop(true) + } runtime.SetFinalizer(b, nil) b.destroybystat(status(atomic.SwapUintptr( (*uintptr)(&b.stat), uintptr(destroyedstatus), diff --git a/pbuf/buffer.go b/pbuf/buffer.go index d449c7b..e25328f 100644 --- a/pbuf/buffer.go +++ b/pbuf/buffer.go @@ -10,23 +10,19 @@ import ( func (bufferPool BufferPool[USRDAT]) NewBuffer( buf []byte, ) *orbyte.Item[UserBuffer[USRDAT]] { - return bufferPool.p.New(buf) + return bufferPool.New(buf) } // InvolveBuffer involve external *bytes.Buffer into Item. func (bufferPool BufferPool[USRDAT]) InvolveBuffer( buf *bytes.Buffer, ) *orbyte.Item[UserBuffer[USRDAT]] { - return bufferPool.p.Involve(buf.Len(), buf) + return bufferPool.Involve(buf.Len(), buf) } // ParseBuffer convert external *bytes.Buffer into Item. func (bufferPool BufferPool[USRDAT]) ParseBuffer( buf *bytes.Buffer, ) *orbyte.Item[UserBuffer[USRDAT]] { - return bufferPool.p.Parse(buf.Len(), buf) -} - -func (bufferPool BufferPool[USRDAT]) CountItems() (outside int32, inside int32) { - return bufferPool.p.CountItems() + return bufferPool.Parse(buf.Len(), buf) } diff --git a/pbuf/buffer_test.go b/pbuf/buffer_test.go index d6dd479..2df0f27 100644 --- a/pbuf/buffer_test.go +++ b/pbuf/buffer_test.go @@ -59,7 +59,7 @@ func testBuffer(buf *OBuffer, t *testing.T) { runtime.Gosched() runtime.GC() - out, in := bufferPool.p.CountItems() + out, in := bufferPool.CountItems() t.Log(out, in) if out != 0 { t.Fail() diff --git a/pbuf/bytes.go b/pbuf/bytes.go index d7484bd..2222d8d 100644 --- a/pbuf/bytes.go +++ b/pbuf/bytes.go @@ -36,7 +36,7 @@ func (b UserBytes[USRDAT]) B(f func([]byte, *USRDAT)) { // NewBytes alloc sz bytes. func (bufferPool BufferPool[USRDAT]) NewBytes(sz int) (b UserBytes[USRDAT]) { - buf := bufferPool.p.New(sz) + buf := bufferPool.New(sz) b.buf = buf buf.P(func(buf *UserBuffer[USRDAT]) { b.b = buf.Len() @@ -46,7 +46,7 @@ func (bufferPool BufferPool[USRDAT]) NewBytes(sz int) (b UserBytes[USRDAT]) { // InvolveBytes involve outside buf into pool. func (bufferPool BufferPool[USRDAT]) InvolveBytes(p ...byte) (b UserBytes[USRDAT]) { - buf := bufferPool.p.Involve(len(p), bytes.NewBuffer(p)) + buf := bufferPool.Involve(len(p), bytes.NewBuffer(p)) b.buf = buf buf.P(func(buf *UserBuffer[USRDAT]) { b.b = buf.Len() @@ -57,7 +57,7 @@ func (bufferPool BufferPool[USRDAT]) InvolveBytes(p ...byte) (b UserBytes[USRDAT // ParseBytes convert outside bytes to Bytes safely // without adding it into pool. func (bufferPool BufferPool[USRDAT]) ParseBytes(p ...byte) (b UserBytes[USRDAT]) { - buf := bufferPool.p.Parse(len(p), bytes.NewBuffer(p)) + buf := bufferPool.Parse(len(p), bytes.NewBuffer(p)) b.buf = buf buf.P(func(buf *UserBuffer[USRDAT]) { b.b = buf.Len() diff --git a/pbuf/bytes_test.go b/pbuf/bytes_test.go index 125273c..58324a2 100644 --- a/pbuf/bytes_test.go +++ b/pbuf/bytes_test.go @@ -37,7 +37,7 @@ func TestBytesSlice(t *testing.T) { runtime.GC() runtime.Gosched() runtime.GC() - out, in := bufferPool.p.CountItems() + out, in := bufferPool.CountItems() t.Log(out, in) if out != 0 { t.Fail() @@ -60,7 +60,7 @@ func TestBytesInvolve(t *testing.T) { } } runtime.GC() - out, in := bufferPool.p.CountItems() + out, in := bufferPool.CountItems() t.Log(out, in) if out != 0 { t.Fail() @@ -80,7 +80,7 @@ func TestBytesParse(t *testing.T) { } } runtime.GC() - out, in := bufferPool.p.CountItems() + out, in := bufferPool.CountItems() t.Log(out, in) if out != 0 { t.Fail() @@ -107,7 +107,7 @@ func TestBytesCopy(t *testing.T) { runtime.GC() runtime.Gosched() runtime.GC() - out, in := bufferPool.p.CountItems() + out, in := bufferPool.CountItems() t.Log(out, in) if out != 0 { t.Fail() diff --git a/pbuf/pbuf.go b/pbuf/pbuf.go index f4dc15d..4661426 100644 --- a/pbuf/pbuf.go +++ b/pbuf/pbuf.go @@ -16,12 +16,12 @@ type ( ) type BufferPool[USRDAT any] struct { - p *orbyte.Pool[UserBuffer[USRDAT]] + *orbyte.Pool[UserBuffer[USRDAT]] } func NewBufferPool[USRDAT any]() BufferPool[USRDAT] { return BufferPool[USRDAT]{ - p: orbyte.NewPool[UserBuffer[USRDAT]](bufpooler[USRDAT]{}), + orbyte.NewPool[UserBuffer[USRDAT]](bufpooler[USRDAT]{}), } } @@ -60,3 +60,23 @@ func ParseBytes(b ...byte) Bytes { func CountItems() (outside int32, inside int32) { return bufferPool.CountItems() } + +// SetNoPutBack see Pool.SetNoPutBack +func SetNoPutBack(on bool) { + bufferPool.SetNoPutBack(on) +} + +// SetSyncItem see Pool.SetSyncItem +func SetSyncItem(on bool) { + bufferPool.SetSyncItem(on) +} + +// LimitInput see Pool.LimitInput +func LimitInput(n int32) { + bufferPool.LimitInput(n) +} + +// LimitInput see Pool.LimitOutput +func LimitOutput(n int32) { + bufferPool.LimitOutput(n) +} diff --git a/pool.go b/pool.go index d373635..a1aa1d0 100644 --- a/pool.go +++ b/pool.go @@ -21,6 +21,7 @@ type Pool[T any] struct { pooler Pooler[T] noputbak bool + issync bool } // NewPool make a new pool from custom pooler. @@ -43,6 +44,13 @@ func (pool *Pool[T]) SetNoPutBack(on bool) { pool.noputbak = on } +// SetSyncItem make it panic on every read-write conflict. +// +// Enable this to detect coding errors. +func (pool *Pool[T]) SetSyncItem(on bool) { + pool.issync = on +} + // LimitOutput will automatically set new item no-autodestroy // if countout > outlim. func (pool *Pool[T]) LimitOutput(n int32) { diff --git a/status.go b/status.go index 920a1ec..f9f3d7f 100644 --- a/status.go +++ b/status.go @@ -5,6 +5,7 @@ import "sync/atomic" const ( statusisbuffered = 1 << iota statusdestroyed + statusinsyncop ) type status uintptr @@ -38,6 +39,24 @@ func (c *status) setbool(v bool, typ uintptr) { } } +// setboolunique panic on non-unique set +func (c *status) setboolunique(v bool, typ uintptr) { + olds := atomic.LoadUintptr((*uintptr)(c)) + oldv := olds&typ != 0 + if oldv == v { + panic("non-unique operation") + } + news := status(olds).mask(v, typ) + for !atomic.CompareAndSwapUintptr((*uintptr)(c), olds, uintptr(news)) { + olds = atomic.LoadUintptr((*uintptr)(c)) + oldv = olds&typ != 0 + if oldv == v { + panic("non-unique operation") + } + news = status(olds).mask(v, typ) + } +} + func (c *status) loadbool(typ uintptr) bool { return atomic.LoadUintptr((*uintptr)(c))&typ != 0 } @@ -57,3 +76,7 @@ func (c *status) hasdestroyed() bool { func (c *status) setdestroyed(v bool) { c.setbool(v, statusdestroyed) } + +func (c *status) setinsyncop(v bool) { + c.setboolunique(v, statusinsyncop) +}