From 5c65302d67598296d596725eb6c6c67bfdb8b664 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Fri, 21 Feb 2025 23:08:41 +0900 Subject: [PATCH] feat(tcp): skip to keep busy sub conn --- gold/p2p/tcp/tcp.go | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/gold/p2p/tcp/tcp.go b/gold/p2p/tcp/tcp.go index 5900d8c..764e7fe 100644 --- a/gold/p2p/tcp/tcp.go +++ b/gold/p2p/tcp/tcp.go @@ -95,6 +95,7 @@ type connrecv struct { type subconn struct { cplk sync.Mutex + last time.Time // last active time conn *net.TCPConn } @@ -187,7 +188,7 @@ func (conn *Conn) receive(tcpconn *net.TCPConn, hasvalidated bool) { } if issub { conn.sblk.Lock() - conn.subs = append(conn.subs, &subconn{conn: tcpconn}) + conn.subs = append(conn.subs, &subconn{conn: tcpconn, last: time.Now()}) conn.sblk.Unlock() } else { if conn.peers == nil { @@ -200,7 +201,8 @@ func (conn *Conn) receive(tcpconn *net.TCPConn, hasvalidated bool) { if issub { defer func() { conn.sblk.Lock() - for i, sub := range conn.subs { + subs := conn.subs + for i, sub := range subs { if sub.conn == tcpconn { conn.subs = delsubs(i, conn.subs) break @@ -283,8 +285,8 @@ func (conn *Conn) receive(tcpconn *net.TCPConn, hasvalidated bool) { func (conn *Conn) keep(ep *EndPoint) { keepinterval := ep.keepinterval - if keepinterval < time.Second*4 { - keepinterval = time.Second * 4 + if keepinterval < time.Second*10 { + keepinterval = time.Second * 10 } t := time.NewTicker(keepinterval) defer t.Stop() @@ -299,23 +301,33 @@ func (conn *Conn) keep(ep *EndPoint) { return } if err != nil { - logrus.Warnln("[tcp] keep main conn alive to", ep, "err:", err) + logrus.Warnln("[tcp] keep main conn alive from", conn, "to", ep, "err:", err) conn.peers.Delete(ep.String()) } else if config.ShowDebugLog { - logrus.Debugln("[tcp] keep main conn alive to", ep) + logrus.Debugln("[tcp] keep main conn alive from", conn, "to", ep) } } conn.sblk.RLock() - for i, sub := range conn.subs { + subs := conn.subs + for i, sub := range subs { + if time.Since(sub.last) < keepinterval { + if config.ShowDebugLog { + logrus.Debugln("[tcp] skip to keep busy sub conn from", conn, "to", ep) + } + continue + } _, err := io.Copy(sub.conn, &packet{typ: packetTypeSubKeepAlive}) if conn.addr == nil { return } if err != nil { - logrus.Warnln("[tcp] keep sub conn alive to", sub.conn.RemoteAddr(), "err:", err) + logrus.Warnln("[tcp] keep sub conn alive from", conn, "to", sub.conn.RemoteAddr(), "err:", err) conn.subs = delsubs(i, conn.subs) // del 1 link at once break } + if config.ShowDebugLog { + logrus.Debugln("[tcp] keep sub conn alive from", conn, "to", ep) + } } conn.sblk.RUnlock() } @@ -431,7 +443,7 @@ RECONNECT: conn.peers.Set(tcpep.String(), tcpconn) } else { conn.sblk.Lock() - conn.subs = append(conn.subs, &subconn{conn: tcpconn}) + conn.subs = append(conn.subs, &subconn{conn: tcpconn, last: time.Now()}) conn.sblk.Unlock() } go conn.receive(tcpconn, true) @@ -448,7 +460,8 @@ RECONNECT: conn.peers.Delete(tcpep.String()) } else { conn.sblk.Lock() - for i, sub := range conn.subs { + subs := conn.subs + for i, sub := range subs { if sub == subc { conn.subs = delsubs(i, conn.subs) break @@ -464,6 +477,7 @@ RECONNECT: } } if subc != nil { + subc.last = time.Now() subc.cplk.Unlock() } return int(cnt) - 3, err