1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-27 06:10:26 +08:00

feat: split udp protocol to folder p2p

This commit is contained in:
源文雨
2024-07-14 22:26:44 +09:00
parent 4a8e848673
commit 32af3ce142
16 changed files with 180 additions and 49 deletions

View File

@@ -1,7 +1,7 @@
package head
// Notify 是 map[peerip]endpoint
type Notify = map[string]string
// Notify 是 map[peerip]{network, endpoint}
type Notify = map[string][2]string
// Query 是 peerips 组成的数组
type Query = []string

View File

@@ -7,6 +7,7 @@ import (
"sync/atomic"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/gold/p2p"
"github.com/fumiama/WireGold/helper"
base14 "github.com/fumiama/go-base16384"
)
@@ -24,7 +25,7 @@ type Link struct {
// peer 的虚拟 ip
peerip net.IP
// peer 的公网 endpoint
endpoint *net.UDPAddr
endpoint p2p.EndPoint
// 本机允许接收/发送的 ip 网段
allowedips []*net.IPNet
// 连接所用对称加密密钥集

View File

@@ -5,28 +5,26 @@ import (
"errors"
"io"
"net"
"net/netip"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/klauspost/compress/zstd"
"github.com/sirupsen/logrus"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/gold/p2p"
)
// 监听本机 UDP endpoint
func (m *Me) listenudp() (conn *net.UDPConn, err error) {
conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.udpep.String())))
// 监听本机 endpoint
func (m *Me) listen() (conn p2p.Conn, err error) {
conn, err = m.ep.Listen()
if err != nil {
return
}
m.udpep = conn.LocalAddr()
logrus.Infoln("[listen] at", m.udpep)
m.ep = conn.LocalAddr()
logrus.Infoln("[listen] at", m.ep)
go func() {
recvtotlcnt := uint64(0)
recvloopcnt := uint16(0)
@@ -49,14 +47,14 @@ func (m *Me) listenudp() (conn *net.UDPConn, err error) {
}
logrus.Debugln("[listen] lock index", i)
lbf := listenbuff[i*65536 : (i+1)*65536]
n, addr, err := conn.ReadFromUDP(lbf)
n, addr, err := conn.ReadFromPeer(lbf)
if m.loop == nil || errors.Is(err, net.ErrClosed) {
logrus.Warnln("[listen] quit listening")
return
}
if err != nil {
logrus.Warnln("[listen] read from udp err, reconnect:", err)
conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.udpep.String())))
conn, err = m.ep.Listen()
if err != nil {
logrus.Errorln("[listen] reconnect udp err:", err)
return
@@ -81,13 +79,13 @@ func (m *Me) listenudp() (conn *net.UDPConn, err error) {
i--
continue
}
go m.listenthread(packet, addr, i, hasntfinished[i].Unlock)
go m.dispatch(packet, addr, i, hasntfinished[i].Unlock)
}
}()
return
}
func (m *Me) listenthread(packet *head.Packet, addr *net.UDPAddr, index int, finish func()) {
func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish func()) {
defer finish()
defer logrus.Debugln("[listen] unlock index", index)
r := packet.Len() - len(packet.Data)
@@ -103,9 +101,9 @@ func (m *Me) listenthread(packet *head.Packet, addr *net.UDPAddr, index int, fin
packet.Put()
return
}
if p.endpoint == nil || p.endpoint.String() != addr.String() {
if p.endpoint == nil || !p.endpoint.Euqal(addr) {
logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String())
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.endpoint)), unsafe.Pointer(addr))
p.endpoint = addr
}
switch {
case p.IsToMe(packet.Dst):

View File

@@ -10,10 +10,12 @@ import (
"time"
"github.com/FloatTech/ttl"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/lower"
"github.com/fumiama/water/waterutil"
"github.com/sirupsen/logrus"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/gold/p2p"
"github.com/fumiama/WireGold/lower"
)
// Me 是本机的抽象
@@ -27,16 +29,16 @@ type Me struct {
me net.IP
// 本机子网
subnet net.IPNet
// 本机 UDP endpoint
udpep net.Addr
// 本机 endpoint
ep p2p.EndPoint
// 本机环回 link
loop *Link
// 本机活跃的所有连接
connections map[string]*Link
// 读写同步锁
connmapmu sync.RWMutex
// 本机监听的 udp 连接, 用于向对端直接发送报文
udpconn *net.UDPConn
// 本机监听的连接端点, 用于向对端直接发送报文
conn p2p.Conn
// 本机网卡
nic lower.NICIO
// 本机路由表
@@ -54,6 +56,7 @@ type Me struct {
type MyConfig struct {
MyIPwithMask string
MyEndpoint string
Network string
PrivateKey *[32]byte
NIC lower.NICIO
SrcPort, DstPort, MTU, SpeedLoop uint16
@@ -64,7 +67,11 @@ type MyConfig struct {
func NewMe(cfg *MyConfig) (m Me) {
m.privKey = *cfg.PrivateKey
var err error
m.udpep, err = net.ResolveUDPAddr("udp", cfg.MyEndpoint)
nw := cfg.Network
if nw == "" {
nw = "udp"
}
m.ep, err = p2p.NewEndPoint(nw, cfg.MyEndpoint)
if err != nil {
panic(err)
}
@@ -74,7 +81,7 @@ func NewMe(cfg *MyConfig) (m Me) {
}
m.me = ip
m.subnet = *cidr
m.udpconn, err = m.listenudp()
m.conn, err = m.listen()
if err != nil {
panic(err)
}
@@ -125,16 +132,16 @@ func (m *Me) MTU() uint16 {
return m.mtu
}
func (m *Me) EndPoint() net.Addr {
return m.udpep
func (m *Me) EndPoint() p2p.EndPoint {
return m.ep
}
func (m *Me) Close() error {
m.loop = nil
m.connections = nil
if m.udpconn != nil {
_ = m.udpconn.Close()
m.udpconn = nil
if m.conn != nil {
_ = m.conn.Close()
m.conn = nil
}
m.router = nil
if m.recving != nil {

View File

@@ -2,12 +2,12 @@ package link
import (
"encoding/json"
"net"
"time"
"github.com/sirupsen/logrus"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/gold/p2p"
"github.com/fumiama/WireGold/helper"
)
@@ -44,11 +44,11 @@ func (l *Link) onNotify(packet []byte) {
// ---- 遍历 Notify注册对方的 endpoint 到
// ---- connections注意使用读写锁connmapmu
for peer, ep := range notify {
addr, err := net.ResolveUDPAddr("udp", ep)
addr, err := p2p.NewEndPoint(ep[0], ep[1])
if err == nil {
p, ok := l.me.IsInPeer(peer)
if ok {
if p.endpoint.String() != ep {
if !p.endpoint.Euqal(addr) {
p.endpoint = addr
logrus.Infoln("[nat] notify set ep of peer", peer, "to", ep)
}
@@ -80,7 +80,10 @@ func (l *Link) onQuery(packet []byte) {
for _, p := range peers {
lnk, ok := l.me.IsInPeer(p)
if ok {
notify[p] = lnk.endpoint.String()
notify[p] = [2]string{
lnk.endpoint.Network(),
lnk.endpoint.String(),
}
}
}
if len(notify) > 0 {

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/gold/p2p"
curve "github.com/fumiama/go-x25519"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/chacha20poly1305"
@@ -72,7 +73,7 @@ func (m *Me) AddPeer(cfg *PeerConfig) (l *Link) {
}
}
if cfg.EndPoint != "" {
e, err := net.ResolveUDPAddr("udp", cfg.EndPoint)
e, err := p2p.NewEndPoint(m.ep.Network(), cfg.EndPoint)
if err != nil {
panic(err)
}

View File

@@ -103,11 +103,11 @@ func (l *Link) write(p *head.Packet, teatype uint8, additional uint16, datasz ui
bound = len(d)
endl = "."
}
logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.udpconn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset))
logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.conn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset))
logrus.Debugln("[send] data bytes", hex.EncodeToString(d[:bound]), endl)
d = l.me.xorenc(d)
logrus.Debugln("[send] data xored", hex.EncodeToString(d[:bound]), endl)
n, err = l.me.udpconn.WriteToUDP(d, peerep)
n, err = l.me.conn.WriteToPeer(d, peerep)
cl()
return
}

40
gold/p2p/define.go Normal file
View File

@@ -0,0 +1,40 @@
package p2p
import (
"errors"
"fmt"
"io"
"github.com/RomiChan/syncx"
)
type Initializer func(endpoint string, configs ...any) EndPoint
var factory syncx.Map[string, Initializer]
func Register(network string, initializer Initializer) (actual Initializer, hasexist bool) {
return factory.LoadOrStore(network, initializer)
}
type EndPoint interface {
fmt.Stringer
Network() string
Euqal(EndPoint) bool
Listen() (Conn, error)
}
func NewEndPoint(network, endpoint string, configs ...any) (EndPoint, error) {
initializer, ok := factory.Load(network)
if !ok {
return nil, errors.New("network " + network + " not found")
}
return initializer(endpoint, configs...), nil
}
type Conn interface {
io.Closer
fmt.Stringer // basically, the local address string
LocalAddr() EndPoint
ReadFromPeer([]byte) (int, EndPoint, error)
WriteToPeer([]byte, EndPoint) (int, error)
}

26
gold/p2p/udp/init.go Normal file
View File

@@ -0,0 +1,26 @@
package udp
import (
"errors"
"net"
"net/netip"
"github.com/fumiama/WireGold/gold/p2p"
)
var (
ErrEndpointTypeMistatch = errors.New("endpoint type mismatch")
)
func NewEndpoint(endpoint string, _ ...any) p2p.EndPoint {
return (*EndPoint)(net.UDPAddrFromAddrPort(
netip.MustParseAddrPort(endpoint),
))
}
func init() {
_, hasexist := p2p.Register("udp", NewEndpoint)
if hasexist {
panic("network udp has been registered")
}
}

58
gold/p2p/udp/udp.go Normal file
View File

@@ -0,0 +1,58 @@
package udp
import (
"net"
"github.com/fumiama/WireGold/gold/p2p"
)
type EndPoint net.UDPAddr
func (ep *EndPoint) String() string {
return (*net.UDPAddr)(ep).String()
}
func (ep *EndPoint) Network() string {
return (*net.UDPAddr)(ep).Network()
}
func (ep *EndPoint) Euqal(ep2 p2p.EndPoint) bool {
udpep2, ok := ep2.(*EndPoint)
if !ok {
return false
}
udpep1 := ep
return udpep1.IP.Equal(udpep2.IP) && udpep1.Port == udpep2.Port && udpep1.Zone == udpep2.Zone
}
func (ep *EndPoint) Listen() (p2p.Conn, error) {
conn, err := net.ListenUDP((*net.UDPAddr)(ep).Network(), (*net.UDPAddr)(ep))
return (*Conn)(conn), err
}
type Conn net.UDPConn
func (conn *Conn) Close() error {
return (*net.UDPConn)(conn).Close()
}
func (conn *Conn) String() string {
return (*net.UDPConn)(conn).LocalAddr().String()
}
func (conn *Conn) LocalAddr() p2p.EndPoint {
return NewEndpoint((*net.UDPConn)(conn).LocalAddr().String())
}
func (conn *Conn) ReadFromPeer(b []byte) (int, p2p.EndPoint, error) {
n, addr, err := (*net.UDPConn)(conn).ReadFromUDP(b)
return n, (*EndPoint)(addr), err
}
func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (int, error) {
udpep, ok := ep.(*EndPoint)
if !ok {
return 0, ErrEndpointTypeMistatch
}
return (*net.UDPConn)(conn).WriteTo(b, (*net.UDPAddr)(udpep))
}