1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-04 23:40:26 +08:00

optimize(link): drop static lstn buf

This commit is contained in:
源文雨
2025-02-26 00:22:17 +09:00
parent 689dfbc174
commit 60209117b7

View File

@@ -5,7 +5,6 @@ import (
"net"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"
@@ -31,40 +30,10 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
m.ep = conn.LocalAddr()
logrus.Infoln("[listen] at", m.ep)
go func() {
n := uint(runtime.NumCPU())
if n > 64 {
n = 64 // 只用最多 64 核
}
logrus.Infoln("[listen] use cpu num:", n)
listenbuf := make([]byte, lstnbufgragsz*n)
hasntfinished := make([]sync.Mutex, n)
for {
usenewbuf := false
i := uint(0)
for !hasntfinished[i].TryLock() {
i++
i %= n
if i == 0 { // looked up a full round, make a new buf
usenewbuf = true
if config.ShowDebugLog {
logrus.Debugln("[listen] use new buf")
}
break
}
}
if config.ShowDebugLog && !usenewbuf {
logrus.Debugln("[listen] lock index", i)
}
var lbf pbuf.Bytes
if usenewbuf {
lbf = pbuf.NewBytes(lstnbufgragsz)
} else {
if config.ShowDebugLog {
logrus.Debugln("[listen] take index", i, "slice", i*lstnbufgragsz, (i+1)*lstnbufgragsz, "cap", lstnbufgragsz)
}
lbf = pbuf.ParseBytes(listenbuf[i*lstnbufgragsz : (i+1)*lstnbufgragsz : (i+1)*lstnbufgragsz]...)
}
lbf := pbuf.NewBytes(lstnbufgragsz)
n, addr, err := conn.ReadFromPeer(lbf.Bytes())
lbf.KeepAlive()
if m.connections == nil || errors.Is(err, net.ErrClosed) {
logrus.Warnln("[listen] quit listening")
return
@@ -76,13 +45,6 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
logrus.Errorln("[listen] reconnect udp err:", err)
return
}
if !usenewbuf {
if config.ShowDebugLog {
logrus.Debugln("[listen] unlock index", i)
}
hasntfinished[i].Unlock()
i--
}
continue
}
if n <= 0 {
@@ -91,17 +53,13 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
}
continue
}
index := -1
if !usenewbuf {
index = int(i)
}
go m.waitordispatch(index, addr, lbf.Trans().SliceTo(n), hasntfinished)
go m.waitordispatch(addr, lbf, n)
}
}()
return
}
func (m *Me) waitordispatch(index int, addr p2p.EndPoint, buf pbuf.Bytes, hasntfinished []sync.Mutex) {
func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) {
recvtotlcnt := atomic.AddUint64(&m.recvtotlcnt, uint64(buf.Len()))
recvloopcnt := atomic.AddUintptr(&m.recvloopcnt, 1)
recvlooptime := atomic.LoadInt64(&m.recvlooptime)
@@ -111,56 +69,41 @@ func (m *Me) waitordispatch(index int, addr p2p.EndPoint, buf pbuf.Bytes, hasntf
atomic.StoreUint64(&m.recvtotlcnt, 0)
atomic.StoreInt64(&m.recvlooptime, now)
}
packet := m.wait(buf.Trans().Bytes())
packet := m.wait(buf.SliceTo(n).Bytes())
buf.KeepAlive()
if packet == nil {
if index < 0 {
if config.ShowDebugLog {
logrus.Debugln("[listen] queue waiting")
}
return
}
if config.ShowDebugLog {
logrus.Debugln("[listen] queue waiting, unlock index", index)
logrus.Debugln("[listen] queue waiting")
}
hasntfinished[index].Unlock()
return
}
if config.ShowDebugLog {
logrus.Debugln("[listen] index", index, "dispatch", len(packet.Pointer().UnsafeBody()), "bytes packet")
logrus.Debugln("[listen] dispatch", len(packet.Pointer().UnsafeBody()), "bytes packet")
}
if index >= 0 {
defer hasntfinished[index].Unlock()
m.dispatch(packet, addr, index)
return
}
m.dispatch(packet, addr, index)
m.dispatch(packet, addr)
}
func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint, index int) {
if config.ShowDebugLog {
defer logrus.Debugln("[listen] dispatched, unlock index", index)
logrus.Debugln("[listen] start dispatching index", index)
}
func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint) {
pp := packet.Pointer
r := pp().Len() - pp().BodyLen()
if r > 0 {
logrus.Warnln("[listen] @", index, "packet from endpoint", addr, "len", pp().BodyLen(), "is smaller than it declared len", pp().Len(), ", drop it")
logrus.Warnln("[listen] packet from endpoint", addr, "len", pp().BodyLen(), "is smaller than it declared len", pp().Len(), ", drop it")
return
}
p, ok := m.IsInPeer(pp().Src.String())
if config.ShowDebugLog {
logrus.Debugln("[listen] @", index, "recv from endpoint", addr, "src", pp().Src, "dst", pp().Dst)
logrus.Debugln("[listen] recv from endpoint", addr, "src", pp().Src, "dst", pp().Dst)
}
if !ok {
logrus.Warnln("[listen] @", index, "packet from", pp().Src, "to", pp().Dst, "is refused")
logrus.Warnln("[listen] packet from", pp().Src, "to", pp().Dst, "is refused")
return
}
if helper.IsNilInterface(p.endpoint) || !p.endpoint.Euqal(addr) {
if m.ep.Network() == "tcp" && !addr.Euqal(p.endpoint) {
logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String())
logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String())
p.endpoint = addr
} else { // others are all no status link
logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String())
logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String())
p.endpoint = addr
}
}
@@ -169,7 +112,7 @@ func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint, index
switch {
case p.IsToMe(pp().Dst):
if !p.Accept(pp().Src) {
logrus.Warnln("[listen] @", index, "refused packet from", pp().Src.String()+":"+strconv.Itoa(int(pp().SrcPort)))
logrus.Warnln("[listen] refused packet from", pp().Src.String()+":"+strconv.Itoa(int(pp().SrcPort)))
return
}
addt := pp().AdditionalData()
@@ -177,7 +120,7 @@ func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint, index
data, err := p.decode(pp().CipherIndex(), addt, pp().TransBody().Bytes())
if err != nil {
if config.ShowDebugLog {
logrus.Debugln("[listen] @", index, "drop invalid packet key idx:", pp().CipherIndex(), "addt:", addt, "err:", err)
logrus.Debugln("[listen] drop invalid packet key idx:", pp().CipherIndex(), "addt:", addt, "err:", err)
}
return
}
@@ -185,19 +128,19 @@ func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint, index
dat, err := decodezstd(data.Trans().Bytes())
if err != nil {
if config.ShowDebugLog {
logrus.Debugln("[listen] @", index, "drop invalid zstd packet:", err)
logrus.Debugln("[listen] drop invalid zstd packet:", err)
}
return
}
if config.ShowDebugLog {
logrus.Debugln("[listen] @", index, "zstd decoded len:", dat.Len())
logrus.Debugln("[listen] zstd decoded len:", dat.Len())
}
data = dat
}
pp().SetBody(data)
if !pp().IsVaildHash() {
if config.ShowDebugLog {
logrus.Debugln("[listen] @", index, "drop invalid hash packet")
logrus.Debugln("[listen] drop invalid hash packet")
}
return
}
@@ -205,63 +148,63 @@ func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint, index
case head.ProtoHello:
switch {
case len(pp().UnsafeBody()) == 0:
logrus.Warnln("[listen] @", index, "recv old hello packet, do nothing")
logrus.Warnln("[listen] recv old hello packet, do nothing")
case pp().UnsafeBody()[0] == byte(head.HelloPing):
n, err := p.WritePacket(head.NewPacketPartial(
head.ProtoHello, m.SrcPort(), p.peerip, m.DstPort(), pbuf.ParseBytes(byte(head.HelloPong))), false)
if err == nil {
logrus.Infoln("[listen] @", index, "recv hello, send", n, "bytes hello ack packet")
logrus.Infoln("[listen] recv hello, send", n, "bytes hello ack packet")
} else {
logrus.Errorln("[listen] @", index, "send hello ack packet error:", err)
logrus.Errorln("[listen] send hello ack packet error:", err)
}
default:
logrus.Infoln("[listen] @", index, "recv hello ack packet, do nothing")
logrus.Infoln("[listen] recv hello ack packet, do nothing")
}
case head.ProtoNotify:
logrus.Infoln("[listen] @", index, "recv notify from", pp().Src)
logrus.Infoln("[listen] recv notify from", pp().Src)
p.onNotify(pp().UnsafeBody())
runtime.KeepAlive(packet)
case head.ProtoQuery:
logrus.Infoln("[listen] @", index, "recv query from", pp().Src)
logrus.Infoln("[listen] recv query from", pp().Src)
p.onQuery(pp().UnsafeBody())
runtime.KeepAlive(packet)
case head.ProtoData:
if p.pipe != nil {
p.pipe <- packet.Copy()
if config.ShowDebugLog {
logrus.Debugln("[listen] @", index, "deliver to pipe of", p.peerip)
logrus.Debugln("[listen] deliver to pipe of", p.peerip)
}
} else {
_, err := m.nic.Write(pp().UnsafeBody())
if err != nil {
logrus.Errorln("[listen] @", index, "deliver", pp().BodyLen(), "bytes data to nic err:", err)
logrus.Errorln("[listen] deliver", pp().BodyLen(), "bytes data to nic err:", err)
} else if config.ShowDebugLog {
logrus.Debugln("[listen] @", index, "deliver", pp().BodyLen(), "bytes data to nic")
logrus.Debugln("[listen] deliver", pp().BodyLen(), "bytes data to nic")
}
}
default:
logrus.Warnln("[listen] @", index, "recv unknown proto:", pp().Proto)
logrus.Warnln("[listen] recv unknown proto:", pp().Proto)
}
case p.Accept(pp().Dst):
if !p.allowtrans {
logrus.Warnln("[listen] @", index, "refused to trans packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)))
logrus.Warnln("[listen] refused to trans packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)))
return
}
// 转发
lnk := m.router.NextHop(pp().Dst.String())
if lnk == nil {
logrus.Warnln("[listen] @", index, "transfer drop packet: nil nexthop")
logrus.Warnln("[listen] transfer drop packet: nil nexthop")
return
}
n, err := lnk.WritePacket(packet, true)
if err == nil {
if config.ShowDebugLog {
logrus.Debugln("[listen] @", index, "trans", n, "bytes packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)))
logrus.Debugln("[listen] trans", n, "bytes packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)))
}
} else {
logrus.Errorln("[listen] @", index, "trans packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)), "err:", err)
logrus.Errorln("[listen] trans packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)), "err:", err)
}
default:
logrus.Warnln("[listen] @", index, "packet dst", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)), "is not in peers")
logrus.Warnln("[listen] packet dst", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)), "is not in peers")
}
}