1
0
mirror of https://github.com/fumiama/orbyte.git synced 2026-06-26 23:10:33 +08:00

feat: add ref counter

This commit is contained in:
源文雨
2025-02-25 23:30:58 +09:00
parent 8c60967c65
commit 717b07486e
3 changed files with 52 additions and 7 deletions

36
item.go
View File

@@ -2,6 +2,7 @@ package orbyte
import ( import (
"runtime" "runtime"
"strconv"
"sync/atomic" "sync/atomic"
) )
@@ -15,10 +16,20 @@ type Item[T any] struct {
cfg any cfg any
stat status stat status
ref *Item[T]
refc int32 // refc -1 means transferred / destroyed
val T val T
} }
func (b *Item[T]) incref() {
atomic.AddInt32(&b.refc, 1)
}
func (b *Item[T]) decref() {
atomic.AddInt32(&b.refc, -1)
}
// Trans ownership to a new item and // Trans ownership to a new item and
// destroy original item immediately. // destroy original item immediately.
// //
@@ -27,15 +38,21 @@ type Item[T any] struct {
// Call this function to drop your ownership // Call this function to drop your ownership
// before passing it to another function // before passing it to another function
// that is not controlled by you. // that is not controlled by you.
//
// Avoid to call this function after calling Ref().
func (b *Item[T]) Trans() (tb *Item[T]) { func (b *Item[T]) Trans() (tb *Item[T]) {
if b.stat.hasdestroyed() { if b.stat.hasdestroyed() {
panic("use after destroy") panic("use after destroy")
} }
if b.ref != nil {
panic("cannot trans ref")
}
tb = b.pool.newempty() tb = b.pool.newempty()
*tb = *b *tb = *b
tb.stat = status(atomic.SwapUintptr( tb.stat = status(atomic.SwapUintptr(
(*uintptr)(&b.stat), uintptr(destroyedstatus), (*uintptr)(&b.stat), uintptr(destroyedstatus),
)) ))
tb.refc = 0
tb.stat.setintrans(true) tb.stat.setintrans(true)
b.destroybystat(status(0)) b.destroybystat(status(0))
return tb return tb
@@ -46,6 +63,11 @@ func (b *Item[T]) IsTrans() bool {
return b.stat.isintrans() return b.stat.isintrans()
} }
// IsRef whether this item is a reference.
func (b *Item[T]) IsRef() bool {
return b.ref != nil
}
// Unwrap use value of the item // Unwrap use value of the item
func (b *Item[T]) Unwrap() T { func (b *Item[T]) Unwrap() T {
if b.stat.hasdestroyed() { if b.stat.hasdestroyed() {
@@ -73,6 +95,9 @@ func (b *Item[T]) Ref() (rb *Item[T]) {
} }
rb = b.pool.newempty() rb = b.pool.newempty()
*rb = *b *rb = *b
rb.ref = b
rb.refc = 0
b.incref()
rb.stat.setbuffered(false) rb.stat.setbuffered(false)
rb.stat.setintrans(false) rb.stat.setintrans(false)
return return
@@ -89,9 +114,18 @@ func (b *Item[T]) Copy() (cb *Item[T]) {
} }
func (b *Item[T]) destroybystat(stat status) { func (b *Item[T]) destroybystat(stat status) {
if !atomic.CompareAndSwapInt32(&b.refc, 0, -1) {
if b.refc < 0 {
panic("use imm. after destroy")
}
panic("cannot destroy: " + strconv.Itoa(int(b.refc)) + " refs remained")
}
if b.ref != nil {
defer b.ref.decref()
}
switch { switch {
case stat.hasdestroyed(): case stat.hasdestroyed():
panic("use after destroy") panic("use after put back to pool")
case stat.isintrans(): case stat.isintrans():
var v T var v T
b.val = v b.val = v

View File

@@ -30,13 +30,15 @@ func TestBytesSlice(t *testing.T) {
buf := make([]byte, b.Len()) buf := make([]byte, b.Len())
copy(buf, b.Bytes()) copy(buf, b.Bytes())
// test normal slice // test normal slice
x := b.SliceFrom(5).SliceTo(i - 5 - 5) y := b.SliceFrom(5)
x := y.SliceTo(i - 5 - 5)
if !bytes.Equal(buf[5:i-5], x.Bytes()) { if !bytes.Equal(buf[5:i-5], x.Bytes()) {
t.Log("exp:", hex.EncodeToString(buf[5:i-5])) t.Log("exp:", hex.EncodeToString(buf[5:i-5]))
t.Log("got:", hex.EncodeToString(x.Bytes())) t.Log("got:", hex.EncodeToString(x.Bytes()))
t.Fatal("index", i, "unexpected") t.Fatal("index", i, "unexpected")
} }
x.manualDestroy() x.manualDestroy()
y.manualDestroy()
// test trans slice // test trans slice
b = b.Trans().SliceFrom(5).SliceTo(i - 5 - 5) b = b.Trans().SliceFrom(5).SliceTo(i - 5 - 5)
if !bytes.Equal(buf[5:i-5], b.Bytes()) { if !bytes.Equal(buf[5:i-5], b.Bytes()) {
@@ -103,19 +105,25 @@ func TestBytesCopy(t *testing.T) {
buf := make([]byte, 4096) buf := make([]byte, 4096)
rand.Read(buf) rand.Read(buf)
for i := 10; i < 4096; i++ { for i := 10; i < 4096; i++ {
b := ParseBytes(buf...).Slice(5, i-5).Copy() a := ParseBytes(buf...)
x := a.Slice(5, i-5)
b := x.Copy()
if b.Len() != i-10 { if b.Len() != i-10 {
t.Fatal("index", i, "excpet len", i, "but got", b.Len()) t.Fatal("index", i, "excpet len", i, "but got", b.Len())
} }
rand.Read(b.Bytes()) rand.Read(b.Bytes())
t.Log("org:", hex.EncodeToString(buf[:i])) // t.Log("org:", hex.EncodeToString(buf[:i]))
t.Log("new:", hex.EncodeToString(b.Bytes())) // t.Log("new:", hex.EncodeToString(b.Bytes()))
if bytes.Equal(b.Bytes(), buf[:i]) { if bytes.Equal(b.Bytes(), buf[:i]) {
t.Fatal("index", i, "unexpected") t.Fatal("index", i, "unexpected")
} }
b.manualDestroy() b.manualDestroy()
x.manualDestroy()
a.manualDestroy()
} }
runtime.GC() runtime.GC()
runtime.Gosched()
runtime.GC()
out, in := bufferPool.p.CountItems() out, in := bufferPool.p.CountItems()
t.Log(out, in) t.Log(out, in)
if out != 0 { if out != 0 {
@@ -125,7 +133,7 @@ func TestBytesCopy(t *testing.T) {
func TestBytesTransMultithread(t *testing.T) { func TestBytesTransMultithread(t *testing.T) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for i := 0; i < 4096; i++ { for i := 0; i < 2048; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()

View File

@@ -74,9 +74,12 @@ func (pool *Pool[T]) put(item *Item[T]) {
runtime.SetFinalizer(item, nil) runtime.SetFinalizer(item, nil)
} }
item.stat.setdestroyed(true)
item.cfg = nil item.cfg = nil
item.stat.setdestroyed(true)
item.ref = nil
item.refc = 0
if pool.noputbak { if pool.noputbak {
return return
} }