1
0
mirror of https://github.com/fumiama/orbyte.git synced 2026-06-05 02:00:30 +08:00

feat: add async conflict detection

This commit is contained in:
源文雨
2025-04-14 22:41:19 +09:00
parent ac9d26dfb9
commit 162407d64e
8 changed files with 86 additions and 17 deletions

22
item.go
View File

@@ -35,6 +35,9 @@ func (b *Item[T]) Trans() T {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
if b.pool.issync {
b.stat.setinsyncop(true)
}
val := b.val
atomic.StoreUintptr(
(*uintptr)(&b.stat), uintptr(destroyedstatus),
@@ -47,6 +50,10 @@ func (b *Item[T]) Trans() T {
// HasInvolved whether this item is buffered
// and will be Reset on putting back.
func (b *Item[T]) HasInvolved() bool {
if b.pool.issync {
b.stat.setinsyncop(true)
defer b.stat.setinsyncop(false)
}
return b.stat.isbuffered()
}
@@ -57,6 +64,10 @@ func (b *Item[T]) V(f func(T)) {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
if b.pool.issync {
b.stat.setinsyncop(true)
defer b.stat.setinsyncop(false)
}
f(b.val)
runtime.KeepAlive(b)
}
@@ -68,6 +79,10 @@ func (b *Item[T]) P(f func(*T)) {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
if b.pool.issync {
b.stat.setinsyncop(true)
defer b.stat.setinsyncop(false)
}
f(&b.val)
runtime.KeepAlive(b)
}
@@ -77,6 +92,10 @@ func (b *Item[T]) Copy() (cb *Item[T]) {
if b.stat.hasdestroyed() {
panic("use after destroy")
}
if b.pool.issync {
b.stat.setinsyncop(true)
defer b.stat.setinsyncop(false)
}
cb = b.pool.New(b.cfg)
b.pool.pooler.Copy(&cb.val, &b.val)
return
@@ -100,6 +119,9 @@ func (b *Item[T]) destroybystat(stat status) {
// Calling this method only when you're sure that
// no one will use it, or it will cause a panic.
func (b *Item[T]) ManualDestroy() {
if b.pool.issync {
b.stat.setinsyncop(true)
}
runtime.SetFinalizer(b, nil)
b.destroybystat(status(atomic.SwapUintptr(
(*uintptr)(&b.stat), uintptr(destroyedstatus),

View File

@@ -10,23 +10,19 @@ import (
func (bufferPool BufferPool[USRDAT]) NewBuffer(
buf []byte,
) *orbyte.Item[UserBuffer[USRDAT]] {
return bufferPool.p.New(buf)
return bufferPool.New(buf)
}
// InvolveBuffer involve external *bytes.Buffer into Item.
func (bufferPool BufferPool[USRDAT]) InvolveBuffer(
buf *bytes.Buffer,
) *orbyte.Item[UserBuffer[USRDAT]] {
return bufferPool.p.Involve(buf.Len(), buf)
return bufferPool.Involve(buf.Len(), buf)
}
// ParseBuffer convert external *bytes.Buffer into Item.
func (bufferPool BufferPool[USRDAT]) ParseBuffer(
buf *bytes.Buffer,
) *orbyte.Item[UserBuffer[USRDAT]] {
return bufferPool.p.Parse(buf.Len(), buf)
}
func (bufferPool BufferPool[USRDAT]) CountItems() (outside int32, inside int32) {
return bufferPool.p.CountItems()
return bufferPool.Parse(buf.Len(), buf)
}

View File

@@ -59,7 +59,7 @@ func testBuffer(buf *OBuffer, t *testing.T) {
runtime.Gosched()
runtime.GC()
out, in := bufferPool.p.CountItems()
out, in := bufferPool.CountItems()
t.Log(out, in)
if out != 0 {
t.Fail()

View File

@@ -36,7 +36,7 @@ func (b UserBytes[USRDAT]) B(f func([]byte, *USRDAT)) {
// NewBytes alloc sz bytes.
func (bufferPool BufferPool[USRDAT]) NewBytes(sz int) (b UserBytes[USRDAT]) {
buf := bufferPool.p.New(sz)
buf := bufferPool.New(sz)
b.buf = buf
buf.P(func(buf *UserBuffer[USRDAT]) {
b.b = buf.Len()
@@ -46,7 +46,7 @@ func (bufferPool BufferPool[USRDAT]) NewBytes(sz int) (b UserBytes[USRDAT]) {
// InvolveBytes involve outside buf into pool.
func (bufferPool BufferPool[USRDAT]) InvolveBytes(p ...byte) (b UserBytes[USRDAT]) {
buf := bufferPool.p.Involve(len(p), bytes.NewBuffer(p))
buf := bufferPool.Involve(len(p), bytes.NewBuffer(p))
b.buf = buf
buf.P(func(buf *UserBuffer[USRDAT]) {
b.b = buf.Len()
@@ -57,7 +57,7 @@ func (bufferPool BufferPool[USRDAT]) InvolveBytes(p ...byte) (b UserBytes[USRDAT
// ParseBytes convert outside bytes to Bytes safely
// without adding it into pool.
func (bufferPool BufferPool[USRDAT]) ParseBytes(p ...byte) (b UserBytes[USRDAT]) {
buf := bufferPool.p.Parse(len(p), bytes.NewBuffer(p))
buf := bufferPool.Parse(len(p), bytes.NewBuffer(p))
b.buf = buf
buf.P(func(buf *UserBuffer[USRDAT]) {
b.b = buf.Len()

View File

@@ -37,7 +37,7 @@ func TestBytesSlice(t *testing.T) {
runtime.GC()
runtime.Gosched()
runtime.GC()
out, in := bufferPool.p.CountItems()
out, in := bufferPool.CountItems()
t.Log(out, in)
if out != 0 {
t.Fail()
@@ -60,7 +60,7 @@ func TestBytesInvolve(t *testing.T) {
}
}
runtime.GC()
out, in := bufferPool.p.CountItems()
out, in := bufferPool.CountItems()
t.Log(out, in)
if out != 0 {
t.Fail()
@@ -80,7 +80,7 @@ func TestBytesParse(t *testing.T) {
}
}
runtime.GC()
out, in := bufferPool.p.CountItems()
out, in := bufferPool.CountItems()
t.Log(out, in)
if out != 0 {
t.Fail()
@@ -107,7 +107,7 @@ func TestBytesCopy(t *testing.T) {
runtime.GC()
runtime.Gosched()
runtime.GC()
out, in := bufferPool.p.CountItems()
out, in := bufferPool.CountItems()
t.Log(out, in)
if out != 0 {
t.Fail()

View File

@@ -16,12 +16,12 @@ type (
)
type BufferPool[USRDAT any] struct {
p *orbyte.Pool[UserBuffer[USRDAT]]
*orbyte.Pool[UserBuffer[USRDAT]]
}
func NewBufferPool[USRDAT any]() BufferPool[USRDAT] {
return BufferPool[USRDAT]{
p: orbyte.NewPool[UserBuffer[USRDAT]](bufpooler[USRDAT]{}),
orbyte.NewPool[UserBuffer[USRDAT]](bufpooler[USRDAT]{}),
}
}
@@ -60,3 +60,23 @@ func ParseBytes(b ...byte) Bytes {
func CountItems() (outside int32, inside int32) {
return bufferPool.CountItems()
}
// SetNoPutBack see Pool.SetNoPutBack
func SetNoPutBack(on bool) {
bufferPool.SetNoPutBack(on)
}
// SetSyncItem see Pool.SetSyncItem
func SetSyncItem(on bool) {
bufferPool.SetSyncItem(on)
}
// LimitInput see Pool.LimitInput
func LimitInput(n int32) {
bufferPool.LimitInput(n)
}
// LimitInput see Pool.LimitOutput
func LimitOutput(n int32) {
bufferPool.LimitOutput(n)
}

View File

@@ -21,6 +21,7 @@ type Pool[T any] struct {
pooler Pooler[T]
noputbak bool
issync bool
}
// NewPool make a new pool from custom pooler.
@@ -43,6 +44,13 @@ func (pool *Pool[T]) SetNoPutBack(on bool) {
pool.noputbak = on
}
// SetSyncItem make it panic on every read-write conflict.
//
// Enable this to detect coding errors.
func (pool *Pool[T]) SetSyncItem(on bool) {
pool.issync = on
}
// LimitOutput will automatically set new item no-autodestroy
// if countout > outlim.
func (pool *Pool[T]) LimitOutput(n int32) {

View File

@@ -5,6 +5,7 @@ import "sync/atomic"
const (
statusisbuffered = 1 << iota
statusdestroyed
statusinsyncop
)
type status uintptr
@@ -38,6 +39,24 @@ func (c *status) setbool(v bool, typ uintptr) {
}
}
// setboolunique panic on non-unique set
func (c *status) setboolunique(v bool, typ uintptr) {
olds := atomic.LoadUintptr((*uintptr)(c))
oldv := olds&typ != 0
if oldv == v {
panic("non-unique operation")
}
news := status(olds).mask(v, typ)
for !atomic.CompareAndSwapUintptr((*uintptr)(c), olds, uintptr(news)) {
olds = atomic.LoadUintptr((*uintptr)(c))
oldv = olds&typ != 0
if oldv == v {
panic("non-unique operation")
}
news = status(olds).mask(v, typ)
}
}
func (c *status) loadbool(typ uintptr) bool {
return atomic.LoadUintptr((*uintptr)(c))&typ != 0
}
@@ -57,3 +76,7 @@ func (c *status) hasdestroyed() bool {
func (c *status) setdestroyed(v bool) {
c.setbool(v, statusdestroyed)
}
func (c *status) setinsyncop(v bool) {
c.setboolunique(v, statusinsyncop)
}