1
0
mirror of https://github.com/fumiama/WireGold.git synced 2026-06-04 23:40:26 +08:00

优化代码结构

This commit is contained in:
fumiama
2022-01-01 23:09:59 +08:00
parent 5dc4bf8cad
commit 7dbf8d8729
10 changed files with 175 additions and 222 deletions

View File

@@ -2,11 +2,8 @@ package link
import (
"errors"
"fmt"
"net"
"github.com/sirupsen/logrus"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/helper"
base14 "github.com/fumiama/go-base16384"
@@ -16,11 +13,6 @@ import (
type Link struct {
// peer 的公钥
pubk *[32]byte
// peer 的公网 ip:port
pep string
// 决定本机是否定时向 peer 发送 hello 保持 NAT。
// 以秒为单位,小于等于 0 不发送
keepalive int64
// 收到的包的队列
// 没有下层 nic 时
// 包会分发到此
@@ -68,47 +60,6 @@ func (l *Link) Destroy() {
l.me.connmapmu.Unlock()
}
// Read 从 peer 收包
func (l *Link) Read() *head.Packet {
return <-l.pipe
}
// Write 向 peer 发包
func (l *Link) Write(p *head.Packet, istransfer bool) (n int, err error) {
if len(p.Data) <= int(l.me.mtu) {
if !istransfer {
p.FillHash()
p.Data = l.Encode(p.Data)
}
return l.write(p, uint32(len(p.Data)), 0, istransfer, false)
}
if !istransfer {
p.FillHash()
p.Data = l.Encode(p.Data)
}
data := p.Data
totl := uint32(len(data))
i := 0
for ; int(totl)-i > int(l.me.mtu); i += int(l.me.mtu) {
logrus.Debugln("[link] split frag", i, ":", i+int(l.me.mtu), ", remain:", int(totl)-i-int(l.me.mtu))
packet := *p
packet.Data = data[:int(l.me.mtu)]
cnt, err := l.write(&packet, totl, uint16(uint(i)>>3), istransfer, true)
n += cnt
if err != nil {
return n, err
}
data = data[int(l.me.mtu):]
}
p.Data = data
cnt, err := l.write(p, totl, uint16(uint(i)>>3), istransfer, false)
n += cnt
if err != nil {
return n, err
}
return n, nil
}
func (l *Link) String() (n string) {
n = "default"
if l.pubk != nil {
@@ -121,30 +72,3 @@ func (l *Link) String() (n string) {
}
return
}
// write 向 peer 发一个包
func (l *Link) write(p *head.Packet, datasz uint32, offset uint16, istransfer, hasmore bool) (n int, err error) {
var d []byte
var cl func()
if istransfer {
if p.Flags&0x4000 == 0x4000 && len(p.Data) > int(l.me.mtu) {
return len(p.Data), errors.New("drop dont fragmnet big trans packet")
}
d, cl = p.Marshal(nil, 0, 0, false, false)
} else {
d, cl = p.Marshal(l.me.me, datasz, offset, false, hasmore)
}
if d == nil {
return 0, errors.New("[link] ttl exceeded")
}
if err == nil {
peerep := l.endpoint
if peerep == nil {
return 0, errors.New("[link] nil endpoint of " + p.Dst.String())
}
logrus.Debugln("[link] write", len(d), "bytes data from ep", l.me.myconn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset))
n, err = l.me.myconn.WriteToUDP(d, peerep)
cl()
}
return
}

View File

@@ -1,31 +0,0 @@
package link
import (
"net"
"testing"
)
func TestUDP(t *testing.T) {
t.Log("test start")
lconn, err := net.ListenUDP("udp", &net.UDPAddr{Port: 1234})
if err == nil {
dconn, err := net.DialUDP("udp", &net.UDPAddr{Port: 1235}, &net.UDPAddr{Port: 1234})
if err != nil {
t.Fatal(err)
}
_, err = dconn.Write(([]byte)("1234567890"))
t.Log("write succ")
if err != nil {
t.Fatal(err)
}
d := make([]byte, 10)
_, err = lconn.Read(d)
t.Log("read succ")
if err != nil {
t.Fatal(err)
}
t.Log(d)
} else {
t.Fatal(err)
}
}

View File

@@ -33,10 +33,9 @@ func (m *Me) listen() (conn *net.UDPConn, err error) {
logrus.Debugln("[link] recv from endpoint", addr, "src", packet.Src, "dst", packet.Dst)
// logrus.Debugln("[link] recv:", hex.EncodeToString(lbf))
if ok {
if p.pep == "" || p.pep != addr.String() {
if p.peerip == nil || p.peerip.String() != addr.String() {
logrus.Infoln("[link] set endpoint of peer", p.peerip, "to", addr.String())
p.endpoint = addr
p.pep = addr.String()
}
if p.IsToMe(packet.Dst) {
packet.Data = p.Decode(packet.Data)

View File

@@ -34,7 +34,7 @@ type Me struct {
// 读写同步锁
connmapmu sync.RWMutex
// 本机监听的 endpoint
myconn *net.UDPConn
myep *net.UDPConn
// 本机网卡
nic lower.NICIO
// 本机路由表
@@ -43,8 +43,9 @@ type Me struct {
writer *helper.Writer
// 本机未接收完全分片池
recving map[[32]byte]*head.Packet
recvmu sync.Mutex
// 超时定时器
// 接收锁
recvmu sync.Mutex
// 收包超时定时器
clock map[*head.Packet]uint8
// 本机上层配置
srcport, dstport, mtu uint16
@@ -64,7 +65,7 @@ func NewMe(privateKey *[32]byte, myipwithmask string, myEndpoint string, nic low
}
m.me = ip
m.subnet = *cidr
m.myconn, err = m.listen()
m.myep, err = m.listen()
if err != nil {
panic(err)
}

View File

@@ -1,18 +1,23 @@
package link
import (
"encoding/json"
"net"
"time"
"github.com/sirupsen/logrus"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/helper"
)
// 保持 NAT
func (l *Link) keepAlive() {
if l.keepalive > 0 {
// dur 决定本机是否定时向 peer 发送 hello 保持 NAT
// 以秒为单位,小于等于 0 不发送
func (l *Link) keepAlive(dur int64) {
if dur > 0 {
logrus.Infoln("[link.nat] start to keep alive")
t := time.NewTicker(time.Second * time.Duration(l.keepalive))
t := time.NewTicker(time.Second * time.Duration(dur))
for range t.C {
n, err := l.Write(head.NewPacket(head.ProtoHello, l.me.srcport, l.peerip, l.me.dstport, nil), false)
if err == nil {
@@ -23,3 +28,85 @@ func (l *Link) keepAlive() {
}
}
}
// 收到通告包的处理函数
func (l *Link) onNotify(packet []byte) {
// TODO: 完成data解包与endpoint注册
// 1. Data 解包
// ---- 使用 head.Notify 解释 packet
notify := make(head.Notify, 32)
err := json.Unmarshal(packet, &notify)
if err != nil {
logrus.Errorln("[notify] json unmarshal err:", err)
return
}
// 2. endpoint注册
// ---- 遍历 Notify注册对方的 endpoint 到
// ---- connections注意使用读写锁connmapmu
for peer, ep := range notify {
addr, err := net.ResolveUDPAddr("udp", ep)
if err == nil {
p, ok := l.me.IsInPeer(peer)
if ok {
if p.endpoint.String() != ep {
p.endpoint = addr
logrus.Infoln("[notify] set ep of peer", peer, "to", ep)
}
continue
}
}
logrus.Debugln("[notify] drop invalid peer:", peer, "ep:", ep)
}
}
// 收到询问包的处理函数
func (l *Link) onQuery(packet []byte) {
// 完成data解包与notify分发
// 1. Data 解包
// ---- 使用 head.Query 解释 packet
// ---- 根据 Query 确定需要封装的 Notify
var peers head.Query
err := json.Unmarshal(packet, &peers)
if err != nil {
logrus.Errorln("[qurey] json unmarshal err:", err)
return
}
// 2. notify分发
// ---- 封装 Notify 到 新的 packet
// ---- 调用 l.Send 发送到对方
notify := make(head.Notify, len(peers))
for _, p := range peers {
lnk, ok := l.me.IsInPeer(p)
if ok {
notify[p] = lnk.endpoint.String()
}
}
if len(notify) > 0 {
logrus.Infoln("[query] wrap", len(notify), "notify")
w := helper.SelectWriter()
json.NewEncoder(w).Encode(&notify)
l.Write(head.NewPacket(head.ProtoNotify, l.me.srcport, l.peerip, l.me.dstport, w.Bytes()), false)
helper.PutWriter(w)
}
}
// sendquery 主动发起查询,询问对方是否可以到达 peers
func (l *Link) sendquery(tick time.Duration, peers ...string) {
if len(peers) == 0 {
return
}
data, err := json.Marshal(peers)
if err != nil {
panic(err)
}
t := time.NewTicker(tick)
for range t.C {
logrus.Infoln("[query] send query to", l.peerip)
_, err = l.Write(head.NewPacket(head.ProtoQuery, l.me.srcport, l.peerip, l.me.dstport, data), false)
if err != nil {
logrus.Errorln("[query] write err:", err)
}
}
}

View File

@@ -1,39 +0,0 @@
package link
import (
"encoding/json"
"net"
"github.com/fumiama/WireGold/gold/head"
"github.com/sirupsen/logrus"
)
// 收到通告包的处理函数
func (l *Link) onNotify(packet []byte) {
// TODO: 完成data解包与endpoint注册
// 1. Data 解包
// ---- 使用 head.Notify 解释 packet
notify := make(head.Notify, 32)
err := json.Unmarshal(packet, &notify)
if err != nil {
logrus.Errorln("[notify] json unmarshal err:", err)
return
}
// 2. endpoint注册
// ---- 遍历 Notify注册对方的 endpoint 到
// ---- connections注意使用读写锁connmapmu
for peer, ep := range notify {
addr, err := net.ResolveUDPAddr("udp", ep)
if err == nil {
p, ok := l.me.IsInPeer(peer)
if ok {
if p.endpoint.String() != ep {
p.endpoint = addr
logrus.Infoln("[notify] set ep of peer", peer, "to", ep)
}
continue
}
}
logrus.Debugln("[notify] drop invalid peer:", peer, "ep:", ep)
}
}

View File

@@ -11,7 +11,7 @@ import (
)
// AddPeer 添加一个 peer
func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowedIPs, querys []string, keepAlive, queryTick int64, allowTrans, nopipe bool) (l *Link) {
func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowedIPs, querys []string, keepAliveDur, queryTick int64, allowTrans, nopipe bool) (l *Link) {
peerip = net.ParseIP(peerip).String()
var ok bool
l, ok = m.IsInPeer(peerip)
@@ -20,7 +20,6 @@ func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowed
}
l = &Link{
pubk: pubicKey,
keepalive: keepAlive,
peerip: net.ParseIP(peerip),
allowtrans: allowTrans,
me: m,
@@ -41,7 +40,6 @@ func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowed
if err != nil {
panic(err)
}
l.pep = endPoint
l.endpoint = e
}
if allowedIPs != nil {
@@ -60,7 +58,7 @@ func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowed
}
}
logrus.Infoln("[peer] add peer:", peerip, "allow:", allowedIPs)
go l.keepAlive()
go l.keepAlive(keepAliveDur)
go l.sendquery(time.Second*time.Duration(queryTick), querys...)
return
}

View File

@@ -1,63 +0,0 @@
package link
import (
"encoding/json"
"time"
"github.com/sirupsen/logrus"
"github.com/fumiama/WireGold/gold/head"
"github.com/fumiama/WireGold/helper"
)
// 收到询问包的处理函数
func (l *Link) onQuery(packet []byte) {
// 完成data解包与notify分发
// 1. Data 解包
// ---- 使用 head.Query 解释 packet
// ---- 根据 Query 确定需要封装的 Notify
var peers head.Query
err := json.Unmarshal(packet, &peers)
if err != nil {
logrus.Errorln("[qurey] json unmarshal err:", err)
return
}
// 2. notify分发
// ---- 封装 Notify 到 新的 packet
// ---- 调用 l.Send 发送到对方
notify := make(head.Notify, len(peers))
for _, p := range peers {
lnk, ok := l.me.IsInPeer(p)
if ok {
notify[p] = lnk.endpoint.String()
}
}
if len(notify) > 0 {
logrus.Infoln("[query] wrap", len(notify), "notify")
w := helper.SelectWriter()
json.NewEncoder(w).Encode(&notify)
l.Write(head.NewPacket(head.ProtoNotify, l.me.srcport, l.peerip, l.me.dstport, w.Bytes()), false)
helper.PutWriter(w)
}
}
// sendquery 主动发起查询,询问对方是否可以到达 peers
func (l *Link) sendquery(tick time.Duration, peers ...string) {
if len(peers) == 0 {
return
}
data, err := json.Marshal(peers)
if err != nil {
panic(err)
}
t := time.NewTicker(tick)
for range t.C {
logrus.Infoln("[query] send query to", l.peerip)
_, err = l.Write(head.NewPacket(head.ProtoQuery, l.me.srcport, l.peerip, l.me.dstport, data), false)
if err != nil {
logrus.Errorln("[query] write err:", err)
}
}
}

View File

@@ -10,6 +10,11 @@ import (
"github.com/sirupsen/logrus"
)
// Read 从 peer 收包
func (l *Link) Read() *head.Packet {
return <-l.pipe
}
func (m *Me) initrecvpool() {
if m.recving == nil {
m.recving = make(map[[32]byte]*head.Packet, 128)

72
gold/link/send.go Normal file
View File

@@ -0,0 +1,72 @@
package link
import (
"errors"
"fmt"
"github.com/fumiama/WireGold/gold/head"
"github.com/sirupsen/logrus"
)
// Write 向 peer 发包
func (l *Link) Write(p *head.Packet, istransfer bool) (n int, err error) {
if len(p.Data) <= int(l.me.mtu) {
if !istransfer {
p.FillHash()
p.Data = l.Encode(p.Data)
}
return l.write(p, uint32(len(p.Data)), 0, istransfer, false)
}
if !istransfer {
p.FillHash()
p.Data = l.Encode(p.Data)
}
data := p.Data
totl := uint32(len(data))
i := 0
for ; int(totl)-i > int(l.me.mtu); i += int(l.me.mtu) {
logrus.Debugln("[link] split frag", i, ":", i+int(l.me.mtu), ", remain:", int(totl)-i-int(l.me.mtu))
packet := *p
packet.Data = data[:int(l.me.mtu)]
cnt, err := l.write(&packet, totl, uint16(uint(i)>>3), istransfer, true)
n += cnt
if err != nil {
return n, err
}
data = data[int(l.me.mtu):]
}
p.Data = data
cnt, err := l.write(p, totl, uint16(uint(i)>>3), istransfer, false)
n += cnt
if err != nil {
return n, err
}
return n, nil
}
// write 向 peer 发一个包
func (l *Link) write(p *head.Packet, datasz uint32, offset uint16, istransfer, hasmore bool) (n int, err error) {
var d []byte
var cl func()
if istransfer {
if p.Flags&0x4000 == 0x4000 && len(p.Data) > int(l.me.mtu) {
return len(p.Data), errors.New("drop dont fragmnet big trans packet")
}
d, cl = p.Marshal(nil, 0, 0, false, false)
} else {
d, cl = p.Marshal(l.me.me, datasz, offset, false, hasmore)
}
if d == nil {
return 0, errors.New("[link] ttl exceeded")
}
if err == nil {
peerep := l.endpoint
if peerep == nil {
return 0, errors.New("[link] nil endpoint of " + p.Dst.String())
}
logrus.Debugln("[link] write", len(d), "bytes data from ep", l.me.myep.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset))
n, err = l.me.myep.WriteToUDP(d, peerep)
cl()
}
return
}