diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..be85b79 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,43 @@ +name: golang-ci + +on: [push, pull_request] + +jobs: + + golang-ci: + name: CI + runs-on: ubuntu-latest + steps: + - name: Set up Go 1.x + uses: actions/setup-go@master + with: + go-version: ^1.20 + + - name: Check out code into the Go module directory + uses: actions/checkout@master + + - name: Get dependencies + run: go mod tidy + + - name: Build + run: go build -v ./... + + - name: Test + run: go test $(go list ./...) + + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - name: Set up Go 1.x + uses: actions/setup-go@master + with: + go-version: ^1.20 + + - name: Check out code into the Go module directory + uses: actions/checkout@master + + - name: golangci-lint + uses: golangci/golangci-lint-action@master + with: + version: latest diff --git a/config/cfg.go b/config/cfg.go index 9e3e7f0..cb0e747 100644 --- a/config/cfg.go +++ b/config/cfg.go @@ -24,6 +24,7 @@ type Config struct { PrivateKey string `yaml:"PrivateKey"` EndPoint string `yaml:"EndPoint"` MTU int64 `yaml:"MTU"` + SpeedLoop uint16 `yaml:"SpeedLoop"` Mask uint64 `yaml:"Mask"` // Mask 是异或报文所用掩码, 必须保证各端统一 Peers []Peer `yaml:"Peers"` } diff --git a/gold/link/listen.go b/gold/link/listen.go index e2da4a6..965f236 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -7,6 +7,7 @@ import ( "net/netip" "runtime" "strconv" + "sync" "sync/atomic" "time" "unsafe" @@ -17,17 +18,17 @@ import ( "github.com/fumiama/WireGold/gold/head" ) -// 监听本机 endpoint -func (m *Me) listen() (conn *net.UDPConn, err error) { - conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.myend.String()))) +// 监听本机 UDP endpoint +func (m *Me) listenudp() (conn *net.UDPConn, err error) { + conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.udpep.String()))) if err != nil { return } - m.myend = conn.LocalAddr() - logrus.Infoln("[listen] at", m.myend) + m.udpep = conn.LocalAddr() + logrus.Infoln("[listen] at", m.udpep) go func() { - recvtotlcnt := 0 - recvloopcnt := 0 + recvtotlcnt := uint64(0) + recvloopcnt := uint16(0) recvlooptime := time.Now().UnixMilli() n := runtime.NumCPU() if n > 64 { @@ -35,42 +36,45 @@ func (m *Me) listen() (conn *net.UDPConn, err error) { } logrus.Infoln("[listen] use cpu num:", n) listenbuff := make([]byte, 65536*n) - hasntfinished := make([]bool, n) + hasntfinished := make([]sync.Mutex, n) for i := 0; err == nil; i++ { i %= n - for hasntfinished[i] { - time.Sleep(time.Millisecond) + for !hasntfinished[i].TryLock() { i++ i %= n + if i == 0 { // looked up a full round + time.Sleep(time.Millisecond * 10) + } } + logrus.Debugln("[listen] lock index", i) lbf := listenbuff[i*65536 : (i+1)*65536] n, addr, err := conn.ReadFromUDP(lbf) if err != nil { logrus.Warnln("[listen] read from udp err, reconnect:", err) - conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.myend.String()))) + conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.udpep.String()))) if err != nil { logrus.Errorln("[listen] reconnect udp err:", err) return } + hasntfinished[i].Unlock() i-- continue } - recvtotlcnt += n + recvtotlcnt += uint64(n) recvloopcnt++ - if recvloopcnt >= 4096 { + if recvloopcnt%m.speedloop == 0 { now := time.Now().UnixMilli() logrus.Infof("[listen] recv avg speed: %.2f KB/s", float64(recvtotlcnt)/float64(now-recvlooptime)) recvtotlcnt = 0 - recvloopcnt = 0 recvlooptime = now } packet := m.wait(lbf[:n]) if packet == nil { + hasntfinished[i].Unlock() i-- continue } - hasntfinished[i] = true - go m.listenthread(packet, addr, i, func() { hasntfinished[i] = false }) + go m.listenthread(packet, addr, i, hasntfinished[i].Unlock) } }() return diff --git a/gold/link/me.go b/gold/link/me.go index d0a04f5..70d4012 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -27,16 +27,16 @@ type Me struct { me net.IP // 本机子网 subnet net.IPNet - // 本机 endpoint - myend net.Addr + // 本机 UDP endpoint + udpep net.Addr // 本机环回 link loop *Link // 本机活跃的所有连接 connections map[string]*Link // 读写同步锁 connmapmu sync.RWMutex - // 本机监听的 endpoint - myep *net.UDPConn + // 本机监听的 udp 连接, 用于向对端直接发送报文 + udpconn *net.UDPConn // 本机网卡 nic lower.NICIO // 本机路由表 @@ -46,25 +46,25 @@ type Me struct { // 抗重放攻击记录池 recved *ttl.Cache[uint64, bool] // 本机上层配置 - srcport, dstport, mtu uint16 + srcport, dstport, mtu, speedloop uint16 // 报头掩码 mask uint64 } type MyConfig struct { - MyIPwithMask string - MyEndpoint string - PrivateKey *[32]byte - NIC lower.NICIO - SrcPort, DstPort, MTU uint16 - Mask uint64 + MyIPwithMask string + MyEndpoint string + PrivateKey *[32]byte + NIC lower.NICIO + SrcPort, DstPort, MTU, SpeedLoop uint16 + Mask uint64 } // NewMe 设置本机参数 func NewMe(cfg *MyConfig) (m Me) { m.privKey = *cfg.PrivateKey var err error - m.myend, err = net.ResolveUDPAddr("udp", cfg.MyEndpoint) + m.udpep, err = net.ResolveUDPAddr("udp", cfg.MyEndpoint) if err != nil { panic(err) } @@ -74,7 +74,7 @@ func NewMe(cfg *MyConfig) (m Me) { } m.me = ip m.subnet = *cidr - m.myep, err = m.listen() + m.udpconn, err = m.listenudp() if err != nil { panic(err) } @@ -96,6 +96,10 @@ func NewMe(cfg *MyConfig) (m Me) { m.srcport = cfg.SrcPort m.dstport = cfg.DstPort m.mtu = cfg.MTU & 0xfff8 + m.speedloop = cfg.SpeedLoop + if m.speedloop == 0 { + m.speedloop = 4096 + } m.mask = cfg.Mask var buf [8]byte binary.BigEndian.PutUint64(buf[:], m.mask) diff --git a/gold/link/send.go b/gold/link/send.go index 0366747..d10d721 100644 --- a/gold/link/send.go +++ b/gold/link/send.go @@ -98,23 +98,21 @@ func (l *Link) write(p *head.Packet, teatype uint8, additional uint16, datasz ui if d == nil { return 0, errors.New("[send] ttl exceeded") } - if err == nil { - peerep := l.endpoint - if peerep == nil { - return 0, errors.New("[send] nil endpoint of " + p.Dst.String()) - } - bound := 64 - endl := "..." - if len(d) < bound { - bound = len(d) - endl = "." - } - logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.myep.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset)) - logrus.Debugln("[send] data bytes", hex.EncodeToString(d[:bound]), endl) - d = l.me.xorenc(d) - logrus.Debugln("[send] data xored", hex.EncodeToString(d[:bound]), endl) - n, err = l.me.myep.WriteToUDP(d, peerep) - cl() + peerep := l.endpoint + if peerep == nil { + return 0, errors.New("[send] nil endpoint of " + p.Dst.String()) } + bound := 64 + endl := "..." + if len(d) < bound { + bound = len(d) + endl = "." + } + logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.udpconn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset)) + logrus.Debugln("[send] data bytes", hex.EncodeToString(d[:bound]), endl) + d = l.me.xorenc(d) + logrus.Debugln("[send] data xored", hex.EncodeToString(d[:bound]), endl) + n, err = l.me.udpconn.WriteToUDP(d, peerep) + cl() return } diff --git a/upper/services/wg/wg.go b/upper/services/wg/wg.go index f873610..1208134 100644 --- a/upper/services/wg/wg.go +++ b/upper/services/wg/wg.go @@ -97,6 +97,7 @@ func (wg *WG) init(srcport, dstport uint16) { SrcPort: srcport, DstPort: dstport, MTU: uint16(wg.c.MTU), + SpeedLoop: wg.c.SpeedLoop, Mask: wg.c.Mask, })