From 32af3ce1425d5299750872f8c98340e814249b0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Sun, 14 Jul 2024 22:26:44 +0900 Subject: [PATCH] feat: split udp protocol to folder p2p --- README.md | 2 -- config/cfg.go | 11 +------ go.mod | 1 + go.sum | 2 ++ gold/head/nat.go | 4 +-- gold/link/link.go | 3 +- gold/link/listen.go | 26 +++++++-------- gold/link/me.go | 33 +++++++++++-------- gold/link/nat.go | 11 ++++--- gold/link/peer.go | 3 +- gold/link/send.go | 4 +-- gold/p2p/define.go | 40 +++++++++++++++++++++++ gold/p2p/udp/init.go | 26 +++++++++++++++ gold/p2p/udp/udp.go | 58 +++++++++++++++++++++++++++++++++ upper/services/tunnel/tunnel.go | 2 ++ upper/services/wg/wg.go | 3 ++ 16 files changed, 180 insertions(+), 49 deletions(-) create mode 100644 gold/p2p/define.go create mode 100644 gold/p2p/udp/init.go create mode 100644 gold/p2p/udp/udp.go diff --git a/README.md b/README.md index eccb394..17e5767 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,6 @@ Mask: 0x1234567890abcdef Peers: - IP: "192.168.233.2" - SubNet: 192.168.233.0/24 PublicKey: 徯萃嵾爻燸攗窍褃冔蒔犡緇袿屿組待族砇嘀 PresharedKey: 瀸敀爅崾嘊嵜紼樴稍毯攣矐訷蟷扛嬋庩崛昀 EndPoint: 1.2.3.4:56789 @@ -58,7 +57,6 @@ Peers: AllowTrans: true - IP: "192.168.233.3" - SubNet: 192.168.233.0/24 PublicKey: 牢喨粷詸衭譛浾蘹櫠砙杹蟫瑳叩刋橋経挵蘀 PresharedKey: 竅琚喫従痸告烈兇厕趭萨假蔛瀇譄施烸蝫瘀 EndPoint: "" diff --git a/config/cfg.go b/config/cfg.go index cb0e747..a851396 100644 --- a/config/cfg.go +++ b/config/cfg.go @@ -8,20 +8,12 @@ import ( "gopkg.in/yaml.v3" ) -// EndPoint 一个终结点的信息 -type EndPoint struct { - Host string `yaml:"Host"` - Port int64 `yaml:"Port"` - Poly uint64 `yaml:"Poly"` // Poly 是 port 随机切换算法的生成多项式, 0 为禁用 - ReconnectSeconds int64 `yaml:"ReconnectSeconds"` // ReconnectSeconds 断开重连间隔, 每次到时即向对端通报并切换到新的端口, 0 为禁用 - FECMethod string `yaml:"FECMethod"` // FECMethod 可选 1/2 2/3 -} - // Config WireGold 配置文件 type Config struct { IP string `yaml:"IP"` SubNet string `yaml:"SubNet"` PrivateKey string `yaml:"PrivateKey"` + Network string `yaml:"Network"` // Network udp or ws (WIP) EndPoint string `yaml:"EndPoint"` MTU int64 `yaml:"MTU"` SpeedLoop uint16 `yaml:"SpeedLoop"` @@ -32,7 +24,6 @@ type Config struct { // Peer 对端信息 type Peer struct { IP string `yaml:"IP"` - SubNet string `yaml:"SubNet"` PublicKey string `yaml:"PublicKey"` PresharedKey string `yaml:"PresharedKey"` EndPoint string `yaml:"EndPoint"` diff --git a/go.mod b/go.mod index d01545d..9dc1031 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/FloatTech/ttl v0.0.0-20230307105452-d6f7b2b647d1 + github.com/RomiChan/syncx v0.0.0-20240418144900-b7402ffdebc7 github.com/fumiama/blake2b-simd v0.0.0-20220412110131-4481822068bb github.com/fumiama/go-base16384 v1.7.0 github.com/fumiama/go-x25519 v1.0.0 diff --git a/go.sum b/go.sum index 339497e..6d1ff6b 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/FloatTech/ttl v0.0.0-20230307105452-d6f7b2b647d1 h1:g4pTnDJUW4VbJ9NvoRfUvdjDrHz/6QhfN/LoIIpICbo= github.com/FloatTech/ttl v0.0.0-20230307105452-d6f7b2b647d1/go.mod h1:fHZFWGquNXuHttu9dUYoKuNbm3dzLETnIOnm1muSfDs= +github.com/RomiChan/syncx v0.0.0-20240418144900-b7402ffdebc7 h1:S/ferNiehVjNaBMNNBxUjLtVmP/YWD6Yh79RfPv4ehU= +github.com/RomiChan/syncx v0.0.0-20240418144900-b7402ffdebc7/go.mod h1:vD7Ra3Q9onRtojoY5sMCLQ7JBgjUsrXDnDKyFxqpf9w= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/gold/head/nat.go b/gold/head/nat.go index 2aedf8c..86e167d 100644 --- a/gold/head/nat.go +++ b/gold/head/nat.go @@ -1,7 +1,7 @@ package head -// Notify 是 map[peerip]endpoint -type Notify = map[string]string +// Notify 是 map[peerip]{network, endpoint} +type Notify = map[string][2]string // Query 是 peerips 组成的数组 type Query = []string diff --git a/gold/link/link.go b/gold/link/link.go index bc0613c..1fe4b0c 100644 --- a/gold/link/link.go +++ b/gold/link/link.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/p2p" "github.com/fumiama/WireGold/helper" base14 "github.com/fumiama/go-base16384" ) @@ -24,7 +25,7 @@ type Link struct { // peer 的虚拟 ip peerip net.IP // peer 的公网 endpoint - endpoint *net.UDPAddr + endpoint p2p.EndPoint // 本机允许接收/发送的 ip 网段 allowedips []*net.IPNet // 连接所用对称加密密钥集 diff --git a/gold/link/listen.go b/gold/link/listen.go index 0b0faff..ec60a71 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -5,28 +5,26 @@ import ( "errors" "io" "net" - "net/netip" "runtime" "strconv" "sync" - "sync/atomic" "time" - "unsafe" "github.com/klauspost/compress/zstd" "github.com/sirupsen/logrus" "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/p2p" ) -// 监听本机 UDP endpoint -func (m *Me) listenudp() (conn *net.UDPConn, err error) { - conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.udpep.String()))) +// 监听本机 endpoint +func (m *Me) listen() (conn p2p.Conn, err error) { + conn, err = m.ep.Listen() if err != nil { return } - m.udpep = conn.LocalAddr() - logrus.Infoln("[listen] at", m.udpep) + m.ep = conn.LocalAddr() + logrus.Infoln("[listen] at", m.ep) go func() { recvtotlcnt := uint64(0) recvloopcnt := uint16(0) @@ -49,14 +47,14 @@ func (m *Me) listenudp() (conn *net.UDPConn, err error) { } logrus.Debugln("[listen] lock index", i) lbf := listenbuff[i*65536 : (i+1)*65536] - n, addr, err := conn.ReadFromUDP(lbf) + n, addr, err := conn.ReadFromPeer(lbf) if m.loop == nil || errors.Is(err, net.ErrClosed) { logrus.Warnln("[listen] quit listening") return } if err != nil { logrus.Warnln("[listen] read from udp err, reconnect:", err) - conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.udpep.String()))) + conn, err = m.ep.Listen() if err != nil { logrus.Errorln("[listen] reconnect udp err:", err) return @@ -81,13 +79,13 @@ func (m *Me) listenudp() (conn *net.UDPConn, err error) { i-- continue } - go m.listenthread(packet, addr, i, hasntfinished[i].Unlock) + go m.dispatch(packet, addr, i, hasntfinished[i].Unlock) } }() return } -func (m *Me) listenthread(packet *head.Packet, addr *net.UDPAddr, index int, finish func()) { +func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish func()) { defer finish() defer logrus.Debugln("[listen] unlock index", index) r := packet.Len() - len(packet.Data) @@ -103,9 +101,9 @@ func (m *Me) listenthread(packet *head.Packet, addr *net.UDPAddr, index int, fin packet.Put() return } - if p.endpoint == nil || p.endpoint.String() != addr.String() { + if p.endpoint == nil || !p.endpoint.Euqal(addr) { logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String()) - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.endpoint)), unsafe.Pointer(addr)) + p.endpoint = addr } switch { case p.IsToMe(packet.Dst): diff --git a/gold/link/me.go b/gold/link/me.go index 7985874..4309182 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -10,10 +10,12 @@ import ( "time" "github.com/FloatTech/ttl" - "github.com/fumiama/WireGold/gold/head" - "github.com/fumiama/WireGold/lower" "github.com/fumiama/water/waterutil" "github.com/sirupsen/logrus" + + "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/p2p" + "github.com/fumiama/WireGold/lower" ) // Me 是本机的抽象 @@ -27,16 +29,16 @@ type Me struct { me net.IP // 本机子网 subnet net.IPNet - // 本机 UDP endpoint - udpep net.Addr + // 本机 endpoint + ep p2p.EndPoint // 本机环回 link loop *Link // 本机活跃的所有连接 connections map[string]*Link // 读写同步锁 connmapmu sync.RWMutex - // 本机监听的 udp 连接, 用于向对端直接发送报文 - udpconn *net.UDPConn + // 本机监听的连接端点, 也用于向对端直接发送报文 + conn p2p.Conn // 本机网卡 nic lower.NICIO // 本机路由表 @@ -54,6 +56,7 @@ type Me struct { type MyConfig struct { MyIPwithMask string MyEndpoint string + Network string PrivateKey *[32]byte NIC lower.NICIO SrcPort, DstPort, MTU, SpeedLoop uint16 @@ -64,7 +67,11 @@ type MyConfig struct { func NewMe(cfg *MyConfig) (m Me) { m.privKey = *cfg.PrivateKey var err error - m.udpep, err = net.ResolveUDPAddr("udp", cfg.MyEndpoint) + nw := cfg.Network + if nw == "" { + nw = "udp" + } + m.ep, err = p2p.NewEndPoint(nw, cfg.MyEndpoint) if err != nil { panic(err) } @@ -74,7 +81,7 @@ func NewMe(cfg *MyConfig) (m Me) { } m.me = ip m.subnet = *cidr - m.udpconn, err = m.listenudp() + m.conn, err = m.listen() if err != nil { panic(err) } @@ -125,16 +132,16 @@ func (m *Me) MTU() uint16 { return m.mtu } -func (m *Me) EndPoint() net.Addr { - return m.udpep +func (m *Me) EndPoint() p2p.EndPoint { + return m.ep } func (m *Me) Close() error { m.loop = nil m.connections = nil - if m.udpconn != nil { - _ = m.udpconn.Close() - m.udpconn = nil + if m.conn != nil { + _ = m.conn.Close() + m.conn = nil } m.router = nil if m.recving != nil { diff --git a/gold/link/nat.go b/gold/link/nat.go index b05072b..9850400 100644 --- a/gold/link/nat.go +++ b/gold/link/nat.go @@ -2,12 +2,12 @@ package link import ( "encoding/json" - "net" "time" "github.com/sirupsen/logrus" "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/p2p" "github.com/fumiama/WireGold/helper" ) @@ -44,11 +44,11 @@ func (l *Link) onNotify(packet []byte) { // ---- 遍历 Notify,注册对方的 endpoint 到 // ---- connections,注意使用读写锁connmapmu for peer, ep := range notify { - addr, err := net.ResolveUDPAddr("udp", ep) + addr, err := p2p.NewEndPoint(ep[0], ep[1]) if err == nil { p, ok := l.me.IsInPeer(peer) if ok { - if p.endpoint.String() != ep { + if !p.endpoint.Euqal(addr) { p.endpoint = addr logrus.Infoln("[nat] notify set ep of peer", peer, "to", ep) } @@ -80,7 +80,10 @@ func (l *Link) onQuery(packet []byte) { for _, p := range peers { lnk, ok := l.me.IsInPeer(p) if ok { - notify[p] = lnk.endpoint.String() + notify[p] = [2]string{ + lnk.endpoint.Network(), + lnk.endpoint.String(), + } } } if len(notify) > 0 { diff --git a/gold/link/peer.go b/gold/link/peer.go index 23cb0c0..ae2e6c0 100644 --- a/gold/link/peer.go +++ b/gold/link/peer.go @@ -5,6 +5,7 @@ import ( "time" "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/gold/p2p" curve "github.com/fumiama/go-x25519" "github.com/sirupsen/logrus" "golang.org/x/crypto/chacha20poly1305" @@ -72,7 +73,7 @@ func (m *Me) AddPeer(cfg *PeerConfig) (l *Link) { } } if cfg.EndPoint != "" { - e, err := net.ResolveUDPAddr("udp", cfg.EndPoint) + e, err := p2p.NewEndPoint(m.ep.Network(), cfg.EndPoint) if err != nil { panic(err) } diff --git a/gold/link/send.go b/gold/link/send.go index f7d04fa..df512e9 100644 --- a/gold/link/send.go +++ b/gold/link/send.go @@ -103,11 +103,11 @@ func (l *Link) write(p *head.Packet, teatype uint8, additional uint16, datasz ui bound = len(d) endl = "." } - logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.udpconn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset)) + logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.conn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset)) logrus.Debugln("[send] data bytes", hex.EncodeToString(d[:bound]), endl) d = l.me.xorenc(d) logrus.Debugln("[send] data xored", hex.EncodeToString(d[:bound]), endl) - n, err = l.me.udpconn.WriteToUDP(d, peerep) + n, err = l.me.conn.WriteToPeer(d, peerep) cl() return } diff --git a/gold/p2p/define.go b/gold/p2p/define.go new file mode 100644 index 0000000..b6e1795 --- /dev/null +++ b/gold/p2p/define.go @@ -0,0 +1,40 @@ +package p2p + +import ( + "errors" + "fmt" + "io" + + "github.com/RomiChan/syncx" +) + +type Initializer func(endpoint string, configs ...any) EndPoint + +var factory syncx.Map[string, Initializer] + +func Register(network string, initializer Initializer) (actual Initializer, hasexist bool) { + return factory.LoadOrStore(network, initializer) +} + +type EndPoint interface { + fmt.Stringer + Network() string + Euqal(EndPoint) bool + Listen() (Conn, error) +} + +func NewEndPoint(network, endpoint string, configs ...any) (EndPoint, error) { + initializer, ok := factory.Load(network) + if !ok { + return nil, errors.New("network " + network + " not found") + } + return initializer(endpoint, configs...), nil +} + +type Conn interface { + io.Closer + fmt.Stringer // basically, the local address string + LocalAddr() EndPoint + ReadFromPeer([]byte) (int, EndPoint, error) + WriteToPeer([]byte, EndPoint) (int, error) +} diff --git a/gold/p2p/udp/init.go b/gold/p2p/udp/init.go new file mode 100644 index 0000000..0cdf014 --- /dev/null +++ b/gold/p2p/udp/init.go @@ -0,0 +1,26 @@ +package udp + +import ( + "errors" + "net" + "net/netip" + + "github.com/fumiama/WireGold/gold/p2p" +) + +var ( + ErrEndpointTypeMistatch = errors.New("endpoint type mismatch") +) + +func NewEndpoint(endpoint string, _ ...any) p2p.EndPoint { + return (*EndPoint)(net.UDPAddrFromAddrPort( + netip.MustParseAddrPort(endpoint), + )) +} + +func init() { + _, hasexist := p2p.Register("udp", NewEndpoint) + if hasexist { + panic("network udp has been registered") + } +} diff --git a/gold/p2p/udp/udp.go b/gold/p2p/udp/udp.go new file mode 100644 index 0000000..3b40374 --- /dev/null +++ b/gold/p2p/udp/udp.go @@ -0,0 +1,58 @@ +package udp + +import ( + "net" + + "github.com/fumiama/WireGold/gold/p2p" +) + +type EndPoint net.UDPAddr + +func (ep *EndPoint) String() string { + return (*net.UDPAddr)(ep).String() +} + +func (ep *EndPoint) Network() string { + return (*net.UDPAddr)(ep).Network() +} + +func (ep *EndPoint) Euqal(ep2 p2p.EndPoint) bool { + udpep2, ok := ep2.(*EndPoint) + if !ok { + return false + } + udpep1 := ep + return udpep1.IP.Equal(udpep2.IP) && udpep1.Port == udpep2.Port && udpep1.Zone == udpep2.Zone +} + +func (ep *EndPoint) Listen() (p2p.Conn, error) { + conn, err := net.ListenUDP((*net.UDPAddr)(ep).Network(), (*net.UDPAddr)(ep)) + return (*Conn)(conn), err +} + +type Conn net.UDPConn + +func (conn *Conn) Close() error { + return (*net.UDPConn)(conn).Close() +} + +func (conn *Conn) String() string { + return (*net.UDPConn)(conn).LocalAddr().String() +} + +func (conn *Conn) LocalAddr() p2p.EndPoint { + return NewEndpoint((*net.UDPConn)(conn).LocalAddr().String()) +} + +func (conn *Conn) ReadFromPeer(b []byte) (int, p2p.EndPoint, error) { + n, addr, err := (*net.UDPConn)(conn).ReadFromUDP(b) + return n, (*EndPoint)(addr), err +} + +func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (int, error) { + udpep, ok := ep.(*EndPoint) + if !ok { + return 0, ErrEndpointTypeMistatch + } + return (*net.UDPConn)(conn).WriteTo(b, (*net.UDPAddr)(udpep)) +} diff --git a/upper/services/tunnel/tunnel.go b/upper/services/tunnel/tunnel.go index 0a4bed1..4670d31 100644 --- a/upper/services/tunnel/tunnel.go +++ b/upper/services/tunnel/tunnel.go @@ -8,6 +8,8 @@ import ( "github.com/sirupsen/logrus" + _ "github.com/fumiama/WireGold/gold/p2p/udp" // support udp connection + "github.com/fumiama/WireGold/gold/head" "github.com/fumiama/WireGold/gold/link" ) diff --git a/upper/services/wg/wg.go b/upper/services/wg/wg.go index a23a3c1..0cc4bab 100644 --- a/upper/services/wg/wg.go +++ b/upper/services/wg/wg.go @@ -9,6 +9,8 @@ import ( curve "github.com/fumiama/go-x25519" "github.com/sirupsen/logrus" + _ "github.com/fumiama/WireGold/gold/p2p/udp" // support udp connection + "github.com/fumiama/WireGold/config" "github.com/fumiama/WireGold/gold/link" "github.com/fumiama/WireGold/helper" @@ -93,6 +95,7 @@ func (wg *WG) init(srcport, dstport uint16) { wg.me = link.NewMe(&link.MyConfig{ MyIPwithMask: wg.c.IP + "/32", MyEndpoint: wg.c.EndPoint, + Network: wg.c.Network, PrivateKey: &wg.key, NIC: lower.NewNIC(wg.c.IP, wg.c.SubNet, strconv.FormatInt(wg.c.MTU, 10), cidrs...), SrcPort: srcport,