From eec7c4821d3319968cbab1f927cb78d87a6c3091 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Mon, 24 Feb 2025 23:52:18 +0900 Subject: [PATCH] init --- README.md | 15 ++++++ go.mod | 3 ++ item.go | 104 ++++++++++++++++++++++++++++++++++++++++++ pbuf/buffer.go | 12 +++++ pbuf/bytes.go | 95 ++++++++++++++++++++++++++++++++++++++ pbuf/pbuf.go | 39 ++++++++++++++++ pbuf/pbuf_test.go | 21 +++++++++ pbuf/pooler.go | 68 ++++++++++++++++++++++++++++ pool.go | 113 ++++++++++++++++++++++++++++++++++++++++++++++ pool_test.go | 88 ++++++++++++++++++++++++++++++++++++ pooler.go | 9 ++++ status.go | 59 ++++++++++++++++++++++++ 12 files changed, 626 insertions(+) create mode 100644 go.mod create mode 100644 item.go create mode 100644 pbuf/buffer.go create mode 100644 pbuf/bytes.go create mode 100644 pbuf/pbuf.go create mode 100644 pbuf/pbuf_test.go create mode 100644 pbuf/pooler.go create mode 100644 pool.go create mode 100644 pool_test.go create mode 100644 pooler.go create mode 100644 status.go diff --git a/README.md b/README.md index 6916c2f..fe88318 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,17 @@ # orbyte Lightweight & Safe (buffer-writer | general object) pool. + +## Quick Start +```go +import ( + "crypto/rand" + + "github.com/fumiama/orbyte/pbuf" +) + +func main() { + b := pbuf.NewBytes(1024) // Allocate Bytes from pool. + rand.Read(b.Bytes()) // Do sth. + b.Destroy() // Optional, can be auto-destroyed on GC. +} +``` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4b6efe0 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/fumiama/orbyte + +go 1.18 diff --git a/item.go b/item.go new file mode 100644 index 0000000..07b5a5e --- /dev/null +++ b/item.go @@ -0,0 +1,104 @@ +package orbyte + +import ( + "runtime" + "sync/atomic" +) + +// Item represents a thread-safe user-defined value. +// +// Only the item that has ownership can be a pointer. +// Do not copy neither Item nor *Item by yourself. +// You must always use the given methods. +type Item[T any] struct { + pool *Pool[T] + cfg any + + stat status + + val T +} + +// Trans ownership to a new item and +// destroy original item immediately. +// +// Call this function to drop your ownership +// before passing it to another function +// that do not return to you. +func (b *Item[T]) Trans() (tb *Item[T]) { + if b.stat.hasdestroyed() { + panic("use after destroy") + } + tb = b.pool.newempty() + *tb = *b + tb.stat = status(atomic.SwapUintptr( + (*uintptr)(&b.stat), uintptr(destroyedstatus), + )) + b.pool.put(b) + return tb +} + +// Unwrap use value of the item +func (b *Item[T]) Unwrap() T { + if b.stat.hasdestroyed() { + panic("use after destroy") + } + return b.val +} + +// Ref gens new item without ownership. +// +// It's a safe reference, thus calling this +// will not destroy the original item +// comparing with Trans(). +func (b *Item[T]) Ref() (rb *Item[T]) { + if b.stat.hasdestroyed() { + panic("use after destroy") + } + rb = b.pool.newempty() + *rb = *b + rb.stat.setbuffered(false) + return +} + +// Copy data completely with separated ownership. +func (b *Item[T]) Copy() (cb *Item[T]) { + if b.stat.hasdestroyed() { + panic("use after destroy") + } + cb = b.pool.New(b.cfg) + b.pool.pooler.Copy(&cb.val, &b.val) + return +} + +// Destroy item and put it back to pool. +func (b *Item[T]) Destroy() { + stat := status(atomic.SwapUintptr( + (*uintptr)(&b.stat), uintptr(destroyedstatus), + )) + if stat.hasdestroyed() { + panic("use after destroy") + } + if b.stat.isbuffered() { + b.pool.pooler.Reset(&b.val) + } + b.pool.put(b) +} + +// setautodestroy item on GC. +// +// Only can call once. +func (b *Item[T]) setautodestroy() *Item[T] { + runtime.SetFinalizer(b, func(item *Item[T]) { + // no one is using, no concurrency issue. + if item.stat.hasdestroyed() { + panic("unexpected hasdestroyed") + } + if item.stat.isbuffered() { + item.pool.pooler.Reset(&item.val) + } + item.stat.setdestroyed(true) + item.pool.put(item) + }) + return b +} diff --git a/pbuf/buffer.go b/pbuf/buffer.go new file mode 100644 index 0000000..87de22e --- /dev/null +++ b/pbuf/buffer.go @@ -0,0 +1,12 @@ +package pbuf + +import ( + "bytes" + + "github.com/fumiama/orbyte" +) + +// NewBuffer wraps bytes.NewBuffer +func (bufferPool BufferPool) NewBuffer(buf []byte) *orbyte.Item[bytes.Buffer] { + return bufferPool.p.New(buf) +} diff --git a/pbuf/bytes.go b/pbuf/bytes.go new file mode 100644 index 0000000..9fabc22 --- /dev/null +++ b/pbuf/bytes.go @@ -0,0 +1,95 @@ +package pbuf + +import ( + "bytes" + + "github.com/fumiama/orbyte" +) + +// Bytes wrap pooled buffer into []byte +// while sharing the same pool. +type Bytes struct { + buf *orbyte.Item[bytes.Buffer] + dat []byte +} + +// NewBytes alloc sz bytes. +func (bufferPool BufferPool) NewBytes(sz int) Bytes { + buf := bufferPool.p.New(sz) + x := buf.Unwrap() + return Bytes{buf: buf, dat: x.Bytes()} +} + +// InvolveBytes involve outside buf into pool. +func (bufferPool BufferPool) InvolveBytes(b ...byte) Bytes { + buf := bufferPool.p.Involve(len(b), bytes.NewBuffer(b)) + x := buf.Unwrap() + return Bytes{buf: buf, dat: x.Bytes()} +} + +// ParseBytes convert outside bytes to Bytes safely +// without adding it into pool. +func (bufferPool BufferPool) ParseBytes(b ...byte) Bytes { + buf := bufferPool.p.Parse(len(b), bytes.NewBuffer(b)) + x := buf.Unwrap() + return Bytes{buf: buf, dat: x.Bytes()} +} + +// Trans please refer to Item.Trans(). +func (b Bytes) Trans() (tb Bytes) { + tb.buf = b.buf.Trans() + return +} + +// Len of slice. +func (b Bytes) Len() int { + return len(b.dat) +} + +// Cap of slice. +func (b Bytes) Cap() int { + return cap(b.dat) +} + +// Bytes is the inner value. +func (b Bytes) Bytes() []byte { + return b.dat +} + +// Ref please refer to Item.Ref(). +func (b Bytes) Ref() (rb Bytes) { + rb.buf = b.buf.Ref() + return +} + +// Copy please refer to Item.Copy(). +func (b Bytes) Copy() (cb Bytes) { + cb.buf = b.buf.Copy() + return +} + +// SliceFrom dat[from:] with Ref. +func (b Bytes) SliceFrom(from int) Bytes { + nb := b.Ref() + nb.dat = b.dat[from:] + return nb +} + +// SliceTo dat[:to] with Ref. +func (b Bytes) SliceTo(to int) Bytes { + nb := b.Ref() + nb.dat = b.dat[:to] + return nb +} + +// Slice dat[from:to] with Ref. +func (b Bytes) Slice(from, to int) Bytes { + nb := b.Ref() + nb.dat = b.dat[from:to] + return nb +} + +// Destroy please refer to Item.Destroy(). +func (b Bytes) Destroy() { + b.buf.Destroy() +} diff --git a/pbuf/pbuf.go b/pbuf/pbuf.go new file mode 100644 index 0000000..fe7dabc --- /dev/null +++ b/pbuf/pbuf.go @@ -0,0 +1,39 @@ +// Package pbuf is a lightweight pooled buffer. +package pbuf + +import ( + "bytes" + + "github.com/fumiama/orbyte" +) + +var bufferPool = NewBufferPool() + +type BufferPool struct { + p *orbyte.Pool[bytes.Buffer] +} + +func NewBufferPool() BufferPool { + return BufferPool{p: orbyte.NewPool[bytes.Buffer](bufpooler{})} +} + +// NewBuffer wraps bytes.NewBuffer +func NewBuffer(buf []byte) *orbyte.Item[bytes.Buffer] { + return bufferPool.NewBuffer(buf) +} + +// NewBytes alloc sz bytes. +func NewBytes(sz int) Bytes { + return bufferPool.NewBytes(sz) +} + +// InvolveBytes involve outside buf into pool. +func InvolveBytes(b ...byte) Bytes { + return bufferPool.InvolveBytes(b...) +} + +// ParseBytes convert outside bytes to Bytes safely +// without adding it into pool. +func ParseBytes(b ...byte) Bytes { + return bufferPool.ParseBytes(b...) +} diff --git a/pbuf/pbuf_test.go b/pbuf/pbuf_test.go new file mode 100644 index 0000000..41782bc --- /dev/null +++ b/pbuf/pbuf_test.go @@ -0,0 +1,21 @@ +package pbuf + +import ( + "crypto/rand" + "runtime" + "testing" +) + +func TestBytes(t *testing.T) { + for i := 0; i < 4096; i++ { + b := NewBytes(i) + rand.Read(b.Bytes()) + b.Destroy() + } + runtime.GC() + out, in := bufferPool.p.CountItems() + t.Log(out, in) + if out != 0 || in != 1 { + t.Fail() + } +} diff --git a/pbuf/pooler.go b/pbuf/pooler.go new file mode 100644 index 0000000..70d9e81 --- /dev/null +++ b/pbuf/pooler.go @@ -0,0 +1,68 @@ +package pbuf + +import ( + "bytes" + "io" + "reflect" +) + +type bufpooler struct{} + +func (bufpooler) New(config any, pooled bytes.Buffer) bytes.Buffer { + switch c := config.(type) { + case int: + pooled.Grow(c) + return pooled + case []byte: + if len(c) > 0 || pooled.Cap() < cap(c) { + return *bytes.NewBuffer(c) + } + return pooled + case string: + pooled.Grow(len(c)) + pooled.WriteString(c) + return pooled + default: + panic("config type " + reflect.ValueOf(config).Type().String() + " isn't supported") + } +} + +func (bufpooler) Parse(obj any, pooled bytes.Buffer) bytes.Buffer { + switch o := obj.(type) { + case *bytes.Buffer: + return *o + case bytes.Buffer: + return o + case []byte: + pooled.Write(o) + return pooled + case string: + pooled.WriteString(o) + return pooled + case io.Reader: + _, err := io.Copy(&pooled, o) + if err != nil { + panic(err) + } + return pooled + default: + panic("object type " + reflect.ValueOf(obj).Type().String() + " isn't supported") + } +} + +func (bufpooler) Reset(item *bytes.Buffer) { + // See https://golang.org/issue/23199 + const maxSize = 1 << 16 + if item.Cap() > maxSize { // drop large buffer + *item = bytes.Buffer{} + return + } + item.Reset() +} + +func (bufpooler) Copy(dst, src *bytes.Buffer) { + _, err := io.Copy(dst, src) + if err != nil { + panic(err) + } +} diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..66fb6d6 --- /dev/null +++ b/pool.go @@ -0,0 +1,113 @@ +// Package orbyte is a lightweight & safe (buffer-writer | general object) pool. +package orbyte + +import ( + "runtime" + "sync" + "sync/atomic" +) + +// Pool lightweight general pool. +type Pool[T any] struct { + pooler Pooler[T] + pool sync.Pool + countin int32 + countout int32 + isstrict bool +} + +// NewPool make a new pool from custom pooler. +func NewPool[T any](pooler Pooler[T]) *Pool[T] { + p := new(Pool[T]) + p.pooler = pooler + p.pool.New = func() any { + return &Item[T]{pool: p} + } + return p +} + +// SetStrictMode panic on every misuse. +// +// Enable this to detect coding errors. +func (pool *Pool[T]) SetStrictMode(on bool) { + pool.isstrict = on +} + +func (pool *Pool[T]) incin() { + atomic.AddInt32(&pool.countin, 1) +} + +func (pool *Pool[T]) decin() { + atomic.AddInt32(&pool.countin, -1) +} + +func (pool *Pool[T]) incout() { + atomic.AddInt32(&pool.countout, 1) +} + +func (pool *Pool[T]) decout() { + atomic.AddInt32(&pool.countout, -1) +} + +func (pool *Pool[T]) newempty() *Item[T] { + item := pool.pool.Get().(*Item[T]) + if item.stat.hasdestroyed() { // is recycled + pool.decin() + } + item.stat = status(0) + pool.incout() + item.setautodestroy() + return item +} + +func (pool *Pool[T]) put(item *Item[T]) { + runtime.SetFinalizer(item, nil) + + if pool.isstrict { + return + } + + item.cfg = nil + var dt T + item.val = dt + + pool.pool.Put(item) + + pool.decout() + pool.incin() +} + +// New call this to generate an item. +func (pool *Pool[T]) New(config any) *Item[T] { + item := pool.newempty() + item.cfg = config + item.stat.setbuffered(true) + item.val = pool.pooler.New(config, item.val) + return item +} + +// InvolveItem[T any] involve external object into pool. +// +// After that, you must only use the object through Item. +func (pool *Pool[T]) Involve(config, obj any) *Item[T] { + item := pool.newempty() + item.cfg = config + item.stat.setbuffered(true) + item.val = pool.pooler.Parse(obj, item.val) + return item +} + +// ParseItem[T any] safely convert obj into pool item without copy. +// +// You can still use the original object elsewhere. +func (pool *Pool[T]) Parse(config, obj any) *Item[T] { + item := pool.newempty() + item.cfg = config + item.val = pool.pooler.Parse(obj, item.val) + return item +} + +// CountItems returns total item count outside and inside. +func (pool *Pool[T]) CountItems() (outside, inside int32) { + return atomic.LoadInt32(&pool.countout), atomic.LoadInt32(&pool.countin) +} diff --git a/pool_test.go b/pool_test.go new file mode 100644 index 0000000..a2dfb44 --- /dev/null +++ b/pool_test.go @@ -0,0 +1,88 @@ +package orbyte + +import ( + "crypto/rand" + "runtime" + "sync" + "testing" +) + +func TestPool(t *testing.T) { + p := NewPool[[]byte](simplepooler{}) + x := p.New(200) + x.Destroy() + out, in := p.CountItems() + t.Log("out", out, "in", in) + if out != 0 || in != 1 { + t.Fatal("unexpected behavior") + } + for i := 0; i < 2000; i++ { + item := p.New(i) + out, in = p.CountItems() + if out != 1 || in != 0 { + t.Fatal("unexpected behavior") + } + item.Destroy() + } + out, in = p.CountItems() + t.Log("out", out, "in", in) + if out != 0 || in != 1 { + t.Fatal("unexpected behavior") + } + wg := sync.WaitGroup{} + for i := 0; i < 4096; i++ { + item := p.New(i) + for j := 0; j < 16; j++ { + wg.Add(1) + user(item.Ref(), &wg) + wg.Add(1) + go usernodestroy(item.Copy(), &wg) + } + wg.Add(1) + go usernodestroy(item.Trans(), &wg) + } + wg.Wait() + runtime.GC() + out, in = p.CountItems() + t.Log("out", out, "in", in) + if out != 0 { + t.Fatal("unexpected behavior") + } +} + +func user(item *Item[[]byte], wg *sync.WaitGroup) { + defer wg.Done() + rand.Read(item.Unwrap()) + item.Destroy() +} + +func usernodestroy(item *Item[[]byte], wg *sync.WaitGroup) { + defer wg.Done() + rand.Read(item.Unwrap()) +} + +type simplepooler struct{} + +func (simplepooler) New(config any, pooled []byte) []byte { + if cap(pooled) >= config.(int) { + return pooled[:config.(int)] + } + return make([]byte, config.(int)) +} + +func (simplepooler) Parse(obj any, pooled []byte) []byte { + src := obj.([]byte) + if cap(pooled) >= len(src) { + copy(pooled[:len(src)], src) + return pooled[:len(src)] + } + return obj.([]byte) +} + +func (simplepooler) Reset(item *[]byte) { + *item = (*item)[:0] +} + +func (simplepooler) Copy(dst, src *[]byte) { + copy(*dst, *src) +} diff --git a/pooler.go b/pooler.go new file mode 100644 index 0000000..d87fb2d --- /dev/null +++ b/pooler.go @@ -0,0 +1,9 @@ +package orbyte + +// Pooler connects to a user-defined struct. +type Pooler[T any] interface { + New(config any, pooled T) T + Parse(obj any, pooled T) T + Reset(item *T) + Copy(dst, src *T) +} diff --git a/status.go b/status.go new file mode 100644 index 0000000..920a1ec --- /dev/null +++ b/status.go @@ -0,0 +1,59 @@ +package orbyte + +import "sync/atomic" + +const ( + statusisbuffered = 1 << iota + statusdestroyed +) + +type status uintptr + +var destroyedstatus status + +func init() { + destroyedstatus.setdestroyed(true) +} + +func (c status) mask(v bool, typ uintptr) (news status) { + news = c + if v { + news |= status(typ) + } else { + news &= ^status(typ) + } + return +} + +func (c *status) setbool(v bool, typ uintptr) { + olds := atomic.LoadUintptr((*uintptr)(c)) + oldv := olds&typ != 0 + if oldv == v { + return + } + news := status(olds).mask(v, typ) + for !atomic.CompareAndSwapUintptr((*uintptr)(c), olds, uintptr(news)) { + olds = atomic.LoadUintptr((*uintptr)(c)) + news = status(olds).mask(v, typ) + } +} + +func (c *status) loadbool(typ uintptr) bool { + return atomic.LoadUintptr((*uintptr)(c))&typ != 0 +} + +func (c *status) isbuffered() bool { + return c.loadbool(statusisbuffered) +} + +func (c *status) setbuffered(v bool) { + c.setbool(v, statusisbuffered) +} + +func (c *status) hasdestroyed() bool { + return c.loadbool(statusdestroyed) +} + +func (c *status) setdestroyed(v bool) { + c.setbool(v, statusdestroyed) +}