1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-13 05:31:08 +08:00

optimize(orbyte): use manual destroy

This commit is contained in:
源文雨
2025-04-04 01:26:37 +09:00
parent 82c6136782
commit b5992574ec
16 changed files with 83 additions and 28 deletions

View File

@@ -47,6 +47,7 @@ func (p *Packet) PreCRC64() (crc uint64) {
)
}
})
w.Destroy()
return
}
@@ -58,11 +59,13 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) {
buf.Write((*[PacketHeadNoCRCLen]byte)(
(unsafe.Pointer)(p),
)[:])
pbuf.NewBytes(buf.Len()).V(func(b []byte) {
b := pbuf.NewBytes(buf.Len())
b.V(func(b []byte) {
copy(b, buf.Bytes())
ClearTTL(b)
p.md5h8 = algo.MD5Hash8(b)
})
b.ManualDestroy()
_ = binary.Write(buf, binary.LittleEndian, p.md5h8)
return
}
@@ -76,14 +79,17 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) {
w.Write(p.src[:])
w.Write(p.dst[:])
w.P(func(buf *pbuf.Buffer) {
pbuf.NewBytes(buf.Len()).V(func(b []byte) {
b := pbuf.NewBytes(buf.Len())
b.V(func(b []byte) {
copy(b, buf.Bytes())
ClearTTL(b)
p.md5h8 = algo.MD5Hash8(b)
})
b.ManualDestroy()
})
w.WriteUInt64(p.md5h8)
w.P(func(b *pbuf.Buffer) {
_, _ = buf.ReadFrom(b)
})
w.Destroy()
}

View File

@@ -91,6 +91,7 @@ func (pb *DataBuilder) Zstd() *DataBuilder {
if config.ShowDebugLog {
logrus.Debugln(file.Header(), strconv.FormatUint(ub.DAT.md5h8, 16), "data after zstd", file.ToLimitHexString(ub.Bytes(), 64))
}
data.ManualDestroy()
})
}
@@ -125,7 +126,9 @@ func (pb *DataBuilder) Seal(aead cipher.AEAD, teatyp uint8, additional uint16) *
data := algo.EncodeAEAD(aead, additional, b.Bytes())
ub.Reset()
data.V(func(b []byte) { ub.Write(b) })
data.ManualDestroy()
})
w.Destroy()
}))
}
@@ -139,6 +142,7 @@ func (pb *DataBuilder) Plain(teatyp uint8, additional uint16) *PacketBuilder {
ub.Reset()
_, _ = ub.ReadFrom(b)
})
w.Destroy()
}))
}
@@ -228,6 +232,11 @@ func (pb *PacketBuilder) Split(mtu int, nofrag bool) (pbs []PacketBytes) {
return
}
// Destroy call this once no one use it.
func (pb *PacketBuilder) Destroy() {
(*PacketItem)(pb).ManualDestroy()
}
func BuildPacketFromBytes(pb PacketBytes) pbuf.Bytes {
w := bin.SelectWriter()
pb.B(func(_ []byte, p *Packet) {

View File

@@ -55,11 +55,13 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) {
return
}
var crc uint64
pbuf.NewBytes(int(PacketHeadNoCRCLen)).V(func(b []byte) {
b := pbuf.NewBytes(int(PacketHeadNoCRCLen))
b.V(func(b []byte) {
copy(b, data[:PacketHeadNoCRCLen])
ClearTTL(b)
crc = algo.MD5Hash8(b)
})
b.ManualDestroy()
if crc != pb.DAT.md5h8 {
err = ErrBadCRCChecksum
if config.ShowDebugLog {
@@ -80,6 +82,7 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) {
pb.DAT.hashrem = int64(sz)
})
if err != nil {
p.ManualDestroy()
return
}
pbytes = pbuf.BufferItemToBytes(p)

View File

@@ -22,7 +22,7 @@ var (
type LinkData struct {
H head.Packet
D pbuf.Bytes
D []byte
}
// Link 是本机到 peer 的连接抽象
@@ -74,7 +74,7 @@ func (l *Link) ToLower(header *head.Packet, data pbuf.Bytes) {
if l.pipe != nil {
l.pipe <- LinkData{
H: *header,
D: data,
D: data.Copy().Trans(),
}
if config.ShowDebugLog {
logrus.Debugln("[listen] deliver to pipe of", l.peerip)

View File

@@ -64,6 +64,8 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
}
func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) {
defer buf.ManualDestroy()
recvtotlcnt := atomic.AddUint64(&m.recvtotlcnt, uint64(buf.Len()))
recvloopcnt := atomic.AddUintptr(&m.recvloopcnt, 1)
recvlooptime := atomic.LoadInt64(&m.recvlooptime)
@@ -100,6 +102,7 @@ func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) {
}
m.dispatch(p, b, addr)
})
h.ManualDestroy()
})
}
@@ -151,7 +154,9 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) {
data = data.SliceFrom(8)
if p.usezstd {
data.V(func(b []byte) {
old := data
data, err = algo.DecodeZstd(b) // skip hash
old.ManualDestroy()
})
if err != nil {
if config.ShowDebugLog {
@@ -169,4 +174,5 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) {
return
}
fn(header, p, data)
data.ManualDestroy()
}

View File

@@ -312,8 +312,11 @@ func (m *Me) sendAllSameDst(packet []byte) (n int) {
pcp.V(func(b []byte) {
copy(b, packet)
})
go pcp.V(func(b []byte) {
lnk.WritePacket(head.ProtoData, b, lnk.me.ttl)
})
go func() {
pcp.V(func(b []byte) {
lnk.WritePacket(head.ProtoData, b, lnk.me.ttl)
})
pcp.ManualDestroy()
}()
return
}

View File

@@ -46,6 +46,7 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) {
return
}
data = w.ToBytes().Copy().Trans()
w.Destroy()
if len(data) < bound {
bound = len(data)
endl = "."
@@ -142,6 +143,9 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) {
})
if ok {
if !h.HasInit() {
header.ManualDestroy()
}
return
}

View File

@@ -57,11 +57,13 @@ func (l *Link) WritePacket(proto uint8, data []byte, ttl uint8) {
pktb = pb.Seal(l.keys[teatype], teatype, sndcnt&0x07ff)
}
bs := pktb.Split(int(mtu), false)
pktb.Destroy()
if config.ShowDebugLog {
logrus.Debugln("[send] split packet into", len(bs), "parts")
}
for _, b := range bs { //TODO: impl. nofrag
go l.write2peer(head.BuildPacketFromBytes(b), randseq(sndcnt))
b.ManualDestroy()
}
}
@@ -69,6 +71,7 @@ func (l *Link) WritePacket(proto uint8, data []byte, ttl uint8) {
//
// 因为不保证可达所以不返回错误。
func (l *Link) write2peer(b pbuf.Bytes, seq uint32) {
defer b.ManualDestroy()
if l.doublepacket {
err := l.write2peer1(b, seq)
if err != nil {
@@ -96,6 +99,7 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) {
if conn == nil {
return io.ErrClosedPipe
}
isnewb := false
b.V(func(data []byte) {
if config.ShowDebugLog {
bound := 64
@@ -107,6 +111,7 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) {
logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "raw data bytes", hex.EncodeToString(data[:bound]), endl)
}
b = l.me.xorenc(data, seq)
isnewb = true
if config.ShowDebugLog {
bound := 64
endl := "..."
@@ -121,7 +126,12 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) {
})
if l.me.base14 {
b.V(func(data []byte) {
old := b
b = pbuf.ParseBytes(base14.Encode(data)...)
if isnewb {
old.ManualDestroy()
}
isnewb = true
if config.ShowDebugLog {
bound := 64
endl := "..."
@@ -141,5 +151,8 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) {
}
_, err = conn.WriteToPeer(b, peerep)
})
if isnewb {
b.ManualDestroy()
}
return
}

View File

@@ -42,7 +42,7 @@ func (p *packet) pack() *net.Buffers {
return &net.Buffers{magicbuf, bin.NewWriterF(func(w *bin.Writer) {
w.WriteByte(byte(p.typ))
w.WriteUInt16(p.len)
}).Trans(), p.dat}
}), p.dat}
}
func (p *packet) Read(_ []byte) (int, error) {
@@ -80,7 +80,8 @@ func (p *packet) ReadFrom(r io.Reader) (n int64, err error) {
if err != nil {
return
}
p.dat = w.ToBytes().Trans()
p.dat = w.ToBytes().Copy().Trans()
w.Destroy()
return
}

View File

@@ -106,6 +106,7 @@ func init() {
w.P(func(b *pbuf.Buffer) {
peer.WritePacket(head.ProtoNotify, b.Bytes(), peer.Me().TTL())
})
w.Destroy()
}
})
})