1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-12 04:43:22 +08:00

optimize: memory consumption

This commit is contained in:
源文雨
2025-05-13 00:59:05 +09:00
parent d65fc4dd71
commit df8f6affa3
19 changed files with 204 additions and 242 deletions

View File

@@ -59,13 +59,10 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) {
buf.Write((*[PacketHeadNoCRCLen]byte)(
(unsafe.Pointer)(p),
)[:])
b := pbuf.NewBytes(buf.Len())
b.V(func(b []byte) {
copy(b, buf.Bytes())
ClearTTL(b)
p.md5h8 = algo.MD5Hash8(b)
})
b.ManualDestroy()
b := make([]byte, buf.Len())
copy(b, buf.Bytes())
ClearTTL(b)
p.md5h8 = algo.MD5Hash8(b)
_ = binary.Write(buf, binary.LittleEndian, p.md5h8)
return
}
@@ -79,13 +76,10 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) {
w.Write(p.src[:])
w.Write(p.dst[:])
w.P(func(buf *pbuf.Buffer) {
b := pbuf.NewBytes(buf.Len())
b.V(func(b []byte) {
copy(b, buf.Bytes())
ClearTTL(b)
p.md5h8 = algo.MD5Hash8(b)
})
b.ManualDestroy()
b := make([]byte, buf.Len())
copy(b, buf.Bytes())
ClearTTL(b)
p.md5h8 = algo.MD5Hash8(b)
})
w.WriteUInt64(p.md5h8)
w.P(func(b *pbuf.Buffer) {

View File

@@ -87,11 +87,10 @@ func (pb *DataBuilder) Zstd() *DataBuilder {
return pb.p(func(ub *PacketBuf) {
data := algo.EncodeZstd(ub.Bytes())
ub.Reset()
data.V(func(b []byte) { ub.Write(b) })
ub.Write(data)
if config.ShowDebugLog {
logrus.Debugln(file.Header(), strconv.FormatUint(ub.DAT.md5h8, 16), "data after zstd", file.ToLimitHexString(ub.Bytes(), 64))
}
data.ManualDestroy()
})
}
@@ -125,8 +124,7 @@ func (pb *DataBuilder) Seal(aead cipher.AEAD, teatyp uint8, additional uint16) *
w.P(func(b *pbuf.Buffer) {
data := algo.EncodeAEAD(aead, additional, b.Bytes())
ub.Reset()
data.V(func(b []byte) { ub.Write(b) })
data.ManualDestroy()
ub.Write(data)
})
w.Destroy()
}))
@@ -204,7 +202,7 @@ func (pb *PacketBuilder) Split(mtu int, nofrag bool) (pbs []PacketBytes) {
pbs = []PacketBytes{
pbuf.BufferItemToBytes((*PacketItem)(
pb.copy().noFrag(nofrag).hasMore(false).offset(0),
)),
)).Ignore(),
}
return
}
@@ -226,7 +224,7 @@ func (pb *PacketBuilder) Split(mtu int, nofrag bool) (pbs []PacketBytes) {
}
pbs[i] = pbuf.BufferItemToBytes((*PacketItem)(
pb.copy().offset(uint16(i*datalim)),
)).Slice(a, b)
)).Ignore().Slice(a, b)
}
})
return

View File

@@ -12,6 +12,10 @@ import (
var packetPool = pbuf.NewBufferPool[Packet]()
func init() {
packetPool.LimitInput(256)
packetPool.LimitOutput(256)
pbuf.LimitInput(256)
pbuf.LimitOutput(256)
if config.ShowDebugLog {
go status()
}
@@ -23,7 +27,7 @@ func selectPacket(buf ...byte) *PacketItem {
}
func status() {
for range time.NewTicker(time.Minute).C {
for range time.NewTicker(time.Second).C {
out, in := packetPool.CountItems()
logrus.Infoln(file.Header(), "packet outside:", out, "inside:", in)
out, in = pbuf.CountItems()

View File

@@ -55,13 +55,10 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) {
return
}
var crc uint64
b := pbuf.NewBytes(int(PacketHeadNoCRCLen))
b.V(func(b []byte) {
copy(b, data[:PacketHeadNoCRCLen])
ClearTTL(b)
crc = algo.MD5Hash8(b)
})
b.ManualDestroy()
var b [PacketHeadNoCRCLen]byte
copy(b[:], data[:PacketHeadNoCRCLen])
ClearTTL(b[:])
crc = algo.MD5Hash8(b[:])
if crc != pb.DAT.md5h8 {
err = ErrBadCRCChecksum
if config.ShowDebugLog {
@@ -85,7 +82,7 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) {
p.ManualDestroy()
return
}
pbytes = pbuf.BufferItemToBytes(p)
pbytes = pbuf.BufferItemToBytes(p).Ignore()
return
}

View File

@@ -3,7 +3,6 @@ package link
import (
"encoding/hex"
"github.com/fumiama/orbyte/pbuf"
"github.com/sirupsen/logrus"
"github.com/fumiama/WireGold/config"
@@ -19,7 +18,7 @@ func (l *Link) randkeyidx() uint8 {
}
// decode by aead and put b into pool
func (l *Link) decode(teatype uint8, additional uint16, b []byte) (db pbuf.Bytes, err error) {
func (l *Link) decode(teatype uint8, additional uint16, b []byte) (db []byte, err error) {
if len(b) == 0 || teatype >= 32 {
return
}
@@ -33,7 +32,9 @@ func (l *Link) decode(teatype uint8, additional uint16, b []byte) (db pbuf.Bytes
}
logrus.Debugln(file.Header(), "copy plain text", hex.EncodeToString(b[:n]), endl)
}
return pbuf.ParseBytes(b...).Copy(), nil
db = make([]byte, len(b))
copy(db, b)
return
}
aead := l.keys[teatype]
if aead == nil {
@@ -43,7 +44,7 @@ func (l *Link) decode(teatype uint8, additional uint16, b []byte) (db pbuf.Bytes
}
// xorenc 按 8 字节, 以初始 m.mask 循环异或编码 data
func (m *Me) xorenc(data []byte, seq uint32) pbuf.Bytes {
func (m *Me) xorenc(data []byte, seq uint32) []byte {
return algo.EncodeXOR(data, m.mask, seq)
}

View File

@@ -3,13 +3,12 @@ package link
import (
"github.com/RomiChan/syncx"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/orbyte/pbuf"
)
// 事件分发器
var dispachers syncx.Map[uint8, Dispacher]
type Dispacher func(header *head.Packet, peer *Link, data pbuf.Bytes)
type Dispacher func(header *head.Packet, peer *Link, data []byte)
// RegisterDispacher of proto
func RegisterDispacher(p uint8, d Dispacher) (actual Dispacher, hasexist bool) {

View File

@@ -12,7 +12,6 @@ import (
"github.com/fumiama/WireGold/gold/p2p"
"github.com/fumiama/WireGold/internal/bin"
base14 "github.com/fumiama/go-base16384"
"github.com/fumiama/orbyte/pbuf"
"github.com/sirupsen/logrus"
)
@@ -70,25 +69,24 @@ func (m *Me) Connect(peer string) (*Link, error) {
return nil, ErrPerrNotExist
}
func (l *Link) ToLower(header *head.Packet, data pbuf.Bytes) {
func (l *Link) ToLower(header *head.Packet, data []byte) {
if l.pipe != nil {
d := make([]byte, len(data))
copy(d, data)
l.pipe <- LinkData{
H: *header,
D: data.Copy().Trans(),
D: d,
}
if config.ShowDebugLog {
logrus.Debugln("[listen] deliver to pipe of", l.peerip)
}
return
}
var err error
data.V(func(b []byte) {
_, err = l.me.nic.Write(b)
})
_, err := l.me.nic.Write(data)
if err != nil {
logrus.Errorln("[listen] deliver", data.Len(), "bytes data to nic err:", err)
logrus.Errorln("[listen] deliver", len(data), "bytes data to nic err:", err)
} else if config.ShowDebugLog {
logrus.Debugln("[listen] deliver", data.Len(), "bytes data to nic")
logrus.Debugln("[listen] deliver", len(data), "bytes data to nic")
}
}

View File

@@ -32,11 +32,17 @@ func (m *Me) runworkers() {
m.jobs = make([]chan job, ncpu)
for i := 0; i < ncpu; i++ {
m.jobs[i] = make(chan job, 4096)
go func(jobs <-chan job) {
go func(i int, jobs <-chan job) {
for jb := range jobs {
if config.ShowDebugLog {
logrus.Debugln("[listen] job thread", i, "call waitordispatch")
}
m.waitordispatch(jb.addr, jb.buf, jb.n, jb.fil)
if config.ShowDebugLog {
logrus.Debugln("[listen] job thread", i, "fin waitordispatch")
}
}
}(m.jobs[i])
}(i, m.jobs[i])
}
}
@@ -73,9 +79,9 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
fil *uintptr
)
if idx < 0 {
lbf = pbuf.NewBytes(lstnbufgragsz)
lbf = pbuf.NewLargeBytes(lstnbufgragsz)
} else {
lbf = pbuf.ParseBytes(bufs[idx*lstnbufgragsz : (idx+1)*lstnbufgragsz]...)
lbf = pbuf.ParseBytes(bufs[idx*lstnbufgragsz : (idx+1)*lstnbufgragsz : (idx+1)*lstnbufgragsz]...).Ignore()
fil = &fils[idx]
}
@@ -194,29 +200,23 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) {
}
return
}
if data.Len() < 8 {
if len(data) < 8 {
if config.ShowDebugLog {
logrus.Debugln("[listen] drop invalid data len packet key idx:", header.CipherIndex(), "addt:", addt, "len", data.Len())
logrus.Debugln("[listen] drop invalid data len packet key idx:", header.CipherIndex(), "addt:", addt, "len", len(data))
}
return
}
ok := false
data.V(func(b []byte) {
ok = algo.IsVaildBlake2bHash8(header.PreCRC64(), b)
})
ok = algo.IsVaildBlake2bHash8(header.PreCRC64(), data)
if !ok {
if config.ShowDebugLog {
logrus.Debugln("[listen] drop invalid hash packet")
}
return
}
data = data.SliceFrom(8)
data = data[8:]
if p.usezstd {
data.V(func(b []byte) {
old := data
data, err = algo.DecodeZstd(b) // skip hash
old.ManualDestroy()
})
data, err = algo.DecodeZstd(data) // skip hash
if err != nil {
if config.ShowDebugLog {
logrus.Debugln("[listen] drop invalid zstd packet:", err)
@@ -224,7 +224,7 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) {
return
}
if config.ShowDebugLog {
logrus.Debugln("[listen] zstd decoded len:", data.Len())
logrus.Debugln("[listen] zstd decoded len:", len(data))
}
}
fn, ok := GetDispacher(header.Proto.Proto())
@@ -233,5 +233,4 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) {
return
}
fn(header, p, data)
data.ManualDestroy()
}

View File

@@ -10,7 +10,6 @@ import (
"time"
"github.com/FloatTech/ttl"
"github.com/fumiama/orbyte/pbuf"
"github.com/fumiama/water/waterutil"
"github.com/sirupsen/logrus"
@@ -316,15 +315,6 @@ func (m *Me) sendAllSameDst(packet []byte) (n int) {
logrus.Warnln("[me] drop packet to", dst.String()+":"+strconv.Itoa(int(m.DstPort())), ": nil nexthop")
return
}
pcp := pbuf.NewBytes(len(packet))
pcp.V(func(b []byte) {
copy(b, packet)
})
go func() {
pcp.V(func(b []byte) {
lnk.WritePacket(head.ProtoData, b, lnk.me.ttl)
})
pcp.ManualDestroy()
}()
lnk.WritePacket(head.ProtoData, packet, lnk.me.ttl)
return
}

View File

@@ -123,7 +123,7 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) {
logrus.Warnln("[recv] transfer drop packet: zero ttl")
return
}
go lnk.write2peer(pbuf.ParseBytes(data...).Copy(), seq)
go lnk.write2peer(pbuf.ParseBytes(data...).Ignore().Copy(), seq)
if config.ShowDebugLog {
logrus.Debugln("[listen] trans", len(data), "bytes packet to", p.Dst().String()+":"+strconv.Itoa(int(p.DstPort)))
}

View File

@@ -110,24 +110,24 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) {
}
logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "raw data bytes", hex.EncodeToString(data[:bound]), endl)
}
b = l.me.xorenc(data, seq)
b = pbuf.ParseBytes(l.me.xorenc(data, seq)...).Ignore()
isnewb = true
if config.ShowDebugLog {
bound := 64
endl := "..."
if b.Len() < bound {
bound = b.Len()
endl = "."
}
b.V(func(b []byte) {
logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "xored data bytes", hex.EncodeToString(b[:bound]), endl)
})
}
})
if config.ShowDebugLog {
bound := 64
endl := "..."
if b.Len() < bound {
bound = b.Len()
endl = "."
}
b.V(func(b []byte) {
logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "xored data bytes", hex.EncodeToString(b[:bound]), endl)
})
}
if l.me.base14 {
b.V(func(data []byte) {
old := b
b = pbuf.ParseBytes(base14.Encode(data)...)
b = pbuf.ParseBytes(base14.Encode(data)...).Ignore()
if isnewb {
old.ManualDestroy()
}

View File

@@ -1,14 +1,12 @@
package data
import (
"github.com/fumiama/orbyte/pbuf"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/gold/link"
)
func init() {
link.RegisterDispacher(head.ProtoData, func(header *head.Packet, peer *link.Link, data pbuf.Bytes) {
link.RegisterDispacher(head.ProtoData, func(header *head.Packet, peer *link.Link, data []byte) {
peer.ToLower(header, data)
})
}

View File

@@ -1,7 +1,6 @@
package hello
import (
"github.com/fumiama/orbyte/pbuf"
"github.com/sirupsen/logrus"
"github.com/fumiama/WireGold/gold/head"
@@ -10,17 +9,15 @@ import (
)
func init() {
link.RegisterDispacher(head.ProtoHello, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) {
data.V(func(b []byte) {
switch {
case len(b) == 0:
logrus.Warnln(file.Header(), "recv old packet, do nothing")
case b[0] == byte(head.HelloPing):
go peer.WritePacket(head.ProtoHello, []byte{byte(head.HelloPong)}, peer.Me().TTL())
logrus.Infoln(file.Header(), "recv, send ack")
default:
logrus.Infoln(file.Header(), "recv ack, do nothing")
}
})
link.RegisterDispacher(head.ProtoHello, func(_ *head.Packet, peer *link.Link, data []byte) {
switch {
case len(data) == 0:
logrus.Warnln(file.Header(), "recv old packet, do nothing")
case data[0] == byte(head.HelloPing):
go peer.WritePacket(head.ProtoHello, []byte{byte(head.HelloPong)}, peer.Me().TTL())
logrus.Infoln(file.Header(), "recv, send ack")
default:
logrus.Infoln(file.Header(), "recv ack, do nothing")
}
})
}

View File

@@ -17,97 +17,93 @@ import (
func init() {
// 收到通告包的处理
link.RegisterDispacher(head.ProtoNotify, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) {
data.V(func(b []byte) {
// 1. Data 解包
// ---- 使用 head.Notify 解释 packet
notify := make(head.Notify, 32)
err := json.Unmarshal(b, &notify)
if err != nil {
logrus.Errorln(file.Header(), "notify json unmarshal err:", err)
return
link.RegisterDispacher(head.ProtoNotify, func(_ *head.Packet, peer *link.Link, data []byte) {
// 1. Data 解包
// ---- 使用 head.Notify 解释 packet
notify := make(head.Notify, 32)
err := json.Unmarshal(data, &notify)
if err != nil {
logrus.Errorln(file.Header(), "notify json unmarshal err:", err)
return
}
// 2. endpoint注册
// ---- 遍历 Notify注册对方的 endpoint 到
// ---- connections注意使用读写锁connmapmu
for ps, ep := range notify {
nw, epstr := ep[0], ep[1]
if nw != peer.Me().EndPoint().Network() {
logrus.Warnln(file.Header(), "ignore different network notify", nw, "addr", epstr)
continue
}
// 2. endpoint注册
// ---- 遍历 Notify注册对方的 endpoint 到
// ---- connections注意使用读写锁connmapmu
for ps, ep := range notify {
nw, epstr := ep[0], ep[1]
if nw != peer.Me().EndPoint().Network() {
logrus.Warnln(file.Header(), "ignore different network notify", nw, "addr", epstr)
addr, err := p2p.NewEndPoint(nw, epstr, peer.Me().NetworkConfigs()...)
if err == nil {
p, ok := peer.Me().IsInPeer(ps)
if ok {
if bin.IsNilInterface(p.EndPoint()) || !p.EndPoint().Euqal(addr) {
p.SetEndPoint(addr)
logrus.Infoln(file.Header(), "notify set ep of peer", ps, "to", ep)
}
continue
}
addr, err := p2p.NewEndPoint(nw, epstr, peer.Me().NetworkConfigs()...)
if err == nil {
p, ok := peer.Me().IsInPeer(ps)
if ok {
if bin.IsNilInterface(p.EndPoint()) || !p.EndPoint().Euqal(addr) {
p.SetEndPoint(addr)
logrus.Infoln(file.Header(), "notify set ep of peer", ps, "to", ep)
}
continue
}
}
if config.ShowDebugLog {
logrus.Debugln(file.Header(), "notify drop invalid peer:", ps, "ep:", ep)
}
}
})
if config.ShowDebugLog {
logrus.Debugln(file.Header(), "notify drop invalid peer:", ps, "ep:", ep)
}
}
})
// 收到询问包的处理
link.RegisterDispacher(head.ProtoQuery, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) {
data.V(func(b []byte) {
// 完成data解包与notify分发
link.RegisterDispacher(head.ProtoQuery, func(_ *head.Packet, peer *link.Link, data []byte) {
// 完成data解包与notify分发
// 1. Data 解包
// ---- 使用 head.Query 解释 packet
// ---- 根据 Query 确定需要封装的 Notify
var peers head.Query
err := json.Unmarshal(b, &peers)
if err != nil {
logrus.Errorln(file.Header(), "query json unmarshal err:", err)
return
}
// 1. Data 解包
// ---- 使用 head.Query 解释 packet
// ---- 根据 Query 确定需要封装的 Notify
var peers head.Query
err := json.Unmarshal(data, &peers)
if err != nil {
logrus.Errorln(file.Header(), "query json unmarshal err:", err)
return
}
if peer == nil || peer.Me() == nil {
logrus.Errorln(file.Header(), "nil link/me")
return
}
if peer == nil || peer.Me() == nil {
logrus.Errorln(file.Header(), "nil link/me")
return
}
// 2. notify分发
// ---- 封装 Notify 到 新的 packet
// ---- 发送到对方
notify := make(head.Notify, len(peers))
for _, p := range peers {
lnk, ok := peer.Me().IsInPeer(p)
eps := ""
if peer.Me().EndPoint().Network() == "udp" { // udp has real p2p
if bin.IsNilInterface(lnk.EndPoint()) {
continue
}
eps = lnk.EndPoint().String()
}
if eps == "" {
eps = peer.RawEndPoint() // use registered ep only
}
if eps == "" {
// 2. notify分发
// ---- 封装 Notify 到 新的 packet
// ---- 发送到对方
notify := make(head.Notify, len(peers))
for _, p := range peers {
lnk, ok := peer.Me().IsInPeer(p)
eps := ""
if peer.Me().EndPoint().Network() == "udp" { // udp has real p2p
if bin.IsNilInterface(lnk.EndPoint()) {
continue
}
if ok && bin.IsNonNilInterface(lnk.EndPoint()) {
notify[p] = [2]string{
lnk.EndPoint().Network(),
eps,
}
eps = lnk.EndPoint().String()
}
if eps == "" {
eps = peer.RawEndPoint() // use registered ep only
}
if eps == "" {
continue
}
if ok && bin.IsNonNilInterface(lnk.EndPoint()) {
notify[p] = [2]string{
lnk.EndPoint().Network(),
eps,
}
}
if len(notify) > 0 {
logrus.Infoln(file.Header(), "query wrap", len(notify), "notify")
w := bin.SelectWriter()
_ = json.NewEncoder(w).Encode(&notify)
w.P(func(b *pbuf.Buffer) {
peer.WritePacket(head.ProtoNotify, b.Bytes(), peer.Me().TTL())
})
w.Destroy()
}
})
}
if len(notify) > 0 {
logrus.Infoln(file.Header(), "query wrap", len(notify), "notify")
w := bin.SelectWriter()
_ = json.NewEncoder(w).Encode(&notify)
w.P(func(b *pbuf.Buffer) {
peer.WritePacket(head.ProtoNotify, b.Bytes(), peer.Me().TTL())
})
w.Destroy()
}
})
}