diff --git a/gold/link/link.go b/gold/link/link.go index 8144b48..0a53aff 100644 --- a/gold/link/link.go +++ b/gold/link/link.go @@ -30,6 +30,8 @@ type Link struct { peerip net.IP // peer 的公网 endpoint endpoint p2p.EndPoint + // peer 在设置的原始值 + rawep string // 本机允许接收/发送的 ip 网段 allowedips []*net.IPNet // 连接所用对称加密密钥集 diff --git a/gold/link/listen.go b/gold/link/listen.go index cf389eb..43e8f37 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -109,7 +109,7 @@ func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish if m.ep.Network() == "udp" { logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String()) p.endpoint = addr - } else if !addr.Euqal(p.endpoint) && p.endpoint == nil { // tcp/ws, ep not registered + } else if !addr.Euqal(p.endpoint) && p.rawep == "" { // tcp/ws, ep not registered logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String()) p.endpoint = addr } diff --git a/gold/link/peer.go b/gold/link/peer.go index ee42562..30ea813 100644 --- a/gold/link/peer.go +++ b/gold/link/peer.go @@ -38,6 +38,7 @@ func (m *Me) AddPeer(cfg *PeerConfig) (l *Link) { l = &Link{ pubk: cfg.PubicKey, peerip: net.ParseIP(cfg.PeerIP), + rawep: cfg.EndPoint, allowtrans: cfg.AllowTrans, usezstd: cfg.UseZstd, me: m, diff --git a/gold/p2p/tcp/tcp.go b/gold/p2p/tcp/tcp.go index 372df95..60fb743 100644 --- a/gold/p2p/tcp/tcp.go +++ b/gold/p2p/tcp/tcp.go @@ -5,7 +5,6 @@ import ( "io" "net" "reflect" - "runtime" "strconv" "time" @@ -59,10 +58,19 @@ func (ep *EndPoint) Listen() (p2p.Conn, error) { chansz = 32 } conn := &Conn{ - addr: ep, - lstn: lstn, - peers: ttl.NewCache[string, *net.TCPConn](peerstimeout), - recv: make(chan *connrecv, chansz), + addr: ep, + lstn: lstn, + peers: ttl.NewCacheOn(peerstimeout, [4]func(string, *net.TCPConn){ + nil, nil, func(_ string, t *net.TCPConn) { + err := t.CloseWrite() + if err != nil { + logrus.Debugln("[tcp] close write from", t.LocalAddr(), "to", t.RemoteAddr(), "err:", err) + } else { + logrus.Debugln("[tcp] close write from", t.LocalAddr(), "to", t.RemoteAddr()) + } + }, nil, + }), + recv: make(chan *connrecv, chansz), } go conn.accept() return conn, nil @@ -147,7 +155,8 @@ func (conn *Conn) receive(ep *EndPoint) { select { case <-stopch: logrus.Debugln("[tcp] recv from", ep, "timeout") - continue + _ = tcpconn.CloseRead() + return case <-copych: t.Stop() } @@ -229,14 +238,6 @@ func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (n int, err error) { if !ok { return 0, errors.New("expect *net.TCPConn but got " + reflect.ValueOf(cn).Type().String()) } - runtime.SetFinalizer(tcpconn, func(t *net.TCPConn) { - err := t.CloseWrite() - if err != nil { - logrus.Debugln("[tcp] close write from", t.LocalAddr(), "to", t.RemoteAddr(), "err:", err) - } else { - logrus.Debugln("[tcp] close write from", t.LocalAddr(), "to", t.RemoteAddr()) - } - }) logrus.Debugln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr()) conn.peers.Set(tcpep.String(), tcpconn) go conn.receive(tcpep)