1
0
mirror of https://github.com/fumiama/orbyte.git synced 2026-06-05 02:00:30 +08:00
This commit is contained in:
源文雨
2025-02-24 23:52:18 +09:00
parent 8f413ecfdf
commit eec7c4821d
12 changed files with 626 additions and 0 deletions

View File

@@ -1,2 +1,17 @@
# orbyte
Lightweight & Safe (buffer-writer | general object) pool.
## Quick Start
```go
import (
"crypto/rand"
"github.com/fumiama/orbyte/pbuf"
)
func main() {
b := pbuf.NewBytes(1024) // Allocate Bytes from pool.
rand.Read(b.Bytes()) // Do sth.
b.Destroy() // Optional, can be auto-destroyed on GC.
}
```

3
go.mod Normal file
View File

@@ -0,0 +1,3 @@
module github.com/fumiama/orbyte
go 1.18

104
item.go Normal file
View File

@@ -0,0 +1,104 @@
package orbyte
import (
"runtime"
"sync/atomic"
)
// Item represents a thread-safe user-defined value.
//
// Only the item that has ownership can be a pointer.
// Do not copy neither Item nor *Item by yourself.
// You must always use the given methods.
type Item[T any] struct {
pool *Pool[T]
cfg any
stat status
val T
}
// Trans ownership to a new item and
// destroy original item immediately.
//
// Call this function to drop your ownership
// before passing it to another function
// that do not return to you.
func (b *Item[T]) Trans() (tb *Item[T]) {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
tb = b.pool.newempty()
*tb = *b
tb.stat = status(atomic.SwapUintptr(
(*uintptr)(&b.stat), uintptr(destroyedstatus),
))
b.pool.put(b)
return tb
}
// Unwrap use value of the item
func (b *Item[T]) Unwrap() T {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
return b.val
}
// Ref gens new item without ownership.
//
// It's a safe reference, thus calling this
// will not destroy the original item
// comparing with Trans().
func (b *Item[T]) Ref() (rb *Item[T]) {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
rb = b.pool.newempty()
*rb = *b
rb.stat.setbuffered(false)
return
}
// Copy data completely with separated ownership.
func (b *Item[T]) Copy() (cb *Item[T]) {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
cb = b.pool.New(b.cfg)
b.pool.pooler.Copy(&cb.val, &b.val)
return
}
// Destroy item and put it back to pool.
func (b *Item[T]) Destroy() {
stat := status(atomic.SwapUintptr(
(*uintptr)(&b.stat), uintptr(destroyedstatus),
))
if stat.hasdestroyed() {
panic("use after destroy")
}
if b.stat.isbuffered() {
b.pool.pooler.Reset(&b.val)
}
b.pool.put(b)
}
// setautodestroy item on GC.
//
// Only can call once.
func (b *Item[T]) setautodestroy() *Item[T] {
runtime.SetFinalizer(b, func(item *Item[T]) {
// no one is using, no concurrency issue.
if item.stat.hasdestroyed() {
panic("unexpected hasdestroyed")
}
if item.stat.isbuffered() {
item.pool.pooler.Reset(&item.val)
}
item.stat.setdestroyed(true)
item.pool.put(item)
})
return b
}

12
pbuf/buffer.go Normal file
View File

@@ -0,0 +1,12 @@
package pbuf
import (
"bytes"
"github.com/fumiama/orbyte"
)
// NewBuffer wraps bytes.NewBuffer
func (bufferPool BufferPool) NewBuffer(buf []byte) *orbyte.Item[bytes.Buffer] {
return bufferPool.p.New(buf)
}

95
pbuf/bytes.go Normal file
View File

@@ -0,0 +1,95 @@
package pbuf
import (
"bytes"
"github.com/fumiama/orbyte"
)
// Bytes wrap pooled buffer into []byte
// while sharing the same pool.
type Bytes struct {
buf *orbyte.Item[bytes.Buffer]
dat []byte
}
// NewBytes alloc sz bytes.
func (bufferPool BufferPool) NewBytes(sz int) Bytes {
buf := bufferPool.p.New(sz)
x := buf.Unwrap()
return Bytes{buf: buf, dat: x.Bytes()}
}
// InvolveBytes involve outside buf into pool.
func (bufferPool BufferPool) InvolveBytes(b ...byte) Bytes {
buf := bufferPool.p.Involve(len(b), bytes.NewBuffer(b))
x := buf.Unwrap()
return Bytes{buf: buf, dat: x.Bytes()}
}
// ParseBytes convert outside bytes to Bytes safely
// without adding it into pool.
func (bufferPool BufferPool) ParseBytes(b ...byte) Bytes {
buf := bufferPool.p.Parse(len(b), bytes.NewBuffer(b))
x := buf.Unwrap()
return Bytes{buf: buf, dat: x.Bytes()}
}
// Trans please refer to Item.Trans().
func (b Bytes) Trans() (tb Bytes) {
tb.buf = b.buf.Trans()
return
}
// Len of slice.
func (b Bytes) Len() int {
return len(b.dat)
}
// Cap of slice.
func (b Bytes) Cap() int {
return cap(b.dat)
}
// Bytes is the inner value.
func (b Bytes) Bytes() []byte {
return b.dat
}
// Ref please refer to Item.Ref().
func (b Bytes) Ref() (rb Bytes) {
rb.buf = b.buf.Ref()
return
}
// Copy please refer to Item.Copy().
func (b Bytes) Copy() (cb Bytes) {
cb.buf = b.buf.Copy()
return
}
// SliceFrom dat[from:] with Ref.
func (b Bytes) SliceFrom(from int) Bytes {
nb := b.Ref()
nb.dat = b.dat[from:]
return nb
}
// SliceTo dat[:to] with Ref.
func (b Bytes) SliceTo(to int) Bytes {
nb := b.Ref()
nb.dat = b.dat[:to]
return nb
}
// Slice dat[from:to] with Ref.
func (b Bytes) Slice(from, to int) Bytes {
nb := b.Ref()
nb.dat = b.dat[from:to]
return nb
}
// Destroy please refer to Item.Destroy().
func (b Bytes) Destroy() {
b.buf.Destroy()
}

39
pbuf/pbuf.go Normal file
View File

@@ -0,0 +1,39 @@
// Package pbuf is a lightweight pooled buffer.
package pbuf
import (
"bytes"
"github.com/fumiama/orbyte"
)
var bufferPool = NewBufferPool()
type BufferPool struct {
p *orbyte.Pool[bytes.Buffer]
}
func NewBufferPool() BufferPool {
return BufferPool{p: orbyte.NewPool[bytes.Buffer](bufpooler{})}
}
// NewBuffer wraps bytes.NewBuffer
func NewBuffer(buf []byte) *orbyte.Item[bytes.Buffer] {
return bufferPool.NewBuffer(buf)
}
// NewBytes alloc sz bytes.
func NewBytes(sz int) Bytes {
return bufferPool.NewBytes(sz)
}
// InvolveBytes involve outside buf into pool.
func InvolveBytes(b ...byte) Bytes {
return bufferPool.InvolveBytes(b...)
}
// ParseBytes convert outside bytes to Bytes safely
// without adding it into pool.
func ParseBytes(b ...byte) Bytes {
return bufferPool.ParseBytes(b...)
}

21
pbuf/pbuf_test.go Normal file
View File

@@ -0,0 +1,21 @@
package pbuf
import (
"crypto/rand"
"runtime"
"testing"
)
func TestBytes(t *testing.T) {
for i := 0; i < 4096; i++ {
b := NewBytes(i)
rand.Read(b.Bytes())
b.Destroy()
}
runtime.GC()
out, in := bufferPool.p.CountItems()
t.Log(out, in)
if out != 0 || in != 1 {
t.Fail()
}
}

68
pbuf/pooler.go Normal file
View File

@@ -0,0 +1,68 @@
package pbuf
import (
"bytes"
"io"
"reflect"
)
type bufpooler struct{}
func (bufpooler) New(config any, pooled bytes.Buffer) bytes.Buffer {
switch c := config.(type) {
case int:
pooled.Grow(c)
return pooled
case []byte:
if len(c) > 0 || pooled.Cap() < cap(c) {
return *bytes.NewBuffer(c)
}
return pooled
case string:
pooled.Grow(len(c))
pooled.WriteString(c)
return pooled
default:
panic("config type " + reflect.ValueOf(config).Type().String() + " isn't supported")
}
}
func (bufpooler) Parse(obj any, pooled bytes.Buffer) bytes.Buffer {
switch o := obj.(type) {
case *bytes.Buffer:
return *o
case bytes.Buffer:
return o
case []byte:
pooled.Write(o)
return pooled
case string:
pooled.WriteString(o)
return pooled
case io.Reader:
_, err := io.Copy(&pooled, o)
if err != nil {
panic(err)
}
return pooled
default:
panic("object type " + reflect.ValueOf(obj).Type().String() + " isn't supported")
}
}
func (bufpooler) Reset(item *bytes.Buffer) {
// See https://golang.org/issue/23199
const maxSize = 1 << 16
if item.Cap() > maxSize { // drop large buffer
*item = bytes.Buffer{}
return
}
item.Reset()
}
func (bufpooler) Copy(dst, src *bytes.Buffer) {
_, err := io.Copy(dst, src)
if err != nil {
panic(err)
}
}

113
pool.go Normal file
View File

@@ -0,0 +1,113 @@
// Package orbyte is a lightweight & safe (buffer-writer | general object) pool.
package orbyte
import (
"runtime"
"sync"
"sync/atomic"
)
// Pool lightweight general pool.
type Pool[T any] struct {
pooler Pooler[T]
pool sync.Pool
countin int32
countout int32
isstrict bool
}
// NewPool make a new pool from custom pooler.
func NewPool[T any](pooler Pooler[T]) *Pool[T] {
p := new(Pool[T])
p.pooler = pooler
p.pool.New = func() any {
return &Item[T]{pool: p}
}
return p
}
// SetStrictMode panic on every misuse.
//
// Enable this to detect coding errors.
func (pool *Pool[T]) SetStrictMode(on bool) {
pool.isstrict = on
}
func (pool *Pool[T]) incin() {
atomic.AddInt32(&pool.countin, 1)
}
func (pool *Pool[T]) decin() {
atomic.AddInt32(&pool.countin, -1)
}
func (pool *Pool[T]) incout() {
atomic.AddInt32(&pool.countout, 1)
}
func (pool *Pool[T]) decout() {
atomic.AddInt32(&pool.countout, -1)
}
func (pool *Pool[T]) newempty() *Item[T] {
item := pool.pool.Get().(*Item[T])
if item.stat.hasdestroyed() { // is recycled
pool.decin()
}
item.stat = status(0)
pool.incout()
item.setautodestroy()
return item
}
func (pool *Pool[T]) put(item *Item[T]) {
runtime.SetFinalizer(item, nil)
if pool.isstrict {
return
}
item.cfg = nil
var dt T
item.val = dt
pool.pool.Put(item)
pool.decout()
pool.incin()
}
// New call this to generate an item.
func (pool *Pool[T]) New(config any) *Item[T] {
item := pool.newempty()
item.cfg = config
item.stat.setbuffered(true)
item.val = pool.pooler.New(config, item.val)
return item
}
// InvolveItem[T any] involve external object into pool.
//
// After that, you must only use the object through Item.
func (pool *Pool[T]) Involve(config, obj any) *Item[T] {
item := pool.newempty()
item.cfg = config
item.stat.setbuffered(true)
item.val = pool.pooler.Parse(obj, item.val)
return item
}
// ParseItem[T any] safely convert obj into pool item without copy.
//
// You can still use the original object elsewhere.
func (pool *Pool[T]) Parse(config, obj any) *Item[T] {
item := pool.newempty()
item.cfg = config
item.val = pool.pooler.Parse(obj, item.val)
return item
}
// CountItems returns total item count outside and inside.
func (pool *Pool[T]) CountItems() (outside, inside int32) {
return atomic.LoadInt32(&pool.countout), atomic.LoadInt32(&pool.countin)
}

88
pool_test.go Normal file
View File

@@ -0,0 +1,88 @@
package orbyte
import (
"crypto/rand"
"runtime"
"sync"
"testing"
)
func TestPool(t *testing.T) {
p := NewPool[[]byte](simplepooler{})
x := p.New(200)
x.Destroy()
out, in := p.CountItems()
t.Log("out", out, "in", in)
if out != 0 || in != 1 {
t.Fatal("unexpected behavior")
}
for i := 0; i < 2000; i++ {
item := p.New(i)
out, in = p.CountItems()
if out != 1 || in != 0 {
t.Fatal("unexpected behavior")
}
item.Destroy()
}
out, in = p.CountItems()
t.Log("out", out, "in", in)
if out != 0 || in != 1 {
t.Fatal("unexpected behavior")
}
wg := sync.WaitGroup{}
for i := 0; i < 4096; i++ {
item := p.New(i)
for j := 0; j < 16; j++ {
wg.Add(1)
user(item.Ref(), &wg)
wg.Add(1)
go usernodestroy(item.Copy(), &wg)
}
wg.Add(1)
go usernodestroy(item.Trans(), &wg)
}
wg.Wait()
runtime.GC()
out, in = p.CountItems()
t.Log("out", out, "in", in)
if out != 0 {
t.Fatal("unexpected behavior")
}
}
func user(item *Item[[]byte], wg *sync.WaitGroup) {
defer wg.Done()
rand.Read(item.Unwrap())
item.Destroy()
}
func usernodestroy(item *Item[[]byte], wg *sync.WaitGroup) {
defer wg.Done()
rand.Read(item.Unwrap())
}
type simplepooler struct{}
func (simplepooler) New(config any, pooled []byte) []byte {
if cap(pooled) >= config.(int) {
return pooled[:config.(int)]
}
return make([]byte, config.(int))
}
func (simplepooler) Parse(obj any, pooled []byte) []byte {
src := obj.([]byte)
if cap(pooled) >= len(src) {
copy(pooled[:len(src)], src)
return pooled[:len(src)]
}
return obj.([]byte)
}
func (simplepooler) Reset(item *[]byte) {
*item = (*item)[:0]
}
func (simplepooler) Copy(dst, src *[]byte) {
copy(*dst, *src)
}

9
pooler.go Normal file
View File

@@ -0,0 +1,9 @@
package orbyte
// Pooler connects to a user-defined struct.
type Pooler[T any] interface {
New(config any, pooled T) T
Parse(obj any, pooled T) T
Reset(item *T)
Copy(dst, src *T)
}

59
status.go Normal file
View File

@@ -0,0 +1,59 @@
package orbyte
import "sync/atomic"
const (
statusisbuffered = 1 << iota
statusdestroyed
)
type status uintptr
var destroyedstatus status
func init() {
destroyedstatus.setdestroyed(true)
}
func (c status) mask(v bool, typ uintptr) (news status) {
news = c
if v {
news |= status(typ)
} else {
news &= ^status(typ)
}
return
}
func (c *status) setbool(v bool, typ uintptr) {
olds := atomic.LoadUintptr((*uintptr)(c))
oldv := olds&typ != 0
if oldv == v {
return
}
news := status(olds).mask(v, typ)
for !atomic.CompareAndSwapUintptr((*uintptr)(c), olds, uintptr(news)) {
olds = atomic.LoadUintptr((*uintptr)(c))
news = status(olds).mask(v, typ)
}
}
func (c *status) loadbool(typ uintptr) bool {
return atomic.LoadUintptr((*uintptr)(c))&typ != 0
}
func (c *status) isbuffered() bool {
return c.loadbool(statusisbuffered)
}
func (c *status) setbuffered(v bool) {
c.setbool(v, statusisbuffered)
}
func (c *status) hasdestroyed() bool {
return c.loadbool(statusdestroyed)
}
func (c *status) setdestroyed(v bool) {
c.setbool(v, statusdestroyed)
}