1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-12 12:50:28 +08:00

fix(p2p): tcp conn early close

This commit is contained in:
源文雨
2024-08-06 23:40:02 +08:00
parent 1c258fcaa3
commit b0667d5a45
2 changed files with 22 additions and 14 deletions

View File

@@ -88,11 +88,11 @@ func (p *packet) WriteTo(w io.Writer) (n int64, err error) {
return io.Copy(w, &buf) return io.Copy(w, &buf)
} }
func isvalid(tcpconn *net.TCPConn) (issub, ok bool) { func isvalid(tcpconn *net.TCPConn, timeout time.Duration) (issub, ok bool) {
pckt := packet{} pckt := packet{}
stopch := make(chan struct{}) stopch := make(chan struct{})
t := time.AfterFunc(time.Second, func() { t := time.AfterFunc(timeout, func() {
stopch <- struct{}{} stopch <- struct{}{}
}) })

View File

@@ -158,8 +158,14 @@ func (conn *Conn) receive(tcpconn *net.TCPConn, hasvalidated bool) {
issub, ok := false, false issub, ok := false, false
peerstimeout := conn.addr.peerstimeout
if peerstimeout < time.Second*30 {
peerstimeout = time.Second * 30
}
peerstimeout *= 2
if !hasvalidated { if !hasvalidated {
issub, ok = isvalid(tcpconn) issub, ok = isvalid(tcpconn, peerstimeout)
if !ok { if !ok {
return return
} }
@@ -175,11 +181,6 @@ func (conn *Conn) receive(tcpconn *net.TCPConn, hasvalidated bool) {
} }
} }
peerstimeout := conn.addr.peerstimeout
if peerstimeout < time.Second*30 {
peerstimeout = time.Second * 30
}
peerstimeout *= 2
if issub { if issub {
defer conn.peers.Delete(ep.String()) defer conn.peers.Delete(ep.String())
} else { } else {
@@ -233,12 +234,17 @@ func (conn *Conn) receive(tcpconn *net.TCPConn, hasvalidated bool) {
t.Stop() t.Stop()
} }
if conn.addr == nil || conn.lstn == nil || conn.peers == nil || conn.recv == nil {
return
}
if err != nil { if err != nil {
if config.ShowDebugLog { if config.ShowDebugLog {
logrus.Debugln("[tcp] recv from", ep, "err:", err) logrus.Debugln("[tcp] recv from", ep, "err:", err)
} }
_ = tcpconn.CloseRead() // _ = tcpconn.CloseRead()
return // return
continue
} }
if r.pckt.typ >= packetTypeTop { if r.pckt.typ >= packetTypeTop {
if config.ShowDebugLog { if config.ShowDebugLog {
@@ -364,7 +370,7 @@ RECONNECT:
dialtimeout = time.Second dialtimeout = time.Second
} }
if config.ShowDebugLog { if config.ShowDebugLog {
logrus.Debugln("[tcp] dial to", tcpep.addr, "timeout", dialtimeout) logrus.Debugln("[tcp] dial to", tcpep.addr, "timeout", dialtimeout, "issub", issub)
} }
var cn net.Conn var cn net.Conn
// must use another port to send because there's no exsiting conn // must use another port to send because there's no exsiting conn
@@ -385,12 +391,12 @@ RECONNECT:
_, err = io.Copy(tcpconn, pkt) _, err = io.Copy(tcpconn, pkt)
if err != nil { if err != nil {
if config.ShowDebugLog { if config.ShowDebugLog {
logrus.Debugln("[tcp] dial to", tcpep.addr, "success, but write err:", err) logrus.Debugln("[tcp] dial to", tcpep.addr, "issub", issub, "success, but write err:", err)
} }
return 0, err return 0, err
} }
if config.ShowDebugLog { if config.ShowDebugLog {
logrus.Debugln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr()) logrus.Debugln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr(), "issub", issub)
} }
if !issub { if !issub {
conn.peers.Set(tcpep.String(), tcpconn) conn.peers.Set(tcpep.String(), tcpconn)
@@ -398,8 +404,8 @@ RECONNECT:
conn.sblk.Lock() conn.sblk.Lock()
conn.subs = append(conn.subs, &subconn{conn: tcpconn}) conn.subs = append(conn.subs, &subconn{conn: tcpconn})
conn.sblk.Unlock() conn.sblk.Unlock()
go conn.receive(tcpconn, true)
} }
go conn.receive(tcpconn, true)
} else if config.ShowDebugLog { } else if config.ShowDebugLog {
logrus.Debugln("[tcp] reuse tcpconn from", tcpconn.LocalAddr(), "to", tcpconn.RemoteAddr()) logrus.Debugln("[tcp] reuse tcpconn from", tcpconn.LocalAddr(), "to", tcpconn.RemoteAddr())
} }
@@ -422,6 +428,7 @@ RECONNECT:
conn.sblk.Unlock() conn.sblk.Unlock()
} }
if !retried { if !retried {
logrus.Warnln("[tcp] reconnect due to write to", tcpconn.RemoteAddr(), "err:", err)
retried = true retried = true
tcpconn = nil tcpconn = nil
goto RECONNECT goto RECONNECT
@@ -442,6 +449,7 @@ func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (n int, err error) {
return 0, errors.New("data size " + strconv.Itoa(len(b)) + " is too large") return 0, errors.New("data size " + strconv.Itoa(len(b)) + " is too large")
} }
if !conn.suberr && !conn.cplk.TryLock() { if !conn.suberr && !conn.cplk.TryLock() {
logrus.Infoln("[tcp] try sub write")
n, err = conn.writeToPeer(b, tcpep, true) // try sub write n, err = conn.writeToPeer(b, tcpep, true) // try sub write
if err == nil { if err == nil {
return return