mirror of
https://github.com/fumiama/WireGold.git
synced 2026-06-27 14:20:27 +08:00
optimize(gold): apply more buffer pools
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"math/bits"
|
||||
mrand "math/rand"
|
||||
|
||||
"github.com/fumiama/WireGold/helper"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -51,13 +52,13 @@ func expandkeyunit(v1, v2 byte) (v uint16) {
|
||||
return
|
||||
}
|
||||
|
||||
// Encode 使用 xchacha20poly1305 和密钥序列加密
|
||||
// Encode by aead and put b into pool
|
||||
func (l *Link) Encode(teatype uint8, additional uint16, b []byte) (eb []byte) {
|
||||
if len(b) == 0 || teatype >= 32 {
|
||||
return
|
||||
}
|
||||
if l.keys[0] == nil {
|
||||
eb = make([]byte, len(b))
|
||||
eb = helper.MakeBytes(len(b))
|
||||
copy(eb, b)
|
||||
return
|
||||
}
|
||||
@@ -70,13 +71,14 @@ func (l *Link) Encode(teatype uint8, additional uint16, b []byte) (eb []byte) {
|
||||
return
|
||||
}
|
||||
|
||||
// Decode 使用 xchacha20poly1305 和密钥序列解密
|
||||
// Decode by aead and put b into pool
|
||||
func (l *Link) Decode(teatype uint8, additional uint16, b []byte) (db []byte, err error) {
|
||||
if len(b) == 0 || teatype >= 32 {
|
||||
return
|
||||
}
|
||||
if l.keys[0] == nil {
|
||||
db = b
|
||||
db = helper.MakeBytes(len(b))
|
||||
copy(db, b)
|
||||
return
|
||||
}
|
||||
aead := l.keys[teatype]
|
||||
@@ -86,11 +88,10 @@ func (l *Link) Decode(teatype uint8, additional uint16, b []byte) (db []byte, er
|
||||
return decode(aead, additional, b)
|
||||
}
|
||||
|
||||
// encode 使用 xchacha20poly1305 加密
|
||||
func encode(aead cipher.AEAD, additional uint16, b []byte) []byte {
|
||||
nsz := aead.NonceSize()
|
||||
// Accocate capacity for all the stuffs.
|
||||
buf := make([]byte, 2+nsz+len(b)+aead.Overhead())
|
||||
buf := helper.MakeBytes(2 + nsz + len(b) + aead.Overhead())
|
||||
binary.LittleEndian.PutUint16(buf[:2], additional)
|
||||
nonce := buf[2 : 2+nsz]
|
||||
// Select a random nonce
|
||||
@@ -103,7 +104,6 @@ func encode(aead cipher.AEAD, additional uint16, b []byte) []byte {
|
||||
return nonce[:nsz+len(eb)]
|
||||
}
|
||||
|
||||
// decode 使用 xchacha20poly1305 解密
|
||||
func decode(aead cipher.AEAD, additional uint16, b []byte) ([]byte, error) {
|
||||
nsz := aead.NonceSize()
|
||||
if len(b) < nsz {
|
||||
@@ -117,7 +117,7 @@ func decode(aead cipher.AEAD, additional uint16, b []byte) ([]byte, error) {
|
||||
// Decrypt the message and check it wasn't tampered with.
|
||||
var buf [2]byte
|
||||
binary.LittleEndian.PutUint16(buf[:], additional)
|
||||
return aead.Open(nil, nonce, ciphertext, buf[:])
|
||||
return aead.Open(helper.SelectWriter().Bytes(), nonce, ciphertext, buf[:])
|
||||
}
|
||||
|
||||
// xorenc 按 8 字节, 以初始 m.mask 循环异或编码 data
|
||||
|
||||
@@ -15,8 +15,11 @@ import (
|
||||
|
||||
"github.com/fumiama/WireGold/gold/head"
|
||||
"github.com/fumiama/WireGold/gold/p2p"
|
||||
"github.com/fumiama/WireGold/helper"
|
||||
)
|
||||
|
||||
const lstnbufgragsz = 65536
|
||||
|
||||
// 监听本机 endpoint
|
||||
func (m *Me) listen() (conn p2p.Conn, err error) {
|
||||
conn, err = m.ep.Listen()
|
||||
@@ -34,7 +37,7 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
|
||||
n = 64 // 只用最多 64 核
|
||||
}
|
||||
logrus.Infoln("[listen] use cpu num:", n)
|
||||
listenbuff := make([]byte, 65536*n)
|
||||
listenbuff := make([]byte, lstnbufgragsz*n)
|
||||
hasntfinished := make([]sync.Mutex, n)
|
||||
for i := 0; err == nil; i++ {
|
||||
i %= n
|
||||
@@ -46,7 +49,7 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
|
||||
}
|
||||
}
|
||||
logrus.Debugln("[listen] lock index", i)
|
||||
lbf := listenbuff[i*65536 : (i+1)*65536]
|
||||
lbf := listenbuff[i*lstnbufgragsz : (i+1)*lstnbufgragsz]
|
||||
n, addr, err := conn.ReadFromPeer(lbf)
|
||||
if m.loop == nil || errors.Is(err, net.ErrClosed) {
|
||||
logrus.Warnln("[listen] quit listening")
|
||||
@@ -72,9 +75,9 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
|
||||
recvtotlcnt = 0
|
||||
recvlooptime = now
|
||||
}
|
||||
packet := m.wait(lbf[:n])
|
||||
packet := m.wait(lbf[:n:lstnbufgragsz])
|
||||
if packet == nil {
|
||||
logrus.Debugln("[listen] unlock index", i)
|
||||
logrus.Debugln("[listen] waiting, unlock index", i)
|
||||
hasntfinished[i].Unlock()
|
||||
i--
|
||||
continue
|
||||
@@ -87,10 +90,11 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
|
||||
|
||||
func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish func()) {
|
||||
defer finish()
|
||||
defer logrus.Debugln("[listen] unlock index", index)
|
||||
r := packet.Len() - len(packet.Data)
|
||||
defer logrus.Debugln("[listen] dispatched, unlock index", index)
|
||||
logrus.Debugln("[listen] start dispatching index", index)
|
||||
r := packet.Len() - packet.BodyLen()
|
||||
if r > 0 {
|
||||
logrus.Warnln("[listen] @", index, "packet from endpoint", addr, "is smaller than it declared: drop it")
|
||||
logrus.Warnln("[listen] @", index, "packet from endpoint", addr, "len", packet.BodyLen(), "is smaller than it declared len", packet.Len(), ", drop it")
|
||||
packet.Put()
|
||||
return
|
||||
}
|
||||
@@ -114,22 +118,25 @@ func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish
|
||||
}
|
||||
addt := packet.AdditionalData()
|
||||
var err error
|
||||
packet.Data, err = p.Decode(packet.CipherIndex(), addt, packet.Data)
|
||||
data, err := p.Decode(packet.CipherIndex(), addt, packet.Body())
|
||||
if err != nil {
|
||||
logrus.Debugln("[listen] @", index, "drop invalid packet", ", key idx:", packet.CipherIndex(), "addt:", addt, "err:", err)
|
||||
packet.Put()
|
||||
return
|
||||
}
|
||||
packet.SetBody(data, true)
|
||||
if p.usezstd {
|
||||
dec, _ := zstd.NewReader(bytes.NewReader(packet.Data))
|
||||
dec, _ := zstd.NewReader(bytes.NewReader(packet.Body()))
|
||||
var err error
|
||||
packet.Data, err = io.ReadAll(dec)
|
||||
w := helper.SelectWriter()
|
||||
_, err = io.Copy(w, dec)
|
||||
dec.Close()
|
||||
if err != nil {
|
||||
logrus.Debugln("[listen] @", index, "drop invalid zstd packet:", err)
|
||||
packet.Put()
|
||||
return
|
||||
}
|
||||
packet.SetBody(w.Bytes(), true)
|
||||
}
|
||||
if !packet.IsVaildHash() {
|
||||
logrus.Debugln("[listen] @", index, "drop invalid hash packet")
|
||||
@@ -154,22 +161,22 @@ func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish
|
||||
packet.Put()
|
||||
case head.ProtoNotify:
|
||||
logrus.Infoln("[listen] @", index, "recv notify from", packet.Src)
|
||||
go p.onNotify(packet.Data)
|
||||
go p.onNotify(packet.Body())
|
||||
packet.Put()
|
||||
case head.ProtoQuery:
|
||||
logrus.Infoln("[listen] @", index, "recv query from", packet.Src)
|
||||
go p.onQuery(packet.Data)
|
||||
go p.onQuery(packet.Body())
|
||||
packet.Put()
|
||||
case head.ProtoData:
|
||||
if p.pipe != nil {
|
||||
p.pipe <- packet
|
||||
logrus.Debugln("[listen] @", index, "deliver to pipe of", p.peerip)
|
||||
} else {
|
||||
_, err := m.nic.Write(packet.Data)
|
||||
_, err := m.nic.Write(packet.Body())
|
||||
if err != nil {
|
||||
logrus.Errorln("[listen] @", index, "deliver", len(packet.Data), "bytes data to nic err:", err)
|
||||
logrus.Errorln("[listen] @", index, "deliver", packet.BodyLen(), "bytes data to nic err:", err)
|
||||
} else {
|
||||
logrus.Debugln("[listen] @", index, "deliver", len(packet.Data), "bytes data to nic")
|
||||
logrus.Debugln("[listen] @", index, "deliver", packet.BodyLen(), "bytes data to nic")
|
||||
}
|
||||
packet.Put()
|
||||
}
|
||||
|
||||
@@ -32,32 +32,33 @@ func (l *Link) WriteAndPut(p *head.Packet, istransfer bool) (n int, err error) {
|
||||
logrus.Warnln("[send] reset invalid data frag len", delta, "to 8")
|
||||
delta = 8
|
||||
}
|
||||
if len(p.Data) <= delta {
|
||||
return l.write(p, teatype, sndcnt, uint32(len(p.Data)), 0, istransfer, false)
|
||||
remlen := p.BodyLen()
|
||||
if remlen <= delta {
|
||||
return l.write(p, teatype, sndcnt, uint32(remlen), 0, istransfer, false)
|
||||
}
|
||||
if istransfer && p.Flags.DontFrag() && len(p.Data) > delta {
|
||||
if istransfer && p.Flags.DontFrag() && remlen > delta {
|
||||
return 0, errors.New("drop don't fragmnet big trans packet")
|
||||
}
|
||||
data := p.Data
|
||||
ttl := p.TTL
|
||||
totl := uint32(len(data))
|
||||
totl := uint32(remlen)
|
||||
pos := 0
|
||||
packet := head.SelectPacket()
|
||||
*packet = *p
|
||||
for ; int(totl)-pos > delta; pos += delta {
|
||||
logrus.Debugln("[send] split frag [", pos, "~", pos+delta, "], remain:", int(totl)-pos-delta)
|
||||
packet.Data = data[:delta]
|
||||
packet := p.Copy()
|
||||
for remlen > delta {
|
||||
remlen -= delta
|
||||
logrus.Debugln("[send] split frag [", pos, "~", pos+delta, "], remain:", remlen)
|
||||
packet.CropBody(pos, pos+delta)
|
||||
cnt, err := l.write(packet, teatype, sndcnt, totl, uint16(pos>>3), istransfer, true)
|
||||
n += cnt
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
data = data[delta:]
|
||||
packet.TTL = ttl
|
||||
pos += delta
|
||||
}
|
||||
packet.Put()
|
||||
if len(data) > 0 {
|
||||
p.Data = data
|
||||
if remlen > 0 {
|
||||
logrus.Debugln("[send] last frag [", pos, "~", pos+remlen, "]")
|
||||
p.CropBody(pos, pos+remlen)
|
||||
cnt := 0
|
||||
cnt, err = l.write(p, teatype, sndcnt, totl, uint16(pos>>3), istransfer, false)
|
||||
n += cnt
|
||||
@@ -67,18 +68,19 @@ func (l *Link) WriteAndPut(p *head.Packet, istransfer bool) (n int, err error) {
|
||||
|
||||
func (l *Link) encrypt(p *head.Packet, sndcnt uint16, teatype uint8) {
|
||||
p.FillHash()
|
||||
logrus.Debugln("[send] data len before encrypt:", len(p.Data))
|
||||
logrus.Debugln("[send] data len before encrypt:", p.BodyLen())
|
||||
data := p.Body()
|
||||
if l.usezstd {
|
||||
w := helper.SelectWriter()
|
||||
defer helper.PutWriter(w)
|
||||
enc, _ := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedFastest))
|
||||
_, _ = io.Copy(enc, bytes.NewReader(p.Data))
|
||||
_, _ = io.Copy(enc, bytes.NewReader(data))
|
||||
enc.Close()
|
||||
p.Data = w.Bytes()
|
||||
logrus.Debugln("[send] data len after zstd:", len(p.Data))
|
||||
data = w.Bytes()
|
||||
logrus.Debugln("[send] data len after zstd:", len(data))
|
||||
}
|
||||
p.Data = l.Encode(teatype, sndcnt&0x07ff, p.Data)
|
||||
logrus.Debugln("[send] data len after xchacha20:", len(p.Data), "addt:", sndcnt)
|
||||
p.SetBody(l.Encode(teatype, sndcnt&0x07ff, data), true)
|
||||
logrus.Debugln("[send] data len after xchacha20:", p.BodyLen(), "addt:", sndcnt)
|
||||
}
|
||||
|
||||
// write 向 peer 发一个包
|
||||
|
||||
Reference in New Issue
Block a user