From b5992574ec06d1b164b30d5b0eeabd85362fa1b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Fri, 4 Apr 2025 01:26:37 +0900 Subject: [PATCH] optimize(orbyte): use manual destroy --- go.mod | 2 +- go.sum | 4 ++-- gold/head/box.go | 10 ++++++++-- gold/head/builder.go | 9 +++++++++ gold/head/unbox.go | 5 ++++- gold/link/link.go | 4 ++-- gold/link/listen.go | 6 ++++++ gold/link/me.go | 9 ++++++--- gold/link/recv.go | 4 ++++ gold/link/send.go | 13 +++++++++++++ gold/p2p/tcp/pdu.go | 5 +++-- gold/proto/nat/nat.go | 1 + internal/bin/pool.go | 2 ++ internal/bin/writer.go | 10 ++++++++-- upper/services/tunnel/tunnel.go | 22 ++++++++++------------ upper/services/tunnel/tunnel_test.go | 5 ++++- 16 files changed, 83 insertions(+), 28 deletions(-) diff --git a/go.mod b/go.mod index a1748a7..1bce183 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/fumiama/blake2b-simd v0.0.0-20220412110131-4481822068bb github.com/fumiama/go-base16384 v1.7.0 github.com/fumiama/go-x25519 v1.0.0 - github.com/fumiama/orbyte v0.0.0-20250403074130-f9204e9924a1 + github.com/fumiama/orbyte v0.0.0-20250403152421-da8338096f2d github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac github.com/klauspost/compress v1.17.9 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index c186f2b..2d0c2f2 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,8 @@ github.com/fumiama/go-base16384 v1.7.0 h1:6fep7XPQWxRlh4Hu+KsdH+6+YdUp+w6CwRXtMW github.com/fumiama/go-base16384 v1.7.0/go.mod h1:OEn+947GV5gsbTAnyuUW/SrfxJYUdYupSIQXOuGOcXM= github.com/fumiama/go-x25519 v1.0.0 h1:hiGg9EhseVmGCc8T1jECVkj8Keu/aJ1ZK05RM8Vuavo= github.com/fumiama/go-x25519 v1.0.0/go.mod h1:8VOhfyGZzw4IUs4nCjQFqW9cA3V/QpSCtP3fo2dLNg4= -github.com/fumiama/orbyte v0.0.0-20250403074130-f9204e9924a1 h1:E6/hCWUUcwJBlbNRHDl2VyqX0l48Xv6sVYPLh4ZFpPQ= -github.com/fumiama/orbyte v0.0.0-20250403074130-f9204e9924a1/go.mod h1:qkUllQ1+gTx5sGrmKvIsqUgsnOO21Hiq847YHJRifbk= +github.com/fumiama/orbyte v0.0.0-20250403152421-da8338096f2d h1:paZ9NH9v7pwULgypKZytzuYNtODDGQGMvXcD6mzpXEs= +github.com/fumiama/orbyte v0.0.0-20250403152421-da8338096f2d/go.mod h1:qkUllQ1+gTx5sGrmKvIsqUgsnOO21Hiq847YHJRifbk= github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac h1:A/5A0rODsg+EQHH61Ew5mMUtDpRXaSNqHhPvW+fN4C4= github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac/go.mod h1:BBnNY9PwK+UUn4trAU+H0qsMEypm7+3Bj1bVFuJItlo= github.com/fumiama/wintun v0.0.0-20211229152851-8bc97c8034c0 h1:WfrSFlIlCAtg6Rt2IGna0HhJYSDE45YVHiYqO4wwsEw= diff --git a/gold/head/box.go b/gold/head/box.go index 2434cb8..c718cbe 100644 --- a/gold/head/box.go +++ b/gold/head/box.go @@ -47,6 +47,7 @@ func (p *Packet) PreCRC64() (crc uint64) { ) } }) + w.Destroy() return } @@ -58,11 +59,13 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) { buf.Write((*[PacketHeadNoCRCLen]byte)( (unsafe.Pointer)(p), )[:]) - pbuf.NewBytes(buf.Len()).V(func(b []byte) { + b := pbuf.NewBytes(buf.Len()) + b.V(func(b []byte) { copy(b, buf.Bytes()) ClearTTL(b) p.md5h8 = algo.MD5Hash8(b) }) + b.ManualDestroy() _ = binary.Write(buf, binary.LittleEndian, p.md5h8) return } @@ -76,14 +79,17 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) { w.Write(p.src[:]) w.Write(p.dst[:]) w.P(func(buf *pbuf.Buffer) { - pbuf.NewBytes(buf.Len()).V(func(b []byte) { + b := pbuf.NewBytes(buf.Len()) + b.V(func(b []byte) { copy(b, buf.Bytes()) ClearTTL(b) p.md5h8 = algo.MD5Hash8(b) }) + b.ManualDestroy() }) w.WriteUInt64(p.md5h8) w.P(func(b *pbuf.Buffer) { _, _ = buf.ReadFrom(b) }) + w.Destroy() } diff --git a/gold/head/builder.go b/gold/head/builder.go index 24235de..5a6d376 100644 --- a/gold/head/builder.go +++ b/gold/head/builder.go @@ -91,6 +91,7 @@ func (pb *DataBuilder) Zstd() *DataBuilder { if config.ShowDebugLog { logrus.Debugln(file.Header(), strconv.FormatUint(ub.DAT.md5h8, 16), "data after zstd", file.ToLimitHexString(ub.Bytes(), 64)) } + data.ManualDestroy() }) } @@ -125,7 +126,9 @@ func (pb *DataBuilder) Seal(aead cipher.AEAD, teatyp uint8, additional uint16) * data := algo.EncodeAEAD(aead, additional, b.Bytes()) ub.Reset() data.V(func(b []byte) { ub.Write(b) }) + data.ManualDestroy() }) + w.Destroy() })) } @@ -139,6 +142,7 @@ func (pb *DataBuilder) Plain(teatyp uint8, additional uint16) *PacketBuilder { ub.Reset() _, _ = ub.ReadFrom(b) }) + w.Destroy() })) } @@ -228,6 +232,11 @@ func (pb *PacketBuilder) Split(mtu int, nofrag bool) (pbs []PacketBytes) { return } +// Destroy call this once no one use it. +func (pb *PacketBuilder) Destroy() { + (*PacketItem)(pb).ManualDestroy() +} + func BuildPacketFromBytes(pb PacketBytes) pbuf.Bytes { w := bin.SelectWriter() pb.B(func(_ []byte, p *Packet) { diff --git a/gold/head/unbox.go b/gold/head/unbox.go index 89cebbc..a01807f 100644 --- a/gold/head/unbox.go +++ b/gold/head/unbox.go @@ -55,11 +55,13 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) { return } var crc uint64 - pbuf.NewBytes(int(PacketHeadNoCRCLen)).V(func(b []byte) { + b := pbuf.NewBytes(int(PacketHeadNoCRCLen)) + b.V(func(b []byte) { copy(b, data[:PacketHeadNoCRCLen]) ClearTTL(b) crc = algo.MD5Hash8(b) }) + b.ManualDestroy() if crc != pb.DAT.md5h8 { err = ErrBadCRCChecksum if config.ShowDebugLog { @@ -80,6 +82,7 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) { pb.DAT.hashrem = int64(sz) }) if err != nil { + p.ManualDestroy() return } pbytes = pbuf.BufferItemToBytes(p) diff --git a/gold/link/link.go b/gold/link/link.go index 16eb55a..df0b61a 100644 --- a/gold/link/link.go +++ b/gold/link/link.go @@ -22,7 +22,7 @@ var ( type LinkData struct { H head.Packet - D pbuf.Bytes + D []byte } // Link 是本机到 peer 的连接抽象 @@ -74,7 +74,7 @@ func (l *Link) ToLower(header *head.Packet, data pbuf.Bytes) { if l.pipe != nil { l.pipe <- LinkData{ H: *header, - D: data, + D: data.Copy().Trans(), } if config.ShowDebugLog { logrus.Debugln("[listen] deliver to pipe of", l.peerip) diff --git a/gold/link/listen.go b/gold/link/listen.go index fcae9bc..f22aaef 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -64,6 +64,8 @@ func (m *Me) listen() (conn p2p.Conn, err error) { } func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) { + defer buf.ManualDestroy() + recvtotlcnt := atomic.AddUint64(&m.recvtotlcnt, uint64(buf.Len())) recvloopcnt := atomic.AddUintptr(&m.recvloopcnt, 1) recvlooptime := atomic.LoadInt64(&m.recvlooptime) @@ -100,6 +102,7 @@ func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) { } m.dispatch(p, b, addr) }) + h.ManualDestroy() }) } @@ -151,7 +154,9 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) { data = data.SliceFrom(8) if p.usezstd { data.V(func(b []byte) { + old := data data, err = algo.DecodeZstd(b) // skip hash + old.ManualDestroy() }) if err != nil { if config.ShowDebugLog { @@ -169,4 +174,5 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) { return } fn(header, p, data) + data.ManualDestroy() } diff --git a/gold/link/me.go b/gold/link/me.go index 39696f2..4571a7b 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -312,8 +312,11 @@ func (m *Me) sendAllSameDst(packet []byte) (n int) { pcp.V(func(b []byte) { copy(b, packet) }) - go pcp.V(func(b []byte) { - lnk.WritePacket(head.ProtoData, b, lnk.me.ttl) - }) + go func() { + pcp.V(func(b []byte) { + lnk.WritePacket(head.ProtoData, b, lnk.me.ttl) + }) + pcp.ManualDestroy() + }() return } diff --git a/gold/link/recv.go b/gold/link/recv.go index f3ca616..835bb8c 100644 --- a/gold/link/recv.go +++ b/gold/link/recv.go @@ -46,6 +46,7 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) { return } data = w.ToBytes().Copy().Trans() + w.Destroy() if len(data) < bound { bound = len(data) endl = "." @@ -142,6 +143,9 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) { }) if ok { + if !h.HasInit() { + header.ManualDestroy() + } return } diff --git a/gold/link/send.go b/gold/link/send.go index a6c47ec..149d07f 100644 --- a/gold/link/send.go +++ b/gold/link/send.go @@ -57,11 +57,13 @@ func (l *Link) WritePacket(proto uint8, data []byte, ttl uint8) { pktb = pb.Seal(l.keys[teatype], teatype, sndcnt&0x07ff) } bs := pktb.Split(int(mtu), false) + pktb.Destroy() if config.ShowDebugLog { logrus.Debugln("[send] split packet into", len(bs), "parts") } for _, b := range bs { //TODO: impl. nofrag go l.write2peer(head.BuildPacketFromBytes(b), randseq(sndcnt)) + b.ManualDestroy() } } @@ -69,6 +71,7 @@ func (l *Link) WritePacket(proto uint8, data []byte, ttl uint8) { // // 因为不保证可达所以不返回错误。 func (l *Link) write2peer(b pbuf.Bytes, seq uint32) { + defer b.ManualDestroy() if l.doublepacket { err := l.write2peer1(b, seq) if err != nil { @@ -96,6 +99,7 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) { if conn == nil { return io.ErrClosedPipe } + isnewb := false b.V(func(data []byte) { if config.ShowDebugLog { bound := 64 @@ -107,6 +111,7 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) { logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "raw data bytes", hex.EncodeToString(data[:bound]), endl) } b = l.me.xorenc(data, seq) + isnewb = true if config.ShowDebugLog { bound := 64 endl := "..." @@ -121,7 +126,12 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) { }) if l.me.base14 { b.V(func(data []byte) { + old := b b = pbuf.ParseBytes(base14.Encode(data)...) + if isnewb { + old.ManualDestroy() + } + isnewb = true if config.ShowDebugLog { bound := 64 endl := "..." @@ -141,5 +151,8 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) { } _, err = conn.WriteToPeer(b, peerep) }) + if isnewb { + b.ManualDestroy() + } return } diff --git a/gold/p2p/tcp/pdu.go b/gold/p2p/tcp/pdu.go index cc5f5af..05433e2 100644 --- a/gold/p2p/tcp/pdu.go +++ b/gold/p2p/tcp/pdu.go @@ -42,7 +42,7 @@ func (p *packet) pack() *net.Buffers { return &net.Buffers{magicbuf, bin.NewWriterF(func(w *bin.Writer) { w.WriteByte(byte(p.typ)) w.WriteUInt16(p.len) - }).Trans(), p.dat} + }), p.dat} } func (p *packet) Read(_ []byte) (int, error) { @@ -80,7 +80,8 @@ func (p *packet) ReadFrom(r io.Reader) (n int64, err error) { if err != nil { return } - p.dat = w.ToBytes().Trans() + p.dat = w.ToBytes().Copy().Trans() + w.Destroy() return } diff --git a/gold/proto/nat/nat.go b/gold/proto/nat/nat.go index eeb5b47..8996763 100644 --- a/gold/proto/nat/nat.go +++ b/gold/proto/nat/nat.go @@ -106,6 +106,7 @@ func init() { w.P(func(b *pbuf.Buffer) { peer.WritePacket(head.ProtoNotify, b.Bytes(), peer.Me().TTL()) }) + w.Destroy() } }) }) diff --git a/internal/bin/pool.go b/internal/bin/pool.go index 161a479..58549aa 100644 --- a/internal/bin/pool.go +++ b/internal/bin/pool.go @@ -5,6 +5,8 @@ import ( ) // SelectWriter 从池中取出一个 Writer +// +// 不要忘记调用 Destroy 以快速回收资源 func SelectWriter() *Writer { return (*Writer)(pbuf.NewBuffer(nil)) } diff --git a/internal/bin/writer.go b/internal/bin/writer.go index d2bde2a..eb4c050 100644 --- a/internal/bin/writer.go +++ b/internal/bin/writer.go @@ -11,10 +11,12 @@ import ( // Writer 写入 type Writer pbuf.OBuffer -func NewWriterF(f func(writer *Writer)) pbuf.Bytes { +func NewWriterF(f func(writer *Writer)) []byte { w := SelectWriter() f(w) - return w.ToBytes() + b := w.ToBytes().Copy() + w.Destroy() + return b.Trans() } func (w *Writer) P(f func(*pbuf.Buffer)) *Writer { @@ -64,3 +66,7 @@ func (w *Writer) WriteUInt64(v uint64) { func (w *Writer) ToBytes() pbuf.Bytes { return pbuf.BufferItemToBytes((*pbuf.OBuffer)(w)) } + +func (w *Writer) Destroy() { + (*pbuf.OBuffer)(w).ManualDestroy() +} diff --git a/upper/services/tunnel/tunnel.go b/upper/services/tunnel/tunnel.go index 9f5a692..0146eb8 100644 --- a/upper/services/tunnel/tunnel.go +++ b/upper/services/tunnel/tunnel.go @@ -73,14 +73,14 @@ func (s *Tunnel) Read(p []byte) (int, error) { d = s.outcache } else { pkt := <-s.out - if !pkt.D.HasInit() { + if len(pkt.D) == 0 { return 0, io.EOF } if pkt.H.Size() < 4 { - logrus.Warnln("[tunnel] unexpected packet data len", pkt.H.Size(), "content", hex.EncodeToString(pkt.D.Trans())) + logrus.Warnln("[tunnel] unexpected packet data len", pkt.H.Size(), "content", hex.EncodeToString(pkt.D)) return 0, io.EOF } - d = pkt.D.Trans()[4:] + d = pkt.D[4:] } if d != nil { if len(p) >= len(d) { @@ -157,23 +157,21 @@ func (s *Tunnel) handleRead() { continue } p := s.l.Read() - if !p.D.HasInit() { + if len(p.D) == 0 { logrus.Errorln("[tunnel] read recv nil") break } end := 64 endl := "..." - if p.D.Len() < 64 { - end = p.D.Len() + if len(p.D) < 64 { + end = len(p.D) endl = "." } var recvseq uint32 - p.D.V(func(b []byte) { - if config.ShowDebugLog { - logrus.Debugln("[tunnel] read recv", hex.EncodeToString(b[:end]), endl) - } - recvseq = binary.LittleEndian.Uint32(b[:4]) - }) + if config.ShowDebugLog { + logrus.Debugln("[tunnel] read recv", hex.EncodeToString(p.D[:end]), endl) + } + recvseq = binary.LittleEndian.Uint32(p.D[:4]) if recvseq == seq { if config.ShowDebugLog { logrus.Debugln("[tunnel] dispatch seq", seq) diff --git a/upper/services/tunnel/tunnel_test.go b/upper/services/tunnel/tunnel_test.go index 7717f24..6543f18 100644 --- a/upper/services/tunnel/tunnel_test.go +++ b/upper/services/tunnel/tunnel_test.go @@ -413,7 +413,10 @@ func (f logFormat) Format(entry *logrus.Entry) ([]byte, error) { buf.WriteString(entry.Message) buf.WriteString("\n") - return buf.ToBytes().Trans(), nil + b := buf.ToBytes().Copy() + buf.Destroy() + + return b.Trans(), nil } const (