diff --git a/gold/link/listen.go b/gold/link/listen.go index 535f6c8..68b864c 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -9,6 +9,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" "unsafe" "github.com/klauspost/compress/zstd" @@ -27,23 +28,36 @@ func (m *Me) listen() (conn *net.UDPConn, err error) { logrus.Infoln("[listen] at", m.myend) var mu sync.Mutex for i := 0; i < runtime.NumCPU()*4; i++ { - go m.listenthread(conn, &mu) + go m.listenthread(conn, &mu, i) } return } -func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) { +func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex, index int) { listenbuff := make([]byte, 65536) lbf := listenbuff + recvtotlcnt := 0 + recvloopcnt := 0 + recvlooptime := time.Now().UnixMilli() for { lbf = listenbuff mu.Lock() n, addr, err := conn.ReadFromUDP(lbf) mu.Unlock() if err != nil { - continue + logrus.Errorln("[listen] thread", index, "read from udp err:", err) + return } lbf = lbf[:n] + recvtotlcnt += n + recvloopcnt++ + if recvloopcnt >= 4096 { + now := time.Now().UnixMilli() + logrus.Infof("[listen] thread %d recv speed: %.2f B/s", index, float64(recvtotlcnt*1000)/float64(now-recvlooptime)) + recvtotlcnt = 0 + recvloopcnt = 0 + recvlooptime = now + } packet := m.wait(lbf) if packet == nil { continue @@ -51,19 +65,19 @@ func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) { sz := packet.TeaTypeDataSZ & 0x0000ffff r := int(sz) - len(packet.Data) if r > 0 { - logrus.Warnln("[listen] packet from endpoint", addr, "is smaller than it declared: drop it") + logrus.Warnln("[listen] thread", index, "packet from endpoint", addr, "is smaller than it declared: drop it") packet.Put() continue } p, ok := m.IsInPeer(packet.Src.String()) - logrus.Debugln("[listen] recv from endpoint", addr, "src", packet.Src, "dst", packet.Dst) + logrus.Debugln("[listen] thread", index, "recv from endpoint", addr, "src", packet.Src, "dst", packet.Dst) if !ok { - logrus.Warnln("[listen] packet from", packet.Src, "to", packet.Dst, "is refused") + logrus.Warnln("[listen] thread", index, "packet from", packet.Src, "to", packet.Dst, "is refused") packet.Put() continue } if p.endpoint == nil || p.endpoint.String() != addr.String() { - logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String()) + logrus.Infoln("[listen] thread", index, "set endpoint of peer", p.peerip, "to", addr.String()) atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.endpoint)), unsafe.Pointer(addr)) } switch { @@ -73,7 +87,7 @@ func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) { addt := packet.AdditionalData() packet.Data = p.DecodePreshared(addt, packet.Data) if packet.Data == nil { - logrus.Debugln("[listen] drop invalid preshared packet, addt:", addt) + logrus.Debugln("[listen] thread", index, "drop invalid preshared packet, addt:", addt) packet.Put() continue } @@ -83,13 +97,13 @@ func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) { packet.Data, err = io.ReadAll(dec) dec.Close() if err != nil { - logrus.Debugln("[listen] drop invalid zstd packet:", err) + logrus.Debugln("[listen] thread", index, "drop invalid zstd packet:", err) packet.Put() continue } } if !packet.IsVaildHash() { - logrus.Debugln("[listen] drop invalid hash packet") + logrus.Debugln("[listen] thread", index, "drop invalid hash packet") packet.Put() continue } @@ -99,10 +113,10 @@ func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) { case LINK_STATUS_DOWN: n, err = p.WriteAndPut(head.NewPacket(head.ProtoHello, m.SrcPort(), p.peerip, m.DstPort(), nil), false) if err == nil { - logrus.Debugln("[listen] send", n, "bytes hello ack packet") + logrus.Debugln("[listen] thread", index, "send", n, "bytes hello ack packet") p.status = LINK_STATUS_HALFUP } else { - logrus.Errorln("[listen] send hello ack packet error:", err) + logrus.Errorln("[listen] thread", index, "send hello ack packet error:", err) } case LINK_STATUS_HALFUP: p.status = LINK_STATUS_UP @@ -110,47 +124,47 @@ func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) { } packet.Put() case head.ProtoNotify: - logrus.Infoln("[listen] recv notify from", packet.Src) + logrus.Infoln("[listen] thread", index, "recv notify from", packet.Src) go p.onNotify(packet.Data) packet.Put() case head.ProtoQuery: - logrus.Infoln("[listen] recv query from", packet.Src) + logrus.Infoln("[listen] thread", index, "recv query from", packet.Src) go p.onQuery(packet.Data) packet.Put() case head.ProtoData: if p.pipe != nil { p.pipe <- packet - logrus.Debugln("[listen] deliver to pipe of", p.peerip) + logrus.Debugln("[listen] thread", index, "deliver to pipe of", p.peerip) } else { m.nic.Write(packet.Data) - logrus.Debugln("[listen] deliver", len(packet.Data), "bytes data to nic") + logrus.Debugln("[listen] thread", index, "deliver", len(packet.Data), "bytes data to nic") packet.Put() } default: - logrus.Warnln("[listen] recv unknown proto:", packet.Proto) + logrus.Warnln("[listen] thread", index, "recv unknown proto:", packet.Proto) packet.Put() } case p.Accept(packet.Dst): if !p.allowtrans { - logrus.Warnln("[listen] refused to trans packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort))) + logrus.Warnln("[listen] thread", index, "refused to trans packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort))) packet.Put() continue } // 转发 lnk := m.router.NextHop(packet.Dst.String()) if lnk == nil { - logrus.Warnln("[listen] transfer drop packet: nil nexthop") + logrus.Warnln("[listen] thread", index, "transfer drop packet: nil nexthop") packet.Put() continue } n, err = lnk.WriteAndPut(packet, true) if err == nil { - logrus.Debugln("[listen] trans", n, "bytes packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort))) + logrus.Debugln("[listen] thread", index, "trans", n, "bytes packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort))) } else { - logrus.Errorln("[listen] trans packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort)), "err:", err) + logrus.Errorln("[listen] thread", index, "trans packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort)), "err:", err) } default: - logrus.Warnln("[listen] packet dst", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort)), "is not in peers") + logrus.Warnln("[listen] thread", index, "packet dst", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort)), "is not in peers") packet.Put() } }