From 309b51a50e5e0e4b14d9a46d68d9d437f80e4436 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Tue, 25 Feb 2025 16:50:06 +0900 Subject: [PATCH] fix(pbuf): destroy misuse --- item.go | 6 +-- pbuf/buffer.go | 69 ++++++++++++++++++++++++++- pbuf/buffer_test.go | 65 +++++++++++++++++++++++++ pbuf/bytes.go | 28 ++++------- pbuf/bytes_test.go | 114 ++++++++++++++++++++++++++++++++++++++++++++ pbuf/pbuf.go | 12 ++++- pbuf/pbuf_test.go | 21 -------- pbuf/pooler.go | 16 +++++-- 8 files changed, 280 insertions(+), 51 deletions(-) create mode 100644 pbuf/buffer_test.go create mode 100644 pbuf/bytes_test.go delete mode 100644 pbuf/pbuf_test.go diff --git a/item.go b/item.go index 05117a3..7eacb89 100644 --- a/item.go +++ b/item.go @@ -120,11 +120,7 @@ func (b *Item[T]) setautodestroy() *Item[T] { if item.stat.hasdestroyed() { panic("unexpected hasdestroyed") } - if !item.stat.isintrans() && item.stat.isbuffered() { - item.pool.pooler.Reset(&item.val) - } - item.stat.setdestroyed(true) - item.pool.put(item) + item.destroybystat(item.stat) }) return b } diff --git a/pbuf/buffer.go b/pbuf/buffer.go index 87de22e..a6718ad 100644 --- a/pbuf/buffer.go +++ b/pbuf/buffer.go @@ -2,11 +2,78 @@ package pbuf import ( "bytes" + "io" + "unsafe" "github.com/fumiama/orbyte" ) -// NewBuffer wraps bytes.NewBuffer +// NewBuffer wraps bytes.NewBuffer into Item. func (bufferPool BufferPool) NewBuffer(buf []byte) *orbyte.Item[bytes.Buffer] { return bufferPool.p.New(buf) } + +// InvolveBuffer involve external *bytes.Buffer into Item. +func (bufferPool BufferPool) InvolveBuffer(buf *bytes.Buffer) *orbyte.Item[bytes.Buffer] { + return bufferPool.p.Involve(buf.Len(), buf) +} + +// ParseBuffer convert external *bytes.Buffer into Item. +func (bufferPool BufferPool) ParseBuffer(buf *bytes.Buffer) *orbyte.Item[bytes.Buffer] { + return bufferPool.p.Parse(buf.Len(), buf) +} + +// A Buffer is a variable-sized buffer of bytes with Read and Write methods. +// The zero value for Buffer is an empty buffer ready to use. +type buffer struct { + buf []byte // contents are the bytes buf[off : len(buf)] + off int // read at &buf[off], write at &buf[len(buf)] + lastRead readOp // last read operation, so that Unread* can work correctly. +} + +func skip(w *bytes.Buffer, n int) (int, error) { + if n == 0 { + return 0, nil + } + b := (*buffer)(unsafe.Pointer(w)) + b.lastRead = opInvalid + if len(b.buf) <= b.off { + // Buffer is empty, reset to recover space. + w.Reset() + if n == 0 { + return 0, nil + } + return 0, io.EOF + } + n = minnum(n, len(b.buf[b.off:])) + b.off += n + if n > 0 { + b.lastRead = opRead + } + return n, nil +} + +// The readOp constants describe the last action performed on +// the buffer, so that UnreadRune and UnreadByte can check for +// invalid usage. opReadRuneX constants are chosen such that +// converted to int they correspond to the rune size that was read. +type readOp int8 + +// Don't use iota for these, as the values need to correspond with the +// names and comments, which is easier to see when being explicit. +const ( + opRead readOp = -1 // Any other read operation. + opInvalid readOp = 0 // Non-read operation. + opReadRune1 readOp = 1 // Read rune of size 1. + opReadRune2 readOp = 2 // Read rune of size 2. + opReadRune3 readOp = 3 // Read rune of size 3. + opReadRune4 readOp = 4 // Read rune of size 4. +) + +// minnum 返回两数最小值,该函数将被内联 +func minnum[T int | int8 | uint8 | int16 | uint16 | int32 | uint32 | int64 | uint64](a, b T) T { + if a > b { + return b + } + return a +} diff --git a/pbuf/buffer_test.go b/pbuf/buffer_test.go new file mode 100644 index 0000000..fe81f90 --- /dev/null +++ b/pbuf/buffer_test.go @@ -0,0 +1,65 @@ +package pbuf + +import ( + "bytes" + "crypto/rand" + "io" + "runtime" + "testing" + + "github.com/fumiama/orbyte" +) + +func TestBuffer(t *testing.T) { + testBuffer(NewBuffer(nil), t) + testBuffer(NewBuffer(make([]byte, 0, 8192)), t) + testBuffer(ParseBuffer(bytes.NewBuffer(nil)), t) + testBuffer(ParseBuffer(bytes.NewBuffer(make([]byte, 0, 8192))), t) + testBuffer(InvolveBuffer(bytes.NewBuffer(nil)), t) + testBuffer(InvolveBuffer(bytes.NewBuffer(make([]byte, 0, 8192))), t) +} + +func testBuffer(buf *orbyte.Item[bytes.Buffer], t *testing.T) { + if buf.Pointer().Len() != 4096 { + io.CopyN(buf.Pointer(), rand.Reader, 4096) + if buf.Pointer().Len() != 4096 { + t.Fatal("got", buf.Pointer().Len()) + } + } + + bufcp := buf.Copy() + if bufcp.Pointer().Len() != 4096 { + t.Fatal("got", bufcp.Pointer().Len()) + } + if !bytes.Equal(bufcp.Pointer().Bytes(), buf.Pointer().Bytes()) { + t.Fatal("unexpected") + } + + bufr := buf.Ref() + if bufr.Pointer().Len() != 4096 { + t.Fatal("got", bufr.Pointer().Len()) + } + if !bytes.Equal(bufr.Pointer().Bytes(), buf.Pointer().Bytes()) { + t.Fatal("unexpected") + } + bufr.Destroy() + + bufcp = bufcp.Trans() + if bufcp.Pointer().Len() != 4096 { + t.Fatal("got", bufcp.Pointer().Len()) + } + if !bytes.Equal(bufcp.Pointer().Bytes(), buf.Pointer().Bytes()) { + t.Fatal("unexpected") + } + bufcp.Destroy() + + runtime.GC() + runtime.Gosched() + runtime.GC() + + out, in := bufferPool.p.CountItems() + t.Log(out, in) + if out != 0 { + t.Fail() + } +} diff --git a/pbuf/bytes.go b/pbuf/bytes.go index 80ec79b..433a8e8 100644 --- a/pbuf/bytes.go +++ b/pbuf/bytes.go @@ -38,14 +38,11 @@ func (bufferPool BufferPool) InvolveBytes(b ...byte) Bytes { // without adding it into pool. func (bufferPool BufferPool) ParseBytes(b ...byte) Bytes { buf := bufferPool.p.Parse(len(b), bytes.NewBuffer(b)) - return Bytes{buf: buf, dat: buf.Pointer().Bytes()[:len(b)]} + return Bytes{buf: buf, dat: buf.Pointer().Bytes()} } // Trans please refer to Item.Trans(). func (b Bytes) Trans() (tb Bytes) { - if b.buf == nil { - return - } tb.buf = b.buf.Trans() tb.dat = b.dat return @@ -53,17 +50,11 @@ func (b Bytes) Trans() (tb Bytes) { // Len of slice. func (b Bytes) Len() int { - if b.buf == nil { - return 0 - } return len(b.dat) } // Cap of slice. func (b Bytes) Cap() int { - if b.buf == nil { - return 0 - } return cap(b.dat) } @@ -74,9 +65,6 @@ func (b Bytes) Bytes() []byte { // Ref please refer to Item.Ref(). func (b Bytes) Ref() (rb Bytes) { - if b.buf == nil { - return - } rb.buf = b.buf.Ref() rb.dat = b.dat return @@ -84,11 +72,8 @@ func (b Bytes) Ref() (rb Bytes) { // Copy please refer to Item.Copy(). func (b Bytes) Copy() (cb Bytes) { - if b.buf == nil { - return - } cb.buf = b.buf.Copy() - cb.dat = b.dat + cb.dat = cb.buf.Pointer().Bytes() return } @@ -98,6 +83,7 @@ func (b Bytes) SliceFrom(from int) Bytes { return InvolveBytes(b.dat[from:]...) } nb := b.Ref() + skip(nb.buf.Pointer(), from) nb.dat = b.dat[from:] return nb } @@ -108,6 +94,7 @@ func (b Bytes) SliceTo(to int) Bytes { return InvolveBytes(b.dat[:to]...) } nb := b.Ref() + nb.buf.Pointer().Truncate(to) nb.dat = b.dat[:to] return nb } @@ -118,13 +105,14 @@ func (b Bytes) Slice(from, to int) Bytes { return InvolveBytes(b.dat[from:to]...) } nb := b.Ref() + buf := nb.buf.Pointer() + skip(buf, from) + buf.Truncate(to - from) nb.dat = b.dat[from:to] return nb } // Destroy please refer to Item.Destroy(). func (b Bytes) Destroy() { - if b.buf != nil { - b.buf.Destroy() - } + b.buf.Destroy() } diff --git a/pbuf/bytes_test.go b/pbuf/bytes_test.go new file mode 100644 index 0000000..027a1b2 --- /dev/null +++ b/pbuf/bytes_test.go @@ -0,0 +1,114 @@ +package pbuf + +import ( + "bytes" + "crypto/rand" + "encoding/hex" + "runtime" + "testing" +) + +// TestBytesSlice sometimes fails at first run because +// GC not collecting all unused items. +func TestBytesSlice(t *testing.T) { + for i := 10; i < 4096; i++ { + b := NewBytes(i) + if b.Len() != i { + t.Fatal("index", i, "excpet len", i, "but got", b.Len()) + } + rand.Read(b.Bytes()) + buf := make([]byte, b.Len()) + copy(buf, b.Bytes()) + // test normal slice + x := b.SliceFrom(5).SliceTo(i - 5 - 5) + if !bytes.Equal(buf[5:i-5], x.Bytes()) { + t.Log("exp:", hex.EncodeToString(buf[5:i-5])) + t.Log("got:", hex.EncodeToString(x.Bytes())) + t.Fatal("index", i, "unexpected") + } + x.Destroy() + // test trans slice + b = b.Trans().SliceFrom(5).SliceTo(i - 5 - 5) + if !bytes.Equal(buf[5:i-5], b.Bytes()) { + t.Log("exp:", hex.EncodeToString(buf[5:i-5])) + t.Log("got:", hex.EncodeToString(b.Bytes())) + t.Fatal("index", i, "unexpected") + } + b.Destroy() + } + runtime.GC() + runtime.Gosched() + runtime.GC() + out, in := bufferPool.p.CountItems() + t.Log(out, in) + if out != 0 { + t.Fail() + } +} + +func TestBytesInvolve(t *testing.T) { + buf := make([]byte, 4096) + rand.Read(buf) + for i := 0; i < 4096; i++ { + b := InvolveBytes(buf[:i]...) + if b.Len() != i { + t.Fatal("index", i, "excpet len", i, "but got", b.Len()) + } + rand.Read(b.Bytes()) + if !bytes.Equal(b.Bytes(), buf[:i]) { + t.Fatal("index", i, "unexpected") + } + b.Destroy() + } + runtime.GC() + out, in := bufferPool.p.CountItems() + t.Log(out, in) + if out != 0 { + t.Fail() + } +} + +func TestBytesParse(t *testing.T) { + buf := make([]byte, 4096) + rand.Read(buf) + for i := 0; i < 4096; i++ { + b := ParseBytes(buf[:i]...) + if b.Len() != i { + t.Fatal("index", i, "excpet len", i, "but got", b.Len()) + } + if !bytes.Equal(b.Bytes(), buf[:i]) { + t.Fatal("index", i, "unexpected") + } + b.Destroy() + } + runtime.GC() + out, in := bufferPool.p.CountItems() + t.Log(out, in) + if out != 0 { + t.Fail() + } +} + +func TestBytesCopy(t *testing.T) { + buf := make([]byte, 4096) + rand.Read(buf) + for i := 10; i < 4096; i++ { + b := ParseBytes(buf...).Slice(5, i-5).Copy() + if b.Len() != i-10 { + t.Fatal("index", i, "excpet len", i, "but got", b.Len()) + } + rand.Read(b.Bytes()) + t.Log("org:", hex.EncodeToString(buf[:i])) + t.Log("new:", hex.EncodeToString(b.Bytes())) + if bytes.Equal(b.Bytes(), buf[:i]) { + t.Fatal("index", i, "unexpected") + } + b.Destroy() + } + runtime.GC() + out, in := bufferPool.p.CountItems() + t.Log(out, in) + if out != 0 { + t.Fail() + } +} diff --git a/pbuf/pbuf.go b/pbuf/pbuf.go index fe7dabc..8a6384a 100644 --- a/pbuf/pbuf.go +++ b/pbuf/pbuf.go @@ -17,11 +17,21 @@ func NewBufferPool() BufferPool { return BufferPool{p: orbyte.NewPool[bytes.Buffer](bufpooler{})} } -// NewBuffer wraps bytes.NewBuffer +// NewBuffer wraps bytes.NewBuffer into Item. func NewBuffer(buf []byte) *orbyte.Item[bytes.Buffer] { return bufferPool.NewBuffer(buf) } +// InvolveBuffer involve external *bytes.Buffer into Item. +func InvolveBuffer(buf *bytes.Buffer) *orbyte.Item[bytes.Buffer] { + return bufferPool.InvolveBuffer(buf) +} + +// ParseBuffer convert external *bytes.Buffer into Item. +func ParseBuffer(buf *bytes.Buffer) *orbyte.Item[bytes.Buffer] { + return bufferPool.ParseBuffer(buf) +} + // NewBytes alloc sz bytes. func NewBytes(sz int) Bytes { return bufferPool.NewBytes(sz) diff --git a/pbuf/pbuf_test.go b/pbuf/pbuf_test.go deleted file mode 100644 index fff855f..0000000 --- a/pbuf/pbuf_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package pbuf - -import ( - "crypto/rand" - "runtime" - "testing" -) - -func TestBytes(t *testing.T) { - for i := 0; i < 4096; i++ { - b := NewBytes(i) - rand.Read(b.Bytes()) - b.Trans().SliceFrom(0).SliceTo(i).Destroy() - } - runtime.GC() - out, in := bufferPool.p.CountItems() - t.Log(out, in) - if out != 0 { - t.Fail() - } -} diff --git a/pbuf/pooler.go b/pbuf/pooler.go index 70d9e81..4df4d25 100644 --- a/pbuf/pooler.go +++ b/pbuf/pooler.go @@ -4,6 +4,7 @@ import ( "bytes" "io" "reflect" + "unsafe" ) type bufpooler struct{} @@ -12,14 +13,21 @@ func (bufpooler) New(config any, pooled bytes.Buffer) bytes.Buffer { switch c := config.(type) { case int: pooled.Grow(c) + *(*[]byte)(unsafe.Pointer(&pooled)) = pooled.Bytes()[:c] + if c != pooled.Len() { + panic("unexpected bad buffer Grow") + } return pooled case []byte: if len(c) > 0 || pooled.Cap() < cap(c) { - return *bytes.NewBuffer(c) + buf := bytes.NewBuffer(c) + if len(c) != buf.Len() { + panic("unexpected bad bytes.NewBuffer") + } + return *buf } return pooled case string: - pooled.Grow(len(c)) pooled.WriteString(c) return pooled default: @@ -61,7 +69,9 @@ func (bufpooler) Reset(item *bytes.Buffer) { } func (bufpooler) Copy(dst, src *bytes.Buffer) { - _, err := io.Copy(dst, src) + dst.Reset() + srccp := *src + _, err := io.Copy(dst, &srccp) if err != nil { panic(err) }