From 717b07486e38172831c56bfb2714c79934b56035 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Tue, 25 Feb 2025 23:30:58 +0900 Subject: [PATCH] feat: add ref counter --- item.go | 36 +++++++++++++++++++++++++++++++++++- pbuf/bytes_test.go | 18 +++++++++++++----- pool.go | 5 ++++- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/item.go b/item.go index ce08c54..e191f46 100644 --- a/item.go +++ b/item.go @@ -2,6 +2,7 @@ package orbyte import ( "runtime" + "strconv" "sync/atomic" ) @@ -15,10 +16,20 @@ type Item[T any] struct { cfg any stat status + ref *Item[T] + refc int32 // refc -1 means transferred / destroyed val T } +func (b *Item[T]) incref() { + atomic.AddInt32(&b.refc, 1) +} + +func (b *Item[T]) decref() { + atomic.AddInt32(&b.refc, -1) +} + // Trans ownership to a new item and // destroy original item immediately. // @@ -27,15 +38,21 @@ type Item[T any] struct { // Call this function to drop your ownership // before passing it to another function // that is not controlled by you. +// +// Avoid to call this function after calling Ref(). func (b *Item[T]) Trans() (tb *Item[T]) { if b.stat.hasdestroyed() { panic("use after destroy") } + if b.ref != nil { + panic("cannot trans ref") + } tb = b.pool.newempty() *tb = *b tb.stat = status(atomic.SwapUintptr( (*uintptr)(&b.stat), uintptr(destroyedstatus), )) + tb.refc = 0 tb.stat.setintrans(true) b.destroybystat(status(0)) return tb @@ -46,6 +63,11 @@ func (b *Item[T]) IsTrans() bool { return b.stat.isintrans() } +// IsRef whether this item is a reference. +func (b *Item[T]) IsRef() bool { + return b.ref != nil +} + // Unwrap use value of the item func (b *Item[T]) Unwrap() T { if b.stat.hasdestroyed() { @@ -73,6 +95,9 @@ func (b *Item[T]) Ref() (rb *Item[T]) { } rb = b.pool.newempty() *rb = *b + rb.ref = b + rb.refc = 0 + b.incref() rb.stat.setbuffered(false) rb.stat.setintrans(false) return @@ -89,9 +114,18 @@ func (b *Item[T]) Copy() (cb *Item[T]) { } func (b *Item[T]) destroybystat(stat status) { + if !atomic.CompareAndSwapInt32(&b.refc, 0, -1) { + if b.refc < 0 { + panic("use imm. after destroy") + } + panic("cannot destroy: " + strconv.Itoa(int(b.refc)) + " refs remained") + } + if b.ref != nil { + defer b.ref.decref() + } switch { case stat.hasdestroyed(): - panic("use after destroy") + panic("use after put back to pool") case stat.isintrans(): var v T b.val = v diff --git a/pbuf/bytes_test.go b/pbuf/bytes_test.go index d74c763..70c7440 100644 --- a/pbuf/bytes_test.go +++ b/pbuf/bytes_test.go @@ -30,13 +30,15 @@ func TestBytesSlice(t *testing.T) { buf := make([]byte, b.Len()) copy(buf, b.Bytes()) // test normal slice - x := b.SliceFrom(5).SliceTo(i - 5 - 5) + y := b.SliceFrom(5) + x := y.SliceTo(i - 5 - 5) if !bytes.Equal(buf[5:i-5], x.Bytes()) { t.Log("exp:", hex.EncodeToString(buf[5:i-5])) t.Log("got:", hex.EncodeToString(x.Bytes())) t.Fatal("index", i, "unexpected") } x.manualDestroy() + y.manualDestroy() // test trans slice b = b.Trans().SliceFrom(5).SliceTo(i - 5 - 5) if !bytes.Equal(buf[5:i-5], b.Bytes()) { @@ -103,19 +105,25 @@ func TestBytesCopy(t *testing.T) { buf := make([]byte, 4096) rand.Read(buf) for i := 10; i < 4096; i++ { - b := ParseBytes(buf...).Slice(5, i-5).Copy() + a := ParseBytes(buf...) + x := a.Slice(5, i-5) + b := x.Copy() if b.Len() != i-10 { t.Fatal("index", i, "excpet len", i, "but got", b.Len()) } rand.Read(b.Bytes()) - t.Log("org:", hex.EncodeToString(buf[:i])) - t.Log("new:", hex.EncodeToString(b.Bytes())) + // t.Log("org:", hex.EncodeToString(buf[:i])) + // t.Log("new:", hex.EncodeToString(b.Bytes())) if bytes.Equal(b.Bytes(), buf[:i]) { t.Fatal("index", i, "unexpected") } b.manualDestroy() + x.manualDestroy() + a.manualDestroy() } runtime.GC() + runtime.Gosched() + runtime.GC() out, in := bufferPool.p.CountItems() t.Log(out, in) if out != 0 { @@ -125,7 +133,7 @@ func TestBytesCopy(t *testing.T) { func TestBytesTransMultithread(t *testing.T) { wg := sync.WaitGroup{} - for i := 0; i < 4096; i++ { + for i := 0; i < 2048; i++ { wg.Add(1) go func() { defer wg.Done() diff --git a/pool.go b/pool.go index a98ccf0..7197d89 100644 --- a/pool.go +++ b/pool.go @@ -74,9 +74,12 @@ func (pool *Pool[T]) put(item *Item[T]) { runtime.SetFinalizer(item, nil) } - item.stat.setdestroyed(true) item.cfg = nil + item.stat.setdestroyed(true) + item.ref = nil + item.refc = 0 + if pool.noputbak { return }