1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-24 21:00:49 +08:00

optimize: add thread pool

This commit is contained in:
源文雨
2025-04-14 20:56:06 +09:00
parent 8b28dbcd3c
commit 8e79f419b8
6 changed files with 69 additions and 6 deletions

View File

@@ -1,12 +1,32 @@
package head
import (
"time"
"github.com/fumiama/WireGold/config"
"github.com/fumiama/WireGold/internal/file"
"github.com/fumiama/orbyte/pbuf"
"github.com/sirupsen/logrus"
)
var packetPool = pbuf.NewBufferPool[Packet]()
func init() {
if config.ShowDebugLog {
go status()
}
}
// selectPacket 从池中取出一个 Packet
func selectPacket(buf ...byte) *PacketItem {
return packetPool.NewBuffer(buf)
}
func status() {
for range time.NewTicker(time.Minute).C {
out, in := packetPool.CountItems()
logrus.Infoln(file.Header(), "packet outside:", out, "inside:", in)
out, in = pbuf.CountItems()
logrus.Infoln(file.Header(), "default outside:", out, "inside:", in)
}
}

View File

@@ -20,6 +20,26 @@ import (
const lstnbufgragsz = 65536
type job struct {
addr p2p.EndPoint
buf pbuf.Bytes
n int
fil *uintptr
}
func (m *Me) runworkers() {
ncpu := runtime.NumCPU()
m.jobs = make([]chan job, ncpu)
for i := 0; i < ncpu; i++ {
m.jobs[i] = make(chan job, 4096)
go func(jobs <-chan job) {
for jb := range jobs {
m.waitordispatch(jb.addr, jb.buf, jb.n, jb.fil)
}
}(m.jobs[i])
}
}
// 监听本机 endpoint
func (m *Me) listen() (conn p2p.Conn, err error) {
conn, err = m.ep.Listen()
@@ -31,6 +51,7 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
ncpu := runtime.NumCPU()
bufs := make([]byte, lstnbufgragsz*ncpu)
fils := make([]uintptr, ncpu)
go m.runworkers()
go func() {
var (
n int
@@ -80,7 +101,17 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
}
continue
}
go m.waitordispatch(addr, lbf, n, fil)
if idx < 0 {
if config.ShowDebugLog {
logrus.Infoln("[listen] go dispatch")
}
go m.waitordispatch(addr, lbf, n, fil)
} else {
if config.ShowDebugLog {
logrus.Infoln("[listen] send dispatch to cpu", idx)
}
m.jobs[idx] <- job{addr: addr, buf: lbf, n: n, fil: fil}
}
}
}()
return

View File

@@ -63,6 +63,8 @@ type Me struct {
connections map[string]*Link
// 本机监听的连接端点, 也用于向对端直接发送报文
conn p2p.Conn
// 供内部使用的包分发任务列
jobs []chan job
// 是否进行 base16384 编码
base14 bool
// 本机初始 ttl
@@ -152,6 +154,9 @@ func NewMe(cfg *MyConfig) (m Me) {
// Restart 重新连接
func (m *Me) Restart() error {
for i := 0; i < len(m.jobs); i++ {
close(m.jobs[i])
}
oldconn := m.conn
m.conn = nil
if bin.IsNonNilInterface(oldconn) {
@@ -203,6 +208,9 @@ func (m *Me) NetworkConfigs() []any {
}
func (m *Me) Close() error {
for i := 0; i < len(m.jobs); i++ {
close(m.jobs[i])
}
m.connections = nil
if bin.IsNonNilInterface(m.conn) {
_ = m.conn.Close()

View File

@@ -87,6 +87,7 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) {
if config.ShowDebugLog {
logrus.Debugln("[recv] ignore duplicated seq^crc packet, seq", strconv.FormatUint(uint64(seq), 16), "crc", strconv.FormatUint(crc, 16))
}
header.ManualDestroy()
return
}
if config.ShowDebugLog {
@@ -150,8 +151,11 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) {
}
h, got := m.recving.GetOrSet(uint16(seq), header)
if got && h == header {
panic("unexpected multi-put found")
if got {
if h == header {
panic("unexpected multi-put found")
}
header.ManualDestroy()
}
if config.ShowDebugLog {
logrus.Debugln("[recv]", strconv.FormatUint(uint64(seq&0xffff), 16), "get frag part isnew:", !got)