1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-04 23:40:26 +08:00

repair pipe

This commit is contained in:
fumiama
2021-12-31 20:52:12 +08:00
parent 41b085be20
commit c4de83947b
5 changed files with 13 additions and 24 deletions

View File

@@ -2,6 +2,7 @@ package link
import (
"errors"
"fmt"
"net"
"github.com/sirupsen/logrus"
@@ -143,7 +144,7 @@ func (l *Link) write(p *head.Packet, datasz uint32, offset uint16, istransfer, h
if peerep == nil {
return 0, errors.New("[link] nil endpoint of " + p.Dst.String())
}
logrus.Infoln("[link] write", len(d), "bytes data from ep", l.me.myconn.LocalAddr(), "to", peerep)
logrus.Infoln("[link] write", len(d), "bytes data from ep", l.me.myconn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset))
n, err = l.me.myconn.WriteToUDP(d, peerep)
} else {
logrus.Warnln("[link] drop packet: nil peerlink")

View File

@@ -68,8 +68,8 @@ func (m *Me) listen() (conn *net.UDPConn, err error) {
p.pipe <- packet
logrus.Infoln("[link] deliver to pipe of", p.peerip)
} else {
m.pipe <- packet
logrus.Infoln("[link] deliver to pipe of me")
m.pipe <- packet.Data
logrus.Infoln("[link] deliver", len(packet.Data), "bytes data to pipe of me")
}
default:
logrus.Warnln("[link] recv unknown proto:", packet.Proto)
@@ -105,23 +105,8 @@ func (m *Me) listen() (conn *net.UDPConn, err error) {
// Read 接收所有发送给本机的报文
// 需要开启 nopipe
func (m *Me) Read(p []byte) (n int, err error) {
if len(m.readptr) > 0 {
n = copy(p, m.readptr)
m.readptr = m.readptr[n:]
if n == len(p) {
return
}
p = p[n:]
}
data := (<-m.pipe).Data
c := copy(p, data)
n += c
if c == len(data) {
return
}
m.readptr = data[c:]
return
func (m *Me) Read() []byte {
return <-m.pipe
}
// 从 conn 读取 sz 字节数据

View File

@@ -29,7 +29,7 @@ type Me struct {
// 本机监听的 endpoint
myconn *net.UDPConn
// 不分目的 link 的接收队列
pipe chan *head.Packet
pipe chan []byte
// 本机路由表
router *Router
// 本机未接收完全分片池
@@ -62,7 +62,7 @@ func NewMe(privateKey *[32]byte, myipwithmask string, myEndpoint string, nopipei
}
m.connections = make(map[string]*Link)
if nopipeinlink {
m.pipe = make(chan *head.Packet, 32)
m.pipe = make(chan []byte, 32)
}
m.router = &Router{
list: make([]*net.IPNet, 1, 16),

View File

@@ -60,6 +60,9 @@ func (m *Me) wait(data []byte) *head.Packet {
ok, err := h.Unmarshal(data)
if err == nil {
if ok {
delete(m.clock, h)
delete(m.recving, hsh)
logrus.Infoln("[recv] all parts of", hex.EncodeToString(hashd), "is reached")
return h
}
m.clock[h] = 0

View File

@@ -2,7 +2,6 @@ package lower
import (
"encoding/binary"
"io"
"os"
"os/exec"
"strconv"
@@ -50,7 +49,8 @@ func (nc *NIC) Start(m *link.Me) {
nc.hasstart = true
go func() { // 接收到 NIC
for nc.hasstart {
n, err := io.Copy(nc.ifce, m)
data := m.Read()
n, err := nc.ifce.Write(data)
if err != nil {
logrus.Errorln("[lower] recv write to nic err:", err)
break