diff --git a/README.md b/README.md index d2ad754..4a29450 100644 --- a/README.md +++ b/README.md @@ -34,15 +34,17 @@ Peers: IP: "192.168.233.2" SubNet: 192.168.233.0/24 PublicKey: 徯萃嵾爻燸攗窍褃冔蒔犡緇袿屿組待族砇嘀 - EndPoint: that.host2.com:56789 + EndPoint: 1.2.3.4:56789 AllowedIPs: ["192.168.233.2/32"] KeepAliveSeconds: 0 + QueryList: ["192.168.233.3"] + QuerySeconds: 10 AllowTrans: false - IP: "192.168.233.3" SubNet: 192.168.233.0/24 PublicKey: 牢喨粷詸衭譛浾蘹櫠砙杹蟫瑳叩刋橋経挵蘀 - EndPoint: that.host3.com:56789 + EndPoint: "" AllowedIPs: ["192.168.233.3/32"] KeepAliveSeconds: 0 AllowTrans: false diff --git a/config/cfg.go b/config/cfg.go index 117714e..f57ee20 100644 --- a/config/cfg.go +++ b/config/cfg.go @@ -25,6 +25,8 @@ type Peer struct { EndPoint string `yaml:"EndPoint"` AllowedIPs []string `yaml:"AllowedIPs"` KeepAliveSeconds int64 `yaml:"KeepAliveSeconds"` + QueryList []string `yaml:"QueryList"` + QuerySeconds int64 `yaml:"QuerySeconds"` AllowTrans bool `yaml:"AllowTrans"` } diff --git a/gold/link/listen.go b/gold/link/listen.go index 23e8126..bde61f2 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -45,7 +45,7 @@ func (m *Me) listen() (conn *net.UDPConn, err error) { case head.ProtoHello: switch p.status { case LINK_STATUS_DOWN: - n, err = p.Write(head.NewPacket(head.ProtoHello, 0, p.peerip, 0, nil), false) + n, err = p.Write(head.NewPacket(head.ProtoHello, m.SrcPort(), p.peerip, m.DstPort(), nil), false) if err == nil { logrus.Debugln("[link] send", n, "bytes hello ack packet") p.status = LINK_STATUS_HALFUP @@ -58,11 +58,11 @@ func (m *Me) listen() (conn *net.UDPConn, err error) { break } case head.ProtoNotify: - logrus.Debugln("[link] recv notify") - p.onNotify(packet) + logrus.Infoln("[link] recv notify from", packet.Src) + go p.onNotify(packet.Data) case head.ProtoQuery: - logrus.Debugln("[link] recv query") - p.onQuery(packet) + logrus.Infoln("[link] recv query from", packet.Src) + go p.onQuery(packet.Data) case head.ProtoData: if p.pipe != nil { p.pipe <- packet diff --git a/gold/link/me.go b/gold/link/me.go index 3d4547a..a7a3762 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -75,7 +75,7 @@ func NewMe(privateKey *[32]byte, myipwithmask string, myEndpoint string, nic low table: make(map[string]*Link, 16), } m.router.SetDefault(nil) - m.loop = m.AddPeer(m.me.String(), nil, "127.0.0.1:56789", []string{myipwithmask}, 0, false, nic != nil) + m.loop = m.AddPeer(m.me.String(), nil, "127.0.0.1:56789", []string{myipwithmask}, nil, 0, 0, false, nic != nil) m.srcport = srcport m.dstport = dstport m.mtu = mtu & 0xfff8 diff --git a/gold/link/nat.go b/gold/link/nat.go index e14f0e5..6d40fee 100644 --- a/gold/link/nat.go +++ b/gold/link/nat.go @@ -14,7 +14,7 @@ func (l *Link) keepAlive() { logrus.Infoln("[link.nat] start to keep alive") t := time.NewTicker(time.Second * time.Duration(l.keepalive)) for range t.C { - n, err := l.Write(head.NewPacket(head.ProtoHello, 0, l.peerip, 0, nil), false) + n, err := l.Write(head.NewPacket(head.ProtoHello, l.me.srcport, l.peerip, l.me.dstport, nil), false) if err == nil { logrus.Infoln("[link] send", n, "bytes keep alive packet") } else { diff --git a/gold/link/notify.go b/gold/link/notify.go index 3b809b8..151ae73 100644 --- a/gold/link/notify.go +++ b/gold/link/notify.go @@ -1,13 +1,36 @@ package link -import "github.com/fumiama/WireGold/gold/head" +import ( + "encoding/json" + "net" + + "github.com/fumiama/WireGold/gold/head" + "github.com/sirupsen/logrus" +) // 收到通告包的处理函数 -func (l *Link) onNotify(packet *head.Packet) { +func (l *Link) onNotify(packet []byte) { // TODO: 完成data解包与endpoint注册 // 1. Data 解包 - // ---- 使用 head.Notify 解释 packet.Data + // ---- 使用 head.Notify 解释 packet + notify := make(head.Notify, 32) + err := json.Unmarshal(packet, ¬ify) + if err != nil { + logrus.Errorln("[notify] json unmarshal err:", err) + return + } // 2. endpoint注册 // ---- 遍历 Notify,注册对方的 endpoint 到 // ---- connections,注意使用读写锁connmapmu + for peer, ep := range notify { + addr, err := net.ResolveUDPAddr("udp", ep) + if err == nil { + p, ok := l.me.IsInPeer(peer) + if ok { + p.endpoint = addr + continue + } + } + logrus.Debugln("[notify] drop invalid peer:", peer, "ep:", ep) + } } diff --git a/gold/link/peer.go b/gold/link/peer.go index 17c49b1..35c84a6 100644 --- a/gold/link/peer.go +++ b/gold/link/peer.go @@ -2,6 +2,7 @@ package link import ( "net" + "time" "unsafe" "github.com/fumiama/WireGold/gold/head" @@ -10,7 +11,7 @@ import ( ) // AddPeer 添加一个 peer -func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowedIPs []string, keepAlive int64, allowTrans, nopipe bool) (l *Link) { +func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowedIPs, querys []string, keepAlive, queryTick int64, allowTrans, nopipe bool) (l *Link) { peerip = net.ParseIP(peerip).String() var ok bool l, ok = m.IsInPeer(peerip) @@ -60,6 +61,7 @@ func (m *Me) AddPeer(peerip string, pubicKey *[32]byte, endPoint string, allowed } logrus.Infoln("[peer] add peer:", peerip, "allow:", allowedIPs) go l.keepAlive() + go l.sendquery(time.Second*time.Duration(queryTick), querys...) return } diff --git a/gold/link/query.go b/gold/link/query.go index 6b2b4b2..c76cd9b 100644 --- a/gold/link/query.go +++ b/gold/link/query.go @@ -2,31 +2,62 @@ package link import ( "encoding/json" - "errors" + "time" + + "github.com/sirupsen/logrus" "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/helper" ) // 收到询问包的处理函数 -func (l *Link) onQuery(packet *head.Packet) { - // TODO: 完成data解包与notify分发 +func (l *Link) onQuery(packet []byte) { + // 完成data解包与notify分发 + // 1. Data 解包 - // ---- 使用 head.Query 解释 packet.Data + // ---- 使用 head.Query 解释 packet // ---- 根据 Query 确定需要封装的 Notify + var peers head.Query + err := json.Unmarshal(packet, &peers) + if err != nil { + logrus.Errorln("[qurey] json unmarshal err:", err) + return + } + // 2. notify分发 - // ---- 封装 Notify 到 新的 packet.Data + // ---- 封装 Notify 到 新的 packet // ---- 调用 l.Send 发送到对方 + notify := make(head.Notify, len(peers)) + for _, p := range peers { + lnk, ok := l.me.IsInPeer(p) + if ok { + notify[p] = lnk.endpoint.String() + } + } + if len(notify) > 0 { + logrus.Infoln("[query] wrap", len(notify), "notify") + w := helper.SelectWriter() + json.NewEncoder(w).Encode(¬ify) + l.Write(head.NewPacket(head.ProtoNotify, l.me.srcport, l.peerip, l.me.dstport, w.Bytes()), false) + helper.PutWriter(w) + } } -// SendQuery 主动发起查询,询问对方是否可以到达 peers -func (l *Link) SendQuery(peers ...string) error { +// sendquery 主动发起查询,询问对方是否可以到达 peers +func (l *Link) sendquery(tick time.Duration, peers ...string) { if len(peers) == 0 { - return errors.New("len(peers) is 0") + return } data, err := json.Marshal(peers) if err != nil { - return err + panic(err) + } + t := time.NewTicker(tick) + for range t.C { + logrus.Infoln("[query] send query to", l.peerip) + _, err = l.Write(head.NewPacket(head.ProtoQuery, l.me.srcport, l.peerip, l.me.dstport, data), false) + if err != nil { + logrus.Errorln("[query] write err:", err) + } } - _, err = l.Write(head.NewPacket(head.ProtoQuery, 0, l.peerip, 0, data), false) - return err } diff --git a/upper/services/tunnel/tunnel_test.go b/upper/services/tunnel/tunnel_test.go index 9428400..d6be4e3 100644 --- a/upper/services/tunnel/tunnel_test.go +++ b/upper/services/tunnel/tunnel_test.go @@ -28,9 +28,9 @@ func TestTunnel(t *testing.T) { t.Log("peer publ key:", hex.EncodeToString(peerpk.Public()[:])) m := link.NewMe(selfpk.Private(), "192.168.1.2/32", "127.0.0.1:1236", nil, 1, 1, 4096) - m.AddPeer("192.168.1.3", peerpk.Public(), "127.0.0.1:1237", []string{"192.168.1.3/32"}, 0, false, false) + m.AddPeer("192.168.1.3", peerpk.Public(), "127.0.0.1:1237", []string{"192.168.1.3/32"}, nil, 0, 0, false, false) p := link.NewMe(peerpk.Private(), "192.168.1.3/32", "127.0.0.1:1237", nil, 1, 1, 4096) - p.AddPeer("192.168.1.2", selfpk.Public(), "127.0.0.1:1236", []string{"192.168.1.2/32"}, 0, false, false) + p.AddPeer("192.168.1.2", selfpk.Public(), "127.0.0.1:1236", []string{"192.168.1.2/32"}, nil, 0, 0, false, false) tunnme, err := Create(&m, "192.168.1.3") if err != nil { t.Fatal(err) diff --git a/upper/services/wg/wg.go b/upper/services/wg/wg.go index 47625ad..9eef543 100644 --- a/upper/services/wg/wg.go +++ b/upper/services/wg/wg.go @@ -107,6 +107,6 @@ func (wg *WG) init(srcport, destport, mtu uint16) { if n != 32 { panic("peer public key length is not 32") } - wg.me.AddPeer(peer.IP, &peerkey, peer.EndPoint, peer.AllowedIPs, peer.KeepAliveSeconds, peer.AllowTrans, true) + wg.me.AddPeer(peer.IP, &peerkey, peer.EndPoint, peer.AllowedIPs, peer.QueryList, peer.KeepAliveSeconds, peer.QuerySeconds, peer.AllowTrans, true) } }