diff --git a/item.go b/item.go index 27c5c76..a32a6ad 100644 --- a/item.go +++ b/item.go @@ -2,7 +2,6 @@ package orbyte import ( "runtime" - "strconv" "sync/atomic" ) @@ -16,96 +15,59 @@ type Item[T any] struct { cfg any stat status - ref *Item[T] - refc int32 // refc -1 means transferred / destroyed val T } -func (b *Item[T]) incref() { - atomic.AddInt32(&b.refc, 1) -} - -func (b *Item[T]) decref() { - atomic.AddInt32(&b.refc, -1) -} - -// Trans ownership to a new item and -// destroy original item immediately. +// Trans disable inner val being reset by +// destroy and return a safe copy of val. // -// The value in new item will not be Reset(). +// This method is not thread-safe. +// Only call once on one item. +// The item will be destroyed after calling Trans(). // -// Call this function to drop your ownership -// before passing it to another function -// that is not controlled by you. -// -// Avoid to call this function after calling Ref(). -func (b *Item[T]) Trans() (tb *Item[T]) { +// Use it to drop your ownership +// before passing val (not its pointer) +// to another function that is not controlled by you. +func (b *Item[T]) Trans() T { if b.stat.hasdestroyed() { panic("use after destroy") } - if b.ref != nil { - panic("cannot trans ref") - } - tb = b.pool.newempty() - *tb = *b - tb.stat = status(atomic.SwapUintptr( + val := b.val + atomic.StoreUintptr( (*uintptr)(&b.stat), uintptr(destroyedstatus), - )) - tb.refc = 0 - tb.stat.setintrans(true) - b.destroybystat(status(0)) - return tb + ) + runtime.KeepAlive(b) + b.destroybystat(0) + return val } -// HasInvolved whether this item is buffered. +// HasInvolved whether this item is buffered +// and will be Reset on putting back. func (b *Item[T]) HasInvolved() bool { return b.stat.isbuffered() } -// IsTrans whether this item has been marked as trans. -func (b *Item[T]) IsTrans() bool { - return b.stat.isintrans() -} - -// IsRef whether this item is a reference. -func (b *Item[T]) IsRef() bool { - return b.ref != nil -} - -// Unwrap use value of the item -func (b *Item[T]) Unwrap() T { - if b.stat.hasdestroyed() { - panic("use after destroy") - } - return b.val -} - -// Pointer use pointer value of the item -func (b *Item[T]) Pointer() *T { - if b.stat.hasdestroyed() { - panic("use after destroy") - } - return &b.val -} - -// Ref gens new item without ownership. +// V use value of the item. // -// It's a safe reference, thus calling this -// will not destroy the original item -// comparing with Trans(). -func (b *Item[T]) Ref() (rb *Item[T]) { +// This operation is safe in function f. +func (b *Item[T]) V(f func(T)) { if b.stat.hasdestroyed() { panic("use after destroy") } - rb = b.pool.newempty() - *rb = *b - rb.ref = b - rb.refc = 0 - b.incref() - rb.stat.setbuffered(false) - rb.stat.setintrans(false) - return + f(b.val) + runtime.KeepAlive(b) +} + +// P use pointer value of the item. +// +// This operation is safe in function f. +func (b *Item[T]) P(f func(*T)) { + if b.stat.hasdestroyed() { + panic("use after destroy") + } + f(&b.val) + runtime.KeepAlive(b) } // Copy data completely with separated ownership. @@ -119,21 +81,9 @@ func (b *Item[T]) Copy() (cb *Item[T]) { } func (b *Item[T]) destroybystat(stat status) { - if !atomic.CompareAndSwapInt32(&b.refc, 0, -1) { - if b.refc < 0 { - panic("use imm. after destroy") - } - panic("cannot destroy: " + strconv.Itoa(int(b.refc)) + " refs remained") - } - if b.ref != nil { - defer b.ref.decref() - } switch { case stat.hasdestroyed(): - panic("use after put back to pool") - case stat.isintrans(): - var v T - b.val = v + panic("destroy after destroy") case stat.isbuffered(): b.pool.pooler.Reset(&b.val) default: @@ -158,10 +108,10 @@ func (b *Item[T]) ManualDestroy() { // Only can call once. func (b *Item[T]) setautodestroy() *Item[T] { runtime.SetFinalizer(b, func(item *Item[T]) { - // no one is using, no concurrency issue. if item.stat.hasdestroyed() { panic("unexpected hasdestroyed") } + // no one is using, no concurrency issue. item.destroybystat(item.stat) }) return b diff --git a/pbuf/buffer.go b/pbuf/buffer.go index 93dd842..f391108 100644 --- a/pbuf/buffer.go +++ b/pbuf/buffer.go @@ -2,7 +2,6 @@ package pbuf import ( "bytes" - "unsafe" "github.com/fumiama/orbyte" ) @@ -21,54 +20,3 @@ func (bufferPool BufferPool) InvolveBuffer(buf *bytes.Buffer) *orbyte.Item[bytes func (bufferPool BufferPool) ParseBuffer(buf *bytes.Buffer) *orbyte.Item[bytes.Buffer] { return bufferPool.p.Parse(buf.Len(), buf) } - -// A Buffer is a variable-sized buffer of bytes with Read and Write methods. -// The zero value for Buffer is an empty buffer ready to use. -type buffer struct { - buf []byte // contents are the bytes buf[off : len(buf)] - off int // read at &buf[off], write at &buf[len(buf)] - lastRead readOp // last read operation, so that Unread* can work correctly. -} - -func skip(w *bytes.Buffer, n int) { - if n == 0 { - return - } - b := (*buffer)(unsafe.Pointer(w)) - b.lastRead = opInvalid - if len(b.buf) <= b.off { - // Buffer is empty, reset to recover space. - w.Reset() - return - } - n = minnum(n, len(b.buf[b.off:])) - b.off += n - if n > 0 { - b.lastRead = opRead - } -} - -// The readOp constants describe the last action performed on -// the buffer, so that UnreadRune and UnreadByte can check for -// invalid usage. opReadRuneX constants are chosen such that -// converted to int they correspond to the rune size that was read. -type readOp int8 - -// Don't use iota for these, as the values need to correspond with the -// names and comments, which is easier to see when being explicit. -const ( - opRead readOp = -1 // Any other read operation. - opInvalid readOp = 0 // Non-read operation. - opReadRune1 readOp = 1 // Read rune of size 1. - opReadRune2 readOp = 2 // Read rune of size 2. - opReadRune3 readOp = 3 // Read rune of size 3. - opReadRune4 readOp = 4 // Read rune of size 4. -) - -// minnum 返回两数最小值,该函数将被内联 -func minnum[T int | int8 | uint8 | int16 | uint16 | int32 | uint32 | int64 | uint64](a, b T) T { - if a > b { - return b - } - return a -} diff --git a/pbuf/buffer_test.go b/pbuf/buffer_test.go index ea1ad1a..9d8f139 100644 --- a/pbuf/buffer_test.go +++ b/pbuf/buffer_test.go @@ -20,38 +20,24 @@ func TestBuffer(t *testing.T) { } func testBuffer(buf *orbyte.Item[bytes.Buffer], t *testing.T) { - if buf.Pointer().Len() != 4096 { - io.CopyN(buf.Pointer(), rand.Reader, 4096) - if buf.Pointer().Len() != 4096 { - t.Fatal("got", buf.Pointer().Len()) + buf.P(func(buf *bytes.Buffer) { + if buf.Len() != 4096 { + io.CopyN(buf, rand.Reader, 4096) + if buf.Len() != 4096 { + t.Fatal("got", buf.Len()) + } } - } - + }) bufcp := buf.Copy() - if bufcp.Pointer().Len() != 4096 { - t.Fatal("got", bufcp.Pointer().Len()) - } - if !bytes.Equal(bufcp.Pointer().Bytes(), buf.Pointer().Bytes()) { - t.Fatal("unexpected") - } - - bufr := buf.Ref() - if bufr.Pointer().Len() != 4096 { - t.Fatal("got", bufr.Pointer().Len()) - } - if !bytes.Equal(bufr.Pointer().Bytes(), buf.Pointer().Bytes()) { - t.Fatal("unexpected") - } - bufr.ManualDestroy() - - bufcp = bufcp.Trans() - if bufcp.Pointer().Len() != 4096 { - t.Fatal("got", bufcp.Pointer().Len()) - } - if !bytes.Equal(bufcp.Pointer().Bytes(), buf.Pointer().Bytes()) { - t.Fatal("unexpected") - } - bufcp.ManualDestroy() + dat := buf.Trans() + bufcp.P(func(bufcp *bytes.Buffer) { + if bufcp.Len() != 4096 { + t.Fatal("got", bufcp.Len()) + } + if !bytes.Equal(bufcp.Bytes(), dat.Bytes()) { + t.Fatal("unexpected") + } + }) runtime.GC() runtime.Gosched() diff --git a/pbuf/bytes.go b/pbuf/bytes.go index 3f07b07..cca2245 100644 --- a/pbuf/bytes.go +++ b/pbuf/bytes.go @@ -2,7 +2,6 @@ package pbuf import ( "bytes" - "runtime" "github.com/fumiama/orbyte" ) @@ -10,8 +9,8 @@ import ( // Bytes wrap pooled buffer into []byte // while sharing the same pool. type Bytes struct { - buf *orbyte.Item[bytes.Buffer] - dat []byte + buf *orbyte.Item[bytes.Buffer] + a, b int } // BufferItemToBytes convert between *orbyte.Item[bytes.Buffer] @@ -19,27 +18,43 @@ type Bytes struct { // // Please notice that Bytes cannnot convert back to // *orbyte.Item[bytes.Buffer] again. -func BufferItemToBytes(buf *orbyte.Item[bytes.Buffer]) Bytes { - return Bytes{buf: buf, dat: buf.Pointer().Bytes()} +func BufferItemToBytes(buf *orbyte.Item[bytes.Buffer]) (b Bytes) { + b.buf = buf + buf.P(func(buf *bytes.Buffer) { + b.b = buf.Len() + }) + return } // NewBytes alloc sz bytes. -func (bufferPool BufferPool) NewBytes(sz int) Bytes { +func (bufferPool BufferPool) NewBytes(sz int) (b Bytes) { buf := bufferPool.p.New(sz) - return Bytes{buf: buf, dat: buf.Pointer().Bytes()[:sz]} + b.buf = buf + buf.P(func(buf *bytes.Buffer) { + b.b = buf.Len() + }) + return } // InvolveBytes involve outside buf into pool. -func (bufferPool BufferPool) InvolveBytes(b ...byte) Bytes { - buf := bufferPool.p.Involve(len(b), bytes.NewBuffer(b)) - return Bytes{buf: buf, dat: buf.Pointer().Bytes()[:len(b)]} +func (bufferPool BufferPool) InvolveBytes(p ...byte) (b Bytes) { + buf := bufferPool.p.Involve(len(p), bytes.NewBuffer(p)) + b.buf = buf + buf.P(func(buf *bytes.Buffer) { + b.b = buf.Len() + }) + return } // ParseBytes convert outside bytes to Bytes safely // without adding it into pool. -func (bufferPool BufferPool) ParseBytes(b ...byte) Bytes { - buf := bufferPool.p.Parse(len(b), bytes.NewBuffer(b)) - return Bytes{buf: buf, dat: buf.Pointer().Bytes()} +func (bufferPool BufferPool) ParseBytes(p ...byte) (b Bytes) { + buf := bufferPool.p.Parse(len(p), bytes.NewBuffer(p)) + b.buf = buf + buf.P(func(buf *bytes.Buffer) { + b.b = buf.Len() + }) + return } // HasInit whether this Bytes is made by pool or @@ -49,89 +64,49 @@ func (b Bytes) HasInit() bool { } // Trans please refer to Item.Trans(). -func (b Bytes) Trans() (tb Bytes) { - tb.buf = b.buf.Trans() - tb.dat = b.dat - return +func (b Bytes) Trans() []byte { + buf := b.buf.Trans() + return buf.Bytes()[b.a:b.b] } // Len of slice. func (b Bytes) Len() int { - return len(b.dat) + return b.b - b.a } // Cap of slice. -func (b Bytes) Cap() int { - return cap(b.dat) +func (b Bytes) Cap() (c int) { + b.buf.P(func(b *bytes.Buffer) { + c = b.Cap() + }) + return c } -// Bytes is the inner value. -func (b Bytes) Bytes() []byte { - return b.dat -} - -// Ref please refer to Item.Ref(). -func (b Bytes) Ref() (rb Bytes) { - rb.buf = b.buf.Ref() - rb.dat = b.dat - return +// V use the inner value safely +func (b Bytes) V(f func([]byte)) { + b.buf.P(func(buf *bytes.Buffer) { + f(buf.Bytes()[b.a:b.b]) + }) } // Copy please refer to Item.Copy(). func (b Bytes) Copy() (cb Bytes) { cb.buf = b.buf.Copy() - cb.dat = cb.buf.Pointer().Bytes() + cb.a, cb.b = b.a, b.b return } // SliceFrom dat[from:] with Ref. func (b Bytes) SliceFrom(from int) Bytes { - if b.buf.IsTrans() { - if b.buf.HasInvolved() { - return InvolveBytes(b.dat[from:]...) - } - return ParseBytes(b.dat[from:]...) - } - nb := b.Ref() - skip(nb.buf.Pointer(), from) - nb.dat = b.dat[from:] - return nb + return Bytes{buf: b.buf, a: b.a + from, b: b.b} } // SliceTo dat[:to] with Ref. func (b Bytes) SliceTo(to int) Bytes { - if b.buf.IsTrans() { - if b.buf.HasInvolved() { - return InvolveBytes(b.dat[:to]...) - } - return ParseBytes(b.dat[:to]...) - } - nb := b.Ref() - nb.buf.Pointer().Truncate(to) - nb.dat = b.dat[:to] - return nb + return Bytes{buf: b.buf, a: b.a, b: b.a + to} } // Slice dat[from:to] with Ref. func (b Bytes) Slice(from, to int) Bytes { - if b.buf.IsTrans() { - if b.buf.HasInvolved() { - return InvolveBytes(b.dat[from:to]...) - } - return ParseBytes(b.dat[from:to]...) - } - nb := b.Ref() - buf := nb.buf.Pointer() - skip(buf, from) - buf.Truncate(to - from) - nb.dat = b.dat[from:to] - return nb -} - -// KeepAlive marks Bytes value as reachable. -func (b Bytes) KeepAlive() { - _ = b.buf - _ = b.dat - runtime.KeepAlive(b.buf) - runtime.KeepAlive(b.dat) + return Bytes{buf: b.buf, a: b.a + from, b: b.a + to} } diff --git a/pbuf/bytes_test.go b/pbuf/bytes_test.go index 70c7440..125273c 100644 --- a/pbuf/bytes_test.go +++ b/pbuf/bytes_test.go @@ -11,13 +11,6 @@ import ( "time" ) -// manualDestroy please refer to Item.manualDestroy(). -// -// Only for test purposes. -func (b Bytes) manualDestroy() { - b.buf.ManualDestroy() -} - // TestBytesSlice sometimes fails at first run because // GC not collecting all unused items. func TestBytesSlice(t *testing.T) { @@ -26,27 +19,20 @@ func TestBytesSlice(t *testing.T) { if b.Len() != i { t.Fatal("index", i, "excpet len", i, "but got", b.Len()) } - rand.Read(b.Bytes()) + b.V(func(b []byte) { + rand.Read(b) + }) buf := make([]byte, b.Len()) - copy(buf, b.Bytes()) - // test normal slice - y := b.SliceFrom(5) - x := y.SliceTo(i - 5 - 5) - if !bytes.Equal(buf[5:i-5], x.Bytes()) { + b.V(func(b []byte) { + copy(buf, b) + }) + x := b.SliceFrom(5).SliceTo(i - 5 - 5) + dat := x.Trans() + if !bytes.Equal(buf[5:i-5], dat) { t.Log("exp:", hex.EncodeToString(buf[5:i-5])) - t.Log("got:", hex.EncodeToString(x.Bytes())) + t.Log("got:", hex.EncodeToString(dat)) t.Fatal("index", i, "unexpected") } - x.manualDestroy() - y.manualDestroy() - // test trans slice - b = b.Trans().SliceFrom(5).SliceTo(i - 5 - 5) - if !bytes.Equal(buf[5:i-5], b.Bytes()) { - t.Log("exp:", hex.EncodeToString(buf[5:i-5])) - t.Log("got:", hex.EncodeToString(b.Bytes())) - t.Fatal("index", i, "unexpected") - } - b.manualDestroy() } runtime.GC() runtime.Gosched() @@ -66,11 +52,12 @@ func TestBytesInvolve(t *testing.T) { if b.Len() != i { t.Fatal("index", i, "excpet len", i, "but got", b.Len()) } - rand.Read(b.Bytes()) - if !bytes.Equal(b.Bytes(), buf[:i]) { + b.V(func(b []byte) { + rand.Read(b) + }) + if !bytes.Equal(b.Trans(), buf[:i]) { t.Fatal("index", i, "unexpected") } - b.manualDestroy() } runtime.GC() out, in := bufferPool.p.CountItems() @@ -88,10 +75,9 @@ func TestBytesParse(t *testing.T) { if b.Len() != i { t.Fatal("index", i, "excpet len", i, "but got", b.Len()) } - if !bytes.Equal(b.Bytes(), buf[:i]) { + if !bytes.Equal(b.Trans(), buf[:i]) { t.Fatal("index", i, "unexpected") } - b.manualDestroy() } runtime.GC() out, in := bufferPool.p.CountItems() @@ -111,15 +97,12 @@ func TestBytesCopy(t *testing.T) { if b.Len() != i-10 { t.Fatal("index", i, "excpet len", i, "but got", b.Len()) } - rand.Read(b.Bytes()) - // t.Log("org:", hex.EncodeToString(buf[:i])) - // t.Log("new:", hex.EncodeToString(b.Bytes())) - if bytes.Equal(b.Bytes(), buf[:i]) { + b.V(func(b []byte) { + rand.Read(b) + }) + if bytes.Equal(b.Trans(), buf[:i]) { t.Fatal("index", i, "unexpected") } - b.manualDestroy() - x.manualDestroy() - a.manualDestroy() } runtime.GC() runtime.Gosched() @@ -141,15 +124,16 @@ func TestBytesTransMultithread(t *testing.T) { buf := NewBytes(65536) refer := make([]byte, 65536) rand.Read(refer) - copy(buf.Bytes(), refer) + buf.V(func(b []byte) { + copy(b, refer) + }) wg.Add(1) - go func(buf Bytes) { + go func(buf []byte) { defer wg.Done() time.Sleep(time.Millisecond * time.Duration(mrand.Intn(10))) - if !bytes.Equal(refer, buf.Bytes()) { + if !bytes.Equal(refer, buf) { panic("unexpected") } - buf.manualDestroy() }(buf.Trans()) }() } diff --git a/pool.go b/pool.go index 7197d89..af0a305 100644 --- a/pool.go +++ b/pool.go @@ -77,8 +77,6 @@ func (pool *Pool[T]) put(item *Item[T]) { item.cfg = nil item.stat.setdestroyed(true) - item.ref = nil - item.refc = 0 if pool.noputbak { return diff --git a/pool_test.go b/pool_test.go index 3e00c3e..d91c597 100644 --- a/pool_test.go +++ b/pool_test.go @@ -32,14 +32,10 @@ func TestPool(t *testing.T) { wg := sync.WaitGroup{} for i := 0; i < 4096; i++ { item := p.New(i) - for j := 0; j < 16; j++ { - wg.Add(1) - user(item.Ref(), &wg) - wg.Add(1) - go usernodestroy(item.Copy(), &wg) - } wg.Add(1) - go usernodestroy(item.Trans(), &wg) + go useranddes(item.Copy(), &wg) + wg.Add(1) + go userv(item.Trans(), &wg) } wg.Wait() runtime.GC() @@ -50,15 +46,17 @@ func TestPool(t *testing.T) { } } -func user(item *Item[[]byte], wg *sync.WaitGroup) { +func useranddes(item *Item[[]byte], wg *sync.WaitGroup) { defer wg.Done() - rand.Read(item.Unwrap()) + item.V(func(b []byte) { + rand.Read(b) + }) item.ManualDestroy() } -func usernodestroy(item *Item[[]byte], wg *sync.WaitGroup) { +func userv(b []byte, wg *sync.WaitGroup) { defer wg.Done() - rand.Read(item.Unwrap()) + rand.Read(b) } type simplepooler struct{} diff --git a/status.go b/status.go index e2e9707..920a1ec 100644 --- a/status.go +++ b/status.go @@ -5,7 +5,6 @@ import "sync/atomic" const ( statusisbuffered = 1 << iota statusdestroyed - statusisintrans ) type status uintptr @@ -58,11 +57,3 @@ func (c *status) hasdestroyed() bool { func (c *status) setdestroyed(v bool) { c.setbool(v, statusdestroyed) } - -func (c *status) isintrans() bool { - return c.loadbool(statusisintrans) -} - -func (c *status) setintrans(v bool) { - c.setbool(v, statusisintrans) -}