From b0667d5a45b15ded2a97b5f868aa3e74645c00bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Tue, 6 Aug 2024 23:40:02 +0800 Subject: [PATCH] fix(p2p): tcp conn early close --- gold/p2p/tcp/pdu.go | 4 ++-- gold/p2p/tcp/tcp.go | 32 ++++++++++++++++++++------------ 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/gold/p2p/tcp/pdu.go b/gold/p2p/tcp/pdu.go index a466ea6..c2ecdac 100644 --- a/gold/p2p/tcp/pdu.go +++ b/gold/p2p/tcp/pdu.go @@ -88,11 +88,11 @@ func (p *packet) WriteTo(w io.Writer) (n int64, err error) { return io.Copy(w, &buf) } -func isvalid(tcpconn *net.TCPConn) (issub, ok bool) { +func isvalid(tcpconn *net.TCPConn, timeout time.Duration) (issub, ok bool) { pckt := packet{} stopch := make(chan struct{}) - t := time.AfterFunc(time.Second, func() { + t := time.AfterFunc(timeout, func() { stopch <- struct{}{} }) diff --git a/gold/p2p/tcp/tcp.go b/gold/p2p/tcp/tcp.go index ad17082..327458f 100644 --- a/gold/p2p/tcp/tcp.go +++ b/gold/p2p/tcp/tcp.go @@ -158,8 +158,14 @@ func (conn *Conn) receive(tcpconn *net.TCPConn, hasvalidated bool) { issub, ok := false, false + peerstimeout := conn.addr.peerstimeout + if peerstimeout < time.Second*30 { + peerstimeout = time.Second * 30 + } + peerstimeout *= 2 + if !hasvalidated { - issub, ok = isvalid(tcpconn) + issub, ok = isvalid(tcpconn, peerstimeout) if !ok { return } @@ -175,11 +181,6 @@ func (conn *Conn) receive(tcpconn *net.TCPConn, hasvalidated bool) { } } - peerstimeout := conn.addr.peerstimeout - if peerstimeout < time.Second*30 { - peerstimeout = time.Second * 30 - } - peerstimeout *= 2 if issub { defer conn.peers.Delete(ep.String()) } else { @@ -233,12 +234,17 @@ func (conn *Conn) receive(tcpconn *net.TCPConn, hasvalidated bool) { t.Stop() } + if conn.addr == nil || conn.lstn == nil || conn.peers == nil || conn.recv == nil { + return + } + if err != nil { if config.ShowDebugLog { logrus.Debugln("[tcp] recv from", ep, "err:", err) } - _ = tcpconn.CloseRead() - return + // _ = tcpconn.CloseRead() + // return + continue } if r.pckt.typ >= packetTypeTop { if config.ShowDebugLog { @@ -364,7 +370,7 @@ RECONNECT: dialtimeout = time.Second } if config.ShowDebugLog { - logrus.Debugln("[tcp] dial to", tcpep.addr, "timeout", dialtimeout) + logrus.Debugln("[tcp] dial to", tcpep.addr, "timeout", dialtimeout, "issub", issub) } var cn net.Conn // must use another port to send because there's no exsiting conn @@ -385,12 +391,12 @@ RECONNECT: _, err = io.Copy(tcpconn, pkt) if err != nil { if config.ShowDebugLog { - logrus.Debugln("[tcp] dial to", tcpep.addr, "success, but write err:", err) + logrus.Debugln("[tcp] dial to", tcpep.addr, "issub", issub, "success, but write err:", err) } return 0, err } if config.ShowDebugLog { - logrus.Debugln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr()) + logrus.Debugln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr(), "issub", issub) } if !issub { conn.peers.Set(tcpep.String(), tcpconn) @@ -398,8 +404,8 @@ RECONNECT: conn.sblk.Lock() conn.subs = append(conn.subs, &subconn{conn: tcpconn}) conn.sblk.Unlock() - go conn.receive(tcpconn, true) } + go conn.receive(tcpconn, true) } else if config.ShowDebugLog { logrus.Debugln("[tcp] reuse tcpconn from", tcpconn.LocalAddr(), "to", tcpconn.RemoteAddr()) } @@ -422,6 +428,7 @@ RECONNECT: conn.sblk.Unlock() } if !retried { + logrus.Warnln("[tcp] reconnect due to write to", tcpconn.RemoteAddr(), "err:", err) retried = true tcpconn = nil goto RECONNECT @@ -442,6 +449,7 @@ func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (n int, err error) { return 0, errors.New("data size " + strconv.Itoa(len(b)) + " is too large") } if !conn.suberr && !conn.cplk.TryLock() { + logrus.Infoln("[tcp] try sub write") n, err = conn.writeToPeer(b, tcpep, true) // try sub write if err == nil { return