1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-12 04:43:22 +08:00

feat: add speed log

This commit is contained in:
源文雨
2023-08-05 09:16:00 +08:00
parent 803a18dd4d
commit 3c5b4ad058

View File

@@ -9,6 +9,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"unsafe" "unsafe"
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
@@ -27,23 +28,36 @@ func (m *Me) listen() (conn *net.UDPConn, err error) {
logrus.Infoln("[listen] at", m.myend) logrus.Infoln("[listen] at", m.myend)
var mu sync.Mutex var mu sync.Mutex
for i := 0; i < runtime.NumCPU()*4; i++ { for i := 0; i < runtime.NumCPU()*4; i++ {
go m.listenthread(conn, &mu) go m.listenthread(conn, &mu, i)
} }
return return
} }
func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) { func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex, index int) {
listenbuff := make([]byte, 65536) listenbuff := make([]byte, 65536)
lbf := listenbuff lbf := listenbuff
recvtotlcnt := 0
recvloopcnt := 0
recvlooptime := time.Now().UnixMilli()
for { for {
lbf = listenbuff lbf = listenbuff
mu.Lock() mu.Lock()
n, addr, err := conn.ReadFromUDP(lbf) n, addr, err := conn.ReadFromUDP(lbf)
mu.Unlock() mu.Unlock()
if err != nil { if err != nil {
continue logrus.Errorln("[listen] thread", index, "read from udp err:", err)
return
} }
lbf = lbf[:n] lbf = lbf[:n]
recvtotlcnt += n
recvloopcnt++
if recvloopcnt >= 4096 {
now := time.Now().UnixMilli()
logrus.Infof("[listen] thread %d recv speed: %.2f B/s", index, float64(recvtotlcnt*1000)/float64(now-recvlooptime))
recvtotlcnt = 0
recvloopcnt = 0
recvlooptime = now
}
packet := m.wait(lbf) packet := m.wait(lbf)
if packet == nil { if packet == nil {
continue continue
@@ -51,19 +65,19 @@ func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) {
sz := packet.TeaTypeDataSZ & 0x0000ffff sz := packet.TeaTypeDataSZ & 0x0000ffff
r := int(sz) - len(packet.Data) r := int(sz) - len(packet.Data)
if r > 0 { if r > 0 {
logrus.Warnln("[listen] packet from endpoint", addr, "is smaller than it declared: drop it") logrus.Warnln("[listen] thread", index, "packet from endpoint", addr, "is smaller than it declared: drop it")
packet.Put() packet.Put()
continue continue
} }
p, ok := m.IsInPeer(packet.Src.String()) p, ok := m.IsInPeer(packet.Src.String())
logrus.Debugln("[listen] recv from endpoint", addr, "src", packet.Src, "dst", packet.Dst) logrus.Debugln("[listen] thread", index, "recv from endpoint", addr, "src", packet.Src, "dst", packet.Dst)
if !ok { if !ok {
logrus.Warnln("[listen] packet from", packet.Src, "to", packet.Dst, "is refused") logrus.Warnln("[listen] thread", index, "packet from", packet.Src, "to", packet.Dst, "is refused")
packet.Put() packet.Put()
continue continue
} }
if p.endpoint == nil || p.endpoint.String() != addr.String() { if p.endpoint == nil || p.endpoint.String() != addr.String() {
logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String()) logrus.Infoln("[listen] thread", index, "set endpoint of peer", p.peerip, "to", addr.String())
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.endpoint)), unsafe.Pointer(addr)) atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.endpoint)), unsafe.Pointer(addr))
} }
switch { switch {
@@ -73,7 +87,7 @@ func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) {
addt := packet.AdditionalData() addt := packet.AdditionalData()
packet.Data = p.DecodePreshared(addt, packet.Data) packet.Data = p.DecodePreshared(addt, packet.Data)
if packet.Data == nil { if packet.Data == nil {
logrus.Debugln("[listen] drop invalid preshared packet, addt:", addt) logrus.Debugln("[listen] thread", index, "drop invalid preshared packet, addt:", addt)
packet.Put() packet.Put()
continue continue
} }
@@ -83,13 +97,13 @@ func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) {
packet.Data, err = io.ReadAll(dec) packet.Data, err = io.ReadAll(dec)
dec.Close() dec.Close()
if err != nil { if err != nil {
logrus.Debugln("[listen] drop invalid zstd packet:", err) logrus.Debugln("[listen] thread", index, "drop invalid zstd packet:", err)
packet.Put() packet.Put()
continue continue
} }
} }
if !packet.IsVaildHash() { if !packet.IsVaildHash() {
logrus.Debugln("[listen] drop invalid hash packet") logrus.Debugln("[listen] thread", index, "drop invalid hash packet")
packet.Put() packet.Put()
continue continue
} }
@@ -99,10 +113,10 @@ func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) {
case LINK_STATUS_DOWN: case LINK_STATUS_DOWN:
n, err = p.WriteAndPut(head.NewPacket(head.ProtoHello, m.SrcPort(), p.peerip, m.DstPort(), nil), false) n, err = p.WriteAndPut(head.NewPacket(head.ProtoHello, m.SrcPort(), p.peerip, m.DstPort(), nil), false)
if err == nil { if err == nil {
logrus.Debugln("[listen] send", n, "bytes hello ack packet") logrus.Debugln("[listen] thread", index, "send", n, "bytes hello ack packet")
p.status = LINK_STATUS_HALFUP p.status = LINK_STATUS_HALFUP
} else { } else {
logrus.Errorln("[listen] send hello ack packet error:", err) logrus.Errorln("[listen] thread", index, "send hello ack packet error:", err)
} }
case LINK_STATUS_HALFUP: case LINK_STATUS_HALFUP:
p.status = LINK_STATUS_UP p.status = LINK_STATUS_UP
@@ -110,47 +124,47 @@ func (m *Me) listenthread(conn *net.UDPConn, mu *sync.Mutex) {
} }
packet.Put() packet.Put()
case head.ProtoNotify: case head.ProtoNotify:
logrus.Infoln("[listen] recv notify from", packet.Src) logrus.Infoln("[listen] thread", index, "recv notify from", packet.Src)
go p.onNotify(packet.Data) go p.onNotify(packet.Data)
packet.Put() packet.Put()
case head.ProtoQuery: case head.ProtoQuery:
logrus.Infoln("[listen] recv query from", packet.Src) logrus.Infoln("[listen] thread", index, "recv query from", packet.Src)
go p.onQuery(packet.Data) go p.onQuery(packet.Data)
packet.Put() packet.Put()
case head.ProtoData: case head.ProtoData:
if p.pipe != nil { if p.pipe != nil {
p.pipe <- packet p.pipe <- packet
logrus.Debugln("[listen] deliver to pipe of", p.peerip) logrus.Debugln("[listen] thread", index, "deliver to pipe of", p.peerip)
} else { } else {
m.nic.Write(packet.Data) m.nic.Write(packet.Data)
logrus.Debugln("[listen] deliver", len(packet.Data), "bytes data to nic") logrus.Debugln("[listen] thread", index, "deliver", len(packet.Data), "bytes data to nic")
packet.Put() packet.Put()
} }
default: default:
logrus.Warnln("[listen] recv unknown proto:", packet.Proto) logrus.Warnln("[listen] thread", index, "recv unknown proto:", packet.Proto)
packet.Put() packet.Put()
} }
case p.Accept(packet.Dst): case p.Accept(packet.Dst):
if !p.allowtrans { if !p.allowtrans {
logrus.Warnln("[listen] refused to trans packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort))) logrus.Warnln("[listen] thread", index, "refused to trans packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort)))
packet.Put() packet.Put()
continue continue
} }
// 转发 // 转发
lnk := m.router.NextHop(packet.Dst.String()) lnk := m.router.NextHop(packet.Dst.String())
if lnk == nil { if lnk == nil {
logrus.Warnln("[listen] transfer drop packet: nil nexthop") logrus.Warnln("[listen] thread", index, "transfer drop packet: nil nexthop")
packet.Put() packet.Put()
continue continue
} }
n, err = lnk.WriteAndPut(packet, true) n, err = lnk.WriteAndPut(packet, true)
if err == nil { if err == nil {
logrus.Debugln("[listen] trans", n, "bytes packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort))) logrus.Debugln("[listen] thread", index, "trans", n, "bytes packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort)))
} else { } else {
logrus.Errorln("[listen] trans packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort)), "err:", err) logrus.Errorln("[listen] thread", index, "trans packet to", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort)), "err:", err)
} }
default: default:
logrus.Warnln("[listen] packet dst", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort)), "is not in peers") logrus.Warnln("[listen] thread", index, "packet dst", packet.Dst.String()+":"+strconv.Itoa(int(packet.DstPort)), "is not in peers")
packet.Put() packet.Put()
} }
} }