diff --git a/gold/link/listen.go b/gold/link/listen.go index 412ab30..53a13e2 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -5,7 +5,6 @@ import ( "net" "runtime" "strconv" - "sync" "sync/atomic" "time" "unsafe" @@ -31,40 +30,10 @@ func (m *Me) listen() (conn p2p.Conn, err error) { m.ep = conn.LocalAddr() logrus.Infoln("[listen] at", m.ep) go func() { - n := uint(runtime.NumCPU()) - if n > 64 { - n = 64 // 只用最多 64 核 - } - logrus.Infoln("[listen] use cpu num:", n) - listenbuf := make([]byte, lstnbufgragsz*n) - hasntfinished := make([]sync.Mutex, n) for { - usenewbuf := false - i := uint(0) - for !hasntfinished[i].TryLock() { - i++ - i %= n - if i == 0 { // looked up a full round, make a new buf - usenewbuf = true - if config.ShowDebugLog { - logrus.Debugln("[listen] use new buf") - } - break - } - } - if config.ShowDebugLog && !usenewbuf { - logrus.Debugln("[listen] lock index", i) - } - var lbf pbuf.Bytes - if usenewbuf { - lbf = pbuf.NewBytes(lstnbufgragsz) - } else { - if config.ShowDebugLog { - logrus.Debugln("[listen] take index", i, "slice", i*lstnbufgragsz, (i+1)*lstnbufgragsz, "cap", lstnbufgragsz) - } - lbf = pbuf.ParseBytes(listenbuf[i*lstnbufgragsz : (i+1)*lstnbufgragsz : (i+1)*lstnbufgragsz]...) - } + lbf := pbuf.NewBytes(lstnbufgragsz) n, addr, err := conn.ReadFromPeer(lbf.Bytes()) + lbf.KeepAlive() if m.connections == nil || errors.Is(err, net.ErrClosed) { logrus.Warnln("[listen] quit listening") return @@ -76,13 +45,6 @@ func (m *Me) listen() (conn p2p.Conn, err error) { logrus.Errorln("[listen] reconnect udp err:", err) return } - if !usenewbuf { - if config.ShowDebugLog { - logrus.Debugln("[listen] unlock index", i) - } - hasntfinished[i].Unlock() - i-- - } continue } if n <= 0 { @@ -91,17 +53,13 @@ func (m *Me) listen() (conn p2p.Conn, err error) { } continue } - index := -1 - if !usenewbuf { - index = int(i) - } - go m.waitordispatch(index, addr, lbf.Trans().SliceTo(n), hasntfinished) + go m.waitordispatch(addr, lbf, n) } }() return } -func (m *Me) waitordispatch(index int, addr p2p.EndPoint, buf pbuf.Bytes, hasntfinished []sync.Mutex) { +func (m *Me) waitordispatch(addr p2p.EndPoint, buf pbuf.Bytes, n int) { recvtotlcnt := atomic.AddUint64(&m.recvtotlcnt, uint64(buf.Len())) recvloopcnt := atomic.AddUintptr(&m.recvloopcnt, 1) recvlooptime := atomic.LoadInt64(&m.recvlooptime) @@ -111,56 +69,41 @@ func (m *Me) waitordispatch(index int, addr p2p.EndPoint, buf pbuf.Bytes, hasntf atomic.StoreUint64(&m.recvtotlcnt, 0) atomic.StoreInt64(&m.recvlooptime, now) } - packet := m.wait(buf.Trans().Bytes()) + packet := m.wait(buf.SliceTo(n).Bytes()) + buf.KeepAlive() if packet == nil { - if index < 0 { - if config.ShowDebugLog { - logrus.Debugln("[listen] queue waiting") - } - return - } if config.ShowDebugLog { - logrus.Debugln("[listen] queue waiting, unlock index", index) + logrus.Debugln("[listen] queue waiting") } - hasntfinished[index].Unlock() return } if config.ShowDebugLog { - logrus.Debugln("[listen] index", index, "dispatch", len(packet.Pointer().UnsafeBody()), "bytes packet") + logrus.Debugln("[listen] dispatch", len(packet.Pointer().UnsafeBody()), "bytes packet") } - if index >= 0 { - defer hasntfinished[index].Unlock() - m.dispatch(packet, addr, index) - return - } - m.dispatch(packet, addr, index) + m.dispatch(packet, addr) } -func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint, index int) { - if config.ShowDebugLog { - defer logrus.Debugln("[listen] dispatched, unlock index", index) - logrus.Debugln("[listen] start dispatching index", index) - } +func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint) { pp := packet.Pointer r := pp().Len() - pp().BodyLen() if r > 0 { - logrus.Warnln("[listen] @", index, "packet from endpoint", addr, "len", pp().BodyLen(), "is smaller than it declared len", pp().Len(), ", drop it") + logrus.Warnln("[listen] packet from endpoint", addr, "len", pp().BodyLen(), "is smaller than it declared len", pp().Len(), ", drop it") return } p, ok := m.IsInPeer(pp().Src.String()) if config.ShowDebugLog { - logrus.Debugln("[listen] @", index, "recv from endpoint", addr, "src", pp().Src, "dst", pp().Dst) + logrus.Debugln("[listen] recv from endpoint", addr, "src", pp().Src, "dst", pp().Dst) } if !ok { - logrus.Warnln("[listen] @", index, "packet from", pp().Src, "to", pp().Dst, "is refused") + logrus.Warnln("[listen] packet from", pp().Src, "to", pp().Dst, "is refused") return } if helper.IsNilInterface(p.endpoint) || !p.endpoint.Euqal(addr) { if m.ep.Network() == "tcp" && !addr.Euqal(p.endpoint) { - logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String()) + logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String()) p.endpoint = addr } else { // others are all no status link - logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String()) + logrus.Infoln("[listen] set endpoint of peer", p.peerip, "to", addr.String()) p.endpoint = addr } } @@ -169,7 +112,7 @@ func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint, index switch { case p.IsToMe(pp().Dst): if !p.Accept(pp().Src) { - logrus.Warnln("[listen] @", index, "refused packet from", pp().Src.String()+":"+strconv.Itoa(int(pp().SrcPort))) + logrus.Warnln("[listen] refused packet from", pp().Src.String()+":"+strconv.Itoa(int(pp().SrcPort))) return } addt := pp().AdditionalData() @@ -177,7 +120,7 @@ func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint, index data, err := p.decode(pp().CipherIndex(), addt, pp().TransBody().Bytes()) if err != nil { if config.ShowDebugLog { - logrus.Debugln("[listen] @", index, "drop invalid packet key idx:", pp().CipherIndex(), "addt:", addt, "err:", err) + logrus.Debugln("[listen] drop invalid packet key idx:", pp().CipherIndex(), "addt:", addt, "err:", err) } return } @@ -185,19 +128,19 @@ func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint, index dat, err := decodezstd(data.Trans().Bytes()) if err != nil { if config.ShowDebugLog { - logrus.Debugln("[listen] @", index, "drop invalid zstd packet:", err) + logrus.Debugln("[listen] drop invalid zstd packet:", err) } return } if config.ShowDebugLog { - logrus.Debugln("[listen] @", index, "zstd decoded len:", dat.Len()) + logrus.Debugln("[listen] zstd decoded len:", dat.Len()) } data = dat } pp().SetBody(data) if !pp().IsVaildHash() { if config.ShowDebugLog { - logrus.Debugln("[listen] @", index, "drop invalid hash packet") + logrus.Debugln("[listen] drop invalid hash packet") } return } @@ -205,63 +148,63 @@ func (m *Me) dispatch(packet *orbyte.Item[head.Packet], addr p2p.EndPoint, index case head.ProtoHello: switch { case len(pp().UnsafeBody()) == 0: - logrus.Warnln("[listen] @", index, "recv old hello packet, do nothing") + logrus.Warnln("[listen] recv old hello packet, do nothing") case pp().UnsafeBody()[0] == byte(head.HelloPing): n, err := p.WritePacket(head.NewPacketPartial( head.ProtoHello, m.SrcPort(), p.peerip, m.DstPort(), pbuf.ParseBytes(byte(head.HelloPong))), false) if err == nil { - logrus.Infoln("[listen] @", index, "recv hello, send", n, "bytes hello ack packet") + logrus.Infoln("[listen] recv hello, send", n, "bytes hello ack packet") } else { - logrus.Errorln("[listen] @", index, "send hello ack packet error:", err) + logrus.Errorln("[listen] send hello ack packet error:", err) } default: - logrus.Infoln("[listen] @", index, "recv hello ack packet, do nothing") + logrus.Infoln("[listen] recv hello ack packet, do nothing") } case head.ProtoNotify: - logrus.Infoln("[listen] @", index, "recv notify from", pp().Src) + logrus.Infoln("[listen] recv notify from", pp().Src) p.onNotify(pp().UnsafeBody()) runtime.KeepAlive(packet) case head.ProtoQuery: - logrus.Infoln("[listen] @", index, "recv query from", pp().Src) + logrus.Infoln("[listen] recv query from", pp().Src) p.onQuery(pp().UnsafeBody()) runtime.KeepAlive(packet) case head.ProtoData: if p.pipe != nil { p.pipe <- packet.Copy() if config.ShowDebugLog { - logrus.Debugln("[listen] @", index, "deliver to pipe of", p.peerip) + logrus.Debugln("[listen] deliver to pipe of", p.peerip) } } else { _, err := m.nic.Write(pp().UnsafeBody()) if err != nil { - logrus.Errorln("[listen] @", index, "deliver", pp().BodyLen(), "bytes data to nic err:", err) + logrus.Errorln("[listen] deliver", pp().BodyLen(), "bytes data to nic err:", err) } else if config.ShowDebugLog { - logrus.Debugln("[listen] @", index, "deliver", pp().BodyLen(), "bytes data to nic") + logrus.Debugln("[listen] deliver", pp().BodyLen(), "bytes data to nic") } } default: - logrus.Warnln("[listen] @", index, "recv unknown proto:", pp().Proto) + logrus.Warnln("[listen] recv unknown proto:", pp().Proto) } case p.Accept(pp().Dst): if !p.allowtrans { - logrus.Warnln("[listen] @", index, "refused to trans packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort))) + logrus.Warnln("[listen] refused to trans packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort))) return } // 转发 lnk := m.router.NextHop(pp().Dst.String()) if lnk == nil { - logrus.Warnln("[listen] @", index, "transfer drop packet: nil nexthop") + logrus.Warnln("[listen] transfer drop packet: nil nexthop") return } n, err := lnk.WritePacket(packet, true) if err == nil { if config.ShowDebugLog { - logrus.Debugln("[listen] @", index, "trans", n, "bytes packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort))) + logrus.Debugln("[listen] trans", n, "bytes packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort))) } } else { - logrus.Errorln("[listen] @", index, "trans packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)), "err:", err) + logrus.Errorln("[listen] trans packet to", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)), "err:", err) } default: - logrus.Warnln("[listen] @", index, "packet dst", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)), "is not in peers") + logrus.Warnln("[listen] packet dst", pp().Dst.String()+":"+strconv.Itoa(int(pp().DstPort)), "is not in peers") } }