1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-18 17:00:26 +08:00

feat(tcp): set default peers timeout to 5s

This commit is contained in:
源文雨
2024-07-16 22:33:05 +09:00
parent 5d04567ec9
commit c7bbcb9fb7

View File

@@ -3,9 +3,9 @@ package tcp
import ( import (
"errors" "errors"
"io" "io"
"math/rand"
"net" "net"
"reflect" "reflect"
"runtime"
"strconv" "strconv"
"time" "time"
@@ -49,45 +49,22 @@ func (ep *EndPoint) Listen() (p2p.Conn, error) {
ep.addr = lstn.Addr().(*net.TCPAddr) ep.addr = lstn.Addr().(*net.TCPAddr)
peerstimeout := ep.peerstimeout peerstimeout := ep.peerstimeout
if peerstimeout < time.Second { if peerstimeout < time.Second {
peerstimeout = time.Second peerstimeout = time.Second * 5
} }
chansz := ep.recvchansize chansz := ep.recvchansize
if chansz < 32 { if chansz < 32 {
chansz = 32 chansz = 32
} }
conn := &Conn{ conn := &Conn{
addr: ep, addr: ep,
lstn: lstn, lstn: lstn,
peers: ttl.NewCacheOn(peerstimeout, [4]func(string, *net.TCPConn){ peers: ttl.NewCache[string, *net.TCPConn](peerstimeout),
nil, recv: make(chan *connrecv, chansz),
nil,
func(s string, t *net.TCPConn) {
err := t.Close()
if err != nil {
logrus.Debugln("[tcp] close conn from", ep, "to", s, "err:", err)
} else {
logrus.Debugln("[tcp] close conn from", ep, "to", s)
}
},
ep.keepAlive,
}),
recv: make(chan *connrecv, chansz),
} }
go conn.accept() go conn.accept()
return conn, nil return conn, nil
} }
func (ep *EndPoint) keepAlive(_ string, t *net.TCPConn) {
_, err := io.Copy(t, &packet{
typ: packetTypeKeepAlive,
len: 1,
dat: []byte{byte(rand.Intn(256))},
})
if err != nil {
logrus.Debugln("[tcp] write keepalive from", ep, "to conn", t.RemoteAddr(), "err:", err)
}
}
type connrecv struct { type connrecv struct {
addr *EndPoint // cast from tcpconn.RemoteAddr() addr *EndPoint // cast from tcpconn.RemoteAddr()
pckt packet pckt packet
@@ -210,7 +187,7 @@ func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (n int, err error) {
if dialtimeout < time.Second { if dialtimeout < time.Second {
dialtimeout = time.Second dialtimeout = time.Second
} }
logrus.Infoln("[tcp] dial to", tcpep.addr, "timeout", dialtimeout) logrus.Debugln("[tcp] dial to", tcpep.addr, "timeout", dialtimeout)
var cn net.Conn var cn net.Conn
// must use another port to send because there's no exsiting conn // must use another port to send because there's no exsiting conn
cn, err = net.DialTimeout(tcpep.Network(), tcpep.addr.String(), dialtimeout) cn, err = net.DialTimeout(tcpep.Network(), tcpep.addr.String(), dialtimeout)
@@ -221,8 +198,18 @@ func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (n int, err error) {
if !ok { if !ok {
return 0, errors.New("expect *net.TCPConn but got " + reflect.ValueOf(cn).Type().String()) return 0, errors.New("expect *net.TCPConn but got " + reflect.ValueOf(cn).Type().String())
} }
logrus.Infoln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr()) runtime.SetFinalizer(tcpconn, func(t *net.TCPConn) {
err := t.CloseWrite()
if err != nil {
logrus.Debugln("[tcp] close write from", t.LocalAddr(), "to", t.RemoteAddr(), "err:", err)
} else {
logrus.Debugln("[tcp] close write from", t.LocalAddr(), "to", t.RemoteAddr())
}
})
logrus.Debugln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr())
conn.peers.Set(tcpep.String(), tcpconn) conn.peers.Set(tcpep.String(), tcpconn)
} else {
logrus.Debugln("[tcp] reuse tcpconn from", tcpconn.LocalAddr(), "to", tcpconn.RemoteAddr())
} }
cnt, err := io.Copy(tcpconn, &packet{ cnt, err := io.Copy(tcpconn, &packet{
typ: packetTypeNormal, typ: packetTypeNormal,