diff --git a/gold/link/link.go b/gold/link/link.go index f4b7174..c481af0 100644 --- a/gold/link/link.go +++ b/gold/link/link.go @@ -2,11 +2,8 @@ package link import ( "errors" - "fmt" "net" - "github.com/sirupsen/logrus" - "github.com/fumiama/WireGold/gold/head" "github.com/fumiama/WireGold/helper" base14 "github.com/fumiama/go-base16384" @@ -16,11 +13,6 @@ import ( type Link struct { // peer 的公钥 pubk *[32]byte - // peer 的公网 ip:port - pep string - // 决定本机是否定时向 peer 发送 hello 保持 NAT。 - // 以秒为单位,小于等于 0 不发送 - keepalive int64 // 收到的包的队列 // 没有下层 nic 时 // 包会分发到此 @@ -68,47 +60,6 @@ func (l *Link) Destroy() { l.me.connmapmu.Unlock() } -// Read 从 peer 收包 -func (l *Link) Read() *head.Packet { - return <-l.pipe -} - -// Write 向 peer 发包 -func (l *Link) Write(p *head.Packet, istransfer bool) (n int, err error) { - if len(p.Data) <= int(l.me.mtu) { - if !istransfer { - p.FillHash() - p.Data = l.Encode(p.Data) - } - return l.write(p, uint32(len(p.Data)), 0, istransfer, false) - } - if !istransfer { - p.FillHash() - p.Data = l.Encode(p.Data) - } - data := p.Data - totl := uint32(len(data)) - i := 0 - for ; int(totl)-i > int(l.me.mtu); i += int(l.me.mtu) { - logrus.Debugln("[link] split frag", i, ":", i+int(l.me.mtu), ", remain:", int(totl)-i-int(l.me.mtu)) - packet := *p - packet.Data = data[:int(l.me.mtu)] - cnt, err := l.write(&packet, totl, uint16(uint(i)>>3), istransfer, true) - n += cnt - if err != nil { - return n, err - } - data = data[int(l.me.mtu):] - } - p.Data = data - cnt, err := l.write(p, totl, uint16(uint(i)>>3), istransfer, false) - n += cnt - if err != nil { - return n, err - } - return n, nil -} - func (l *Link) String() (n string) { n = "default" if l.pubk != nil { @@ -121,30 +72,3 @@ func (l *Link) String() (n string) { } return } - -// write 向 peer 发一个包 -func (l *Link) write(p *head.Packet, datasz uint32, offset uint16, istransfer, hasmore bool) (n int, err error) { - var d []byte - var cl func() - if istransfer { - if p.Flags&0x4000 == 0x4000 && len(p.Data) > int(l.me.mtu) { - return len(p.Data), errors.New("drop dont fragmnet big trans packet") - } - d, cl = p.Marshal(nil, 0, 0, false, false) - } else { - d, cl = p.Marshal(l.me.me, datasz, offset, false, hasmore) - } - if d == nil { - return 0, errors.New("[link] ttl exceeded") - } - if err == nil { - peerep := l.endpoint - if peerep == nil { - return 0, errors.New("[link] nil endpoint of " + p.Dst.String()) - } - logrus.Debugln("[link] write", len(d), "bytes data from ep", l.me.myconn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset)) - n, err = l.me.myconn.WriteToUDP(d, peerep) - cl() - } - return -} diff --git a/gold/link/link_test.go b/gold/link/link_test.go deleted file mode 100644 index d3c844b..0000000 --- a/gold/link/link_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package link - -import ( - "net" - "testing" -) - -func TestUDP(t *testing.T) { - t.Log("test start") - lconn, err := net.ListenUDP("udp", &net.UDPAddr{Port: 1234}) - if err == nil { - dconn, err := net.DialUDP("udp", &net.UDPAddr{Port: 1235}, &net.UDPAddr{Port: 1234}) - if err != nil { - t.Fatal(err) - } - _, err = dconn.Write(([]byte)("1234567890")) - t.Log("write succ") - if err != nil { - t.Fatal(err) - } - d := make([]byte, 10) - _, err = lconn.Read(d) - t.Log("read succ") - if err != nil { - t.Fatal(err) - } - t.Log(d) - } else { - t.Fatal(err) - } -} diff --git a/gold/link/listen.go b/gold/link/listen.go index fca69aa..5d8df85 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -33,10 +33,9 @@ func (m *Me) listen() (conn *net.UDPConn, err error) { logrus.Debugln("[link] recv from endpoint", addr, "src", packet.Src, "dst", packet.Dst) // logrus.Debugln("[link] recv:", hex.EncodeToString(lbf)) if ok { - if p.pep == "" || p.pep != addr.String() { + if p.peerip == nil || p.peerip.String() != addr.String() { logrus.Infoln("[link] set endpoint of peer", p.peerip, "to", addr.String()) p.endpoint = addr - p.pep = addr.String() } if p.IsToMe(packet.Dst) { packet.Data = p.Decode(packet.Data) diff --git a/gold/link/me.go b/gold/link/me.go index a7a3762..5c543c9 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -34,7 +34,7 @@ type Me struct { // 读写同步锁 connmapmu sync.RWMutex // 本机监听的 endpoint - myconn *net.UDPConn + myep *net.UDPConn // 本机网卡 nic lower.NICIO // 本机路由表 @@ -43,8 +43,9 @@ type Me struct { writer *helper.Writer // 本机未接收完全分片池 recving map[[32]byte]*head.Packet - recvmu sync.Mutex - // 超时定时器 + // 接收锁 + recvmu sync.Mutex + // 收包超时定时器 clock map[*head.Packet]uint8 // 本机上层配置 srcport, dstport, mtu uint16 @@ -64,7 +65,7 @@ func NewMe(privateKey *[32]byte, myipwithmask string, myEndpoint string, nic low } m.me = ip m.subnet = *cidr - m.myconn, err = m.listen() + m.myep, err = m.listen() if err != nil { panic(err) } diff --git a/gold/link/nat.go b/gold/link/nat.go index 6d40fee..45b72f5 100644 --- a/gold/link/nat.go +++ b/gold/link/nat.go @@ -1,18 +1,23 @@ package link import ( + "encoding/json" + "net" "time" "github.com/sirupsen/logrus" "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/helper" ) // 保持 NAT -func (l *Link) keepAlive() { - if l.keepalive > 0 { +// dur 决定本机是否定时向 peer 发送 hello 保持 NAT +// 以秒为单位,小于等于 0 不发送 +func (l *Link) keepAlive(dur int64) { + if dur > 0 { logrus.Infoln("[link.nat] start to keep alive") - t := time.NewTicker(time.Second * time.Duration(l.keepalive)) + t := time.NewTicker(time.Second * time.Duration(dur)) for range t.C { n, err := l.Write(head.NewPacket(head.ProtoHello, l.me.srcport, l.peerip, l.me.dstport, nil), false) if err == nil { @@ -23,3 +28,85 @@ func (l *Link) keepAlive() { } } } + +// 收到通告包的处理函数 +func (l *Link) onNotify(packet []byte) { + // TODO: 完成data解包与endpoint注册 + // 1. Data 解包 + // ---- 使用 head.Notify 解释 packet + notify := make(head.Notify, 32) + err := json.Unmarshal(packet, ¬ify) + if err != nil { + logrus.Errorln("[notify] json unmarshal err:", err) + return + } + // 2. endpoint注册 + // ---- 遍历 Notify,注册对方的 endpoint 到 + // ---- connections,注意使用读写锁connmapmu + for peer, ep := range notify { + addr, err := net.ResolveUDPAddr("udp", ep) + if err == nil { + p, ok := l.me.IsInPeer(peer) + if ok { + if p.endpoint.String() != ep { + p.endpoint = addr + logrus.Infoln("[notify] set ep of peer", peer, "to", ep) + } + continue + } + } + logrus.Debugln("[notify] drop invalid peer:", peer, "ep:", ep) + } +} + +// 收到询问包的处理函数 +func (l *Link) onQuery(packet []byte) { + // 完成data解包与notify分发 + + // 1. Data 解包 + // ---- 使用 head.Query 解释 packet + // ---- 根据 Query 确定需要封装的 Notify + var peers head.Query + err := json.Unmarshal(packet, &peers) + if err != nil { + logrus.Errorln("[qurey] json unmarshal err:", err) + return + } + + // 2. notify分发 + // ---- 封装 Notify 到 新的 packet + // ---- 调用 l.Send 发送到对方 + notify := make(head.Notify, len(peers)) + for _, p := range peers { + lnk, ok := l.me.IsInPeer(p) + if ok { + notify[p] = lnk.endpoint.String() + } + } + if len(notify) > 0 { + logrus.Infoln("[query] wrap", len(notify), "notify") + w := helper.SelectWriter() + json.NewEncoder(w).Encode(¬ify) + l.Write(head.NewPacket(head.ProtoNotify, l.me.srcport, l.peerip, l.me.dstport, w.Bytes()), false) + helper.PutWriter(w) + } +} + +// sendquery 主动发起查询,询问对方是否可以到达 peers +func (l *Link) sendquery(tick time.Duration, peers ...string) { + if len(peers) == 0 { + return + } + data, err := json.Marshal(peers) + if err != nil { + panic(err) + } + t := time.NewTicker(tick) + for range t.C { + logrus.Infoln("[query] send query to", l.peerip) + _, err = l.Write(head.NewPacket(head.ProtoQuery, l.me.srcport, l.peerip, l.me.dstport, data), false) + if err != nil { + logrus.Errorln("[query] write err:", err) + } + } +} diff --git a/gold/link/notify.go b/gold/link/notify.go deleted file mode 100644 index 53784e1..0000000 --- a/gold/link/notify.go +++ /dev/null @@ -1,39 +0,0 @@ -package link - -import ( - "encoding/json" - "net" - - "github.com/fumiama/WireGold/gold/head" - "github.com/sirupsen/logrus" -) - -// 收到通告包的处理函数 -func (l *Link) onNotify(packet []byte) { - // TODO: 完成data解包与endpoint注册 - // 1. Data 解包 - // ---- 使用 head.Notify 解释 packet - notify := make(head.Notify, 32) - err := json.Unmarshal(packet, ¬ify) - if err != nil { - logrus.Errorln("[notify] json unmarshal err:", err) - return - } - // 2. endpoint注册 - // ---- 遍历 Notify,注册对方的 endpoint 到 - // ---- connections,注意使用读写锁connmapmu - for peer, ep := range notify { - addr, err := net.ResolveUDPAddr("udp", ep) - if err == nil { - p, ok := l.me.IsInPeer(peer) - if ok { - if p.endpoint.String() != ep { - p.endpoint = addr - logrus.Infoln("[notify] set ep of peer", peer, "to", ep) - } - continue - } - } - logrus.Debugln("[notify] drop invalid peer:", peer, "ep:", ep) - } -} diff --git a/gold/link/peer.go b/gold/link/peer.go index 35c84a6..7626cbc 100644 --- a/gold/link/peer.go +++ b/gold/link/peer.go @@ -11,7 +11,7 @@ import ( ) // AddPeer 添加一个 peer -func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowedIPs, querys []string, keepAlive, queryTick int64, allowTrans, nopipe bool) (l *Link) { +func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowedIPs, querys []string, keepAliveDur, queryTick int64, allowTrans, nopipe bool) (l *Link) { peerip = net.ParseIP(peerip).String() var ok bool l, ok = m.IsInPeer(peerip) @@ -20,7 +20,6 @@ func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowed } l = &Link{ pubk: pubicKey, - keepalive: keepAlive, peerip: net.ParseIP(peerip), allowtrans: allowTrans, me: m, @@ -41,7 +40,6 @@ func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowed if err != nil { panic(err) } - l.pep = endPoint l.endpoint = e } if allowedIPs != nil { @@ -60,7 +58,7 @@ func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowed } } logrus.Infoln("[peer] add peer:", peerip, "allow:", allowedIPs) - go l.keepAlive() + go l.keepAlive(keepAliveDur) go l.sendquery(time.Second*time.Duration(queryTick), querys...) return } diff --git a/gold/link/query.go b/gold/link/query.go deleted file mode 100644 index c76cd9b..0000000 --- a/gold/link/query.go +++ /dev/null @@ -1,63 +0,0 @@ -package link - -import ( - "encoding/json" - "time" - - "github.com/sirupsen/logrus" - - "github.com/fumiama/WireGold/gold/head" - "github.com/fumiama/WireGold/helper" -) - -// 收到询问包的处理函数 -func (l *Link) onQuery(packet []byte) { - // 完成data解包与notify分发 - - // 1. Data 解包 - // ---- 使用 head.Query 解释 packet - // ---- 根据 Query 确定需要封装的 Notify - var peers head.Query - err := json.Unmarshal(packet, &peers) - if err != nil { - logrus.Errorln("[qurey] json unmarshal err:", err) - return - } - - // 2. notify分发 - // ---- 封装 Notify 到 新的 packet - // ---- 调用 l.Send 发送到对方 - notify := make(head.Notify, len(peers)) - for _, p := range peers { - lnk, ok := l.me.IsInPeer(p) - if ok { - notify[p] = lnk.endpoint.String() - } - } - if len(notify) > 0 { - logrus.Infoln("[query] wrap", len(notify), "notify") - w := helper.SelectWriter() - json.NewEncoder(w).Encode(¬ify) - l.Write(head.NewPacket(head.ProtoNotify, l.me.srcport, l.peerip, l.me.dstport, w.Bytes()), false) - helper.PutWriter(w) - } -} - -// sendquery 主动发起查询,询问对方是否可以到达 peers -func (l *Link) sendquery(tick time.Duration, peers ...string) { - if len(peers) == 0 { - return - } - data, err := json.Marshal(peers) - if err != nil { - panic(err) - } - t := time.NewTicker(tick) - for range t.C { - logrus.Infoln("[query] send query to", l.peerip) - _, err = l.Write(head.NewPacket(head.ProtoQuery, l.me.srcport, l.peerip, l.me.dstport, data), false) - if err != nil { - logrus.Errorln("[query] write err:", err) - } - } -} diff --git a/gold/link/recv.go b/gold/link/recv.go index 8efb185..74c7be3 100644 --- a/gold/link/recv.go +++ b/gold/link/recv.go @@ -10,6 +10,11 @@ import ( "github.com/sirupsen/logrus" ) +// Read 从 peer 收包 +func (l *Link) Read() *head.Packet { + return <-l.pipe +} + func (m *Me) initrecvpool() { if m.recving == nil { m.recving = make(map[[32]byte]*head.Packet, 128) diff --git a/gold/link/send.go b/gold/link/send.go new file mode 100644 index 0000000..556c40b --- /dev/null +++ b/gold/link/send.go @@ -0,0 +1,72 @@ +package link + +import ( + "errors" + "fmt" + + "github.com/fumiama/WireGold/gold/head" + "github.com/sirupsen/logrus" +) + +// Write 向 peer 发包 +func (l *Link) Write(p *head.Packet, istransfer bool) (n int, err error) { + if len(p.Data) <= int(l.me.mtu) { + if !istransfer { + p.FillHash() + p.Data = l.Encode(p.Data) + } + return l.write(p, uint32(len(p.Data)), 0, istransfer, false) + } + if !istransfer { + p.FillHash() + p.Data = l.Encode(p.Data) + } + data := p.Data + totl := uint32(len(data)) + i := 0 + for ; int(totl)-i > int(l.me.mtu); i += int(l.me.mtu) { + logrus.Debugln("[link] split frag", i, ":", i+int(l.me.mtu), ", remain:", int(totl)-i-int(l.me.mtu)) + packet := *p + packet.Data = data[:int(l.me.mtu)] + cnt, err := l.write(&packet, totl, uint16(uint(i)>>3), istransfer, true) + n += cnt + if err != nil { + return n, err + } + data = data[int(l.me.mtu):] + } + p.Data = data + cnt, err := l.write(p, totl, uint16(uint(i)>>3), istransfer, false) + n += cnt + if err != nil { + return n, err + } + return n, nil +} + +// write 向 peer 发一个包 +func (l *Link) write(p *head.Packet, datasz uint32, offset uint16, istransfer, hasmore bool) (n int, err error) { + var d []byte + var cl func() + if istransfer { + if p.Flags&0x4000 == 0x4000 && len(p.Data) > int(l.me.mtu) { + return len(p.Data), errors.New("drop dont fragmnet big trans packet") + } + d, cl = p.Marshal(nil, 0, 0, false, false) + } else { + d, cl = p.Marshal(l.me.me, datasz, offset, false, hasmore) + } + if d == nil { + return 0, errors.New("[link] ttl exceeded") + } + if err == nil { + peerep := l.endpoint + if peerep == nil { + return 0, errors.New("[link] nil endpoint of " + p.Dst.String()) + } + logrus.Debugln("[link] write", len(d), "bytes data from ep", l.me.myep.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset)) + n, err = l.me.myep.WriteToUDP(d, peerep) + cl() + } + return +}