From c4de83947b5e5b86eecf4f7eb83e599e979578ff Mon Sep 17 00:00:00 2001 From: fumiama Date: Fri, 31 Dec 2021 20:52:12 +0800 Subject: [PATCH] repair pipe --- gold/link/link.go | 3 ++- gold/link/listen.go | 23 ++++------------------- gold/link/me.go | 4 ++-- gold/link/recv.go | 3 +++ lower/nic.go | 4 ++-- 5 files changed, 13 insertions(+), 24 deletions(-) diff --git a/gold/link/link.go b/gold/link/link.go index 5df85f0..c565d37 100644 --- a/gold/link/link.go +++ b/gold/link/link.go @@ -2,6 +2,7 @@ package link import ( "errors" + "fmt" "net" "github.com/sirupsen/logrus" @@ -143,7 +144,7 @@ func (l *Link) write(p *head.Packet, datasz uint32, offset uint16, istransfer, h if peerep == nil { return 0, errors.New("[link] nil endpoint of " + p.Dst.String()) } - logrus.Infoln("[link] write", len(d), "bytes data from ep", l.me.myconn.LocalAddr(), "to", peerep) + logrus.Infoln("[link] write", len(d), "bytes data from ep", l.me.myconn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset)) n, err = l.me.myconn.WriteToUDP(d, peerep) } else { logrus.Warnln("[link] drop packet: nil peerlink") diff --git a/gold/link/listen.go b/gold/link/listen.go index bee6386..99d413a 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -68,8 +68,8 @@ func (m *Me) listen() (conn *net.UDPConn, err error) { p.pipe <- packet logrus.Infoln("[link] deliver to pipe of", p.peerip) } else { - m.pipe <- packet - logrus.Infoln("[link] deliver to pipe of me") + m.pipe <- packet.Data + logrus.Infoln("[link] deliver", len(packet.Data), "bytes data to pipe of me") } default: logrus.Warnln("[link] recv unknown proto:", packet.Proto) @@ -105,23 +105,8 @@ func (m *Me) listen() (conn *net.UDPConn, err error) { // Read 接收所有发送给本机的报文 // 需要开启 nopipe -func (m *Me) Read(p []byte) (n int, err error) { - if len(m.readptr) > 0 { - n = copy(p, m.readptr) - m.readptr = m.readptr[n:] - if n == len(p) { - return - } - p = p[n:] - } - data := (<-m.pipe).Data - c := copy(p, data) - n += c - if c == len(data) { - return - } - m.readptr = data[c:] - return +func (m *Me) Read() []byte { + return <-m.pipe } // 从 conn 读取 sz 字节数据 diff --git a/gold/link/me.go b/gold/link/me.go index 5315185..8ea9b96 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -29,7 +29,7 @@ type Me struct { // 本机监听的 endpoint myconn *net.UDPConn // 不分目的 link 的接收队列 - pipe chan *head.Packet + pipe chan []byte // 本机路由表 router *Router // 本机未接收完全分片池 @@ -62,7 +62,7 @@ func NewMe(privateKey *[32]byte, myipwithmask string, myEndpoint string, nopipei } m.connections = make(map[string]*Link) if nopipeinlink { - m.pipe = make(chan *head.Packet, 32) + m.pipe = make(chan []byte, 32) } m.router = &Router{ list: make([]*net.IPNet, 1, 16), diff --git a/gold/link/recv.go b/gold/link/recv.go index 9c58c26..c37deb9 100644 --- a/gold/link/recv.go +++ b/gold/link/recv.go @@ -60,6 +60,9 @@ func (m *Me) wait(data []byte) *head.Packet { ok, err := h.Unmarshal(data) if err == nil { if ok { + delete(m.clock, h) + delete(m.recving, hsh) + logrus.Infoln("[recv] all parts of", hex.EncodeToString(hashd), "is reached") return h } m.clock[h] = 0 diff --git a/lower/nic.go b/lower/nic.go index 8d6cd07..a52fff3 100644 --- a/lower/nic.go +++ b/lower/nic.go @@ -2,7 +2,6 @@ package lower import ( "encoding/binary" - "io" "os" "os/exec" "strconv" @@ -50,7 +49,8 @@ func (nc *NIC) Start(m *link.Me) { nc.hasstart = true go func() { // 接收到 NIC for nc.hasstart { - n, err := io.Copy(nc.ifce, m) + data := m.Read() + n, err := nc.ifce.Write(data) if err != nil { logrus.Errorln("[lower] recv write to nic err:", err) break