From 8e79f419b85d551aab07a050b5f6d32521185178 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Mon, 14 Apr 2025 20:56:06 +0900 Subject: [PATCH] optimize: add thread pool --- go.mod | 2 +- go.sum | 4 ++-- gold/head/pool.go | 20 ++++++++++++++++++++ gold/link/listen.go | 33 ++++++++++++++++++++++++++++++++- gold/link/me.go | 8 ++++++++ gold/link/recv.go | 8 ++++++-- 6 files changed, 69 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 1bce183..54d9342 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/fumiama/blake2b-simd v0.0.0-20220412110131-4481822068bb github.com/fumiama/go-base16384 v1.7.0 github.com/fumiama/go-x25519 v1.0.0 - github.com/fumiama/orbyte v0.0.0-20250403152421-da8338096f2d + github.com/fumiama/orbyte v0.0.0-20250404130147-ac9d26dfb903 github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac github.com/klauspost/compress v1.17.9 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 2d0c2f2..688d1ab 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,8 @@ github.com/fumiama/go-base16384 v1.7.0 h1:6fep7XPQWxRlh4Hu+KsdH+6+YdUp+w6CwRXtMW github.com/fumiama/go-base16384 v1.7.0/go.mod h1:OEn+947GV5gsbTAnyuUW/SrfxJYUdYupSIQXOuGOcXM= github.com/fumiama/go-x25519 v1.0.0 h1:hiGg9EhseVmGCc8T1jECVkj8Keu/aJ1ZK05RM8Vuavo= github.com/fumiama/go-x25519 v1.0.0/go.mod h1:8VOhfyGZzw4IUs4nCjQFqW9cA3V/QpSCtP3fo2dLNg4= -github.com/fumiama/orbyte v0.0.0-20250403152421-da8338096f2d h1:paZ9NH9v7pwULgypKZytzuYNtODDGQGMvXcD6mzpXEs= -github.com/fumiama/orbyte v0.0.0-20250403152421-da8338096f2d/go.mod h1:qkUllQ1+gTx5sGrmKvIsqUgsnOO21Hiq847YHJRifbk= +github.com/fumiama/orbyte v0.0.0-20250404130147-ac9d26dfb903 h1:7suHE2A8JCLP/DA2j3mL7ytEhFSUKwz8KIsqQlAHAYw= +github.com/fumiama/orbyte v0.0.0-20250404130147-ac9d26dfb903/go.mod h1:qkUllQ1+gTx5sGrmKvIsqUgsnOO21Hiq847YHJRifbk= github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac h1:A/5A0rODsg+EQHH61Ew5mMUtDpRXaSNqHhPvW+fN4C4= github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac/go.mod h1:BBnNY9PwK+UUn4trAU+H0qsMEypm7+3Bj1bVFuJItlo= github.com/fumiama/wintun v0.0.0-20211229152851-8bc97c8034c0 h1:WfrSFlIlCAtg6Rt2IGna0HhJYSDE45YVHiYqO4wwsEw= diff --git a/gold/head/pool.go b/gold/head/pool.go index a60d022..b0880c7 100644 --- a/gold/head/pool.go +++ b/gold/head/pool.go @@ -1,12 +1,32 @@ package head import ( + "time" + + "github.com/fumiama/WireGold/config" + "github.com/fumiama/WireGold/internal/file" "github.com/fumiama/orbyte/pbuf" + "github.com/sirupsen/logrus" ) var packetPool = pbuf.NewBufferPool[Packet]() +func init() { + if config.ShowDebugLog { + go status() + } +} + // selectPacket 从池中取出一个 Packet func selectPacket(buf ...byte) *PacketItem { return packetPool.NewBuffer(buf) } + +func status() { + for range time.NewTicker(time.Minute).C { + out, in := packetPool.CountItems() + logrus.Infoln(file.Header(), "packet outside:", out, "inside:", in) + out, in = pbuf.CountItems() + logrus.Infoln(file.Header(), "default outside:", out, "inside:", in) + } +} diff --git a/gold/link/listen.go b/gold/link/listen.go index fe57b4c..a0e19e7 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -20,6 +20,26 @@ import ( const lstnbufgragsz = 65536 +type job struct { + addr p2p.EndPoint + buf pbuf.Bytes + n int + fil *uintptr +} + +func (m *Me) runworkers() { + ncpu := runtime.NumCPU() + m.jobs = make([]chan job, ncpu) + for i := 0; i < ncpu; i++ { + m.jobs[i] = make(chan job, 4096) + go func(jobs <-chan job) { + for jb := range jobs { + m.waitordispatch(jb.addr, jb.buf, jb.n, jb.fil) + } + }(m.jobs[i]) + } +} + // 监听本机 endpoint func (m *Me) listen() (conn p2p.Conn, err error) { conn, err = m.ep.Listen() @@ -31,6 +51,7 @@ func (m *Me) listen() (conn p2p.Conn, err error) { ncpu := runtime.NumCPU() bufs := make([]byte, lstnbufgragsz*ncpu) fils := make([]uintptr, ncpu) + go m.runworkers() go func() { var ( n int @@ -80,7 +101,17 @@ func (m *Me) listen() (conn p2p.Conn, err error) { } continue } - go m.waitordispatch(addr, lbf, n, fil) + if idx < 0 { + if config.ShowDebugLog { + logrus.Infoln("[listen] go dispatch") + } + go m.waitordispatch(addr, lbf, n, fil) + } else { + if config.ShowDebugLog { + logrus.Infoln("[listen] send dispatch to cpu", idx) + } + m.jobs[idx] <- job{addr: addr, buf: lbf, n: n, fil: fil} + } } }() return diff --git a/gold/link/me.go b/gold/link/me.go index 4571a7b..2652d79 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -63,6 +63,8 @@ type Me struct { connections map[string]*Link // 本机监听的连接端点, 也用于向对端直接发送报文 conn p2p.Conn + // 供内部使用的包分发任务列 + jobs []chan job // 是否进行 base16384 编码 base14 bool // 本机初始 ttl @@ -152,6 +154,9 @@ func NewMe(cfg *MyConfig) (m Me) { // Restart 重新连接 func (m *Me) Restart() error { + for i := 0; i < len(m.jobs); i++ { + close(m.jobs[i]) + } oldconn := m.conn m.conn = nil if bin.IsNonNilInterface(oldconn) { @@ -203,6 +208,9 @@ func (m *Me) NetworkConfigs() []any { } func (m *Me) Close() error { + for i := 0; i < len(m.jobs); i++ { + close(m.jobs[i]) + } m.connections = nil if bin.IsNonNilInterface(m.conn) { _ = m.conn.Close() diff --git a/gold/link/recv.go b/gold/link/recv.go index 835bb8c..93f67ab 100644 --- a/gold/link/recv.go +++ b/gold/link/recv.go @@ -87,6 +87,7 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) { if config.ShowDebugLog { logrus.Debugln("[recv] ignore duplicated seq^crc packet, seq", strconv.FormatUint(uint64(seq), 16), "crc", strconv.FormatUint(crc, 16)) } + header.ManualDestroy() return } if config.ShowDebugLog { @@ -150,8 +151,11 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) { } h, got := m.recving.GetOrSet(uint16(seq), header) - if got && h == header { - panic("unexpected multi-put found") + if got { + if h == header { + panic("unexpected multi-put found") + } + header.ManualDestroy() } if config.ShowDebugLog { logrus.Debugln("[recv]", strconv.FormatUint(uint64(seq&0xffff), 16), "get frag part isnew:", !got)