1
0
mirror of https://github.com/fumiama/orbyte.git synced 2026-06-05 02:00:30 +08:00

fix(all): impl. new apis to make sure safety

This commit is contained in:
源文雨
2025-02-26 14:50:39 +09:00
parent 190281785c
commit 72938b1e5f
8 changed files with 130 additions and 300 deletions

120
item.go
View File

@@ -2,7 +2,6 @@ package orbyte
import (
"runtime"
"strconv"
"sync/atomic"
)
@@ -16,96 +15,59 @@ type Item[T any] struct {
cfg any
stat status
ref *Item[T]
refc int32 // refc -1 means transferred / destroyed
val T
}
func (b *Item[T]) incref() {
atomic.AddInt32(&b.refc, 1)
}
func (b *Item[T]) decref() {
atomic.AddInt32(&b.refc, -1)
}
// Trans ownership to a new item and
// destroy original item immediately.
// Trans disable inner val being reset by
// destroy and return a safe copy of val.
//
// The value in new item will not be Reset().
// This method is not thread-safe.
// Only call once on one item.
// The item will be destroyed after calling Trans().
//
// Call this function to drop your ownership
// before passing it to another function
// that is not controlled by you.
//
// Avoid to call this function after calling Ref().
func (b *Item[T]) Trans() (tb *Item[T]) {
// Use it to drop your ownership
// before passing val (not its pointer)
// to another function that is not controlled by you.
func (b *Item[T]) Trans() T {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
if b.ref != nil {
panic("cannot trans ref")
}
tb = b.pool.newempty()
*tb = *b
tb.stat = status(atomic.SwapUintptr(
val := b.val
atomic.StoreUintptr(
(*uintptr)(&b.stat), uintptr(destroyedstatus),
))
tb.refc = 0
tb.stat.setintrans(true)
b.destroybystat(status(0))
return tb
)
runtime.KeepAlive(b)
b.destroybystat(0)
return val
}
// HasInvolved whether this item is buffered.
// HasInvolved whether this item is buffered
// and will be Reset on putting back.
func (b *Item[T]) HasInvolved() bool {
return b.stat.isbuffered()
}
// IsTrans whether this item has been marked as trans.
func (b *Item[T]) IsTrans() bool {
return b.stat.isintrans()
}
// IsRef whether this item is a reference.
func (b *Item[T]) IsRef() bool {
return b.ref != nil
}
// Unwrap use value of the item
func (b *Item[T]) Unwrap() T {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
return b.val
}
// Pointer use pointer value of the item
func (b *Item[T]) Pointer() *T {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
return &b.val
}
// Ref gens new item without ownership.
// V use value of the item.
//
// It's a safe reference, thus calling this
// will not destroy the original item
// comparing with Trans().
func (b *Item[T]) Ref() (rb *Item[T]) {
// This operation is safe in function f.
func (b *Item[T]) V(f func(T)) {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
rb = b.pool.newempty()
*rb = *b
rb.ref = b
rb.refc = 0
b.incref()
rb.stat.setbuffered(false)
rb.stat.setintrans(false)
return
f(b.val)
runtime.KeepAlive(b)
}
// P use pointer value of the item.
//
// This operation is safe in function f.
func (b *Item[T]) P(f func(*T)) {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
f(&b.val)
runtime.KeepAlive(b)
}
// Copy data completely with separated ownership.
@@ -119,21 +81,9 @@ func (b *Item[T]) Copy() (cb *Item[T]) {
}
func (b *Item[T]) destroybystat(stat status) {
if !atomic.CompareAndSwapInt32(&b.refc, 0, -1) {
if b.refc < 0 {
panic("use imm. after destroy")
}
panic("cannot destroy: " + strconv.Itoa(int(b.refc)) + " refs remained")
}
if b.ref != nil {
defer b.ref.decref()
}
switch {
case stat.hasdestroyed():
panic("use after put back to pool")
case stat.isintrans():
var v T
b.val = v
panic("destroy after destroy")
case stat.isbuffered():
b.pool.pooler.Reset(&b.val)
default:
@@ -158,10 +108,10 @@ func (b *Item[T]) ManualDestroy() {
// Only can call once.
func (b *Item[T]) setautodestroy() *Item[T] {
runtime.SetFinalizer(b, func(item *Item[T]) {
// no one is using, no concurrency issue.
if item.stat.hasdestroyed() {
panic("unexpected hasdestroyed")
}
// no one is using, no concurrency issue.
item.destroybystat(item.stat)
})
return b

View File

@@ -2,7 +2,6 @@ package pbuf
import (
"bytes"
"unsafe"
"github.com/fumiama/orbyte"
)
@@ -21,54 +20,3 @@ func (bufferPool BufferPool) InvolveBuffer(buf *bytes.Buffer) *orbyte.Item[bytes
func (bufferPool BufferPool) ParseBuffer(buf *bytes.Buffer) *orbyte.Item[bytes.Buffer] {
return bufferPool.p.Parse(buf.Len(), buf)
}
// A Buffer is a variable-sized buffer of bytes with Read and Write methods.
// The zero value for Buffer is an empty buffer ready to use.
type buffer struct {
buf []byte // contents are the bytes buf[off : len(buf)]
off int // read at &buf[off], write at &buf[len(buf)]
lastRead readOp // last read operation, so that Unread* can work correctly.
}
func skip(w *bytes.Buffer, n int) {
if n == 0 {
return
}
b := (*buffer)(unsafe.Pointer(w))
b.lastRead = opInvalid
if len(b.buf) <= b.off {
// Buffer is empty, reset to recover space.
w.Reset()
return
}
n = minnum(n, len(b.buf[b.off:]))
b.off += n
if n > 0 {
b.lastRead = opRead
}
}
// The readOp constants describe the last action performed on
// the buffer, so that UnreadRune and UnreadByte can check for
// invalid usage. opReadRuneX constants are chosen such that
// converted to int they correspond to the rune size that was read.
type readOp int8
// Don't use iota for these, as the values need to correspond with the
// names and comments, which is easier to see when being explicit.
const (
opRead readOp = -1 // Any other read operation.
opInvalid readOp = 0 // Non-read operation.
opReadRune1 readOp = 1 // Read rune of size 1.
opReadRune2 readOp = 2 // Read rune of size 2.
opReadRune3 readOp = 3 // Read rune of size 3.
opReadRune4 readOp = 4 // Read rune of size 4.
)
// minnum 返回两数最小值,该函数将被内联
func minnum[T int | int8 | uint8 | int16 | uint16 | int32 | uint32 | int64 | uint64](a, b T) T {
if a > b {
return b
}
return a
}

View File

@@ -20,38 +20,24 @@ func TestBuffer(t *testing.T) {
}
func testBuffer(buf *orbyte.Item[bytes.Buffer], t *testing.T) {
if buf.Pointer().Len() != 4096 {
io.CopyN(buf.Pointer(), rand.Reader, 4096)
if buf.Pointer().Len() != 4096 {
t.Fatal("got", buf.Pointer().Len())
buf.P(func(buf *bytes.Buffer) {
if buf.Len() != 4096 {
io.CopyN(buf, rand.Reader, 4096)
if buf.Len() != 4096 {
t.Fatal("got", buf.Len())
}
}
}
})
bufcp := buf.Copy()
if bufcp.Pointer().Len() != 4096 {
t.Fatal("got", bufcp.Pointer().Len())
}
if !bytes.Equal(bufcp.Pointer().Bytes(), buf.Pointer().Bytes()) {
t.Fatal("unexpected")
}
bufr := buf.Ref()
if bufr.Pointer().Len() != 4096 {
t.Fatal("got", bufr.Pointer().Len())
}
if !bytes.Equal(bufr.Pointer().Bytes(), buf.Pointer().Bytes()) {
t.Fatal("unexpected")
}
bufr.ManualDestroy()
bufcp = bufcp.Trans()
if bufcp.Pointer().Len() != 4096 {
t.Fatal("got", bufcp.Pointer().Len())
}
if !bytes.Equal(bufcp.Pointer().Bytes(), buf.Pointer().Bytes()) {
t.Fatal("unexpected")
}
bufcp.ManualDestroy()
dat := buf.Trans()
bufcp.P(func(bufcp *bytes.Buffer) {
if bufcp.Len() != 4096 {
t.Fatal("got", bufcp.Len())
}
if !bytes.Equal(bufcp.Bytes(), dat.Bytes()) {
t.Fatal("unexpected")
}
})
runtime.GC()
runtime.Gosched()

View File

@@ -2,7 +2,6 @@ package pbuf
import (
"bytes"
"runtime"
"github.com/fumiama/orbyte"
)
@@ -10,8 +9,8 @@ import (
// Bytes wrap pooled buffer into []byte
// while sharing the same pool.
type Bytes struct {
buf *orbyte.Item[bytes.Buffer]
dat []byte
buf *orbyte.Item[bytes.Buffer]
a, b int
}
// BufferItemToBytes convert between *orbyte.Item[bytes.Buffer]
@@ -19,27 +18,43 @@ type Bytes struct {
//
// Please notice that Bytes cannnot convert back to
// *orbyte.Item[bytes.Buffer] again.
func BufferItemToBytes(buf *orbyte.Item[bytes.Buffer]) Bytes {
return Bytes{buf: buf, dat: buf.Pointer().Bytes()}
func BufferItemToBytes(buf *orbyte.Item[bytes.Buffer]) (b Bytes) {
b.buf = buf
buf.P(func(buf *bytes.Buffer) {
b.b = buf.Len()
})
return
}
// NewBytes alloc sz bytes.
func (bufferPool BufferPool) NewBytes(sz int) Bytes {
func (bufferPool BufferPool) NewBytes(sz int) (b Bytes) {
buf := bufferPool.p.New(sz)
return Bytes{buf: buf, dat: buf.Pointer().Bytes()[:sz]}
b.buf = buf
buf.P(func(buf *bytes.Buffer) {
b.b = buf.Len()
})
return
}
// InvolveBytes involve outside buf into pool.
func (bufferPool BufferPool) InvolveBytes(b ...byte) Bytes {
buf := bufferPool.p.Involve(len(b), bytes.NewBuffer(b))
return Bytes{buf: buf, dat: buf.Pointer().Bytes()[:len(b)]}
func (bufferPool BufferPool) InvolveBytes(p ...byte) (b Bytes) {
buf := bufferPool.p.Involve(len(p), bytes.NewBuffer(p))
b.buf = buf
buf.P(func(buf *bytes.Buffer) {
b.b = buf.Len()
})
return
}
// ParseBytes convert outside bytes to Bytes safely
// without adding it into pool.
func (bufferPool BufferPool) ParseBytes(b ...byte) Bytes {
buf := bufferPool.p.Parse(len(b), bytes.NewBuffer(b))
return Bytes{buf: buf, dat: buf.Pointer().Bytes()}
func (bufferPool BufferPool) ParseBytes(p ...byte) (b Bytes) {
buf := bufferPool.p.Parse(len(p), bytes.NewBuffer(p))
b.buf = buf
buf.P(func(buf *bytes.Buffer) {
b.b = buf.Len()
})
return
}
// HasInit whether this Bytes is made by pool or
@@ -49,89 +64,49 @@ func (b Bytes) HasInit() bool {
}
// Trans please refer to Item.Trans().
func (b Bytes) Trans() (tb Bytes) {
tb.buf = b.buf.Trans()
tb.dat = b.dat
return
func (b Bytes) Trans() []byte {
buf := b.buf.Trans()
return buf.Bytes()[b.a:b.b]
}
// Len of slice.
func (b Bytes) Len() int {
return len(b.dat)
return b.b - b.a
}
// Cap of slice.
func (b Bytes) Cap() int {
return cap(b.dat)
func (b Bytes) Cap() (c int) {
b.buf.P(func(b *bytes.Buffer) {
c = b.Cap()
})
return c
}
// Bytes is the inner value.
func (b Bytes) Bytes() []byte {
return b.dat
}
// Ref please refer to Item.Ref().
func (b Bytes) Ref() (rb Bytes) {
rb.buf = b.buf.Ref()
rb.dat = b.dat
return
// V use the inner value safely
func (b Bytes) V(f func([]byte)) {
b.buf.P(func(buf *bytes.Buffer) {
f(buf.Bytes()[b.a:b.b])
})
}
// Copy please refer to Item.Copy().
func (b Bytes) Copy() (cb Bytes) {
cb.buf = b.buf.Copy()
cb.dat = cb.buf.Pointer().Bytes()
cb.a, cb.b = b.a, b.b
return
}
// SliceFrom dat[from:] with Ref.
func (b Bytes) SliceFrom(from int) Bytes {
if b.buf.IsTrans() {
if b.buf.HasInvolved() {
return InvolveBytes(b.dat[from:]...)
}
return ParseBytes(b.dat[from:]...)
}
nb := b.Ref()
skip(nb.buf.Pointer(), from)
nb.dat = b.dat[from:]
return nb
return Bytes{buf: b.buf, a: b.a + from, b: b.b}
}
// SliceTo dat[:to] with Ref.
func (b Bytes) SliceTo(to int) Bytes {
if b.buf.IsTrans() {
if b.buf.HasInvolved() {
return InvolveBytes(b.dat[:to]...)
}
return ParseBytes(b.dat[:to]...)
}
nb := b.Ref()
nb.buf.Pointer().Truncate(to)
nb.dat = b.dat[:to]
return nb
return Bytes{buf: b.buf, a: b.a, b: b.a + to}
}
// Slice dat[from:to] with Ref.
func (b Bytes) Slice(from, to int) Bytes {
if b.buf.IsTrans() {
if b.buf.HasInvolved() {
return InvolveBytes(b.dat[from:to]...)
}
return ParseBytes(b.dat[from:to]...)
}
nb := b.Ref()
buf := nb.buf.Pointer()
skip(buf, from)
buf.Truncate(to - from)
nb.dat = b.dat[from:to]
return nb
}
// KeepAlive marks Bytes value as reachable.
func (b Bytes) KeepAlive() {
_ = b.buf
_ = b.dat
runtime.KeepAlive(b.buf)
runtime.KeepAlive(b.dat)
return Bytes{buf: b.buf, a: b.a + from, b: b.a + to}
}

View File

@@ -11,13 +11,6 @@ import (
"time"
)
// manualDestroy please refer to Item.manualDestroy().
//
// Only for test purposes.
func (b Bytes) manualDestroy() {
b.buf.ManualDestroy()
}
// TestBytesSlice sometimes fails at first run because
// GC not collecting all unused items.
func TestBytesSlice(t *testing.T) {
@@ -26,27 +19,20 @@ func TestBytesSlice(t *testing.T) {
if b.Len() != i {
t.Fatal("index", i, "excpet len", i, "but got", b.Len())
}
rand.Read(b.Bytes())
b.V(func(b []byte) {
rand.Read(b)
})
buf := make([]byte, b.Len())
copy(buf, b.Bytes())
// test normal slice
y := b.SliceFrom(5)
x := y.SliceTo(i - 5 - 5)
if !bytes.Equal(buf[5:i-5], x.Bytes()) {
b.V(func(b []byte) {
copy(buf, b)
})
x := b.SliceFrom(5).SliceTo(i - 5 - 5)
dat := x.Trans()
if !bytes.Equal(buf[5:i-5], dat) {
t.Log("exp:", hex.EncodeToString(buf[5:i-5]))
t.Log("got:", hex.EncodeToString(x.Bytes()))
t.Log("got:", hex.EncodeToString(dat))
t.Fatal("index", i, "unexpected")
}
x.manualDestroy()
y.manualDestroy()
// test trans slice
b = b.Trans().SliceFrom(5).SliceTo(i - 5 - 5)
if !bytes.Equal(buf[5:i-5], b.Bytes()) {
t.Log("exp:", hex.EncodeToString(buf[5:i-5]))
t.Log("got:", hex.EncodeToString(b.Bytes()))
t.Fatal("index", i, "unexpected")
}
b.manualDestroy()
}
runtime.GC()
runtime.Gosched()
@@ -66,11 +52,12 @@ func TestBytesInvolve(t *testing.T) {
if b.Len() != i {
t.Fatal("index", i, "excpet len", i, "but got", b.Len())
}
rand.Read(b.Bytes())
if !bytes.Equal(b.Bytes(), buf[:i]) {
b.V(func(b []byte) {
rand.Read(b)
})
if !bytes.Equal(b.Trans(), buf[:i]) {
t.Fatal("index", i, "unexpected")
}
b.manualDestroy()
}
runtime.GC()
out, in := bufferPool.p.CountItems()
@@ -88,10 +75,9 @@ func TestBytesParse(t *testing.T) {
if b.Len() != i {
t.Fatal("index", i, "excpet len", i, "but got", b.Len())
}
if !bytes.Equal(b.Bytes(), buf[:i]) {
if !bytes.Equal(b.Trans(), buf[:i]) {
t.Fatal("index", i, "unexpected")
}
b.manualDestroy()
}
runtime.GC()
out, in := bufferPool.p.CountItems()
@@ -111,15 +97,12 @@ func TestBytesCopy(t *testing.T) {
if b.Len() != i-10 {
t.Fatal("index", i, "excpet len", i, "but got", b.Len())
}
rand.Read(b.Bytes())
// t.Log("org:", hex.EncodeToString(buf[:i]))
// t.Log("new:", hex.EncodeToString(b.Bytes()))
if bytes.Equal(b.Bytes(), buf[:i]) {
b.V(func(b []byte) {
rand.Read(b)
})
if bytes.Equal(b.Trans(), buf[:i]) {
t.Fatal("index", i, "unexpected")
}
b.manualDestroy()
x.manualDestroy()
a.manualDestroy()
}
runtime.GC()
runtime.Gosched()
@@ -141,15 +124,16 @@ func TestBytesTransMultithread(t *testing.T) {
buf := NewBytes(65536)
refer := make([]byte, 65536)
rand.Read(refer)
copy(buf.Bytes(), refer)
buf.V(func(b []byte) {
copy(b, refer)
})
wg.Add(1)
go func(buf Bytes) {
go func(buf []byte) {
defer wg.Done()
time.Sleep(time.Millisecond * time.Duration(mrand.Intn(10)))
if !bytes.Equal(refer, buf.Bytes()) {
if !bytes.Equal(refer, buf) {
panic("unexpected")
}
buf.manualDestroy()
}(buf.Trans())
}()
}

View File

@@ -77,8 +77,6 @@ func (pool *Pool[T]) put(item *Item[T]) {
item.cfg = nil
item.stat.setdestroyed(true)
item.ref = nil
item.refc = 0
if pool.noputbak {
return

View File

@@ -32,14 +32,10 @@ func TestPool(t *testing.T) {
wg := sync.WaitGroup{}
for i := 0; i < 4096; i++ {
item := p.New(i)
for j := 0; j < 16; j++ {
wg.Add(1)
user(item.Ref(), &wg)
wg.Add(1)
go usernodestroy(item.Copy(), &wg)
}
wg.Add(1)
go usernodestroy(item.Trans(), &wg)
go useranddes(item.Copy(), &wg)
wg.Add(1)
go userv(item.Trans(), &wg)
}
wg.Wait()
runtime.GC()
@@ -50,15 +46,17 @@ func TestPool(t *testing.T) {
}
}
func user(item *Item[[]byte], wg *sync.WaitGroup) {
func useranddes(item *Item[[]byte], wg *sync.WaitGroup) {
defer wg.Done()
rand.Read(item.Unwrap())
item.V(func(b []byte) {
rand.Read(b)
})
item.ManualDestroy()
}
func usernodestroy(item *Item[[]byte], wg *sync.WaitGroup) {
func userv(b []byte, wg *sync.WaitGroup) {
defer wg.Done()
rand.Read(item.Unwrap())
rand.Read(b)
}
type simplepooler struct{}

View File

@@ -5,7 +5,6 @@ import "sync/atomic"
const (
statusisbuffered = 1 << iota
statusdestroyed
statusisintrans
)
type status uintptr
@@ -58,11 +57,3 @@ func (c *status) hasdestroyed() bool {
func (c *status) setdestroyed(v bool) {
c.setbool(v, statusdestroyed)
}
func (c *status) isintrans() bool {
return c.loadbool(statusisintrans)
}
func (c *status) setintrans(v bool) {
c.setbool(v, statusisintrans)
}