1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-13 13:40:30 +08:00

feat: impl. trans & ttl

This commit is contained in:
源文雨
2025-03-13 01:52:35 +09:00
parent 658916268a
commit 6fc45333d8
20 changed files with 335 additions and 271 deletions

View File

@@ -1,22 +1,22 @@
package link
import (
"strconv"
"github.com/RomiChan/syncx"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/orbyte/pbuf"
)
// 事件分发器
var dispachers map[uint8]EventDispacher = make(map[uint8]EventDispacher)
var dispachers syncx.Map[uint8, Dispacher]
type EventDispacher func(header *head.Packet, peer *Link, data pbuf.Bytes)
type Dispacher func(header *head.Packet, peer *Link, data pbuf.Bytes)
// AddProto is thread unsafe. Use in init() only.
func AddProto(p uint8, d EventDispacher) {
_, ok := dispachers[p]
if ok {
panic("proto " + strconv.Itoa(int(p)) + " has been registered")
}
dispachers[p] = d
// RegisterDispacher of proto
func RegisterDispacher(p uint8, d Dispacher) (actual Dispacher, hasexist bool) {
return dispachers.LoadOrStore(p, d)
}
// GetDispacher fn, ok
func GetDispacher(p uint8) (Dispacher, bool) {
return dispachers.Load(p)
}

View File

@@ -6,7 +6,6 @@ import (
"strconv"
"sync/atomic"
"time"
"unsafe"
"github.com/sirupsen/logrus"
@@ -14,7 +13,6 @@ import (
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/gold/p2p"
"github.com/fumiama/WireGold/internal/algo"
"github.com/fumiama/WireGold/internal/bin"
"github.com/fumiama/WireGold/internal/file"
"github.com/fumiama/orbyte/pbuf"
)
@@ -76,7 +74,7 @@ func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) {
atomic.StoreInt64(&m.recvlooptime, now)
}
buf.V(func(b []byte) {
h := m.wait(b[:n])
h := m.wait(b[:n], addr)
if !h.HasInit() {
if config.ShowDebugLog {
logrus.Debugln("[listen] queue waiting")
@@ -100,94 +98,62 @@ func (m *Me) dispatch(header *head.Packet, body []byte, addr p2p.EndPoint) {
}
srcip := header.Src()
dstip := header.Dst()
p, ok := m.IsInPeer(srcip.String())
if config.ShowDebugLog {
logrus.Debugln("[listen] recv from endpoint", addr, "src", srcip, "dst", dstip)
}
if !ok {
logrus.Warnln("[listen] packet from", srcip, "to", dstip, "is refused")
p := m.extractPeer(srcip, dstip, addr)
if p == nil {
return
}
if bin.IsNilInterface(p.endpoint) || !p.endpoint.Euqal(addr) {
if m.ep.Network() == "tcp" && !addr.Euqal(p.endpoint) {
logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String())
p.endpoint = addr
} else { // others are all no status link
logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String())
p.endpoint = addr
}
if !p.Accept(srcip) {
logrus.Warnln("[listen] refused packet from", srcip.String()+":"+strconv.Itoa(int(header.SrcPort)))
return
}
now := time.Now()
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.lastalive)), unsafe.Pointer(&now))
switch {
case p.IsToMe(dstip):
if !p.Accept(srcip) {
logrus.Warnln("[listen] refused packet from", srcip.String()+":"+strconv.Itoa(int(header.SrcPort)))
return
if !p.IsToMe(dstip) {
logrus.Warnln("[listen] unhandled trans packet from", srcip.String()+":"+strconv.Itoa(int(header.SrcPort)))
return
}
addt := header.AdditionalData()
var err error
data, err := p.decode(header.CipherIndex(), addt, body)
if err != nil {
if config.ShowDebugLog {
logrus.Debugln("[listen] drop invalid packet key idx:", header.CipherIndex(), "addt:", addt, "err:", err)
}
addt := header.AdditionalData()
var err error
data, err := p.decode(header.CipherIndex(), addt, body)
return
}
if data.Len() < 8 {
if config.ShowDebugLog {
logrus.Debugln("[listen] drop invalid data len packet key idx:", header.CipherIndex(), "addt:", addt, "len", data.Len())
}
return
}
ok := false
data.V(func(b []byte) {
ok = algo.IsVaildBlake2bHash8(header.PreCRC64(), b)
})
if !ok {
if config.ShowDebugLog {
logrus.Debugln("[listen] drop invalid hash packet")
}
return
}
data = data.SliceFrom(8)
if p.usezstd {
data.V(func(b []byte) {
data, err = algo.DecodeZstd(b) // skip hash
})
if err != nil {
if config.ShowDebugLog {
logrus.Debugln("[listen] drop invalid packet key idx:", header.CipherIndex(), "addt:", addt, "err:", err)
logrus.Debugln("[listen] drop invalid zstd packet:", err)
}
return
}
if data.Len() < 8 {
if config.ShowDebugLog {
logrus.Debugln("[listen] drop invalid data len packet key idx:", header.CipherIndex(), "addt:", addt, "len", data.Len())
}
return
}
ok := false
data.V(func(b []byte) {
ok = algo.IsVaildBlake2bHash8(header.PreCRC64(), b)
})
if !ok {
if config.ShowDebugLog {
logrus.Debugln("[listen] drop invalid hash packet")
}
return
}
data = data.SliceFrom(8)
if p.usezstd {
data.V(func(b []byte) {
data, err = algo.DecodeZstd(b) // skip hash
})
if err != nil {
if config.ShowDebugLog {
logrus.Debugln("[listen] drop invalid zstd packet:", err)
}
return
}
if config.ShowDebugLog {
logrus.Debugln("[listen] zstd decoded len:", data.Len())
}
}
fn, ok := dispachers[header.Proto.Proto()]
if !ok {
logrus.Warnln(file.Header(), "unsupported proto", header.Proto.Proto())
return
}
fn(header, p, data)
return
case p.Accept(dstip): //TODO: 移除此处转发, 将转发放到 wait
if !p.allowtrans {
logrus.Warnln("[listen] refused to trans packet to", dstip.String()+":"+strconv.Itoa(int(header.DstPort)))
return
}
// 转发
lnk := m.router.NextHop(dstip.String())
if lnk == nil {
logrus.Warnln("[listen] transfer drop packet: nil nexthop")
return
}
lnk.WritePacket(head.ProtoTrans, body)
if config.ShowDebugLog {
logrus.Debugln("[listen] trans", len(body), "bytes body to", dstip.String()+":"+strconv.Itoa(int(header.DstPort)))
logrus.Debugln("[listen] zstd decoded len:", data.Len())
}
default:
logrus.Warnln("[listen] packet dst", dstip.String()+":"+strconv.Itoa(int(header.DstPort)), "is not in peers")
}
fn, ok := GetDispacher(header.Proto.Proto())
if !ok {
logrus.Warnln(file.Header(), "unsupported proto", header.Proto.Proto())
return
}
fn(header, p, data)
}

View File

@@ -62,6 +62,8 @@ type Me struct {
recvloopcnt uintptr
// 是否进行 base16384 编码
base14 bool
// 本机初始 ttl
ttl uint8
// 本机网络端点初始化配置
networkconfigs []any
}
@@ -76,6 +78,7 @@ type MyConfig struct {
SrcPort, DstPort, MTU, SpeedLoop uint16
Mask uint64
Base14 bool
MaxTTL uint8
}
type NICConfig struct {
@@ -131,6 +134,11 @@ func NewMe(cfg *MyConfig) (m Me) {
m.mask = cfg.Mask
m.recvlooptime = time.Now().UnixMilli()
m.base14 = cfg.Base14
if cfg.MaxTTL == 0 {
m.ttl = 64
} else {
m.ttl = cfg.MaxTTL
}
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], m.mask)
logrus.Infoln("[me] xor mask", hex.EncodeToString(buf[:]))
@@ -179,6 +187,10 @@ func (m *Me) MTU() uint16 {
return m.mtu
}
func (m *Me) TTL() uint8 {
return m.ttl
}
func (m *Me) EndPoint() p2p.EndPoint {
return m.ep
}
@@ -298,7 +310,7 @@ func (m *Me) sendAllSameDst(packet []byte) (n int) {
copy(b, packet)
})
go pcp.V(func(b []byte) {
lnk.WritePacket(head.ProtoData, b)
lnk.WritePacket(head.ProtoData, b, lnk.me.ttl)
})
return
}

View File

@@ -33,7 +33,7 @@ func (l *Link) keepAlive(dur int64) {
logrus.Infoln(file.Header(), "re-connect me succeeded")
}
}
l.WritePacket(head.ProtoHello, []byte{byte(head.HelloPing)})
l.WritePacket(head.ProtoHello, []byte{byte(head.HelloPing)}, 64)
logrus.Infoln(file.Header(), "send keep alive to", l.peerip)
}
}
@@ -50,7 +50,7 @@ func (l *Link) sendQuery(tick time.Duration, peers ...string) {
}
t := time.NewTicker(tick)
for range t.C {
l.WritePacket(head.ProtoQuery, data)
l.WritePacket(head.ProtoQuery, data, l.me.ttl)
logrus.Infoln(file.Header(), "send query to", l.peerip)
}
}

View File

@@ -2,13 +2,19 @@ package link
import (
"net"
"sync/atomic"
"time"
"unsafe"
"github.com/fumiama/WireGold/gold/p2p"
"github.com/fumiama/WireGold/internal/algo"
curve "github.com/fumiama/go-x25519"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/chacha20poly1305"
"github.com/fumiama/WireGold/config"
"github.com/fumiama/WireGold/gold/p2p"
"github.com/fumiama/WireGold/internal/algo"
"github.com/fumiama/WireGold/internal/bin"
"github.com/fumiama/WireGold/internal/file"
)
type PeerConfig struct {
@@ -123,3 +129,26 @@ func (m *Me) IsInPeer(peer string) (p *Link, ok bool) {
m.connmapmu.RUnlock()
return
}
func (m *Me) extractPeer(srcip, dstip net.IP, addr p2p.EndPoint) *Link {
p, ok := m.IsInPeer(srcip.String())
if config.ShowDebugLog {
logrus.Debugln(file.Header(), "recv from endpoint", addr, "src", srcip, "dst", dstip)
}
if !ok {
logrus.Warnln(file.Header(), "packet from", srcip, "to", dstip, "is refused")
return nil
}
if bin.IsNilInterface(p.endpoint) || !p.endpoint.Euqal(addr) {
if m.ep.Network() == "tcp" && !addr.Euqal(p.endpoint) {
logrus.Infoln(file.Header(), "set endpoint of peer", p.peerip, "to", addr.String())
p.endpoint = addr
} else { // others are all no status link
logrus.Infoln(file.Header(), "set endpoint of peer", p.peerip, "to", addr.String())
p.endpoint = addr
}
}
now := time.Now()
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.lastalive)), unsafe.Pointer(&now))
return p
}

View File

@@ -8,8 +8,10 @@ import (
"github.com/fumiama/WireGold/config"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/gold/p2p"
"github.com/fumiama/WireGold/internal/bin"
base14 "github.com/fumiama/go-base16384"
"github.com/fumiama/orbyte/pbuf"
"github.com/sirupsen/logrus"
)
@@ -18,8 +20,7 @@ func (l *Link) Read() LinkData {
return <-l.pipe
}
// wait TODO: 判断是否为 trans 并提前 call dispatch
func (m *Me) wait(data []byte) (h head.PacketBytes) {
func (m *Me) wait(data []byte, addr p2p.EndPoint) (h head.PacketBytes) {
if len(data) < int(head.PacketHeadLen)+8 { // not a valid packet
if config.ShowDebugLog {
logrus.Debugln("[recv] invalid data len", len(data))
@@ -44,7 +45,7 @@ func (m *Me) wait(data []byte) (h head.PacketBytes) {
}
return
}
data = w.ToBytes().Trans()
data = w.ToBytes().Copy().Trans()
if len(data) < bound {
bound = len(data)
endl = "."
@@ -94,6 +95,31 @@ func (m *Me) wait(data []byte) (h head.PacketBytes) {
}
header.B(func(buf []byte, p *head.Packet) {
peer := m.extractPeer(p.Src(), p.Dst(), addr)
if peer == nil {
return
}
if !peer.IsToMe(p.Dst()) { // 提前处理转发
if !peer.allowtrans {
logrus.Warnln("[recv] refused to trans packet to", p.Dst().String()+":"+strconv.Itoa(int(p.DstPort)))
return
}
// 转发
lnk := m.router.NextHop(p.Dst().String())
if lnk == nil {
logrus.Warnln("[recv] transfer drop packet: nil nexthop")
return
}
if head.DecTTL(data) { // need drop
logrus.Warnln("[recv] transfer drop packet: zero ttl")
return
}
go lnk.write2peer(pbuf.ParseBytes(data...).Copy(), seq)
if config.ShowDebugLog {
logrus.Debugln("[listen] trans", len(data), "bytes packet to", p.Dst().String()+":"+strconv.Itoa(int(p.DstPort)))
}
return
}
if !p.Proto.HasMore() {
ok := p.WriteDataSegment(data, buf)
if !ok {

View File

@@ -32,7 +32,7 @@ func randseq(i uint16) uint32 {
// WritePacket 基于 data 向 peer 发包
//
// data 可为空, 因为不保证可达所以不返回错误。
func (l *Link) WritePacket(proto uint8, data []byte) {
func (l *Link) WritePacket(proto uint8, data []byte, ttl uint8) {
teatype := l.randkeyidx()
sndcnt := uint16(l.incgetsndcnt())
mtu := l.mtu
@@ -44,7 +44,7 @@ func (l *Link) WritePacket(proto uint8, data []byte) {
}
pb := head.NewPacketBuilder().
Src(l.me.me, l.me.srcport).Dst(l.peerip, l.me.dstport).
Proto(proto).TTL(64).With(data)
Proto(proto).TTL(ttl).With(data)
if l.usezstd {
pb.Zstd()
}