From f4fd9b1423ef16e03c9b1fc9c39de72204baf9a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Wed, 12 Mar 2025 22:20:02 +0900 Subject: [PATCH] feat: impl. new protol design & new head --- .golangci.yml | 4 +- config/global.go | 2 +- go.mod | 4 +- go.sum | 6 +- gold/head/box.go | 73 +++++ gold/head/builder.go | 242 ++++++++++++++++ gold/head/flags.go | 50 ++-- gold/head/hello.go | 8 - gold/head/nat.go | 7 - gold/head/packet.go | 297 ++++---------------- gold/head/packet_test.go | 210 +++++++++++--- gold/head/pool.go | 31 +- gold/head/protos.go | 20 ++ gold/head/raw.go | 22 -- gold/head/unbox.go | 118 ++++++++ gold/link/crypto.go | 142 +--------- gold/link/event.go | 22 ++ gold/link/link.go | 59 +++- gold/link/listen.go | 163 +++++------ gold/link/me.go | 36 +-- gold/link/nat.go | 132 +-------- gold/link/peer.go | 9 +- gold/link/recv.go | 123 ++++---- gold/link/send.go | 200 ++++++------- gold/link/zstd.go | 41 --- gold/p2p/ip/init.go | 4 +- gold/p2p/tcp/init.go | 4 +- gold/p2p/tcp/pdu.go | 10 +- gold/p2p/udp/init.go | 4 +- gold/p2p/udplite/init.go | 4 +- gold/p2p/udplite/lite.go | 4 +- gold/proto/data.go | 14 + gold/proto/hello.go | 30 ++ gold/proto/nat.go | 121 ++++++++ helper/writer.go | 83 ------ internal/algo/crypto.go | 115 ++++++++ {gold/link => internal/algo}/crypto_test.go | 16 +- internal/algo/hash.go | 47 ++++ internal/algo/key.go | 40 +++ internal/algo/zstd.go | 41 +++ {helper => internal/bin}/data.go | 6 +- {helper => internal/bin}/pool.go | 2 +- internal/bin/writer.go | 66 +++++ {helper => internal/file}/file.go | 2 +- internal/file/log.go | 52 ++++ main.go | 20 +- upper/services/tunnel/tunnel.go | 56 ++-- upper/services/tunnel/tunnel_test.go | 7 +- upper/services/wg/wg.go | 11 +- 49 files changed, 1643 insertions(+), 1137 deletions(-) create mode 100644 gold/head/box.go create mode 100644 gold/head/builder.go delete mode 100644 gold/head/hello.go delete mode 100644 gold/head/nat.go delete mode 100644 gold/head/raw.go create mode 100644 gold/head/unbox.go create mode 100644 gold/link/event.go delete mode 100644 gold/link/zstd.go create mode 100644 gold/proto/data.go create mode 100644 gold/proto/hello.go create mode 100644 gold/proto/nat.go delete mode 100644 helper/writer.go create mode 100644 internal/algo/crypto.go rename {gold/link => internal/algo}/crypto_test.go (88%) create mode 100644 internal/algo/hash.go create mode 100644 internal/algo/key.go create mode 100644 internal/algo/zstd.go rename {helper => internal/bin}/data.go (82%) rename {helper => internal/bin}/pool.go (91%) create mode 100644 internal/bin/writer.go rename {helper => internal/file}/file.go (97%) create mode 100644 internal/file/log.go diff --git a/.golangci.yml b/.golangci.yml index 1f04f9b..ca6c686 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -4,8 +4,8 @@ linters-settings: ignoretests: true exclude-functions: - (*os.File).Write - - (*github.com/fumiama/WireGold/helper.Writer).Write - - (*github.com/fumiama/WireGold/helper.Writer).WriteByte + - (*github.com/fumiama/WireGold/bin.Writer).Write + - (*github.com/fumiama/WireGold/bin.Writer).WriteByte - (*github.com/fumiama/WireGold/upper/services/tunnel.Tunnel).Write - (*github.com/fumiama/WireGold/upper/services/tunnel.Tunnel).Read diff --git a/config/global.go b/config/global.go index 4bcfe7d..df62328 100644 --- a/config/global.go +++ b/config/global.go @@ -1,3 +1,3 @@ package config -const ShowDebugLog = false +const ShowDebugLog = true diff --git a/go.mod b/go.mod index 89f51a0..fd8a16f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/fumiama/WireGold -go 1.20 +go 1.21 require ( github.com/FloatTech/ttl v0.0.0-20250224045156-012b1463287d @@ -8,7 +8,7 @@ require ( github.com/fumiama/blake2b-simd v0.0.0-20220412110131-4481822068bb github.com/fumiama/go-base16384 v1.7.0 github.com/fumiama/go-x25519 v1.0.0 - github.com/fumiama/orbyte v0.0.0-20250225150552-190281785ccc + github.com/fumiama/orbyte v0.0.0-20250228175313-326f247ad703 github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac github.com/klauspost/compress v1.17.9 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 4af92c9..5e67ebd 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,8 @@ github.com/fumiama/go-base16384 v1.7.0 h1:6fep7XPQWxRlh4Hu+KsdH+6+YdUp+w6CwRXtMW github.com/fumiama/go-base16384 v1.7.0/go.mod h1:OEn+947GV5gsbTAnyuUW/SrfxJYUdYupSIQXOuGOcXM= github.com/fumiama/go-x25519 v1.0.0 h1:hiGg9EhseVmGCc8T1jECVkj8Keu/aJ1ZK05RM8Vuavo= github.com/fumiama/go-x25519 v1.0.0/go.mod h1:8VOhfyGZzw4IUs4nCjQFqW9cA3V/QpSCtP3fo2dLNg4= -github.com/fumiama/orbyte v0.0.0-20250225150552-190281785ccc h1:fW8dJIg6Yoz8k7dQ6zdUyhud5Pgi6khbqJw/IydqrtU= -github.com/fumiama/orbyte v0.0.0-20250225150552-190281785ccc/go.mod h1:qkUllQ1+gTx5sGrmKvIsqUgsnOO21Hiq847YHJRifbk= +github.com/fumiama/orbyte v0.0.0-20250228175313-326f247ad703 h1:NJ3V9S03x9g8e8+patxO13CqR0+OqUq0DDQiMkIUtz4= +github.com/fumiama/orbyte v0.0.0-20250228175313-326f247ad703/go.mod h1:qkUllQ1+gTx5sGrmKvIsqUgsnOO21Hiq847YHJRifbk= github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac h1:A/5A0rODsg+EQHH61Ew5mMUtDpRXaSNqHhPvW+fN4C4= github.com/fumiama/water v0.0.0-20211231134027-da391938d6ac/go.mod h1:BBnNY9PwK+UUn4trAU+H0qsMEypm7+3Bj1bVFuJItlo= github.com/fumiama/wintun v0.0.0-20211229152851-8bc97c8034c0 h1:WfrSFlIlCAtg6Rt2IGna0HhJYSDE45YVHiYqO4wwsEw= @@ -24,10 +24,12 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8 h1:TG/diQgUe0pntT/2D9tmUCz4VNwm9MfrtPr0SU2qSX8= +github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8/go.mod h1:P5HUIBuIWKbyjl083/loAegFkfbFNx5i2qEP4CNbm7E= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= diff --git a/gold/head/box.go b/gold/head/box.go new file mode 100644 index 0000000..7a3c02d --- /dev/null +++ b/gold/head/box.go @@ -0,0 +1,73 @@ +package head + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "unsafe" + + "github.com/fumiama/orbyte/pbuf" + "github.com/sirupsen/logrus" + + "github.com/fumiama/WireGold/config" + "github.com/fumiama/WireGold/internal/algo" + "github.com/fumiama/WireGold/internal/bin" +) + +// PreCRC64 calculate crc64 checksum without idxdatsz. +func (p *Packet) PreCRC64() (crc uint64) { + w := bin.SelectWriter() + if bin.IsLittleEndian { + w.Write((*[PacketHeadNoCRCLen]byte)( + (unsafe.Pointer)(p), + )[:]) + } else { + w.WriteUInt32(p.idxdatsz) + w.WriteUInt32(uint32(p.randn)) + w.WriteUInt16((uint16(p.TTL) << 8) | uint16(p.Proto)) + w.WriteUInt16(p.SrcPort) + w.WriteUInt16(p.DstPort) + w.WriteUInt16(p.Offset) + w.Write(p.src[:]) + w.Write(p.dst[:]) + } + w.P(func(b *pbuf.Buffer) { + crc = algo.MD5Hash8(b.Bytes()[PacketHeadPreCRCIdx:]) + if config.ShowDebugLog { + logrus.Debugf( + "[box] calc pre-crc64 %016x, dat %s", crc, + hex.EncodeToString(b.Bytes()[PacketHeadPreCRCIdx:]), + ) + } + }) + return +} + +// WriteHeaderTo write header bytes to buf +// with crc64 checksum. +func (p *Packet) WriteHeaderTo(buf *bytes.Buffer) { + if bin.IsLittleEndian { + buf.Write((*[PacketHeadNoCRCLen]byte)( + (unsafe.Pointer)(p), + )[:]) + p.md5h8rem = int64(algo.MD5Hash8(buf.Bytes())) + binary.Write(buf, binary.LittleEndian, p.md5h8rem) + return + } + w := bin.SelectWriter() + w.WriteUInt32(p.idxdatsz) + w.WriteUInt32(uint32(p.randn)) + w.WriteUInt16((uint16(p.TTL) << 8) | uint16(p.Proto)) + w.WriteUInt16(p.SrcPort) + w.WriteUInt16(p.DstPort) + w.WriteUInt16(p.Offset) + w.Write(p.src[:]) + w.Write(p.dst[:]) + w.P(func(b *pbuf.Buffer) { + p.md5h8rem = int64(algo.MD5Hash8(b.Bytes())) + }) + w.WriteUInt64(uint64(p.md5h8rem)) + w.P(func(b *pbuf.Buffer) { + buf.ReadFrom(b) + }) +} diff --git a/gold/head/builder.go b/gold/head/builder.go new file mode 100644 index 0000000..c05839e --- /dev/null +++ b/gold/head/builder.go @@ -0,0 +1,242 @@ +package head + +import ( + "crypto/cipher" + "crypto/rand" + "encoding/binary" + "net" + "strconv" + + "github.com/fumiama/orbyte/pbuf" + "github.com/sirupsen/logrus" + + "github.com/fumiama/WireGold/config" + "github.com/fumiama/WireGold/internal/algo" + "github.com/fumiama/WireGold/internal/bin" + "github.com/fumiama/WireGold/internal/file" +) + +type ( + HeaderBuilder PacketItem + DataBuilder PacketItem + PacketBuilder PacketItem +) + +func NewPacketBuilder() *HeaderBuilder { + p := selectPacket() + p.P(func(ub *PacketBuf) { + err := binary.Read( + rand.Reader, binary.LittleEndian, &ub.DAT.randn, + ) + if err != nil { + panic(err) + } + }) + return (*HeaderBuilder)(p) +} + +func (pb *HeaderBuilder) p(f func(*PacketBuf)) *HeaderBuilder { + (*PacketItem)(pb).P(f) + return pb +} + +func (pb *HeaderBuilder) Proto(proto uint8) *HeaderBuilder { + return pb.p(func(ub *PacketBuf) { + ub.DAT.Proto |= FlagsProto(proto) & protobit + }) +} + +func (pb *HeaderBuilder) TTL(ttl uint8) *HeaderBuilder { + return pb.p(func(ub *PacketBuf) { + ub.DAT.TTL = ttl + }) +} + +func (pb *HeaderBuilder) Src(ip net.IP, p uint16) *HeaderBuilder { + return pb.p(func(ub *PacketBuf) { + copy(ub.DAT.src[:], ip.To4()) + ub.DAT.SrcPort = p + }) +} + +func (pb *HeaderBuilder) Dst(ip net.IP, p uint16) *HeaderBuilder { + return pb.p(func(ub *PacketBuf) { + copy(ub.DAT.dst[:], ip.To4()) + ub.DAT.DstPort = p + }) +} + +func (pb *HeaderBuilder) With(data []byte) *DataBuilder { + return (*DataBuilder)(pb.p(func(ub *PacketBuf) { + // header crc64 except idxdatasz + ub.DAT.md5h8rem = int64(ub.DAT.PreCRC64()) + // plain data + ub.Buffer.Write(data) + if config.ShowDebugLog { + logrus.Debugln(file.Header(), strconv.FormatUint(uint64(ub.DAT.md5h8rem), 16), "build with data", file.ToLimitHexString(data, 64)) + } + })) +} + +func (pb *DataBuilder) p(f func(*PacketBuf)) *DataBuilder { + (*PacketItem)(pb).P(f) + return pb +} + +func (pb *DataBuilder) Zstd() *DataBuilder { + return pb.p(func(ub *PacketBuf) { + data := algo.EncodeZstd(ub.Bytes()) + ub.Reset() + data.V(func(b []byte) { ub.Write(b) }) + if config.ShowDebugLog { + logrus.Debugln(file.Header(), strconv.FormatUint(uint64(ub.DAT.md5h8rem), 16), "data after zstd", file.ToLimitHexString(ub.Bytes(), 64)) + } + }) +} + +func (pb *DataBuilder) Hash() *DataBuilder { + return (*DataBuilder)(pb.p(func(ub *PacketBuf) { + ub.DAT.hash = algo.Blake2bHash8( + uint64(ub.DAT.md5h8rem), ub.Bytes(), + ) + })) +} + +func (pb *DataBuilder) tea(typ uint8) *DataBuilder { + return pb.p(func(ub *PacketBuf) { + ub.DAT.idxdatsz |= (uint32(typ) << 27) + }) +} + +func (pb *DataBuilder) additional(additional uint16) *DataBuilder { + return pb.p(func(ub *PacketBuf) { + ub.DAT.idxdatsz |= (uint32(additional&0x07ff) << 16) + }) +} + +func (pb *DataBuilder) Seal(aead cipher.AEAD, teatyp uint8, additional uint16) *PacketBuilder { + return (*PacketBuilder)(pb.tea(teatyp).additional(additional). + p(func(ub *PacketBuf) { + // encrypted data: chacha20(hash + plain) + w := bin.SelectWriter() + w.WriteUInt64(ub.DAT.hash) + w.Write(ub.Bytes()) + w.P(func(b *pbuf.Buffer) { + data := algo.EncodeAEAD(aead, additional, b.Bytes()) + ub.Reset() + data.V(func(b []byte) { ub.Write(b) }) + }) + })) +} + +func (pb *DataBuilder) Plain(teatyp uint8, additional uint16) *PacketBuilder { + return (*PacketBuilder)(pb.tea(teatyp).additional(additional). + p(func(ub *PacketBuf) { + w := bin.SelectWriter() + w.WriteUInt64(ub.DAT.hash) + w.Write(ub.Bytes()) + w.P(func(b *pbuf.Buffer) { + ub.Reset() + ub.ReadFrom(b) + }) + })) +} + +func (pb *DataBuilder) Trans(teatyp uint8, additional uint16) *PacketBuilder { + return (*PacketBuilder)(pb.tea(teatyp).additional(additional)) +} + +func (pb *PacketBuilder) copy() *PacketBuilder { + return (*PacketBuilder)((*PacketItem)(pb).Copy()) +} + +func (pb *PacketBuilder) p(f func(*PacketBuf)) *PacketBuilder { + (*PacketItem)(pb).P(f) + return pb +} + +// datasize fill encrypted datasize by calling data.Len(). +func (pb *PacketBuilder) datasize() *PacketBuilder { + return pb.p(func(ub *PacketBuf) { + l := uint32(ub.Len()) & 0xffff + ub.DAT.idxdatsz |= l + }) +} + +func (pb *PacketBuilder) noFrag(on bool) *PacketBuilder { + return pb.p(func(ub *PacketBuf) { + if on { + ub.DAT.Proto |= nofragbit + } else { + ub.DAT.Proto &= ^nofragbit + } + }) +} + +func (pb *PacketBuilder) hasMore(on bool) *PacketBuilder { + return pb.p(func(ub *PacketBuf) { + if on { + ub.DAT.Proto |= hasmorebit + } else { + ub.DAT.Proto &= ^hasmorebit + } + }) +} + +func (pb *PacketBuilder) offset(off uint16) *PacketBuilder { + return pb.p(func(ub *PacketBuf) { + ub.DAT.Offset = off + }) +} + +// Split mtu based on the total len, which includes +// header and body and padding after outer xor. +func (pb *PacketBuilder) Split(mtu int, nofrag bool) (pbs []PacketBytes) { + pb.datasize().p(func(ub *PacketBuf) { + bodylen := ub.Len() + datalen := bodylen + int(PacketHeadLen) + udplen := algo.EncodeXORLen(datalen) + if udplen <= mtu { // can be sent in a single packet + pbs = []PacketBytes{ + pbuf.BufferItemToBytes((*PacketItem)( + pb.copy().noFrag(nofrag).hasMore(false).offset(0), + )), + } + return + } + if nofrag { // drop oversized packet + return + } + pb.noFrag(false).hasMore(true) + datalim := mtu - 9 - int(PacketHeadLen) + n := bodylen / datalim + r := bodylen % datalim + if r > 0 { + n++ + } + pbs = make([]PacketBytes, n) + for i := 0; i < n; i++ { + a, b := i*datalim, (i+1)*datalim + if b > bodylen { + b = bodylen + } + pbs[i] = pbuf.BufferItemToBytes((*PacketItem)( + pb.copy().offset(uint16(i*datalim)), + )).Slice(a, b) + } + }) + return +} + +func BuildPacketFromBytes(pb PacketBytes) pbuf.Bytes { + w := bin.SelectWriter() + pb.B(func(_ []byte, p *Packet) { + w.P(func(b *pbuf.Buffer) { + p.WriteHeaderTo(&b.Buffer) + }) + }) + pb.V(func(b []byte) { + w.Write(b) + }) + return w.ToBytes() +} diff --git a/gold/head/flags.go b/gold/head/flags.go index 7dfeb17..bf913a6 100644 --- a/gold/head/flags.go +++ b/gold/head/flags.go @@ -1,41 +1,37 @@ package head import ( - "encoding/binary" "fmt" ) -type PacketFlags uint16 +const ( + hasmorebit FlagsProto = 0x20 << iota + nofragbit + topbit //TODO: 改为 trans 标记 +) -func (pf PacketFlags) String() string { - return fmt.Sprintf("%04x", uint16(pf)) +const ( + impossiblebit = hasmorebit | nofragbit + flagsbit = topbit | impossiblebit + protobit = ^flagsbit +) + +type FlagsProto uint8 + +func (pf FlagsProto) String() string { + return fmt.Sprintf("%02x", uint8(pf)) } -func (pf PacketFlags) IsValid() bool { - return pf&0x8000 == 0 +func (pf FlagsProto) IsValid() bool { + return pf&topbit == 0 && + pf&impossiblebit != impossiblebit && + pf.Proto() < ProtoTop } -func (pf PacketFlags) DontFrag() bool { - return pf&0x4000 == 0x4000 +func (pf FlagsProto) HasMore() bool { + return pf&hasmorebit != 0 } -func (pf PacketFlags) NoFrag() bool { - return pf == 0x4000 -} - -func (pf PacketFlags) IsSingle() bool { - return pf == 0 -} - -func (pf PacketFlags) ZeroOffset() bool { - return pf&0x1fff == 0 -} - -func (pf PacketFlags) Offset() uint16 { - return uint16(pf << 3) -} - -// Flags extract flags from raw data -func Flags(data []byte) PacketFlags { - return PacketFlags(binary.LittleEndian.Uint16(data[10:12])) +func (pf FlagsProto) NoFrag() bool { + return pf&nofragbit != 0 } diff --git a/gold/head/hello.go b/gold/head/hello.go deleted file mode 100644 index bd5ccee..0000000 --- a/gold/head/hello.go +++ /dev/null @@ -1,8 +0,0 @@ -package head - -type Hello uint8 - -const ( - HelloPing Hello = iota - HelloPong -) diff --git a/gold/head/nat.go b/gold/head/nat.go deleted file mode 100644 index 86e167d..0000000 --- a/gold/head/nat.go +++ /dev/null @@ -1,7 +0,0 @@ -package head - -// Notify 是 map[peerip]{network, endpoint} -type Notify = map[string][2]string - -// Query 是 peerips 组成的数组 -type Query = []string diff --git a/gold/head/packet.go b/gold/head/packet.go index a5cfc4c..e1c0174 100644 --- a/gold/head/packet.go +++ b/gold/head/packet.go @@ -1,238 +1,78 @@ package head import ( - "encoding/binary" - "encoding/hex" "errors" "net" - "sync/atomic" + "unsafe" - blake2b "github.com/fumiama/blake2b-simd" "github.com/fumiama/orbyte" "github.com/fumiama/orbyte/pbuf" - "github.com/sirupsen/logrus" - - "github.com/fumiama/WireGold/config" - "github.com/fumiama/WireGold/helper" ) -const PacketHeadLen = 60 +const ( + // PacketHeadPreCRCIdx skip idxdatsz, which will be set at Seal(). + PacketHeadPreCRCIdx = unsafe.Offsetof(Packet{}.randn) + // PacketHeadNoCRCLen without final crc + PacketHeadNoCRCLen = unsafe.Offsetof(Packet{}.md5h8rem) + PacketHeadLen = unsafe.Offsetof(Packet{}.hash) +) var ( - ErrBadCRCChecksum = errors.New("bad crc checksum") - ErrDataLenLT60 = errors.New("data len < 60") + ErrBadCRCChecksum = errors.New("bad crc checksum") + ErrDataLenLEHeader = errors.New("data len <= header len") + ErrInvalidOffset = errors.New("invalid offset") +) + +type ( + PacketBuf = pbuf.UserBuffer[Packet] + PacketItem = orbyte.Item[PacketBuf] + PacketBytes = pbuf.UserBytes[Packet] ) // Packet 是发送和接收的最小单位 type Packet struct { - // idxdatsz len(Data) + // idxdatsz + // + // idx // 高 5 位指定加密所用 key index // 高 5-16 位是递增值, 用于 xchacha20 验证 additionalData + // + // datsz // 不得超过 65507-head 字节 idxdatsz uint32 - // Proto 详见 head - Proto uint8 + // randn + // 在发送报文时填入随机值. + randn int32 + // Proto 高3位为标志(xDM),低5位为协议类型 + Proto FlagsProto // TTL is time to live TTL uint8 // SrcPort 源端口 SrcPort uint16 // DstPort 目的端口 DstPort uint16 - // Flags 高3位为标志(xDM),低13位为分片偏移 - Flags PacketFlags - // 记录还有多少字节未到达 - rembytes int32 - // Src 源 ip (ipv4) - Src net.IP - // Dst 目的 ip (ipv4) - Dst net.IP - // Hash 使用 BLAKE2 生成加密前 Packet 的摘要 - // 生成时 Hash 全 0 + // Offset 分片偏移量 + Offset uint16 + // src 源 ip (ipv4) + src [4]byte + // dst 目的 ip (ipv4) + dst [4]byte + // md5h8rem 发送时记录包头字段除自身外的 checksum 值, + // 接收时记录剩余字节数. + // + // 可以认为在一定时间内唯一 (现已更改算法为 md5 但名字未变)。 + md5h8rem int64 + + // 以下字段为包体, 与 data 一起加密 + + // hash 使用 BLAKE2B 生成加密前 packet data+crc64 的摘要, + // 取其前 8 字节, 小端序读写. + // // https://github.com/fumiama/blake2b-simd - Hash [32]byte - // crc64 包头字段的 checksum 值,可以认为在一定时间内唯一 (现已更改算法为 md5 但名字未变) - crc64 uint64 - // data 承载的数据 - data pbuf.Bytes - // Data 当前的偏移 - a, b int -} + hash uint64 -// NewPacketPartial 从一些预设参数生成一个新包 -func NewPacketPartial( - proto uint8, srcPort uint16, - dst net.IP, dstPort uint16, - data pbuf.Bytes, -) *orbyte.Item[Packet] { - p := selectPacket() - pp := p.Pointer() - pp.Proto = proto - pp.TTL = 16 - pp.SrcPort = srcPort - pp.DstPort = dstPort - pp.Dst = dst - pp.data = data - pp.b = data.Len() - return p -} - -func ParsePacket(p Packet) *orbyte.Item[Packet] { - return packetPool.Parse(nil, p) -} - -func ParsePacketHeader(data []byte) (p *orbyte.Item[Packet], err error) { - if len(data) < 60 { - err = ErrDataLenLT60 - return - } - p = selectPacket() - pp := p.Pointer() - pp.crc64 = CRC64(data) - if CalcCRC64(data) != pp.crc64 { - err = ErrBadCRCChecksum - return - } - - pp.idxdatsz = binary.LittleEndian.Uint32(data[:4]) - sz := pp.Len() - if config.ShowDebugLog { - logrus.Debugln("[packet] header data len", sz, "read data len", len(data)) - } - pt := binary.LittleEndian.Uint16(data[4:6]) - pp.Proto = uint8(pt) - pp.TTL = uint8(pt >> 8) - pp.SrcPort = binary.LittleEndian.Uint16(data[6:8]) - pp.DstPort = binary.LittleEndian.Uint16(data[8:10]) - - flags := PacketFlags(binary.LittleEndian.Uint16(data[10:12])) - pp.Flags = flags - pp.Src = make(net.IP, 4) - copy(pp.Src, data[12:16]) - pp.Dst = make(net.IP, 4) - copy(pp.Dst, data[16:20]) - copy(pp.Hash[:], data[20:52]) - - switch { - case sz+PacketHeadLen == len(data): - pp.b = sz - pp.rembytes = -1 - case pp.rembytes == 0: - pp.data = pbuf.NewBytes(sz) - pp.b = sz - pp.rembytes = int32(sz) - } - - return -} - -// ParseData 将 data 的数据解码到自身 -// -// 必须先调用 ParsePacketHeader -func (p *Packet) ParseData(data []byte) (complete bool) { - sz := p.Len() - if sz+PacketHeadLen == len(data) { - p.data = pbuf.ParseBytes(data[PacketHeadLen:]...).Copy() - return true - } - - flags := PacketFlags(binary.LittleEndian.Uint16(data[10:12])) - if config.ShowDebugLog { - logrus.Debugln("[packet] parse data flags", flags, "off", flags.Offset()) - } - if flags.ZeroOffset() { - p.Flags = flags - if config.ShowDebugLog { - logrus.Debugln("[packet] parse data set zero offset flags", flags) - } - } - - rembytes := atomic.LoadInt32(&p.rembytes) - if rembytes > 0 { - n := int32(copy(p.data.Bytes()[flags.Offset():], data[PacketHeadLen:])) - newrem := rembytes - n - for !atomic.CompareAndSwapInt32(&p.rembytes, rembytes, newrem) { - rembytes = atomic.LoadInt32(&p.rembytes) - newrem = rembytes - n - } - if config.ShowDebugLog { - logrus.Debugln("[packet] copied frag", hex.EncodeToString(data[20:52]), "rembytes:", p.rembytes) - } - } - - return p.rembytes <= 0 -} - -// DecreaseAndGetTTL TTL 自减后返回 -func (p *Packet) DecreaseAndGetTTL() uint8 { - p.TTL-- - return p.TTL -} - -// MarshalWith 补全剩余参数, 将自身数据编码为 []byte -// offset 必须为 8 的倍数,表示偏移的 8 位 -func (p *Packet) MarshalWith( - src net.IP, teatype uint8, additional uint16, - datasz uint32, offset uint16, - dontfrag, hasmore bool, -) pbuf.Bytes { - if src != nil { - p.Src = src - p.idxdatsz = (uint32(teatype) << 27) | (uint32(additional&0x07ff) << 16) | datasz&0xffff - } - - offset &= 0x1fff - if dontfrag { - offset |= 0x4000 - } - if hasmore { - offset |= 0x2000 - } - p.Flags = PacketFlags(offset) - return helper.NewWriterF(func(w *helper.Writer) { - w.WriteUInt32(p.idxdatsz) - w.WriteUInt16((uint16(p.TTL) << 8) | uint16(p.Proto)) - w.WriteUInt16(p.SrcPort) - w.WriteUInt16(p.DstPort) - w.WriteUInt16(uint16(p.Flags)) - w.Write(p.Src.To4()) - w.Write(p.Dst.To4()) - w.Write(p.Hash[:]) - p.crc64 = CalcCRC64(w.UnsafeBytes()) - w.WriteUInt64(p.crc64) - w.Write(p.UnsafeBody()) - }) -} - -// FillHash 生成 p.Data 的 Hash -func (p *Packet) FillHash() { - h := blake2b.New256() - _, err := h.Write(p.UnsafeBody()) - if err != nil { - logrus.Errorln("[packet] err when fill hash:", err) - return - } - hsh := h.Sum(p.Hash[:0]) - if config.ShowDebugLog { - logrus.Debugln("[packet] sum calulated:", hex.EncodeToString(hsh)) - } -} - -// IsVaildHash 验证 packet 合法性 -func (p *Packet) IsVaildHash() bool { - h := blake2b.New256() - _, err := h.Write(p.UnsafeBody()) - if err != nil { - logrus.Errorln("[packet] err when check hash:", err) - return false - } - var sum [32]byte - _ = h.Sum(sum[:0]) - if config.ShowDebugLog { - logrus.Debugln("[packet] sum data len:", len(p.UnsafeBody())) - logrus.Debugln("[packet] sum calulated:", hex.EncodeToString(sum[:])) - logrus.Debugln("[packet] sum in packet:", hex.EncodeToString(p.Hash[:])) - } - return sum == p.Hash + // Buffer 用于 builder with 暂存原始包体数据 + // 以及接收时保存 body, 通过 PacketBytes 截取偏移. } // AdditionalData 获得 packet 的 additionalData @@ -246,48 +86,19 @@ func (p *Packet) CipherIndex() uint8 { } // Len is packet size -func (p *Packet) Len() int { +func (p *Packet) Size() int { return int(p.idxdatsz & 0xffff) } +// CRC64 extract md5h8rem field func (p *Packet) CRC64() uint64 { - return p.crc64 + return uint64(p.md5h8rem) } -// TransBody returns item.Trans().Slice() -func (p *Packet) TransBody() pbuf.Bytes { - d := p.data.Trans().Slice(p.a, p.b) - p.data = pbuf.Bytes{} - return d +func (p *Packet) Src() net.IP { + return append(net.IP{}, p.src[:]...) } -// UnsafeBody returns data -func (p *Packet) UnsafeBody() []byte { - return p.data.Bytes()[p.a:p.b] -} - -func (p *Packet) BodyLen() int { - return p.b - p.a -} - -func (p *Packet) SetBody(b pbuf.Bytes) { - p.a = 0 - p.b = b.Len() - p.data = b -} - -func (p *Packet) CropBody(a, b int) { - if b > p.data.Len() { - b = p.data.Len() - } - if a < 0 || b < 0 || a > b { - return - } - p.a, p.b = a, b -} - -func (p *Packet) ShallowCopy() (newp Packet) { - newp = *p - newp.data = p.data.Ref() - return newp +func (p *Packet) Dst() net.IP { + return append(net.IP{}, p.dst[:]...) } diff --git a/gold/head/packet_test.go b/gold/head/packet_test.go index c3ca074..37ff826 100644 --- a/gold/head/packet_test.go +++ b/gold/head/packet_test.go @@ -1,31 +1,157 @@ package head import ( + "bytes" crand "crypto/rand" "encoding/hex" "math/rand" "net" + "runtime" + "sync" "testing" - "github.com/fumiama/orbyte/pbuf" + "github.com/fumiama/WireGold/internal/algo" + "github.com/fumiama/WireGold/internal/bin" ) +func TestBuilderNative(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(4096) + for i := 0; i < 4096; i++ { + go func(i int) { + defer runtime.GC() + defer wg.Done() + dat := BuildPacketFromBytes(NewPacketBuilder().Proto(3).TTL(0xff). + Src(net.IPv4(1, 2, 3, 4), 5).Dst(net.IPv4(6, 7, 8, 9), 10). + With([]byte("0123456789")).Hash().Plain(0x12, 0x0345). + Split(16384, false)[0]).Trans() + s := hex.EncodeToString(dat) + if s[:8] != "12004593" { + panic("1") + } + if s[16:48] != "03ff05000a0000000102030406070809" { + panic("2") + } + if s[80:] != "30313233343536373839" { + panic("3") + } + p, err := ParsePacketHeader(dat) + if err != nil { + panic(err) + } + p.B(func(buf []byte, p *Packet) { + ok := p.WriteDataSegment(dat, buf) + if !ok { + panic(i) + } + if !algo.IsVaildBlake2bHash8(p.PreCRC64(), buf) { + panic(i) + } + if p.Proto != 3 { + panic(i) + } + if p.CipherIndex() != 0x12 { + panic(i) + } + if p.SrcPort != 5 { + panic(i) + } + if p.DstPort != 10 { + panic(i) + } + if !bytes.Equal(p.src[:], net.IPv4(1, 2, 3, 4).To4()) { + panic(i) + } + if !bytes.Equal(p.dst[:], net.IPv4(6, 7, 8, 9).To4()) { + panic(i) + } + if p.AdditionalData() != 0x0345 { + panic(i) + } + }) + }(i) + } + wg.Wait() +} + +func TestBuilderBE(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(4096) + bin.IsLittleEndian = false + for i := 0; i < 4096; i++ { + go func(i int) { + defer runtime.GC() + defer wg.Done() + dat := BuildPacketFromBytes(NewPacketBuilder().Proto(3).TTL(0xff). + Src(net.IPv4(1, 2, 3, 4), 5).Dst(net.IPv4(6, 7, 8, 9), 10). + With([]byte("0123456789")).Hash().Plain(0x12, 0x0345). + Split(16384, false)[0]).Trans() + s := hex.EncodeToString(dat) + if s[:8] != "12004593" { + panic("1") + } + if s[16:48] != "03ff05000a0000000102030406070809" { + panic("2") + } + if s[80:] != "30313233343536373839" { + panic("3") + } + p, err := ParsePacketHeader(dat) + if err != nil { + panic(err) + } + p.B(func(buf []byte, p *Packet) { + ok := p.WriteDataSegment(dat, buf) + if !ok { + panic(i) + } + if !algo.IsVaildBlake2bHash8(p.PreCRC64(), buf) { + panic(i) + } + if p.Proto != 3 { + panic(i) + } + if p.CipherIndex() != 0x12 { + panic(i) + } + if p.SrcPort != 5 { + panic(i) + } + if p.DstPort != 10 { + panic(i) + } + if !bytes.Equal(p.src[:], net.IPv4(1, 2, 3, 4).To4()) { + panic(i) + } + if !bytes.Equal(p.dst[:], net.IPv4(6, 7, 8, 9).To4()) { + panic(i) + } + if p.AdditionalData() != 0x0345 { + panic(i) + } + }) + }(i) + } + wg.Wait() +} + func TestMarshalUnmarshal(t *testing.T) { - data := pbuf.NewBytes(4096) - n, err := crand.Read(data.Bytes()) + // logrus.SetLevel(logrus.DebugLevel) + data := make([]byte, 4096) + n, err := crand.Read(data) if n != 4096 { t.Fatal("unexpected") } if err != nil { t.Fatal(err) } - for i := 0; i < 0x7ff; i++ { - proto := uint8(rand.Intn(255)) + for i := 0; i < 4096; i++ { + proto := uint8(rand.Intn(int(ProtoTop))) teatype := uint8(rand.Intn(32)) srcPort := uint16(rand.Intn(65535)) dstPort := uint16(rand.Intn(65535)) src := make(net.IP, 4) - _, err = crand.Read(src) + _, err := crand.Read(src) if err != nil { t.Fatal(err) } @@ -34,41 +160,47 @@ func TestMarshalUnmarshal(t *testing.T) { if err != nil { t.Fatal(err) } - p := NewPacketPartial(proto, srcPort, dst, dstPort, data.SliceTo(i)) - p.Pointer().FillHash() - d := p.Pointer().MarshalWith(src, teatype, uint16(i), uint32(i), 0, true, false) - t.Log("data:", hex.EncodeToString(d.Bytes())) - p, err := ParsePacketHeader(d.Bytes()) + dat := BuildPacketFromBytes(NewPacketBuilder().Proto(proto). + Src(src, srcPort).Dst(dst, dstPort). + With(data[:i]).Hash().Plain(teatype, uint16(i&0x7ff)). + Split(16384, false)[0]).Trans() + t.Log("pkt:", hex.EncodeToString(dat)) + p, err := ParsePacketHeader(dat) if err != nil { t.Fatal("index", i, err) } - ok := p.Pointer().ParseData(d.Bytes()) - if !ok { - t.Fatal("index", i) - } - if !p.Pointer().IsVaildHash() { - t.Fatal("index", i, "expect body", hex.EncodeToString(data.SliceTo(i).Bytes()), "got", hex.EncodeToString(p.Pointer().UnsafeBody())) - } - if p.Pointer().Proto != proto { - t.Fatal("index", i) - } - if p.Pointer().CipherIndex() != teatype { - t.Fatal("index", i, "expect", teatype, "got", p.Pointer().CipherIndex()) - } - if p.Pointer().SrcPort != srcPort { - t.Fatal("index", i) - } - if p.Pointer().DstPort != dstPort { - t.Fatal("index", i) - } - if !p.Pointer().Src.Equal(src) { - t.Fatal("index", i) - } - if !p.Pointer().Dst.Equal(dst) { - t.Fatal("index", i) - } - if p.Pointer().AdditionalData() != uint16(i) { - t.Fatal("index", i) - } + p.B(func(buf []byte, p *Packet) { + ok := p.WriteDataSegment(dat, buf) + if !ok { + t.Fatal("index", i) + } + if !algo.IsVaildBlake2bHash8(p.PreCRC64(), buf) { + t.Fatal("index", i, "expect body", hex.EncodeToString(data[:i]), "got", hex.EncodeToString(buf[8:])) + } + if p.Proto != FlagsProto(proto) { + t.Fatal("index", i) + } + if p.CipherIndex() != teatype { + t.Fatal("index", i, "expect", teatype, "got", p.CipherIndex()) + } + if p.SrcPort != srcPort { + t.Fatal("index", i) + } + if p.DstPort != dstPort { + t.Fatal("index", i) + } + if !bytes.Equal(p.src[:], src) { + t.Fatal("index", i) + } + if !bytes.Equal(p.dst[:], dst) { + t.Fatal("index", i) + } + if p.AdditionalData() != uint16(i&0x7ff) { + t.Fatal("index", i) + } + if !bytes.Equal(buf[8:], data[:i]) { + t.Fatal("index", i) + } + }) } } diff --git a/gold/head/pool.go b/gold/head/pool.go index 60233bd..fca7e05 100644 --- a/gold/head/pool.go +++ b/gold/head/pool.go @@ -1,37 +1,12 @@ package head import ( - "github.com/fumiama/orbyte" "github.com/fumiama/orbyte/pbuf" ) -type packetPooler struct { - orbyte.Pooler[Packet] -} - -func (packetPooler) New(_ any, pooled Packet) Packet { - return pooled -} - -func (packetPooler) Parse(obj any, _ Packet) Packet { - return obj.(Packet) -} - -func (packetPooler) Reset(p *Packet) { - p.idxdatsz = 0 - p.data = pbuf.Bytes{} - p.a, p.b = 0, 0 - p.rembytes = 0 -} - -func (packetPooler) Copy(dst, src *Packet) { - *dst = *src - dst.data = src.data.Copy() -} - -var packetPool = orbyte.NewPool[Packet](packetPooler{}) +var packetPool = pbuf.NewBufferPool[Packet]() // selectPacket 从池中取出一个 Packet -func selectPacket() *orbyte.Item[Packet] { - return packetPool.New(nil) +func selectPacket(buf ...byte) *PacketItem { + return (*PacketItem)(packetPool.NewBuffer(buf)) } diff --git a/gold/head/protos.go b/gold/head/protos.go index ce5d257..a7dbc7d 100644 --- a/gold/head/protos.go +++ b/gold/head/protos.go @@ -6,4 +6,24 @@ const ( ProtoNotify ProtoQuery ProtoData + ProtoTrans ) + +const ProtoTop = uint8(protobit + 1) + +func (pf FlagsProto) Proto() uint8 { + return uint8(pf & protobit) +} + +type Hello uint8 + +const ( + HelloPing Hello = iota + HelloPong +) + +// Notify 是 map[peerip]{network, endpoint} +type Notify = map[string][2]string + +// Query 是 peerips 组成的数组 +type Query = []string diff --git a/gold/head/raw.go b/gold/head/raw.go deleted file mode 100644 index 646873b..0000000 --- a/gold/head/raw.go +++ /dev/null @@ -1,22 +0,0 @@ -package head - -import ( - "crypto/md5" - "encoding/binary" -) - -// CRC64 extract packet header checksum -func CRC64(data []byte) uint64 { - return binary.LittleEndian.Uint64(data[52:PacketHeadLen]) -} - -// CalcCRC64 calculate packet header checksum -func CalcCRC64(data []byte) uint64 { - m := md5.Sum(data[:52]) - return binary.LittleEndian.Uint64(m[:8]) -} - -// Hash extract 32 bytes blake2b hash from raw bytes -func Hash(data []byte) []byte { - return data[20:52] -} diff --git a/gold/head/unbox.go b/gold/head/unbox.go new file mode 100644 index 0000000..39abdc1 --- /dev/null +++ b/gold/head/unbox.go @@ -0,0 +1,118 @@ +package head + +import ( + "encoding/binary" + "errors" + "sync/atomic" + "unsafe" + + "github.com/sirupsen/logrus" + + "github.com/fumiama/WireGold/config" + "github.com/fumiama/WireGold/internal/algo" + "github.com/fumiama/WireGold/internal/bin" + "github.com/fumiama/orbyte/pbuf" +) + +func ParsePacketHeader(data []byte) (pbytes PacketBytes, err error) { + if len(data) <= int(PacketHeadLen) { + err = ErrDataLenLEHeader + return + } + p := selectPacket() + sz := 0 + p.P(func(pb *PacketBuf) { + if bin.IsLittleEndian { + copy((*[PacketHeadLen]byte)( + (unsafe.Pointer)(&pb.DAT), + )[:], data) + } else { + pb.DAT.idxdatsz = binary.LittleEndian.Uint32(data[:4]) + pb.DAT.randn = int32(binary.LittleEndian.Uint32(data[4:8])) + pt := binary.LittleEndian.Uint16(data[8:10]) + pb.DAT.Proto = FlagsProto(pt) + pb.DAT.TTL = uint8(pt >> 8) + pb.DAT.SrcPort = binary.LittleEndian.Uint16(data[10:12]) + pb.DAT.DstPort = binary.LittleEndian.Uint16(data[12:14]) + pb.DAT.Offset = binary.LittleEndian.Uint16(data[14:16]) + copy(pb.DAT.src[:], data[16:20]) + copy(pb.DAT.dst[:], data[20:24]) + pb.DAT.md5h8rem = int64(binary.LittleEndian.Uint64(data[24:32])) + } + sz = pb.DAT.Size() + if !pb.DAT.Proto.IsValid() { + err = errors.New("invalid proto " + pb.DAT.Proto.String()) + return + } + if (!pb.DAT.Proto.HasMore() && (pb.DAT.Offset != 0 || + sz+int(PacketHeadLen) != len(data))) || + (pb.DAT.Proto.HasMore() && pb.DAT.Offset+ + uint16(len(data[PacketHeadLen:])) > uint16(sz)) { + err = ErrInvalidOffset + if config.ShowDebugLog { + logrus.Warnf("[unbox] invalid offset %04x size %04x", pb.DAT.Offset, sz) + } + return + } + crc := algo.MD5Hash8(data[:PacketHeadNoCRCLen]) + if crc != uint64(pb.DAT.md5h8rem) { + err = ErrBadCRCChecksum + if config.ShowDebugLog { + logrus.Warnf("[unbox] exp crc %016x but got %016x", pb.DAT.md5h8rem, crc) + } + return + } + if config.ShowDebugLog { + logrus.Debugln("[unbox] header data len", sz, "read data len", len(data)-int(PacketHeadLen)) + } + if sz+int(PacketHeadLen) == len(data) { + pb.Buffer.Write(data[PacketHeadLen:]) + pb.DAT.md5h8rem = -1 + return + } + pb.Buffer.Grow(sz) + pb.Buffer.Write(make([]byte, sz)) + pb.DAT.md5h8rem = int64(sz) + }) + if err != nil { + return + } + pbytes = pbuf.BufferItemToBytes(p) + return +} + +// WriteDataSegment 将 data 的数据并发解码到自身 buf. +// +// 必须先调用 ParsePacketHeader 获得 packet. +// +// return: complete. +func (p *Packet) WriteDataSegment(data, buf []byte) bool { + if atomic.LoadInt64(&p.md5h8rem) <= 0 { + return true + } + + flags := FlagsProto(data[8]) + offset := binary.LittleEndian.Uint16(data[14:16]) + if config.ShowDebugLog { + logrus.Debugln("[unbox] parse data flags", flags, "off", offset) + } + + if offset == 0 { + p.Proto = flags + p.Offset = 0 + if config.ShowDebugLog { + logrus.Debugln("[unbox] parse data set zero offset flags", flags) + } + } + + rembytes := atomic.LoadInt64(&p.md5h8rem) + if rembytes > 0 { + n := int64(copy(buf[offset:], data[PacketHeadLen:])) + newrem := rembytes - n + for !atomic.CompareAndSwapInt64(&p.md5h8rem, rembytes, newrem) { + rembytes = atomic.LoadInt64(&p.md5h8rem) + newrem = rembytes - n + } + } + return atomic.LoadInt64(&p.md5h8rem) <= 0 +} diff --git a/gold/link/crypto.go b/gold/link/crypto.go index c717aa2..dd6e38e 100644 --- a/gold/link/crypto.go +++ b/gold/link/crypto.go @@ -1,72 +1,16 @@ package link import ( - "crypto/cipher" - "crypto/rand" - "encoding/binary" - "errors" - "math/bits" - mrand "math/rand" - "github.com/fumiama/orbyte/pbuf" - "github.com/sirupsen/logrus" -) -var ( - ErrCipherTextTooShort = errors.New("ciphertext too short") + "github.com/fumiama/WireGold/internal/algo" ) func (l *Link) randkeyidx() uint8 { if l.keys[1] == nil { return 0 } - return uint8(mrand.Intn(32)) -} - -func mixkeys(k1, k2 []byte) []byte { - if len(k1) != 32 || len(k2) != 32 { - panic("unexpected key len") - } - k := make([]byte, 64) - for i := range k1 { - k1i, k2i := i, 31-i - k1v, k2v := k1[k1i], k2[k2i] - binary.LittleEndian.PutUint16( - k[i*2:(i+1)*2], - expandkeyunit(k1v, k2v), - ) - } - return k -} - -func expandkeyunit(v1, v2 byte) (v uint16) { - v1s, v2s := uint16(v1), uint16(bits.Reverse8(v2)) - for i := 0; i < 8; i++ { - v |= v1s & (1 << (i * 2)) - v1s <<= 1 - } - for i := 0; i < 8; i++ { - v2s <<= 1 - v |= v2s & (2 << (i * 2)) - } - return -} - -// encode by aead and put b into pool -func (l *Link) encode(teatype uint8, additional uint16, b []byte) (eb pbuf.Bytes) { - if len(b) == 0 || teatype >= 32 { - return - } - if l.keys[0] == nil { - return pbuf.ParseBytes(b...).Copy() - } - aead := l.keys[teatype] - if aead == nil { - logrus.Warnln("[crypto] cipher key at index", teatype, "is empty") - return - } - eb = encode(aead, additional, b) - return + return algo.RandKeyIndex() } // decode by aead and put b into pool @@ -81,91 +25,15 @@ func (l *Link) decode(teatype uint8, additional uint16, b []byte) (db pbuf.Bytes if aead == nil { return } - return decode(aead, additional, b) -} - -func encode(aead cipher.AEAD, additional uint16, b []byte) pbuf.Bytes { - nsz := aead.NonceSize() - // Accocate capacity for all the stuffs. - buf := pbuf.NewBytes(2 + nsz + len(b) + aead.Overhead()) - binary.LittleEndian.PutUint16(buf.Bytes()[:2], additional) - nonce := buf.Bytes()[2 : 2+nsz] - // Select a random nonce - _, err := rand.Read(nonce) - if err != nil { - panic(err) - } - // Encrypt the message and append the ciphertext to the nonce. - eb := aead.Seal(nonce[nsz:nsz], nonce, b, buf.Bytes()[:2]) - return buf.Trans().Slice(2, 2+nsz+len(eb)) -} - -func decode(aead cipher.AEAD, additional uint16, b []byte) (pbuf.Bytes, error) { - nsz := aead.NonceSize() - if len(b) < nsz { - return pbuf.Bytes{}, ErrCipherTextTooShort - } - // Split nonce and ciphertext. - nonce, ciphertext := b[:nsz], b[nsz:] - if len(ciphertext) == 0 { - return pbuf.Bytes{}, nil - } - // Decrypt the message and check it wasn't tampered with. - var buf [2]byte - binary.LittleEndian.PutUint16(buf[:], additional) - data, err := aead.Open( - pbuf.NewBytes(4096).Trans().Bytes()[:0], - nonce, ciphertext, buf[:], - ) - if err != nil { - return pbuf.Bytes{}, nil - } - return pbuf.ParseBytes(data...), nil + return algo.DecodeAEAD(aead, additional, b) } // xorenc 按 8 字节, 以初始 m.mask 循环异或编码 data func (m *Me) xorenc(data []byte, seq uint32) pbuf.Bytes { - batchsz := len(data) / 8 - remain := len(data) % 8 - sum := m.mask - newdat := pbuf.NewBytes(8 + batchsz*8 + 8) // seqrand dat tail - binary.LittleEndian.PutUint32(newdat.Bytes()[:4], seq) - _, _ = rand.Read(newdat.Bytes()[4:8]) // seqrand - sum ^= binary.LittleEndian.Uint64(newdat.Bytes()[:8]) // init from seqrand - binary.LittleEndian.PutUint64(newdat.Bytes()[:8], sum) - for i := 0; i < batchsz; i++ { // range on batch data - a := i * 8 - b := (i + 1) * 8 - sum ^= binary.LittleEndian.Uint64(data[a:b]) - binary.LittleEndian.PutUint64(newdat.Bytes()[a+8:b+8], sum) - } - p := batchsz * 8 - copy(newdat.Bytes()[8+p:], data[p:]) - newdat.Bytes()[newdat.Len()-1] = byte(remain) - sum ^= binary.LittleEndian.Uint64(newdat.Bytes()[8+p:]) - binary.LittleEndian.PutUint64(newdat.Bytes()[8+p:], sum) - return newdat + return algo.EncodeXOR(data, m.mask, seq) } // xordec 按 8 字节, 以初始 m.mask 循环异或解码 data func (m *Me) xordec(data []byte) (uint32, []byte) { - if len(data) < 16 || len(data)%8 != 0 { - return 0, nil - } - batchsz := len(data) / 8 - sum := m.mask - for i := 0; i < batchsz; i++ { - a := i * 8 - b := (i + 1) * 8 - x := binary.LittleEndian.Uint64(data[a:b]) - sum ^= x - binary.LittleEndian.PutUint64(data[a:b], sum) - sum = x - } - remain := data[len(data)-1] - if remain >= 8 { - return 0, nil - } - return binary.LittleEndian.Uint32(data[:4]), - data[8 : len(data)-8+int(remain)] + return algo.DecodeXOR(data, m.mask) } diff --git a/gold/link/event.go b/gold/link/event.go new file mode 100644 index 0000000..eea7451 --- /dev/null +++ b/gold/link/event.go @@ -0,0 +1,22 @@ +package link + +import ( + "strconv" + + "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/orbyte/pbuf" +) + +// 事件分发器 +var dispachers map[uint8]EventDispacher = make(map[uint8]EventDispacher) + +type EventDispacher func(header *head.Packet, peer *Link, data pbuf.Bytes) + +// AddProto is thread unsafe. Use in init() only. +func AddProto(p uint8, d EventDispacher) { + _, ok := dispachers[p] + if ok { + panic("proto " + strconv.Itoa(int(p)) + " has been registered") + } + dispachers[p] = d +} diff --git a/gold/link/link.go b/gold/link/link.go index 8e8a7d0..16eb55a 100644 --- a/gold/link/link.go +++ b/gold/link/link.go @@ -7,17 +7,24 @@ import ( "sync/atomic" "time" + "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/head" "github.com/fumiama/WireGold/gold/p2p" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/bin" base14 "github.com/fumiama/go-base16384" - "github.com/fumiama/orbyte" + "github.com/fumiama/orbyte/pbuf" + "github.com/sirupsen/logrus" ) var ( ErrPerrNotExist = errors.New("peer not exist") ) +type LinkData struct { + H head.Packet + D pbuf.Bytes +} + // Link 是本机到 peer 的连接抽象 type Link struct { // peer 的公钥 @@ -27,7 +34,7 @@ type Link struct { // 收到的包的队列 // 没有下层 nic 时 // 包会分发到此 - pipe chan *orbyte.Item[head.Packet] + pipe chan LinkData // peer 的虚拟 ip peerip net.IP // peer 的公网 endpoint @@ -63,11 +70,55 @@ func (m *Me) Connect(peer string) (*Link, error) { return nil, ErrPerrNotExist } +func (l *Link) ToLower(header *head.Packet, data pbuf.Bytes) { + if l.pipe != nil { + l.pipe <- LinkData{ + H: *header, + D: data, + } + if config.ShowDebugLog { + logrus.Debugln("[listen] deliver to pipe of", l.peerip) + } + return + } + var err error + data.V(func(b []byte) { + _, err = l.me.nic.Write(b) + }) + if err != nil { + logrus.Errorln("[listen] deliver", data.Len(), "bytes data to nic err:", err) + } else if config.ShowDebugLog { + logrus.Debugln("[listen] deliver", data.Len(), "bytes data to nic") + } +} + // Close 关闭到 peer 的连接 func (l *Link) Close() { l.Destroy() } +// IP is wiregold peer ip +func (l *Link) IP() net.IP { + return l.peerip +} + +// RawEndPoint is initial ep in cfg +func (l *Link) RawEndPoint() string { + return l.rawep +} + +func (l *Link) EndPoint() p2p.EndPoint { + return l.endpoint +} + +func (l *Link) SetEndPoint(ep p2p.EndPoint) { + l.endpoint = ep +} + +func (l *Link) Me() *Me { + return l.me +} + // Destroy 从 connections 移除 peer func (l *Link) Destroy() { l.me.connmapmu.Lock() @@ -80,7 +131,7 @@ func (l *Link) String() (n string) { if l.pubk != nil { b, err := base14.UTF16BE2UTF8(base14.Encode(l.pubk[:7])) if err == nil { - n = helper.BytesToString(b) + n = bin.BytesToString(b) } else { n = err.Error() } diff --git a/gold/link/listen.go b/gold/link/listen.go index 53a13e2..e524a0d 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -3,7 +3,6 @@ package link import ( "errors" "net" - "runtime" "strconv" "sync/atomic" "time" @@ -14,8 +13,9 @@ import ( "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/head" "github.com/fumiama/WireGold/gold/p2p" - "github.com/fumiama/WireGold/helper" - "github.com/fumiama/orbyte" + "github.com/fumiama/WireGold/internal/algo" + "github.com/fumiama/WireGold/internal/bin" + "github.com/fumiama/WireGold/internal/file" "github.com/fumiama/orbyte/pbuf" ) @@ -30,10 +30,16 @@ func (m *Me) listen() (conn p2p.Conn, err error) { m.ep = conn.LocalAddr() logrus.Infoln("[listen] at", m.ep) go func() { + var ( + n int + addr p2p.EndPoint + err error + ) for { lbf := pbuf.NewBytes(lstnbufgragsz) - n, addr, err := conn.ReadFromPeer(lbf.Bytes()) - lbf.KeepAlive() + lbf.V(func(b []byte) { + n, addr, err = conn.ReadFromPeer(b) + }) if m.connections == nil || errors.Is(err, net.ErrClosed) { logrus.Warnln("[listen] quit listening") return @@ -69,36 +75,40 @@ func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) { atomic.StoreUint64(&m.recvtotlcnt, 0) atomic.StoreInt64(&m.recvlooptime, now) } - packet := m.wait(buf.SliceTo(n).Bytes()) - buf.KeepAlive() - if packet == nil { - if config.ShowDebugLog { - logrus.Debugln("[listen] queue waiting") + buf.V(func(b []byte) { + h := m.wait(b[:n]) + if !h.HasInit() { + if config.ShowDebugLog { + logrus.Debugln("[listen] queue waiting") + } + return } - return - } - if config.ShowDebugLog { - logrus.Debugln("[listen] dispatch", len(packet.Pointer().UnsafeBody()), "bytes packet") - } - m.dispatch(packet, addr) + h.B(func(b []byte, p *head.Packet) { + if config.ShowDebugLog { + logrus.Debugln("[listen] dispatch", len(b), "bytes packet") + } + m.dispatch(p, b, addr) + }) + }) } -func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint) { - pp := packet.Pointer - r := pp().Len() - pp().BodyLen() +func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) { + r := header.Size() - len(body) if r > 0 { - logrus.Warnln("[listen] packet from endpoint", addr, "len", pp().BodyLen(), "is smaller than it declared len", pp().Len(), ", drop it") + logrus.Warnln("[listen] packet from endpoint", addr, "len", len(body), "is smaller than it declared len", header.Size(), ", drop it") return } - p, ok := m.IsInPeer(pp().Src.String()) + srcip := header.Src() + dstip := header.Dst() + p, ok := m.IsInPeer(srcip.String()) if config.ShowDebugLog { - logrus.Debugln("[listen] recv from endpoint", addr, "src", pp().Src, "dst", pp().Dst) + logrus.Debugln("[listen] recv from endpoint", addr, "src", srcip, "dst", dstip) } if !ok { - logrus.Warnln("[listen] packet from", pp().Src, "to", pp().Dst, "is refused") + logrus.Warnln("[listen] packet from", srcip, "to", dstip, "is refused") return } - if helper.IsNilInterface(p.endpoint) || !p.endpoint.Euqal(addr) { + if bin.IsNilInterface(p.endpoint) || !p.endpoint.Euqal(addr) { if m.ep.Network() == "tcp" && !addr.Euqal(p.endpoint) { logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String()) p.endpoint = addr @@ -110,22 +120,41 @@ func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint) { now := time.Now() atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.lastalive)), unsafe.Pointer(&now)) switch { - case p.IsToMe(pp().Dst): - if !p.Accept(pp().Src) { - logrus.Warnln("[listen] refused packet from", pp().Src.String()+":"+strconv.Itoa(int(pp().SrcPort))) + case p.IsToMe(dstip): + if !p.Accept(srcip) { + logrus.Warnln("[listen] refused packet from", srcip.String()+":"+strconv.Itoa(int(header.SrcPort))) return } - addt := pp().AdditionalData() + addt := header.AdditionalData() var err error - data, err := p.decode(pp().CipherIndex(), addt, pp().TransBody().Bytes()) + data, err := p.decode(header.CipherIndex(), addt, body) if err != nil { if config.ShowDebugLog { - logrus.Debugln("[listen] drop invalid packet key idx:", pp().CipherIndex(), "addt:", addt, "err:", err) + logrus.Debugln("[listen] drop invalid packet key idx:", header.CipherIndex(), "addt:", addt, "err:", err) } return } + if data.Len() < 8 { + if config.ShowDebugLog { + logrus.Debugln("[listen] drop invalid data len packet key idx:", header.CipherIndex(), "addt:", addt, "len", data.Len()) + } + return + } + ok := false + data.V(func(b []byte) { + ok = algo.IsVaildBlake2bHash8(header.PreCRC64(), b) + }) + if !ok { + if config.ShowDebugLog { + logrus.Debugln("[listen] drop invalid hash packet") + } + return + } + data = data.SliceFrom(8) if p.usezstd { - dat, err := decodezstd(data.Trans().Bytes()) + data.V(func(b []byte) { + data, err = algo.DecodeZstd(b) // skip hash + }) if err != nil { if config.ShowDebugLog { logrus.Debugln("[listen] drop invalid zstd packet:", err) @@ -133,78 +162,32 @@ func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint) { return } if config.ShowDebugLog { - logrus.Debugln("[listen] zstd decoded len:", dat.Len()) + logrus.Debugln("[listen] zstd decoded len:", data.Len()) } - data = dat } - pp().SetBody(data) - if !pp().IsVaildHash() { - if config.ShowDebugLog { - logrus.Debugln("[listen] drop invalid hash packet") - } + fn, ok := dispachers[header.Proto.Proto()] + if !ok { + logrus.Warnln(file.Header(), "unsupported proto", header.Proto.Proto()) return } - switch pp().Proto { - case head.ProtoHello: - switch { - case len(pp().UnsafeBody()) == 0: - logrus.Warnln("[listen] recv old hello packet, do nothing") - case pp().UnsafeBody()[0] == byte(head.HelloPing): - n, err := p.WritePacket(head.NewPacketPartial( - head.ProtoHello, m.SrcPort(), p.peerip, m.DstPort(), pbuf.ParseBytes(byte(head.HelloPong))), false) - if err == nil { - logrus.Infoln("[listen] recv hello, send", n, "bytes hello ack packet") - } else { - logrus.Errorln("[listen] send hello ack packet error:", err) - } - default: - logrus.Infoln("[listen] recv hello ack packet, do nothing") - } - case head.ProtoNotify: - logrus.Infoln("[listen] recv notify from", pp().Src) - p.onNotify(pp().UnsafeBody()) - runtime.KeepAlive(packet) - case head.ProtoQuery: - logrus.Infoln("[listen] recv query from", pp().Src) - p.onQuery(pp().UnsafeBody()) - runtime.KeepAlive(packet) - case head.ProtoData: - if p.pipe != nil { - p.pipe <- packet.Copy() - if config.ShowDebugLog { - logrus.Debugln("[listen] deliver to pipe of", p.peerip) - } - } else { - _, err := m.nic.Write(pp().UnsafeBody()) - if err != nil { - logrus.Errorln("[listen] deliver", pp().BodyLen(), "bytes data to nic err:", err) - } else if config.ShowDebugLog { - logrus.Debugln("[listen] deliver", pp().BodyLen(), "bytes data to nic") - } - } - default: - logrus.Warnln("[listen] recv unknown proto:", pp().Proto) - } - case p.Accept(pp().Dst): + fn(header, p, data) + return + case p.Accept(dstip): //TODO: 移除此处转发, 将转发放到 wait if !p.allowtrans { - logrus.Warnln("[listen] refused to trans packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort))) + logrus.Warnln("[listen] refused to trans packet to", dstip.String()+":"+strconv.Itoa(int(header.DstPort))) return } // 转发 - lnk := m.router.NextHop(pp().Dst.String()) + lnk := m.router.NextHop(dstip.String()) if lnk == nil { logrus.Warnln("[listen] transfer drop packet: nil nexthop") return } - n, err := lnk.WritePacket(packet, true) - if err == nil { - if config.ShowDebugLog { - logrus.Debugln("[listen] trans", n, "bytes packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort))) - } - } else { - logrus.Errorln("[listen] trans packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)), "err:", err) + lnk.WritePacket(head.ProtoTrans, body) + if config.ShowDebugLog { + logrus.Debugln("[listen] trans", len(body), "bytes body to", dstip.String()+":"+strconv.Itoa(int(header.DstPort))) } default: - logrus.Warnln("[listen] packet dst", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)), "is not in peers") + logrus.Warnln("[listen] packet dst", dstip.String()+":"+strconv.Itoa(int(header.DstPort)), "is not in peers") } } diff --git a/gold/link/me.go b/gold/link/me.go index a89664f..e141077 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -10,7 +10,6 @@ import ( "time" "github.com/FloatTech/ttl" - "github.com/fumiama/orbyte" "github.com/fumiama/orbyte/pbuf" "github.com/fumiama/water/waterutil" "github.com/sirupsen/logrus" @@ -18,7 +17,7 @@ import ( "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/head" "github.com/fumiama/WireGold/gold/p2p" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/bin" "github.com/fumiama/WireGold/lower" ) @@ -48,9 +47,9 @@ type Me struct { // 本机路由表 router *Router // 本机未接收完全分片池 - recving *ttl.Cache[uint64, *orbyte.Item[head.Packet]] + recving *ttl.Cache[uint16, head.PacketBytes] // 抗重放攻击记录池 - recved *ttl.Cache[uint64, struct{}] + recved *ttl.Cache[uint32, struct{}] // 本机上层配置 srcport, dstport, mtu, speedloop uint16 // 报头掩码 @@ -122,11 +121,11 @@ func NewMe(cfg *MyConfig) (m Me) { m.router.SetDefault(nil) m.srcport = cfg.SrcPort m.dstport = cfg.DstPort - m.mtu = (cfg.MTU - head.PacketHeadLen) & 0xfff8 + m.mtu = cfg.MTU if cfg.NICConfig != nil { m.nic = lower.NewNIC( cfg.NICConfig.IP, cfg.NICConfig.SubNet, - strconv.FormatUint(uint64(m.MTU()), 10), cfg.NICConfig.CIDRs..., + strconv.FormatUint(uint64(m.mtu), 10), cfg.NICConfig.CIDRs..., ) } m.mask = cfg.Mask @@ -135,8 +134,8 @@ func NewMe(cfg *MyConfig) (m Me) { var buf [8]byte binary.BigEndian.PutUint64(buf[:], m.mask) logrus.Infoln("[me] xor mask", hex.EncodeToString(buf[:])) - m.recving = ttl.NewCache[uint64, *orbyte.Item[head.Packet]](time.Second * 10) - m.recved = ttl.NewCache[uint64, struct{}](time.Minute) + m.recving = ttl.NewCache[uint16, head.PacketBytes](time.Second * 10) + m.recved = ttl.NewCache[uint32, struct{}](time.Minute) return } @@ -144,7 +143,7 @@ func NewMe(cfg *MyConfig) (m Me) { func (m *Me) Restart() error { oldconn := m.conn m.conn = nil - if helper.IsNonNilInterface(oldconn) { + if bin.IsNonNilInterface(oldconn) { _ = oldconn.Close() } var err error @@ -184,9 +183,13 @@ func (m *Me) EndPoint() p2p.EndPoint { return m.ep } +func (m *Me) NetworkConfigs() []any { + return m.networkconfigs +} + func (m *Me) Close() error { m.connections = nil - if helper.IsNonNilInterface(m.conn) { + if bin.IsNonNilInterface(m.conn) { _ = m.conn.Close() m.conn = nil } @@ -291,12 +294,11 @@ func (m *Me) sendAllSameDst(packet []byte) (n int) { return } pcp := pbuf.NewBytes(len(packet)) - copy(pcp.Bytes(), packet) - go func(packet pbuf.Bytes) { - _, err := lnk.WritePacket(head.NewPacketPartial(head.ProtoData, m.SrcPort(), lnk.peerip, m.DstPort(), packet), false) - if err != nil { - logrus.Warnln("[me] write to peer", lnk.peerip, "err:", err) - } - }(pcp) + pcp.V(func(b []byte) { + copy(b, packet) + }) + go pcp.V(func(b []byte) { + lnk.WritePacket(head.ProtoData, b) + }) return } diff --git a/gold/link/nat.go b/gold/link/nat.go index 1178adf..95ba55b 100644 --- a/gold/link/nat.go +++ b/gold/link/nat.go @@ -1,7 +1,6 @@ package link import ( - "bytes" "encoding/json" "sync/atomic" "time" @@ -9,12 +8,8 @@ import ( "github.com/sirupsen/logrus" - "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/head" - "github.com/fumiama/WireGold/gold/p2p" - "github.com/fumiama/WireGold/helper" - "github.com/fumiama/orbyte" - "github.com/fumiama/orbyte/pbuf" + "github.com/fumiama/WireGold/internal/file" ) // 保持 NAT @@ -22,7 +17,7 @@ import ( // 以秒为单位,小于等于 0 不发送 func (l *Link) keepAlive(dur int64) { if dur > 0 { - logrus.Infoln("[nat] start to keep alive") + logrus.Infoln(file.Header(), "start to keep alive") t := time.NewTicker(time.Second * time.Duration(dur)) for range t.C { if l.me.connections == nil { @@ -30,123 +25,22 @@ func (l *Link) keepAlive(dur int64) { } la := (*time.Time)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&l.lastalive)))) if la != nil && time.Since(*la) > 10*time.Second*time.Duration(dur) { // 可能已经被阻断, 断开重连 - logrus.Warnln("[nat] no response after 10 keep alive tries, re-connecting...") + logrus.Warnln(file.Header(), "no response after 10 keep alive tries, re-connecting...") err := l.me.Restart() if err != nil { - logrus.Errorln("[nat] re-connect me err:", err) + logrus.Errorln(file.Header(), "re-connect me err:", err) } else { - logrus.Infoln("[nat] re-connect me succeeded") + logrus.Infoln(file.Header(), "re-connect me succeeded") } } - n, err := l.WritePacket(head.NewPacketPartial(head.ProtoHello, l.me.srcport, l.peerip, l.me.dstport, pbuf.ParseBytes(byte(head.HelloPing))), false) - if err == nil { - logrus.Infoln("[nat] send", n, "bytes keep alive packet") - } else { - logrus.Warnln("[nat] send keep alive packet error:", err) - } - } - } -} - -// 收到通告包的处理函数 -func (l *Link) onNotify(packet []byte) { - // TODO: 完成data解包与endpoint注册 - // 1. Data 解包 - // ---- 使用 head.Notify 解释 packet - notify := make(head.Notify, 32) - err := json.Unmarshal(packet, ¬ify) - if err != nil { - logrus.Errorln("[nat] notify json unmarshal err:", err) - return - } - // 2. endpoint注册 - // ---- 遍历 Notify,注册对方的 endpoint 到 - // ---- connections,注意使用读写锁connmapmu - for peer, ep := range notify { - nw, epstr := ep[0], ep[1] - if nw != l.me.ep.Network() { - logrus.Warnln("[nat] ignore different network notify", nw, "addr", epstr) - continue - } - addr, err := p2p.NewEndPoint(nw, epstr, l.me.networkconfigs...) - if err == nil { - p, ok := l.me.IsInPeer(peer) - if ok { - if helper.IsNilInterface(p.endpoint) || !p.endpoint.Euqal(addr) { - p.endpoint = addr - logrus.Infoln("[nat] notify set ep of peer", peer, "to", ep) - } - continue - } - } - if config.ShowDebugLog { - logrus.Debugln("[nat] notify drop invalid peer:", peer, "ep:", ep) - } - } -} - -// 收到询问包的处理函数 -func (l *Link) onQuery(packet []byte) { - // 完成data解包与notify分发 - - // 1. Data 解包 - // ---- 使用 head.Query 解释 packet - // ---- 根据 Query 确定需要封装的 Notify - var peers head.Query - err := json.Unmarshal(packet, &peers) - if err != nil { - logrus.Errorln("[nat] query json unmarshal err:", err) - return - } - - if l == nil || l.me == nil { - logrus.Errorln("[nat] nil link/me") - return - } - - // 2. notify分发 - // ---- 封装 Notify 到 新的 packet - // ---- 调用 l.Send 发送到对方 - notify := make(head.Notify, len(peers)) - for _, p := range peers { - lnk, ok := l.me.IsInPeer(p) - eps := "" - if l.me.ep.Network() == "udp" { // udp has real p2p - if helper.IsNilInterface(lnk.endpoint) { - continue - } - eps = lnk.endpoint.String() - } - if eps == "" { - eps = l.rawep // use registered ep only - } - if eps == "" { - continue - } - if ok && helper.IsNonNilInterface(lnk.endpoint) { - notify[p] = [2]string{ - lnk.endpoint.Network(), - eps, - } - } - } - if len(notify) > 0 { - logrus.Infoln("[nat] query wrap", len(notify), "notify") - w := helper.SelectWriter() - _ = json.NewEncoder(w).Encode(¬ify) - _, err = l.WritePacket(head.NewPacketPartial( - head.ProtoNotify, l.me.srcport, l.peerip, l.me.dstport, - pbuf.BufferItemToBytes((*orbyte.Item[bytes.Buffer])(w).Trans()), - ), false) - if err != nil { - logrus.Errorln("[nat] notify peer", l, "err:", err) - return + l.WritePacket(head.ProtoHello, []byte{byte(head.HelloPing)}) + logrus.Infoln(file.Header(), "send keep alive to", l.peerip) } } } // sendquery 主动发起查询,询问对方是否可以到达 peers -func (l *Link) sendquery(tick time.Duration, peers ...string) { +func (l *Link) sendQuery(tick time.Duration, peers ...string) { if len(peers) == 0 { return } @@ -156,13 +50,7 @@ func (l *Link) sendquery(tick time.Duration, peers ...string) { } t := time.NewTicker(tick) for range t.C { - logrus.Infoln("[nat] query send query to", l.peerip) - _, err = l.WritePacket(head.NewPacketPartial( - head.ProtoQuery, l.me.srcport, l.peerip, l.me.dstport, - pbuf.ParseBytes(data...), - ), false) - if err != nil { - logrus.Errorln("[nat] query write err:", err) - } + l.WritePacket(head.ProtoQuery, data) + logrus.Infoln(file.Header(), "send query to", l.peerip) } } diff --git a/gold/link/peer.go b/gold/link/peer.go index 864adaf..d662714 100644 --- a/gold/link/peer.go +++ b/gold/link/peer.go @@ -4,10 +4,9 @@ import ( "net" "time" - "github.com/fumiama/WireGold/gold/head" "github.com/fumiama/WireGold/gold/p2p" + "github.com/fumiama/WireGold/internal/algo" curve "github.com/fumiama/go-x25519" - "github.com/fumiama/orbyte" "github.com/sirupsen/logrus" "golang.org/x/crypto/chacha20poly1305" ) @@ -50,7 +49,7 @@ func (m *Me) AddPeer(cfg *PeerConfig) (l *Link) { } if !cfg.NoPipe { - l.pipe = make(chan *orbyte.Item[head.Packet], 65536) + l.pipe = make(chan LinkData, 4096) } var k, p []byte if cfg.PubicKey != nil { @@ -62,7 +61,7 @@ func (m *Me) AddPeer(cfg *PeerConfig) (l *Link) { if len(k) == 32 { var err error if len(p) == 32 { - mixk := mixkeys(k, p) + mixk := algo.MixKeys(k, p) for i := range k { l.keys[i], err = chacha20poly1305.NewX(mixk[i : i+32]) if err != nil { @@ -113,7 +112,7 @@ func (m *Me) AddPeer(cfg *PeerConfig) (l *Link) { } logrus.Infoln("[peer] add peer:", cfg.PeerIP, "allow:", cfg.AllowedIPs) go l.keepAlive(cfg.KeepAliveDur) - go l.sendquery(time.Second*time.Duration(cfg.QueryTick), cfg.Querys...) + go l.sendQuery(time.Second*time.Duration(cfg.QueryTick), cfg.Querys...) return } diff --git a/gold/link/recv.go b/gold/link/recv.go index 89cd1ca..b405705 100644 --- a/gold/link/recv.go +++ b/gold/link/recv.go @@ -2,31 +2,29 @@ package link import ( "bytes" - "encoding/binary" "encoding/hex" - "hash/crc64" "io" "strconv" "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/head" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/bin" base14 "github.com/fumiama/go-base16384" - "github.com/fumiama/orbyte" "github.com/sirupsen/logrus" ) // Read 从 peer 收包 -func (l *Link) Read() *orbyte.Item[head.Packet] { +func (l *Link) Read() LinkData { return <-l.pipe } -func (m *Me) wait(data []byte) *orbyte.Item[head.Packet] { - if len(data) < head.PacketHeadLen { // not a valid packet +// wait TODO: 判断是否为 trans 并提前 call dispatch +func (m *Me) wait(data []byte) (h head.PacketBytes) { + if len(data) < int(head.PacketHeadLen)+8 { // not a valid packet if config.ShowDebugLog { logrus.Debugln("[recv] invalid data len", len(data)) } - return nil + return } bound := 64 endl := "..." @@ -38,15 +36,15 @@ func (m *Me) wait(data []byte) *orbyte.Item[head.Packet] { logrus.Debugln("[recv] data bytes, len", len(data), "val", hex.EncodeToString(data[:bound]), endl) } if m.base14 { - w := helper.SelectWriter() + w := bin.SelectWriter() _, err := io.Copy(w, base14.NewDecoder(bytes.NewReader(data))) if err != nil { // not a valid packet if config.ShowDebugLog { logrus.Debugln("[recv] decode base14 err:", err) } - return nil + return } - data = w.TransUnderlyingBytes() + data = w.ToBytes().Trans() if len(data) < bound { bound = len(data) endl = "." @@ -54,11 +52,11 @@ func (m *Me) wait(data []byte) *orbyte.Item[head.Packet] { if config.ShowDebugLog { logrus.Debugln("[recv] data b14ed, len", len(data), "val", hex.EncodeToString(data[:bound]), endl) } - if len(data) < head.PacketHeadLen { // not a valid packet + if len(data) < int(head.PacketHeadLen)+8 { // not a valid packet if config.ShowDebugLog { logrus.Debugln("[recv] invalid data len", len(data)) } - return nil + return } } seq, data := m.xordec(data) // inplace decoding @@ -74,68 +72,65 @@ func (m *Me) wait(data []byte) *orbyte.Item[head.Packet] { if config.ShowDebugLog { logrus.Debugln("[recv] invalid packet header:", err) } - return nil - } - if !header.Pointer().Flags.IsValid() { - if config.ShowDebugLog { - logrus.Debugln("[recv] drop invalid flags packet:", header.Pointer().Flags) - } - return nil - } - crc := header.Pointer().CRC64() - crclog := crc - crc ^= (uint64(seq) << 16) - if config.ShowDebugLog { - logrus.Debugf("[recv] packet crc %016x, seq %08x, xored crc %016x", crclog, seq, crc) - } - if _, got := m.recved.GetOrSet(crc, struct{}{}); got { - if config.ShowDebugLog { - logrus.Debugln("[recv] ignore duplicated crc packet", strconv.FormatUint(crc, 16)) - } - return nil + return } if config.ShowDebugLog { - logrus.Debugln( - "[recv]", strconv.FormatUint(crc, 16), - len(data), "bytes data with flag", header.Pointer().Flags, - "offset", header.Pointer().Flags.Offset(), - ) + logrus.Debugf("[recv] packet seq %08x", seq) } - if header.Pointer().Flags.IsSingle() || header.Pointer().Flags.NoFrag() { - ok := header.Pointer().ParseData(data) - if !ok { - logrus.Errorln("[recv]", strconv.FormatUint(crc, 16), "unexpected !ok") - return nil - } + if _, got := m.recved.GetOrSet(seq, struct{}{}); got { if config.ShowDebugLog { - logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), len(data), "bytes full data waited") + logrus.Debugln("[recv] ignore duplicated seq packet", strconv.FormatUint(uint64(seq), 16)) } - return header + return + } + if config.ShowDebugLog { + header.B(func(_ []byte, p *head.Packet) { + logrus.Debugln( + "[recv]", strconv.FormatUint(uint64(seq), 16), + len(data), "bytes data with protoflag", p.Proto, + "offset", p.Offset, + ) + }) } - crchash := crc64.New(crc64.MakeTable(crc64.ISO)) - _, _ = crchash.Write(head.Hash(data)) - var buf [4]byte - binary.LittleEndian.PutUint32(buf[:], seq) - _, _ = crchash.Write(buf[:]) - hsh := crchash.Sum64() - h, got := m.recving.GetOrSet(hsh, header) + header.B(func(buf []byte, p *head.Packet) { + if !p.Proto.HasMore() { + ok := p.WriteDataSegment(data, buf) + if !ok { + logrus.Errorln("[recv]", strconv.FormatUint(uint64(seq), 16), "unexpected !ok") + return + } + if config.ShowDebugLog { + logrus.Debugln("[recv]", strconv.FormatUint(uint64(seq), 16), len(data), "bytes full data waited") + } + h = header + return + } + }) + + if h.HasInit() { + return + } + + h, got := m.recving.GetOrSet(uint16(seq), header) if got && h == header { panic("unexpected multi-put found") } if config.ShowDebugLog { - logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), "get frag part of", strconv.FormatUint(hsh, 16), "isnew:", !got) + logrus.Debugln("[recv]", strconv.FormatUint(uint64(seq&0xffff), 16), "get frag part isnew:", !got) } - ok := h.Pointer().ParseData(data) - if !ok { - if config.ShowDebugLog { - logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), "wait other frag parts of", strconv.FormatUint(hsh, 16), "isnew:", !got) + h.B(func(buf []byte, p *head.Packet) { + ok := p.WriteDataSegment(data, buf) + if !ok { + if config.ShowDebugLog { + logrus.Debugln("[recv]", strconv.FormatUint(uint64(seq&0xffff), 16), "wait other frag parts isnew:", !got) + } + return } - return nil - } - m.recving.Delete(hsh) - if config.ShowDebugLog { - logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), "all parts of", strconv.FormatUint(hsh, 16), "has reached") - } - return h + m.recving.Delete(uint16(seq)) + if config.ShowDebugLog { + logrus.Debugln("[recv]", strconv.FormatUint(uint64(seq&0xffff), 16), "all parts has reached") + } + }) + return } diff --git a/gold/link/send.go b/gold/link/send.go index 1aa686b..2bf659e 100644 --- a/gold/link/send.go +++ b/gold/link/send.go @@ -5,18 +5,15 @@ import ( "encoding/binary" "encoding/hex" "errors" - "fmt" "io" "math/rand" - "runtime" "github.com/sirupsen/logrus" "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/head" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/bin" base14 "github.com/fumiama/go-base16384" - "github.com/fumiama/orbyte" "github.com/fumiama/orbyte/pbuf" ) @@ -32,137 +29,112 @@ func randseq(i uint16) uint32 { return binary.BigEndian.Uint32(buf[:]) } -// WritePacket 向 peer 发包 -func (l *Link) WritePacket(p *orbyte.Item[head.Packet], istransfer bool) (n int, err error) { - pp := p.Pointer() +// WritePacket 基于 data 向 peer 发包 +// +// data 可为空, 因为不保证可达所以不返回错误。 +func (l *Link) WritePacket(proto uint8, data []byte) { teatype := l.randkeyidx() sndcnt := uint16(l.incgetsndcnt()) - seq := randseq(sndcnt) mtu := l.mtu if l.mturandomrange > 0 { mtu -= uint16(rand.Intn(int(l.mturandomrange))) } if config.ShowDebugLog { - logrus.Debugln("[send] mtu:", mtu, ", addt:", sndcnt&0x07ff, ", key index:", teatype) + logrus.Debugln("[send] write mtu:", mtu, ", addt:", sndcnt&0x07ff, ", key index:", teatype) } - if !istransfer { - l.encrypt(pp, sndcnt, teatype) - } - delta := (int(mtu) - head.PacketHeadLen) & 0x0000fff8 - if delta <= 0 { - logrus.Warnln("[send] reset invalid data frag len", delta, "to 8") - delta = 8 - } - remlen := pp.BodyLen() - if remlen <= delta { - return l.write(p, teatype, sndcnt, uint32(remlen), 0, istransfer, false, seq) - } - if istransfer && pp.Flags.DontFrag() && remlen > delta { - return 0, ErrDropBigDontFragPkt - } - ttl := pp.TTL - totl := uint32(remlen) - pos := 0 - packet := head.ParsePacket(pp.ShallowCopy()) - for remlen > delta { - remlen -= delta - if config.ShowDebugLog { - logrus.Debugln("[send] split frag [", pos, "~", pos+delta, "], remain:", remlen) - } - packet.Pointer().CropBody(pos, pos+delta) - cnt, err := l.write(packet, teatype, sndcnt, totl, uint16(pos>>3), istransfer, true, seq) - n += cnt - if err != nil { - return n, err - } - packet.Pointer().TTL = ttl - pos += delta - } - if remlen > 0 { - if config.ShowDebugLog { - logrus.Debugln("[send] last frag [", pos, "~", pos+remlen, "]") - } - pp.CropBody(pos, pos+remlen) - cnt := 0 - cnt, err = l.write(p, teatype, sndcnt, totl, uint16(pos>>3), istransfer, false, seq) - n += cnt - } - runtime.KeepAlive(p) - return n, err -} - -func (l *Link) encrypt(p *head.Packet, sndcnt uint16, teatype uint8) { - p.FillHash() - if config.ShowDebugLog { - logrus.Debugln("[send] data len before encrypt:", p.BodyLen()) - } - data := p.TransBody().Bytes() + pb := head.NewPacketBuilder(). + Src(l.me.me, l.me.srcport).Dst(l.peerip, l.me.dstport). + Proto(proto).TTL(64).With(data) if l.usezstd { - data = encodezstd(data).Trans().Bytes() - if config.ShowDebugLog { - logrus.Debugln("[send] data len after zstd:", len(data)) + pb.Zstd() + } + pb = pb.Hash() + var pktb *head.PacketBuilder + if l.keys[0] == nil { + pktb = pb.Plain(teatype, sndcnt&0x07ff) + } else { + pktb = pb.Seal(l.keys[teatype], teatype, sndcnt&0x07ff) + } + for _, b := range pktb.Split(int(mtu), false) { //TODO: impl. nofrag + go l.write2peer(head.BuildPacketFromBytes(b), randseq(sndcnt)) + } +} + +// write2peer 计算 xor + b14 后向 peer 发包 +// +// 因为不保证可达所以不返回错误。 +func (l *Link) write2peer(b pbuf.Bytes, seq uint32) { + if l.doublepacket { + _, err := l.write2peer1(b, seq) + if err != nil { + if config.ShowDebugLog { + logrus.Warnln("[send] double wr2peer", l.peerip, "err:", err) + } } } - p.SetBody(l.encode(teatype, sndcnt&0x07ff, data).Trans()) - if config.ShowDebugLog { - logrus.Debugln("[send] data len after xchacha20:", p.BodyLen(), "addt:", sndcnt) + _, err := l.write2peer1(b, seq) + if err != nil { + if config.ShowDebugLog { + logrus.Warnln("[send] wr2peer", l.peerip, "err:", err) + } } } -// write 向 peer 发包 -func (l *Link) write( - p *orbyte.Item[head.Packet], teatype uint8, additional uint16, - datasz uint32, offset uint16, istransfer, - hasmore bool, seq uint32, -) (int, error) { - if p.Pointer().DecreaseAndGetTTL() <= 0 { - return 0, ErrTTL - } - if l.doublepacket { - _, _ = l.writeonce(p, teatype, additional, datasz, offset, istransfer, hasmore, seq) - } - return l.writeonce(p, teatype, additional, datasz, offset, istransfer, hasmore, seq) -} - -// write 向 peer 发一个包 -func (l *Link) writeonce( - p *orbyte.Item[head.Packet], teatype uint8, additional uint16, - datasz uint32, offset uint16, - istransfer, hasmore bool, seq uint32, -) (int, error) { +// write2peer1 计算 xor + b14 后向 peer 发一个包 +func (l *Link) write2peer1(b pbuf.Bytes, seq uint32) (n int, err error) { peerep := l.endpoint - if helper.IsNilInterface(peerep) { - return 0, errors.New("nil endpoint of " + p.Pointer().Dst.String()) + if bin.IsNilInterface(peerep) { + return 0, errors.New("nil endpoint of " + l.peerip.String()) } - var d pbuf.Bytes - // TODO: now all packet allow frag, adapt to DF - if istransfer { - d = p.Pointer().MarshalWith(nil, 0, 0, 0, offset, false, hasmore) - } else { - d = p.Pointer().MarshalWith(l.me.me, teatype, additional, datasz, offset, false, hasmore) - } - - bound := 64 - endl := "..." - if d.Len() < bound { - bound = d.Len() - endl = "." - } conn := l.me.conn if conn == nil { return 0, io.ErrClosedPipe } - if config.ShowDebugLog { - logrus.Debugln("[send] write", d.Len(), "bytes data from ep", conn.LocalAddr(), "to", peerep, "offset", fmt.Sprintf("%04x", offset), "crc", fmt.Sprintf("%016x", p.Pointer().CRC64())) - logrus.Debugln("[send] data bytes", hex.EncodeToString(d.Bytes()[:bound]), endl) - } - d = l.me.xorenc(d.Bytes(), seq) + b.V(func(data []byte) { + if config.ShowDebugLog { + bound := 64 + endl := "..." + if len(data) < bound { + bound = len(data) + endl = "." + } + logrus.Debugln("[send] raw data bytes", hex.EncodeToString(data[:bound]), endl) + } + b = l.me.xorenc(data, seq) + if config.ShowDebugLog { + bound := 64 + endl := "..." + if b.Len() < bound { + bound = b.Len() + endl = "." + } + b.V(func(b []byte) { + logrus.Debugln("[send] xored data bytes", hex.EncodeToString(b[:bound]), endl) + }) + } + }) if l.me.base14 { - d = pbuf.ParseBytes(base14.Encode(d.Bytes())...) + b.V(func(data []byte) { + b = pbuf.ParseBytes(base14.Encode(data)...) + if config.ShowDebugLog { + bound := 64 + endl := "..." + if b.Len() < bound { + bound = b.Len() + endl = "." + } + b.V(func(b []byte) { + logrus.Debugln("[send] xored data bytes", hex.EncodeToString(b[:bound]), endl) + }) + } + }) } - if config.ShowDebugLog { - logrus.Debugln("[send] data xored", hex.EncodeToString(d.Bytes()[:bound]), endl) - } - return conn.WriteToPeer(d.Trans().Bytes(), peerep) + b.V(func(b []byte) { + if config.ShowDebugLog { + logrus.Debugln("[send] write", len(b), "bytes data from ep", conn.LocalAddr(), "to", peerep) + } + n, err = conn.WriteToPeer(b, peerep) + }) + return } diff --git a/gold/link/zstd.go b/gold/link/zstd.go deleted file mode 100644 index 380acdc..0000000 --- a/gold/link/zstd.go +++ /dev/null @@ -1,41 +0,0 @@ -package link - -import ( - "bytes" - "io" - - "github.com/fumiama/WireGold/helper" - "github.com/fumiama/orbyte/pbuf" - "github.com/klauspost/compress/zstd" -) - -func encodezstd(data []byte) pbuf.Bytes { - w := helper.SelectWriter() - enc, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedFastest)) - if err != nil { - panic(err) - } - _, err = io.Copy(enc, bytes.NewReader(data)) - if err != nil { - panic(err) - } - err = enc.Close() - if err != nil { - panic(err) - } - return w.TransBytes() -} - -func decodezstd(data []byte) (pbuf.Bytes, error) { - dec, err := zstd.NewReader(bytes.NewReader(data)) - if err != nil { - return pbuf.Bytes{}, err - } - w := helper.SelectWriter() - _, err = io.Copy(w, dec) - dec.Close() - if err != nil { - return pbuf.Bytes{}, err - } - return w.TransBytes(), nil -} diff --git a/gold/p2p/ip/init.go b/gold/p2p/ip/init.go index 66b075f..7cc76a9 100644 --- a/gold/p2p/ip/init.go +++ b/gold/p2p/ip/init.go @@ -5,7 +5,7 @@ import ( "net/netip" "github.com/fumiama/WireGold/gold/p2p" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/file" ) func NewEndpoint(endpoint string, configs ...any) (p2p.EndPoint, error) { @@ -27,7 +27,7 @@ func NewEndpoint(endpoint string, configs ...any) (p2p.EndPoint, error) { } func init() { - name := helper.FolderName() + name := file.FolderName() _, hasexist := p2p.Register(name, NewEndpoint) if hasexist { panic("network " + name + " has been registered") diff --git a/gold/p2p/tcp/init.go b/gold/p2p/tcp/init.go index 0e8f4e0..1d71586 100644 --- a/gold/p2p/tcp/init.go +++ b/gold/p2p/tcp/init.go @@ -6,7 +6,7 @@ import ( "time" "github.com/fumiama/WireGold/gold/p2p" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/file" ) type Config struct { @@ -41,7 +41,7 @@ func newEndpoint(endpoint string, configs ...any) (*EndPoint, error) { } func init() { - name := helper.FolderName() + name := file.FolderName() _, hasexist := p2p.Register(name, NewEndpoint) if hasexist { panic("network " + name + " has been registered") diff --git a/gold/p2p/tcp/pdu.go b/gold/p2p/tcp/pdu.go index 6346786..cc5f5af 100644 --- a/gold/p2p/tcp/pdu.go +++ b/gold/p2p/tcp/pdu.go @@ -8,7 +8,7 @@ import ( "time" "github.com/fumiama/WireGold/config" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/bin" "github.com/sirupsen/logrus" ) @@ -39,10 +39,10 @@ type packet struct { } func (p *packet) pack() *net.Buffers { - return &net.Buffers{magicbuf, helper.NewWriterF(func(w *helper.Writer) { + return &net.Buffers{magicbuf, bin.NewWriterF(func(w *bin.Writer) { w.WriteByte(byte(p.typ)) w.WriteUInt16(p.len) - }).Trans().Bytes(), p.dat} + }).Trans(), p.dat} } func (p *packet) Read(_ []byte) (int, error) { @@ -74,13 +74,13 @@ func (p *packet) ReadFrom(r io.Reader) (n int64, err error) { } p.typ = packetType(buf[0]) p.len = binary.LittleEndian.Uint16(buf[1:3]) - w := helper.SelectWriter() + w := bin.SelectWriter() copied, err := io.CopyN(w, r, int64(p.len)) n += copied if err != nil { return } - p.dat = w.TransUnderlyingBytes() + p.dat = w.ToBytes().Trans() return } diff --git a/gold/p2p/udp/init.go b/gold/p2p/udp/init.go index 9c2f92b..e49f6f7 100644 --- a/gold/p2p/udp/init.go +++ b/gold/p2p/udp/init.go @@ -5,7 +5,7 @@ import ( "net/netip" "github.com/fumiama/WireGold/gold/p2p" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/file" ) func NewEndpoint(endpoint string, _ ...any) (p2p.EndPoint, error) { @@ -17,7 +17,7 @@ func NewEndpoint(endpoint string, _ ...any) (p2p.EndPoint, error) { } func init() { - name := helper.FolderName() + name := file.FolderName() _, hasexist := p2p.Register(name, NewEndpoint) if hasexist { panic("network " + name + " has been registered") diff --git a/gold/p2p/udplite/init.go b/gold/p2p/udplite/init.go index a7896f1..117b67d 100644 --- a/gold/p2p/udplite/init.go +++ b/gold/p2p/udplite/init.go @@ -7,7 +7,7 @@ import ( "net/netip" "github.com/fumiama/WireGold/gold/p2p" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/file" ) func NewEndpoint(endpoint string, _ ...any) (p2p.EndPoint, error) { @@ -19,7 +19,7 @@ func NewEndpoint(endpoint string, _ ...any) (p2p.EndPoint, error) { } func init() { - name := helper.FolderName() + name := file.FolderName() _, hasexist := p2p.Register(name, NewEndpoint) if hasexist { panic("network " + name + " has been registered") diff --git a/gold/p2p/udplite/lite.go b/gold/p2p/udplite/lite.go index f0ab9d5..5b4f3a0 100644 --- a/gold/p2p/udplite/lite.go +++ b/gold/p2p/udplite/lite.go @@ -75,11 +75,11 @@ func listenUDPLite(network string, laddr *net.UDPAddr) (*net.UDPConn, error) { } var errsys error err = rc.Control(func(fd uintptr) { - errsys = syscall.SetsockoptInt(int(fd), SOL_UDPLITE, UDPLITE_SEND_CSCOV, head.PacketHeadLen+8) // for xor rand + errsys = syscall.SetsockoptInt(int(fd), SOL_UDPLITE, UDPLITE_SEND_CSCOV, int(head.PacketHeadLen)) // for xor rand if errsys != nil { return } - errsys = syscall.SetsockoptInt(int(fd), SOL_UDPLITE, UDPLITE_RECV_CSCOV, head.PacketHeadLen+8) // for xor rand + errsys = syscall.SetsockoptInt(int(fd), SOL_UDPLITE, UDPLITE_RECV_CSCOV, int(head.PacketHeadLen)) // for xor rand }) if err != nil { _ = conn.Close() diff --git a/gold/proto/data.go b/gold/proto/data.go new file mode 100644 index 0000000..59b7cac --- /dev/null +++ b/gold/proto/data.go @@ -0,0 +1,14 @@ +package proto + +import ( + "github.com/fumiama/orbyte/pbuf" + + "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/link" +) + +func init() { + link.AddProto(head.ProtoData, func(header *head.Packet, peer *link.Link, data pbuf.Bytes) { + peer.ToLower(header, data) + }) +} diff --git a/gold/proto/hello.go b/gold/proto/hello.go new file mode 100644 index 0000000..b5e9922 --- /dev/null +++ b/gold/proto/hello.go @@ -0,0 +1,30 @@ +package proto + +import ( + "github.com/fumiama/orbyte/pbuf" + "github.com/sirupsen/logrus" + + "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/link" + "github.com/fumiama/WireGold/internal/file" +) + +func init() { + link.AddProto(head.ProtoHello, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { + onHello(data, peer) + }) +} + +func onHello(data pbuf.Bytes, p *link.Link) { + data.V(func(b []byte) { + switch { + case len(b) == 0: + logrus.Warnln(file.Header(), "recv old packet, do nothing") + case b[0] == byte(head.HelloPing): + go p.WritePacket(head.ProtoHello, []byte{byte(head.HelloPong)}) + logrus.Infoln(file.Header(), "recv, send ack") + default: + logrus.Infoln(file.Header(), "recv ack, do nothing") + } + }) +} diff --git a/gold/proto/nat.go b/gold/proto/nat.go new file mode 100644 index 0000000..00ea949 --- /dev/null +++ b/gold/proto/nat.go @@ -0,0 +1,121 @@ +package proto + +import ( + "encoding/json" + + "github.com/fumiama/orbyte/pbuf" + "github.com/sirupsen/logrus" + + "github.com/fumiama/WireGold/config" + "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/link" + "github.com/fumiama/WireGold/gold/p2p" + + "github.com/fumiama/WireGold/internal/bin" + "github.com/fumiama/WireGold/internal/file" +) + +func init() { + link.AddProto(head.ProtoNotify, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { + data.V(func(b []byte) { + onNotify(peer, b) + }) + }) + link.AddProto(head.ProtoQuery, func(_ *head.Packet, peer *link.Link, data pbuf.Bytes) { + data.V(func(b []byte) { + onQuery(peer, b) + }) + }) +} + +// 收到通告包的处理函数 +func onNotify(l *link.Link, packet []byte) { + // TODO: 完成data解包与endpoint注册 + // 1. Data 解包 + // ---- 使用 head.Notify 解释 packet + notify := make(head.Notify, 32) + err := json.Unmarshal(packet, ¬ify) + if err != nil { + logrus.Errorln(file.Header(), "notify json unmarshal err:", err) + return + } + // 2. endpoint注册 + // ---- 遍历 Notify,注册对方的 endpoint 到 + // ---- connections,注意使用读写锁connmapmu + for peer, ep := range notify { + nw, epstr := ep[0], ep[1] + if nw != l.Me().EndPoint().Network() { + logrus.Warnln(file.Header(), "ignore different network notify", nw, "addr", epstr) + continue + } + addr, err := p2p.NewEndPoint(nw, epstr, l.Me().NetworkConfigs()...) + if err == nil { + p, ok := l.Me().IsInPeer(peer) + if ok { + if bin.IsNilInterface(p.EndPoint()) || !p.EndPoint().Euqal(addr) { + p.SetEndPoint(addr) + logrus.Infoln(file.Header(), "notify set ep of peer", peer, "to", ep) + } + continue + } + } + if config.ShowDebugLog { + logrus.Debugln(file.Header(), "notify drop invalid peer:", peer, "ep:", ep) + } + } +} + +// 收到询问包的处理函数 +func onQuery(l *link.Link, packet []byte) { + // 完成data解包与notify分发 + + // 1. Data 解包 + // ---- 使用 head.Query 解释 packet + // ---- 根据 Query 确定需要封装的 Notify + var peers head.Query + err := json.Unmarshal(packet, &peers) + if err != nil { + logrus.Errorln(file.Header(), "query json unmarshal err:", err) + return + } + + if l == nil || l.Me() == nil { + logrus.Errorln(file.Header(), "nil link/me") + return + } + + // 2. notify分发 + // ---- 封装 Notify 到 新的 packet + // ---- 调用 l.Send 发送到对方 + notify := make(head.Notify, len(peers)) + for _, p := range peers { + lnk, ok := l.Me().IsInPeer(p) + eps := "" + if l.Me().EndPoint().Network() == "udp" { // udp has real p2p + if bin.IsNilInterface(lnk.EndPoint()) { + continue + } + eps = lnk.EndPoint().String() + } + if eps == "" { + eps = l.RawEndPoint() // use registered ep only + } + if eps == "" { + continue + } + if ok && bin.IsNonNilInterface(lnk.EndPoint()) { + notify[p] = [2]string{ + lnk.EndPoint().Network(), + eps, + } + } + } + if len(notify) > 0 { + logrus.Infoln(file.Header(), "query wrap", len(notify), "notify") + w := bin.SelectWriter() + _ = json.NewEncoder(w).Encode(¬ify) + w.P(func(b *pbuf.Buffer) { + l.WritePacket(head.ProtoNotify, b.Bytes()) + }) + } +} diff --git a/helper/writer.go b/helper/writer.go deleted file mode 100644 index 8e9d627..0000000 --- a/helper/writer.go +++ /dev/null @@ -1,83 +0,0 @@ -package helper - -// https://github.com/Mrs4s/MiraiGo/blob/master/binary/writer.go - -import ( - "bytes" - "encoding/binary" - - "github.com/fumiama/orbyte" - "github.com/fumiama/orbyte/pbuf" -) - -// Writer 写入 -type Writer orbyte.Item[bytes.Buffer] - -func NewWriterF(f func(writer *Writer)) pbuf.Bytes { - w := SelectWriter() - f(w) - return w.TransBytes() -} - -func (w *Writer) p() *orbyte.Item[bytes.Buffer] { - return (*orbyte.Item[bytes.Buffer])(w) -} - -func (w *Writer) pp() *bytes.Buffer { - return w.p().Pointer() -} - -func (w *Writer) Write(b []byte) (n int, err error) { - return w.pp().Write(b) -} - -func (w *Writer) WriteByte(b byte) error { - return w.pp().WriteByte(b) -} - -func (w *Writer) WriteUInt16(v uint16) { - b := make([]byte, 2) - binary.LittleEndian.PutUint16(b, v) - w.Write(b) -} - -func (w *Writer) WriteUInt32(v uint32) { - b := make([]byte, 4) - binary.LittleEndian.PutUint32(b, v) - w.Write(b) -} - -func (w *Writer) WriteUInt64(v uint64) { - b := make([]byte, 8) - binary.LittleEndian.PutUint64(b, v) - w.Write(b) -} - -func (w *Writer) WriteString(v string) { - //w.WriteUInt32(uint32(len(v) + 4)) - w.pp().WriteString(v) -} - -func (w *Writer) Len() int { - return w.pp().Len() -} - -func (w *Writer) UnsafeBytes() []byte { - return w.pp().Bytes() -} - -func (w *Writer) TransUnderlyingBytes() []byte { - return w.p().Trans().Pointer().Bytes() -} - -func (w *Writer) TransBytes() pbuf.Bytes { - return pbuf.BufferItemToBytes(w.p().Trans()) -} - -func (w *Writer) Reset() { - w.pp().Reset() -} - -func (w *Writer) Grow(n int) { - w.pp().Grow(n) -} diff --git a/internal/algo/crypto.go b/internal/algo/crypto.go new file mode 100644 index 0000000..300ff75 --- /dev/null +++ b/internal/algo/crypto.go @@ -0,0 +1,115 @@ +package algo + +import ( + "crypto/cipher" + "crypto/rand" + "encoding/binary" + "errors" + + "github.com/fumiama/orbyte/pbuf" +) + +var ( + ErrCipherTextTooShort = errors.New("ciphertext too short") +) + +func EncodeAEAD(aead cipher.AEAD, additional uint16, b []byte) pbuf.Bytes { + nsz := aead.NonceSize() + // Accocate capacity for all the stuffs. + buf := pbuf.NewBytes(2 + nsz + len(b) + aead.Overhead()) + n := 0 + buf.V(func(buf []byte) { + binary.LittleEndian.PutUint16(buf[:2], additional) + nonce := buf[2 : 2+nsz] + // Select a random nonce + _, err := rand.Read(nonce) + if err != nil { + panic(err) + } + // Encrypt the message and append the ciphertext to the nonce. + eb := aead.Seal(nonce[nsz:nsz], nonce, b, buf[:2]) + n = len(eb) + }) + return buf.Slice(2, 2+nsz+n) +} + +func DecodeAEAD(aead cipher.AEAD, additional uint16, b []byte) (data pbuf.Bytes, err error) { + nsz := aead.NonceSize() + if len(b) < nsz { + return pbuf.Bytes{}, ErrCipherTextTooShort + } + // Split nonce and ciphertext. + nonce, ciphertext := b[:nsz], b[nsz:] + if len(ciphertext) == 0 { + return pbuf.Bytes{}, nil + } + // Decrypt the message and check it wasn't tampered with. + var buf [2]byte + binary.LittleEndian.PutUint16(buf[:], additional) + data = pbuf.NewBytes(len(ciphertext)) + n := 0 + data.V(func(b []byte) { + var d []byte + d, err = aead.Open(b[:0], nonce, ciphertext, buf[:]) + n = len(d) + }) + if err != nil { + return + } + return data.SliceTo(n), nil +} + +func EncodeXORLen(datalen int) int { + batchsz := datalen / 8 + return 8 + batchsz*8 + 8 // seqrand dat tail +} + +// EncodeXOR 按 8 字节, 以初始 mask 循环异或编码 data +func EncodeXOR(data []byte, mask uint64, seq uint32) pbuf.Bytes { + batchsz := len(data) / 8 + remain := len(data) % 8 + sum := mask + newdat := pbuf.NewBytes(EncodeXORLen(len(data))) + newdat.V(func(buf []byte) { + binary.LittleEndian.PutUint32(buf[:4], seq) + _, _ = rand.Read(buf[4:8]) // seqrand + sum ^= binary.LittleEndian.Uint64(buf[:8]) // init from seqrand + binary.LittleEndian.PutUint64(buf[:8], sum) + for i := 0; i < batchsz; i++ { // range on batch data + a := i * 8 + b := (i + 1) * 8 + sum ^= binary.LittleEndian.Uint64(data[a:b]) + binary.LittleEndian.PutUint64(buf[a+8:b+8], sum) + } + p := batchsz * 8 + copy(buf[8+p:], data[p:]) + buf[newdat.Len()-1] = byte(remain) + sum ^= binary.LittleEndian.Uint64(buf[8+p:]) + binary.LittleEndian.PutUint64(buf[8+p:], sum) + }) + return newdat +} + +// DecodeXOR 按 8 字节, 以初始 mask 循环异或解码 data, +// 解码结果完全覆盖 data. +func DecodeXOR(data []byte, mask uint64) (uint32, []byte) { + if len(data) < 16 || len(data)%8 != 0 { + return 0, nil + } + batchsz := len(data) / 8 + sum := mask + for i := 0; i < batchsz; i++ { + a := i * 8 + b := (i + 1) * 8 + x := binary.LittleEndian.Uint64(data[a:b]) + sum ^= x + binary.LittleEndian.PutUint64(data[a:b], sum) + sum = x + } + remain := data[len(data)-1] + if remain >= 8 { + return 0, nil + } + return binary.LittleEndian.Uint32(data[:4]), + data[8 : len(data)-8+int(remain)] +} diff --git a/gold/link/crypto_test.go b/internal/algo/crypto_test.go similarity index 88% rename from gold/link/crypto_test.go rename to internal/algo/crypto_test.go index bcb2efb..317a9ae 100644 --- a/gold/link/crypto_test.go +++ b/internal/algo/crypto_test.go @@ -1,4 +1,4 @@ -package link +package algo import ( "bytes" @@ -12,9 +12,7 @@ import ( ) func TestXOR(t *testing.T) { - m := Me{ - mask: 0x12345678_90abcdef, - } + mask := uint64(0x12345678_90abcdef) buf := make([]byte, 4096) buf2 := make([]byte, 4096) for i := 0; i < 4096; i++ { @@ -27,7 +25,7 @@ func TestXOR(t *testing.T) { if err != nil { t.Fatal(err) } - seq, dec := m.xordec(m.xorenc(r1.Bytes(), uint32(i)).Trans().Bytes()) + seq, dec := DecodeXOR(EncodeXOR(r1.Bytes(), mask, uint32(i)).Trans(), mask) if !bytes.Equal(dec, r2.Bytes()) { t.Fatal("unexpected xor at", i, "except", hex.EncodeToString(r2.Bytes()), "got", hex.EncodeToString(dec)) } @@ -53,11 +51,11 @@ func TestXChacha20(t *testing.T) { t.Fatal(err) } for i := 0; i < 4096; i++ { - db, err := decode(aead, uint16(i), encode(aead, uint16(i), data[:i]).Trans().Bytes()) + db, err := DecodeAEAD(aead, uint16(i), EncodeAEAD(aead, uint16(i), data[:i]).Trans()) if err != nil { t.Fatal(err) } - if !bytes.Equal(db.Bytes(), data[:i]) { + if !bytes.Equal(db.Trans(), data[:i]) { t.Fatal("unexpected preshared at idx(len)", i, "addt", uint16(i)) } } @@ -77,14 +75,14 @@ func TestExpandKeyUnit(t *testing.T) { func TestMixKeys(t *testing.T) { k1, _ := hex.DecodeString("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") k2, _ := hex.DecodeString("0000000000000000000000000000000000000000000000000000000000000000") - k := mixkeys(k1, k2) + k := MixKeys(k1, k2) kexp, _ := hex.DecodeString("55555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555") if !bytes.Equal(k, kexp) { t.Fatal(hex.EncodeToString(k)) } k1, _ = hex.DecodeString("1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") k2, _ = hex.DecodeString("deadbeef1239876540deadbeef1239876540deadbeef1239876540abcdef4567") - k = mixkeys(k1, k2) + k = MixKeys(k1, k2) kexp, _ = hex.DecodeString("2ca9188d3ebb4a9f22e34d4479d857fca48390253ebbe23f22cbcf6e59507ddc06a9b08794316abfa26b67cedb7a5d542c8912adb493c0352aebe76e73dadf7e") if !bytes.Equal(k, kexp) { t.Fatal(hex.EncodeToString(k)) diff --git a/internal/algo/hash.go b/internal/algo/hash.go new file mode 100644 index 0000000..8841c4a --- /dev/null +++ b/internal/algo/hash.go @@ -0,0 +1,47 @@ +package algo + +import ( + "bytes" + "crypto/md5" + "encoding/binary" + "encoding/hex" + + "github.com/fumiama/WireGold/config" + "github.com/fumiama/blake2b-simd" + "github.com/sirupsen/logrus" +) + +// Blake2bHash8 生成 data 的 blake2b hash, 返回前八位 +func Blake2bHash8(precrc64 uint64, data []byte) uint64 { + var tgt [32]byte + h := blake2b.New256() + binary.LittleEndian.PutUint64(tgt[:8], precrc64) + _, _ = h.Write(tgt[:8]) + _, _ = h.Write(data) + b := h.Sum(tgt[:0])[:8] + if config.ShowDebugLog { + logrus.Debugln("[algo] blk2b hash:", hex.EncodeToString(b)) + } + return binary.LittleEndian.Uint64(b) +} + +// IsVaildBlake2bHash8 在收齐全部分片并解密后验证 packet 合法性 +func IsVaildBlake2bHash8(precrc64 uint64, hash8data []byte) bool { + var tgt [32]byte + h := blake2b.New256() + binary.LittleEndian.PutUint64(tgt[:8], precrc64) + _, _ = h.Write(tgt[:8]) + _, _ = h.Write(hash8data[8:]) + b := h.Sum(tgt[:0])[:8] + if config.ShowDebugLog { + logrus.Debugln("[algo] blk2b sum calulated:", hex.EncodeToString(b)) + logrus.Debugln("[algo] blk2b sum in packet:", hex.EncodeToString(hash8data[:8])) + } + return bytes.Equal(b, hash8data[:8]) +} + +// MD5Hash8 calculate packet header checksum +func MD5Hash8(data []byte) uint64 { + m := md5.Sum(data) + return binary.LittleEndian.Uint64(m[:8]) +} diff --git a/internal/algo/key.go b/internal/algo/key.go new file mode 100644 index 0000000..79560db --- /dev/null +++ b/internal/algo/key.go @@ -0,0 +1,40 @@ +package algo + +import ( + "encoding/binary" + "math/bits" + "math/rand" +) + +func RandKeyIndex() uint8 { + return uint8(rand.Intn(32)) +} + +func MixKeys(k1, k2 []byte) []byte { + if len(k1) != 32 || len(k2) != 32 { + panic("unexpected key len") + } + k := make([]byte, 64) + for i := range k1 { + k1i, k2i := i, 31-i + k1v, k2v := k1[k1i], k2[k2i] + binary.LittleEndian.PutUint16( + k[i*2:(i+1)*2], + expandkeyunit(k1v, k2v), + ) + } + return k +} + +func expandkeyunit(v1, v2 byte) (v uint16) { + v1s, v2s := uint16(v1), uint16(bits.Reverse8(v2)) + for i := 0; i < 8; i++ { + v |= v1s & (1 << (i * 2)) + v1s <<= 1 + } + for i := 0; i < 8; i++ { + v2s <<= 1 + v |= v2s & (2 << (i * 2)) + } + return +} diff --git a/internal/algo/zstd.go b/internal/algo/zstd.go new file mode 100644 index 0000000..47ceea7 --- /dev/null +++ b/internal/algo/zstd.go @@ -0,0 +1,41 @@ +package algo + +import ( + "bytes" + "io" + + "github.com/fumiama/WireGold/internal/bin" + "github.com/fumiama/orbyte/pbuf" + "github.com/klauspost/compress/zstd" +) + +func EncodeZstd(data []byte) pbuf.Bytes { + return bin.SelectWriter().P(func(w *pbuf.Buffer) { + enc, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedFastest)) + if err != nil { + panic(err) + } + _, err = io.Copy(enc, bytes.NewReader(data)) + if err != nil { + panic(err) + } + err = enc.Close() + if err != nil { + panic(err) + } + }).ToBytes() +} + +func DecodeZstd(data []byte) (b pbuf.Bytes, err error) { + dec, err := zstd.NewReader(bytes.NewReader(data)) + if err != nil { + return pbuf.Bytes{}, err + } + + b = bin.SelectWriter().P(func(w *pbuf.Buffer) { + _, err = io.Copy(w, dec) + dec.Close() + }).ToBytes() + + return +} diff --git a/helper/data.go b/internal/bin/data.go similarity index 82% rename from helper/data.go rename to internal/bin/data.go index 6b860cd..2a52276 100644 --- a/helper/data.go +++ b/internal/bin/data.go @@ -1,10 +1,14 @@ -package helper +package bin import ( + "encoding/binary" "reflect" "unsafe" ) +// IsLittleEndian judge by binary packet. +var IsLittleEndian = reflect.ValueOf(&binary.NativeEndian).Elem().Field(0).Type().String() == "binary.littleEndian" + // slice is the runtime representation of a slice. // It cannot be used safely or portably and its representation may // change in a later release. diff --git a/helper/pool.go b/internal/bin/pool.go similarity index 91% rename from helper/pool.go rename to internal/bin/pool.go index 0130511..161a479 100644 --- a/helper/pool.go +++ b/internal/bin/pool.go @@ -1,4 +1,4 @@ -package helper +package bin import ( "github.com/fumiama/orbyte/pbuf" diff --git a/internal/bin/writer.go b/internal/bin/writer.go new file mode 100644 index 0000000..d2bde2a --- /dev/null +++ b/internal/bin/writer.go @@ -0,0 +1,66 @@ +package bin + +// https://github.com/Mrs4s/MiraiGo/blob/master/binary/writer.go + +import ( + "encoding/binary" + + "github.com/fumiama/orbyte/pbuf" +) + +// Writer 写入 +type Writer pbuf.OBuffer + +func NewWriterF(f func(writer *Writer)) pbuf.Bytes { + w := SelectWriter() + f(w) + return w.ToBytes() +} + +func (w *Writer) P(f func(*pbuf.Buffer)) *Writer { + (*pbuf.OBuffer)(w).P(f) + return w +} + +func (w *Writer) Write(b []byte) (n int, err error) { + w.P(func(buf *pbuf.Buffer) { + n, err = buf.Write(b) + }) + return +} + +func (w *Writer) WriteByte(b byte) (err error) { + w.P(func(buf *pbuf.Buffer) { + err = buf.WriteByte(b) + }) + return +} + +func (w *Writer) WriteString(s string) (n int, err error) { + w.P(func(buf *pbuf.Buffer) { + n, err = buf.WriteString(s) + }) + return +} + +func (w *Writer) WriteUInt16(v uint16) { + b := make([]byte, 2) + binary.LittleEndian.PutUint16(b, v) + w.Write(b) +} + +func (w *Writer) WriteUInt32(v uint32) { + b := make([]byte, 4) + binary.LittleEndian.PutUint32(b, v) + w.Write(b) +} + +func (w *Writer) WriteUInt64(v uint64) { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, v) + w.Write(b) +} + +func (w *Writer) ToBytes() pbuf.Bytes { + return pbuf.BufferItemToBytes((*pbuf.OBuffer)(w)) +} diff --git a/helper/file.go b/internal/file/file.go similarity index 97% rename from helper/file.go rename to internal/file/file.go index 510dbc4..0632c88 100644 --- a/helper/file.go +++ b/internal/file/file.go @@ -1,4 +1,4 @@ -package helper +package file import ( "os" diff --git a/internal/file/log.go b/internal/file/log.go new file mode 100644 index 0000000..d01e60c --- /dev/null +++ b/internal/file/log.go @@ -0,0 +1,52 @@ +package file + +import ( + "encoding/hex" + "runtime" + "strings" +) + +func Header() string { + file, fn := fileFuncName(2) + sb := strings.Builder{} + sb.WriteString("[") + sb.WriteString(file) + sb.WriteString("] ") + sb.WriteString(fn) + return sb.String() +} + +func fileFuncName(skip int) (string, string) { + pc, file, _, ok := runtime.Caller(skip) + if !ok { + return "unknown", "unknown" + } + fn := runtime.FuncForPC(pc).Name() + i := strings.LastIndex(fn, "/") + fn = fn[i+1:] + i = strings.LastIndex(file, "/") + if i < 0 { + i = strings.LastIndex(file, "\\") + if i < 0 { + return file, fn + } + } + nm := file[i+1:] + if len(nm) == 0 { + return file, fn + } + i = strings.LastIndex(nm, ".") + if i <= 0 { + return nm, fn + } + return nm[:i], fn +} + +func ToLimitHexString(data []byte, bound int) string { + endl := "..." + if len(data) < bound { + bound = len(data) + endl = "." + } + return hex.EncodeToString(data[:bound]) + endl +} diff --git a/main.go b/main.go index c511b73..f3d5e77 100644 --- a/main.go +++ b/main.go @@ -15,8 +15,8 @@ import ( "github.com/sirupsen/logrus" "github.com/fumiama/WireGold/config" - "github.com/fumiama/WireGold/gold/head" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/bin" + "github.com/fumiama/WireGold/internal/file" "github.com/fumiama/WireGold/upper" "github.com/fumiama/WireGold/upper/services/wg" ) @@ -26,7 +26,7 @@ func main() { gen := flag.Bool("g", false, "generate key pair") pshgen := flag.Bool("pg", false, "generate preshared key") showp := flag.Bool("p", false, "show my publickey") - file := flag.String("c", "config.yaml", "specify conf file") + cfile := flag.String("c", "config.yaml", "specify conf file") debug := flag.Bool("d", false, "print debug logs") warn := flag.Bool("w", false, "only show logs above warn level") logfile := flag.String("l", "-", "write log to file") @@ -52,8 +52,8 @@ func main() { if err != nil { panic(err) } - fmt.Println("PublicKey:", helper.BytesToString(pubk[:57])) - fmt.Println("PrivateKey:", helper.BytesToString(prvk[:57])) + fmt.Println("PublicKey:", bin.BytesToString(pubk[:57])) + fmt.Println("PrivateKey:", bin.BytesToString(prvk[:57])) os.Exit(0) } if *pshgen { @@ -66,7 +66,7 @@ func main() { if err != nil { panic(err) } - fmt.Println("PresharedKey:", helper.BytesToString(pshk[:57])) + fmt.Println("PresharedKey:", bin.BytesToString(pshk[:57])) os.Exit(0) } if *logfile != "-" { @@ -77,7 +77,7 @@ func main() { defer f.Close() logrus.SetOutput(f) } - if helper.IsNotExist(*file) { + if file.IsNotExist(*cfile) { f := new(bytes.Buffer) var r string fmt.Print("IP: ") @@ -125,14 +125,14 @@ func main() { f.WriteString("MTU: " + strings.TrimSpace(r) + "\n") r = "" - cfgf, err := os.Create(*file) + cfgf, err := os.Create(*cfile) if err != nil { panic(err) } cfgf.Write(f.Bytes()) cfgf.Close() } - c := config.Parse(*file) + c := config.Parse(*cfile) if c.IP == "" { displayHelp("nil ip") } @@ -145,7 +145,7 @@ func main() { if c.EndPoint == "" { displayHelp("nil endpoint") } - if c.MTU <= head.PacketHeadLen { + if c.MTU < 128 { displayHelp("invalid mtu") } w, err := wg.NewWireGold(&c) diff --git a/upper/services/tunnel/tunnel.go b/upper/services/tunnel/tunnel.go index 03b52df..677f20b 100644 --- a/upper/services/tunnel/tunnel.go +++ b/upper/services/tunnel/tunnel.go @@ -12,8 +12,7 @@ import ( _ "github.com/fumiama/WireGold/gold/p2p/tcp" // support tcp connection _ "github.com/fumiama/WireGold/gold/p2p/udp" // support udp connection _ "github.com/fumiama/WireGold/gold/p2p/udplite" // support udplite connection - "github.com/fumiama/orbyte" - "github.com/fumiama/orbyte/pbuf" + _ "github.com/fumiama/WireGold/gold/proto" // support basic protos "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/head" @@ -23,7 +22,7 @@ import ( type Tunnel struct { l *link.Link in chan []byte - out chan *orbyte.Item[head.Packet] + out chan link.LinkData outcache []byte peerip net.IP src uint16 @@ -35,7 +34,7 @@ func Create(me *link.Me, peer string) (s Tunnel, err error) { s.l, err = me.Connect(peer) if err == nil { s.in = make(chan []byte, 4) - s.out = make(chan *orbyte.Item[head.Packet], 4) + s.out = make(chan link.LinkData, 4) s.peerip = net.ParseIP(peer) } else { logrus.Errorln("[tunnel] create err:", err) @@ -72,14 +71,14 @@ func (s *Tunnel) Read(p []byte) (int, error) { d = s.outcache } else { pkt := <-s.out - if pkt == nil { + if !pkt.D.HasInit() { return 0, io.EOF } - if pkt.Pointer().BodyLen() < 4 { - logrus.Warnln("[tunnel] unexpected packet data len", pkt.Pointer().BodyLen(), "content", hex.EncodeToString(pkt.Pointer().UnsafeBody())) + if pkt.H.Size() < 4 { + logrus.Warnln("[tunnel] unexpected packet data len", pkt.H.Size(), "content", hex.EncodeToString(pkt.D.Trans())) return 0, io.EOF } - d = pkt.Pointer().UnsafeBody()[4:] + d = pkt.D.Trans()[4:] } if d != nil { if len(p) >= len(d) { @@ -126,37 +125,25 @@ func (s *Tunnel) handleWrite() { binary.LittleEndian.PutUint32(buf[:4], seq) seq++ copy(buf[4:], b[:s.mtu-4]) - _, err := s.l.WritePacket( - head.NewPacketPartial(head.ProtoData, s.src, s.peerip, s.dest, pbuf.ParseBytes(buf...)), false, - ) - if err != nil { - logrus.Errorln("[tunnel] seq", seq-1, "write err:", err) - return - } + s.l.WritePacket(head.ProtoData, buf) if config.ShowDebugLog { - logrus.Debugln("[tunnel] seq", seq-1, "write succeeded") + logrus.Debugln("[tunnel] seq", seq-1, "written") } b = b[s.mtu-4:] } binary.LittleEndian.PutUint32(buf[:4], seq) seq++ copy(buf[4:], b) - _, err := s.l.WritePacket( - head.NewPacketPartial(head.ProtoData, s.src, s.peerip, s.dest, pbuf.ParseBytes(buf[:len(b)+4]...)), false, - ) - if err != nil { - logrus.Errorln("[tunnel] seq", seq-1, "write err:", err) - break - } + s.l.WritePacket(head.ProtoData, buf[:len(b)+4]) if config.ShowDebugLog { - logrus.Debugln("[tunnel] seq", seq-1, "write succeeded") + logrus.Debugln("[tunnel] seq", seq-1, "written") } } } func (s *Tunnel) handleRead() { seq := uint32(0) - seqmap := make(map[uint32]*orbyte.Item[head.Packet]) + seqmap := make(map[uint32]link.LinkData) for { if p, ok := seqmap[seq]; ok { if config.ShowDebugLog { @@ -168,21 +155,24 @@ func (s *Tunnel) handleRead() { continue } p := s.l.Read() - if p == nil { + if !p.D.HasInit() { logrus.Errorln("[tunnel] read recv nil") break } end := 64 endl := "..." - pp := p.Pointer() - if pp.BodyLen() < 64 { - end = pp.BodyLen() + pp := &p.H + if pp.Size() < 64 { + end = pp.Size() endl = "." } - if config.ShowDebugLog { - logrus.Debugln("[tunnel] read recv", hex.EncodeToString(pp.UnsafeBody()[:end]), endl) - } - recvseq := binary.LittleEndian.Uint32(pp.UnsafeBody()[:4]) + var recvseq uint32 + p.D.V(func(b []byte) { + if config.ShowDebugLog { + logrus.Debugln("[tunnel] read recv", hex.EncodeToString(b[:end]), endl) + } + recvseq = binary.LittleEndian.Uint32(b[:4]) + }) if recvseq == seq { if config.ShowDebugLog { logrus.Debugln("[tunnel] dispatch seq", seq) diff --git a/upper/services/tunnel/tunnel_test.go b/upper/services/tunnel/tunnel_test.go index d2a14eb..2be5f39 100644 --- a/upper/services/tunnel/tunnel_test.go +++ b/upper/services/tunnel/tunnel_test.go @@ -12,11 +12,10 @@ import ( "time" curve "github.com/fumiama/go-x25519" - "github.com/fumiama/orbyte" "github.com/sirupsen/logrus" "github.com/fumiama/WireGold/gold/link" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/bin" ) func TestTunnelUDP(t *testing.T) { @@ -432,7 +431,7 @@ type logFormat struct { func (f logFormat) Format(entry *logrus.Entry) ([]byte, error) { // this writer will not be put back - buf := (*orbyte.Item[bytes.Buffer])(helper.SelectWriter()).Trans().Pointer() + buf := bin.SelectWriter() buf.WriteByte('[') if f.enableColor { @@ -446,7 +445,7 @@ func (f logFormat) Format(entry *logrus.Entry) ([]byte, error) { buf.WriteString(entry.Message) buf.WriteString("\n") - return buf.Bytes(), nil + return buf.ToBytes().Trans(), nil } const ( diff --git a/upper/services/wg/wg.go b/upper/services/wg/wg.go index 84ca7cd..9fab70b 100644 --- a/upper/services/wg/wg.go +++ b/upper/services/wg/wg.go @@ -13,10 +13,11 @@ import ( _ "github.com/fumiama/WireGold/gold/p2p/tcp" // support tcp connection _ "github.com/fumiama/WireGold/gold/p2p/udp" // support udp connection _ "github.com/fumiama/WireGold/gold/p2p/udplite" // support udplite connection + _ "github.com/fumiama/WireGold/gold/proto" // support basic protos "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/link" - "github.com/fumiama/WireGold/helper" + "github.com/fumiama/WireGold/internal/bin" ) const suffix32 = "㴄" @@ -32,7 +33,7 @@ func NewWireGold(c *config.Config) (wg WG, err error) { wg.c = c var k []byte - k, err = base14.UTF82UTF16BE(helper.StringToBytes(c.PrivateKey + suffix32)) + k, err = base14.UTF82UTF16BE(bin.StringToBytes(c.PrivateKey + suffix32)) if err != nil { return } @@ -47,7 +48,7 @@ func NewWireGold(c *config.Config) (wg WG, err error) { if err != nil { return } - wg.PublicKey = helper.BytesToString(pubk[:57]) + wg.PublicKey = bin.BytesToString(pubk[:57]) return } @@ -118,7 +119,7 @@ func (wg *WG) init(srcport, dstport uint16) { for _, peer := range wg.c.Peers { var peerkey [32]byte - k, err := base14.UTF82UTF16BE(helper.StringToBytes(peer.PublicKey + suffix32)) + k, err := base14.UTF82UTF16BE(bin.StringToBytes(peer.PublicKey + suffix32)) if err != nil { panic(err) } @@ -128,7 +129,7 @@ func (wg *WG) init(srcport, dstport uint16) { } var pshk *[32]byte if peer.PresharedKey != "" { - k, err := base14.UTF82UTF16BE(helper.StringToBytes(peer.PresharedKey + suffix32)) + k, err := base14.UTF82UTF16BE(bin.StringToBytes(peer.PresharedKey + suffix32)) if err != nil { panic(err) }