1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-09 02:02:41 +08:00

fix: multi-segment hash checking

This commit is contained in:
源文雨
2025-04-03 00:00:21 +09:00
parent 6fc45333d8
commit 0c2f201bd0
15 changed files with 227 additions and 209 deletions

View File

@@ -17,18 +17,21 @@ import (
// PreCRC64 calculate crc64 checksum without idxdatsz.
func (p *Packet) PreCRC64() (crc uint64) {
w := bin.SelectWriter()
// 固定 TTL 为 0 计算
// 固定 TTL 为 0, flag 为空 计算
if bin.IsLittleEndian {
ttl := p.TTL
f := p.Proto
p.TTL = 0
p.Proto &= protobit
w.Write((*[PacketHeadNoCRCLen]byte)(
(unsafe.Pointer)(p),
)[:])
p.TTL = ttl
p.Proto = f
} else {
w.WriteUInt32(p.idxdatsz)
w.WriteUInt32(uint32(p.randn))
w.WriteUInt16(uint16(p.Proto)) // TTL is set to 0
w.WriteUInt16(uint16(p.Proto & protobit)) // TTL, flags is set to 0
w.WriteUInt16(p.SrcPort)
w.WriteUInt16(p.DstPort)
w.WriteUInt16(p.Offset)
@@ -58,9 +61,9 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) {
pbuf.NewBytes(buf.Len()).V(func(b []byte) {
copy(b, buf.Bytes())
ClearTTL(b)
p.md5h8rem = int64(algo.MD5Hash8(b))
p.md5h8 = algo.MD5Hash8(b)
})
_ = binary.Write(buf, binary.LittleEndian, p.md5h8rem)
_ = binary.Write(buf, binary.LittleEndian, p.md5h8)
return
}
w := bin.SelectWriter()
@@ -76,10 +79,10 @@ func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) {
pbuf.NewBytes(buf.Len()).V(func(b []byte) {
copy(b, buf.Bytes())
ClearTTL(b)
p.md5h8rem = int64(algo.MD5Hash8(b))
p.md5h8 = algo.MD5Hash8(b)
})
})
w.WriteUInt64(uint64(p.md5h8rem))
w.WriteUInt64(p.md5h8)
w.P(func(b *pbuf.Buffer) {
_, _ = buf.ReadFrom(b)
})

View File

@@ -69,11 +69,11 @@ func (pb *HeaderBuilder) Dst(ip net.IP, p uint16) *HeaderBuilder {
func (pb *HeaderBuilder) With(data []byte) *DataBuilder {
return (*DataBuilder)(pb.p(func(ub *PacketBuf) {
// header crc64 except idxdatasz
ub.DAT.md5h8rem = int64(ub.DAT.PreCRC64())
ub.DAT.md5h8 = ub.DAT.PreCRC64()
// plain data
ub.Buffer.Write(data)
if config.ShowDebugLog {
logrus.Debugln(file.Header(), strconv.FormatUint(uint64(ub.DAT.md5h8rem), 16), "build with data", file.ToLimitHexString(data, 64))
logrus.Debugln(file.Header(), strconv.FormatUint(ub.DAT.md5h8, 16), "build with data", file.ToLimitHexString(data, 64))
}
}))
}
@@ -89,16 +89,16 @@ func (pb *DataBuilder) Zstd() *DataBuilder {
ub.Reset()
data.V(func(b []byte) { ub.Write(b) })
if config.ShowDebugLog {
logrus.Debugln(file.Header(), strconv.FormatUint(uint64(ub.DAT.md5h8rem), 16), "data after zstd", file.ToLimitHexString(ub.Bytes(), 64))
logrus.Debugln(file.Header(), strconv.FormatUint(ub.DAT.md5h8, 16), "data after zstd", file.ToLimitHexString(ub.Bytes(), 64))
}
})
}
func (pb *DataBuilder) Hash() *DataBuilder {
return pb.p(func(ub *PacketBuf) {
ub.DAT.hash = algo.Blake2bHash8(
uint64(ub.DAT.md5h8rem), ub.Bytes(),
)
ub.DAT.hashrem = int64(algo.Blake2bHash8(
ub.DAT.md5h8, ub.Bytes(),
))
})
}
@@ -119,7 +119,7 @@ func (pb *DataBuilder) Seal(aead cipher.AEAD, teatyp uint8, additional uint16) *
p(func(ub *PacketBuf) {
// encrypted data: chacha20(hash + plain)
w := bin.SelectWriter()
w.WriteUInt64(ub.DAT.hash)
w.WriteUInt64(uint64(ub.DAT.hashrem))
w.Write(ub.Bytes())
w.P(func(b *pbuf.Buffer) {
data := algo.EncodeAEAD(aead, additional, b.Bytes())
@@ -133,7 +133,7 @@ func (pb *DataBuilder) Plain(teatyp uint8, additional uint16) *PacketBuilder {
return (*PacketBuilder)(pb.tea(teatyp).additional(additional).
p(func(ub *PacketBuf) {
w := bin.SelectWriter()
w.WriteUInt64(ub.DAT.hash)
w.WriteUInt64(uint64(ub.DAT.hashrem))
w.Write(ub.Bytes())
w.P(func(b *pbuf.Buffer) {
ub.Reset()

View File

@@ -3,6 +3,7 @@ package head
import (
"errors"
"net"
"sync/atomic"
"unsafe"
"github.com/fumiama/orbyte"
@@ -13,8 +14,8 @@ const (
// PacketHeadPreCRCIdx skip idxdatsz, which will be set at Seal().
PacketHeadPreCRCIdx = unsafe.Offsetof(Packet{}.randn)
// PacketHeadNoCRCLen without final crc
PacketHeadNoCRCLen = unsafe.Offsetof(Packet{}.md5h8rem)
PacketHeadLen = unsafe.Offsetof(Packet{}.hash)
PacketHeadNoCRCLen = unsafe.Offsetof(Packet{}.md5h8)
PacketHeadLen = unsafe.Offsetof(Packet{}.hashrem)
)
var (
@@ -57,19 +58,18 @@ type Packet struct {
src [4]byte
// dst 目的 ip (ipv4)
dst [4]byte
// md5h8rem 发送时记录包头字段除自身外的 checksum 值,
// 接收时记录剩余字节数.
// md5h8 发送时记录包头字段除自身外的 checksum 值.
//
// 可以认为在一定时间内唯一 (现已更改算法为 md5 但名字未变)。
md5h8rem int64
md5h8 uint64
// 以下字段为包体, 与 data 一起加密
// hash 使用 BLAKE2B 生成加密前 packet data+crc64 的摘要,
// 取其前 8 字节, 小端序读写.
// hashrem 使用 BLAKE2B 生成加密前 packet data+crc64 的摘要,
// 取其前 8 字节, 小端序读写. 接收时记录剩余字节数.
//
// https://github.com/fumiama/blake2b-simd
hash uint64
hashrem int64
// Buffer 用于 builder with 暂存原始包体数据
// 以及接收时保存 body, 通过 PacketBytes 截取偏移.
@@ -92,7 +92,7 @@ func (p *Packet) Size() int {
// CRC64 extract md5h8rem field
func (p *Packet) CRC64() uint64 {
return uint64(p.md5h8rem)
return p.md5h8
}
func (p *Packet) Src() net.IP {
@@ -102,3 +102,7 @@ func (p *Packet) Src() net.IP {
func (p *Packet) Dst() net.IP {
return append(net.IP{}, p.dst[:]...)
}
func (p *Packet) HasFinished() bool {
return atomic.LoadInt64(&p.hashrem) <= 0
}

View File

@@ -37,7 +37,7 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) {
pb.DAT.Offset = binary.LittleEndian.Uint16(data[14:16])
copy(pb.DAT.src[:], data[16:20])
copy(pb.DAT.dst[:], data[20:24])
pb.DAT.md5h8rem = int64(binary.LittleEndian.Uint64(data[24:32]))
pb.DAT.md5h8 = binary.LittleEndian.Uint64(data[24:32])
}
sz = pb.DAT.Size()
if !pb.DAT.Proto.IsValid() {
@@ -60,10 +60,10 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) {
ClearTTL(b)
crc = algo.MD5Hash8(b)
})
if crc != uint64(pb.DAT.md5h8rem) {
if crc != pb.DAT.md5h8 {
err = ErrBadCRCChecksum
if config.ShowDebugLog {
logrus.Warnf("[unbox] exp crc %016x but got %016x", pb.DAT.md5h8rem, crc)
logrus.Warnf("[unbox] exp crc %016x but got %016x", pb.DAT.md5h8, crc)
}
return
}
@@ -72,12 +72,12 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) {
}
if sz+int(PacketHeadLen) == len(data) {
pb.Buffer.Write(data[PacketHeadLen:])
pb.DAT.md5h8rem = -1
pb.DAT.hashrem = -1
return
}
pb.Buffer.Grow(sz)
pb.Buffer.Write(make([]byte, sz))
pb.DAT.md5h8rem = int64(sz)
pb.DAT.hashrem = int64(sz)
})
if err != nil {
return
@@ -92,7 +92,7 @@ func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) {
//
// return: complete.
func (p *Packet) WriteDataSegment(data, buf []byte) bool {
if atomic.LoadInt64(&p.md5h8rem) <= 0 {
if p.HasFinished() {
return true
}
@@ -103,21 +103,24 @@ func (p *Packet) WriteDataSegment(data, buf []byte) bool {
}
if offset == 0 {
p.randn = int32(binary.LittleEndian.Uint32(data[4:8]))
p.Proto = flags
p.TTL = data[9]
p.Offset = 0
p.md5h8 = binary.LittleEndian.Uint64(data[24:32])
if config.ShowDebugLog {
logrus.Debugln("[unbox] parse data set zero offset flags", flags)
}
}
rembytes := atomic.LoadInt64(&p.md5h8rem)
rembytes := atomic.LoadInt64(&p.hashrem)
if rembytes > 0 {
n := int64(copy(buf[offset:], data[PacketHeadLen:]))
newrem := rembytes - n
for !atomic.CompareAndSwapInt64(&p.md5h8rem, rembytes, newrem) {
rembytes = atomic.LoadInt64(&p.md5h8rem)
for !atomic.CompareAndSwapInt64(&p.hashrem, rembytes, newrem) {
rembytes = atomic.LoadInt64(&p.hashrem)
newrem = rembytes - n
}
}
return atomic.LoadInt64(&p.md5h8rem) <= 0
return p.HasFinished()
}

View File

@@ -1,9 +1,14 @@
package link
import (
"github.com/fumiama/orbyte/pbuf"
"encoding/hex"
"github.com/fumiama/orbyte/pbuf"
"github.com/sirupsen/logrus"
"github.com/fumiama/WireGold/config"
"github.com/fumiama/WireGold/internal/algo"
"github.com/fumiama/WireGold/internal/file"
)
func (l *Link) randkeyidx() uint8 {
@@ -19,11 +24,20 @@ func (l *Link) decode(teatype uint8, additional uint16, b []byte) (db pbuf.Bytes
return
}
if l.keys[0] == nil {
if config.ShowDebugLog {
n := len(b)
endl := "."
if n > 64 {
n = 64
endl = "..."
}
logrus.Debugln(file.Header(), "copy plain text", hex.EncodeToString(b[:n]), endl)
}
return pbuf.ParseBytes(b...).Copy(), nil
}
aead := l.keys[teatype]
if aead == nil {
return
panic("unexpected empty aead")
}
return algo.DecodeAEAD(aead, additional, b)
}

View File

@@ -69,7 +69,17 @@ func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) {
recvlooptime := atomic.LoadInt64(&m.recvlooptime)
if recvloopcnt%uintptr(m.speedloop) == 0 {
now := time.Now().UnixMilli()
logrus.Infof("[listen] queue recv avg speed: %.2f KB/s", float64(recvtotlcnt)/float64(now-recvlooptime))
kb := float64(recvtotlcnt) / float64(now-recvlooptime)
if kb < 1024 {
logrus.Infof("[listen] queue recv avg speed: %.2f KB/s", kb)
} else {
kb /= 1024
if kb < 1024 {
logrus.Infof("[listen] queue recv avg speed: %.2f MB/s", kb)
} else {
logrus.Infof("[listen] queue recv avg speed: %.2f GB/s", kb/1024)
}
}
atomic.StoreUint64(&m.recvtotlcnt, 0)
atomic.StoreInt64(&m.recvlooptime, now)
}
@@ -85,6 +95,9 @@ func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) {
if config.ShowDebugLog {
logrus.Debugln("[listen] dispatch", len(b), "bytes packet")
}
if !p.HasFinished() {
panic("unexpected unfinished")
}
m.dispatch(p, b, addr)
})
})

View File

@@ -49,7 +49,7 @@ type Me struct {
// 本机未接收完全分片池
recving *ttl.Cache[uint16, head.PacketBytes]
// 抗重放攻击记录池
recved *ttl.Cache[uint32, struct{}]
recved *ttl.Cache[uint64, struct{}]
// 本机上层配置
srcport, dstport, mtu, speedloop uint16
// 报头掩码
@@ -143,7 +143,7 @@ func NewMe(cfg *MyConfig) (m Me) {
binary.BigEndian.PutUint64(buf[:], m.mask)
logrus.Infoln("[me] xor mask", hex.EncodeToString(buf[:]))
m.recving = ttl.NewCache[uint16, head.PacketBytes](time.Second * 10)
m.recved = ttl.NewCache[uint32, struct{}](time.Minute)
m.recved = ttl.NewCache[uint64, struct{}](time.Minute)
return
}

View File

@@ -78,9 +78,13 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) {
if config.ShowDebugLog {
logrus.Debugf("[recv] packet seq %08x", seq)
}
if _, got := m.recved.GetOrSet(seq, struct{}{}); got {
crc := uint64(0)
header.B(func(_ []byte, p *head.Packet) {
crc = p.CRC64()
})
if _, got := m.recved.GetOrSet(uint64(seq)^crc, struct{}{}); got {
if config.ShowDebugLog {
logrus.Debugln("[recv] ignore duplicated seq packet", strconv.FormatUint(uint64(seq), 16))
logrus.Debugln("[recv] ignore duplicated seq^crc packet, seq", strconv.FormatUint(uint64(seq), 16), "crc", strconv.FormatUint(crc, 16))
}
return
}
@@ -145,8 +149,9 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) {
if config.ShowDebugLog {
logrus.Debugln("[recv]", strconv.FormatUint(uint64(seq&0xffff), 16), "get frag part isnew:", !got)
}
ok := false
h.B(func(buf []byte, p *head.Packet) {
ok := p.WriteDataSegment(data, buf)
ok = p.WriteDataSegment(data, buf)
if !ok {
if config.ShowDebugLog {
logrus.Debugln("[recv]", strconv.FormatUint(uint64(seq&0xffff), 16), "wait other frag parts isnew:", !got)
@@ -158,5 +163,8 @@ func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) {
logrus.Debugln("[recv]", strconv.FormatUint(uint64(seq&0xffff), 16), "all parts has reached")
}
})
if !ok {
return head.PacketBytes{}
}
return
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"math/rand"
@@ -40,7 +41,7 @@ func (l *Link) WritePacket(proto uint8, data []byte, ttl uint8) {
mtu -= uint16(rand.Intn(int(l.mturandomrange)))
}
if config.ShowDebugLog {
logrus.Debugln("[send] write mtu:", mtu, ", addt:", sndcnt&0x07ff, ", key index:", teatype)
logrus.Debugln("[send] write mtu:", mtu, ", addt:", sndcnt&0x07ff, ", key index:", teatype, ", data len:", len(data))
}
pb := head.NewPacketBuilder().
Src(l.me.me, l.me.srcport).Dst(l.peerip, l.me.dstport).
@@ -55,7 +56,11 @@ func (l *Link) WritePacket(proto uint8, data []byte, ttl uint8) {
} else {
pktb = pb.Seal(l.keys[teatype], teatype, sndcnt&0x07ff)
}
for _, b := range pktb.Split(int(mtu), false) { //TODO: impl. nofrag
bs := pktb.Split(int(mtu), false)
if config.ShowDebugLog {
logrus.Debugln("[send] split packet into", len(bs), "parts")
}
for _, b := range bs { //TODO: impl. nofrag
go l.write2peer(head.BuildPacketFromBytes(b), randseq(sndcnt))
}
}
@@ -99,7 +104,7 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) {
bound = len(data)
endl = "."
}
logrus.Debugln("[send] raw data bytes", hex.EncodeToString(data[:bound]), endl)
logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "raw data bytes", hex.EncodeToString(data[:bound]), endl)
}
b = l.me.xorenc(data, seq)
if config.ShowDebugLog {
@@ -110,7 +115,7 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) {
endl = "."
}
b.V(func(b []byte) {
logrus.Debugln("[send] xored data bytes", hex.EncodeToString(b[:bound]), endl)
logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "xored data bytes", hex.EncodeToString(b[:bound]), endl)
})
}
})
@@ -125,14 +130,14 @@ func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (err error) {
endl = "."
}
b.V(func(b []byte) {
logrus.Debugln("[send] xored data bytes", hex.EncodeToString(b[:bound]), endl)
logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "b14ed data bytes", hex.EncodeToString(b[:bound]), endl)
})
}
})
}
b.V(func(b []byte) {
if config.ShowDebugLog {
logrus.Debugln("[send] write", len(b), "bytes data from ep", conn.LocalAddr(), "to", peerep)
logrus.Debugln("[send] crc seq", fmt.Sprintf("%08x", seq), "write", len(b), "bytes data from ep", conn.LocalAddr(), "to", peerep)
}
_, err = conn.WriteToPeer(b, peerep)
})