diff --git a/gold/link/link.go b/gold/link/link.go index 25957d7..3f93198 100644 --- a/gold/link/link.go +++ b/gold/link/link.go @@ -22,6 +22,8 @@ type Link struct { // 以秒为单位,小于等于 0 不发送 keepalive int64 // 收到的包的队列 + // 没有下层 nic 时 + // 包会分发到此 pipe chan *head.Packet // peer 的虚拟 ip peerip net.IP diff --git a/gold/link/listen.go b/gold/link/listen.go index aa0ac42..088ac26 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -68,7 +68,7 @@ func (m *Me) listen() (conn *net.UDPConn, err error) { p.pipe <- packet logrus.Debugln("[link] deliver to pipe of", p.peerip) } else { - m.pipe <- packet.Data + m.nic.Write(packet.Data) logrus.Debugln("[link] deliver", len(packet.Data), "bytes data to pipe of me") } default: @@ -103,12 +103,6 @@ func (m *Me) listen() (conn *net.UDPConn, err error) { return } -// Read 接收所有发送给本机的报文 -// 需要开启 nopipe -func (m *Me) Read() []byte { - return <-m.pipe -} - // 从 conn 读取 sz 字节数据 func readAll(conn *net.UDPConn, sz int) ([]byte, error) { i := 0 diff --git a/gold/link/me.go b/gold/link/me.go index 8ea9b96..b0242de 100644 --- a/gold/link/me.go +++ b/gold/link/me.go @@ -1,10 +1,16 @@ package link import ( + "encoding/binary" + "io" "net" + "strconv" "sync" "github.com/fumiama/WireGold/gold/head" + "github.com/fumiama/WireGold/lower" + "github.com/fumiama/water/waterutil" + "github.com/sirupsen/logrus" ) // Me 是本机的抽象 @@ -28,8 +34,8 @@ type Me struct { connmapmu sync.RWMutex // 本机监听的 endpoint myconn *net.UDPConn - // 不分目的 link 的接收队列 - pipe chan []byte + // 本机网卡 + nic lower.NICIO // 本机路由表 router *Router // 本机未接收完全分片池 @@ -39,11 +45,10 @@ type Me struct { clock map[*head.Packet]uint8 // 本机上层配置 srcport, dstport, mtu uint16 - readptr []byte } // NewMe 设置本机参数 -func NewMe(privateKey *[32]byte, myipwithmask string, myEndpoint string, nopipeinlink bool, srcport, dstport, mtu uint16) (m Me) { +func NewMe(privateKey *[32]byte, myipwithmask string, myEndpoint string, nic lower.NICIO, srcport, dstport, mtu uint16) (m Me) { m.privKey = *privateKey var err error m.myend, err = net.ResolveUDPAddr("udp", myEndpoint) @@ -61,15 +66,13 @@ func NewMe(privateKey *[32]byte, myipwithmask string, myEndpoint string, nopipei panic(err) } m.connections = make(map[string]*Link) - if nopipeinlink { - m.pipe = make(chan []byte, 32) - } + m.nic = nic m.router = &Router{ list: make([]*net.IPNet, 1, 16), table: make(map[string]*Link, 16), } m.router.SetDefault(nil) - m.loop = m.AddPeer(m.me.String(), nil, "127.0.0.1:56789", []string{myipwithmask}, 0, false, nopipeinlink) + m.loop = m.AddPeer(m.me.String(), nil, "127.0.0.1:56789", []string{myipwithmask}, 0, false, nic != nil) m.srcport = srcport m.dstport = dstport m.mtu = mtu & 0xfff8 @@ -88,3 +91,91 @@ func (m *Me) DstPort() uint16 { func (m *Me) MTU() uint16 { return m.mtu } + +func (m *Me) ListenFromNIC() { + // 双缓冲区 + buf := make([]byte, m.MTU()+68) // 增加报头长度与 TEA 冗余 + buf2 := make([]byte, m.MTU()+68) // 增加报头长度与 TEA 冗余 + + off := 0 + isrev := false + for { // 从 NIC 发送 + var packet []byte + if off > 0 && !isrev { + packet = buf2 + } else { + packet = buf + } + n, err := m.nic.Read(packet[off:]) + if isrev { + off = 0 + } + if err != nil { + logrus.Errorln("[lower] send read from nic err:", err) + break + } + if n == 0 { + continue + } + packet = packet[:n] + n, rem := m.send(m.nic, packet) + for len(rem) > 20 && n > 0 { + n, rem = m.send(m.nic, rem) + } + if len(rem) > 0 { + logrus.Debugln("[lower] remain", len(rem), "bytes to send") + if off > 0 { + off = copy(buf, rem) + isrev = true + } else { + off = copy(buf2, rem) + } + } else { + off = 0 + } + } +} + +func (m *Me) send(nc io.Reader, packet []byte) (n int, rem []byte) { + if !waterutil.IsIPv4(packet) { + if waterutil.IsIPv6(packet) { + n = int(binary.BigEndian.Uint16(packet[4:6])) + 40 + if n > len(packet) { + rem = packet + logrus.Warnln("[lower] skip to send", len(packet), "bytes ipv6 packet head") + } else { + rem = packet[n:] + logrus.Warnln("[lower] skip to send", n, "bytes ipv6 packet") + } + return + } + logrus.Warnln("[lower] skip to send", len(packet), "bytes non-ipv4/v6 packet") + return len(packet), nil + } + totl := waterutil.IPv4TotalLength(packet) + if int(totl) > len(packet) { + buf := make([]byte, int(totl)) + copy(buf, packet) + cnt, err := m.nic.Read(buf[len(packet):]) + if err != nil { + rem = packet + return + } + packet = buf[:cnt+len(packet)] + } + rem = packet[totl:] + packet = packet[:totl] + n = int(totl) + dst := waterutil.IPv4Destination(packet) + logrus.Debugln("[lower] sending", len(packet), "bytes packet from :"+strconv.Itoa(int(m.SrcPort())), "to", dst.String()+":"+strconv.Itoa(int(m.DstPort()))) + lnk, err := m.Connect(dst.String()) + if err != nil { + logrus.Warnln("[lower] connect to peer", dst.String(), "err:", err) + return + } + _, err = lnk.Write(head.NewPacket(head.ProtoData, m.SrcPort(), dst, m.DstPort(), packet), false) + if err != nil { + logrus.Warnln("[lower] write to peer", dst.String(), "err:", err) + } + return +} diff --git a/lower/nic.go b/lower/nic.go index e9417ec..676f970 100644 --- a/lower/nic.go +++ b/lower/nic.go @@ -1,111 +1,57 @@ package lower import ( - "encoding/binary" + "io" "os" "os/exec" - "strconv" "github.com/fumiama/water" - "github.com/fumiama/water/waterutil" "github.com/sirupsen/logrus" - - "github.com/fumiama/WireGold/gold/head" - "github.com/fumiama/WireGold/gold/link" ) +type NICIO interface { + io.ReadWriteCloser + Up() + Down() +} + // NIC 虚拟网卡 type NIC struct { - ifce *water.Interface - ip string - subnet string - cidrs []string - hasstart bool + ifce *water.Interface + ip string + subnet string + cidrs []string } // NewNIC 新建 TUN 网络接口卡 // 网卡地址为 ip, 所属子网为 subnet -// 所有路由为 cidrs -func NewNIC(ip, subnet string, cidrs ...string) (n *NIC) { +// 以本网卡为下一跳的所有子网为 cidrs +// cidrs 不包括本网卡 subnet +func NewNIC(ip, subnet string, cidrs ...string) NICIO { ifce, err := water.New(water.Config{DeviceType: water.TUN}) if err != nil { panic(err) } - n = &NIC{ + n := &NIC{ ifce: ifce, ip: ip, cidrs: cidrs, subnet: subnet, } - n.prepare() - return + return n } -// Start 开始处理网卡消息,阻塞 -func (nc *NIC) Start(m *link.Me) { - if nc.hasstart { - return - } - nc.hasstart = true - go func() { // 接收到 NIC - for nc.hasstart { - data := m.Read() - n, err := nc.ifce.Write(data) - if err != nil { - logrus.Errorln("[lower] recv write to nic err:", err) - break - } - logrus.Debugln("[lower] recv write", n, "bytes packet to nic") - } - }() - buf := make([]byte, (m.MTU()+68)*4096) // 增加报头长度与 TEA 冗余 - buf2 := make([]byte, (m.MTU()+68)*4096) // 增加报头长度与 TEA 冗余 - off := 0 - isrev := false - for nc.hasstart { // 从 NIC 发送 - var packet []byte - if off > 0 && !isrev { - packet = buf2 - } else { - packet = buf - } - n, err := nc.ifce.Read(packet[off:]) - if isrev { - off = 0 - } - if err != nil { - logrus.Errorln("[lower] send read from nic err:", err) - break - } - if n == 0 { - continue - } - packet = packet[:n] - n, rem := nc.send(m, packet) - for len(rem) > 20 && n > 0 { - n, rem = nc.send(m, rem) - } - if len(rem) > 0 { - logrus.Debugln("[lower] remain", len(rem), "bytes to send") - if off > 0 { - off = copy(buf, rem) - isrev = true - } else { - off = copy(buf2, rem) - } - } else { - off = 0 - } - } +// Read 匹配 PacketsIO Interface +func (nc *NIC) Read(buf []byte) (int, error) { + return nc.ifce.Read(buf) } -// Stop 停止处理 -func (n *NIC) Stop() { - n.hasstart = false +func (nc *NIC) Write(packet []byte) (int, error) { + return nc.ifce.Write(packet) } -// Destroy 关闭网卡 -func (n *NIC) Destroy() error { +// Close 关闭网卡 +func (n *NIC) Close() error { return n.ifce.Close() } @@ -120,47 +66,3 @@ func execute(c string, args ...string) { logrus.Panicln("[lower] failed to exec cmd:", err) } } - -func (nc *NIC) send(m *link.Me, packet []byte) (n int, rem []byte) { - if !waterutil.IsIPv4(packet) { - if waterutil.IsIPv6(packet) { - n = int(binary.BigEndian.Uint16(packet[4:6])) + 40 - if n > len(packet) { - rem = packet - logrus.Warnln("[lower] skip to send", len(packet), "bytes ipv6 packet head") - } else { - rem = packet[n:] - logrus.Warnln("[lower] skip to send", n, "bytes ipv6 packet") - } - return - } - logrus.Warnln("[lower] skip to send", len(packet), "bytes non-ipv4/v6 packet") - return len(packet), nil - } - totl := waterutil.IPv4TotalLength(packet) - if int(totl) > len(packet) { - buf := make([]byte, int(totl)) - copy(buf, packet) - cnt, err := nc.ifce.Read(buf[len(packet):]) - if err != nil { - rem = packet - return - } - packet = buf[:cnt+len(packet)] - } - rem = packet[totl:] - packet = packet[:totl] - n = int(totl) - dst := waterutil.IPv4Destination(packet) - logrus.Debugln("[lower] sending", len(packet), "bytes packet from :"+strconv.Itoa(int(m.SrcPort())), "to", dst.String()+":"+strconv.Itoa(int(m.DstPort()))) - lnk, err := m.Connect(dst.String()) - if err != nil { - logrus.Warnln("[lower] connect to peer", dst.String(), "err:", err) - return - } - _, err = lnk.Write(head.NewPacket(head.ProtoData, m.SrcPort(), dst, m.DstPort(), packet), false) - if err != nil { - logrus.Warnln("[lower] write to peer", dst.String(), "err:", err) - } - return -} diff --git a/lower/tun_darwin.go b/lower/tun_darwin.go index 497c2ab..b1f4649 100644 --- a/lower/tun_darwin.go +++ b/lower/tun_darwin.go @@ -3,7 +3,7 @@ package lower -func (n *NIC) prepare() { +func (n *NIC) Up() { execute("ifconfig", n.ifce.Name(), "inet", n.ip, n.ip, "up") execute("route", "add", n.subnet, "-interface", n.ifce.Name()) for _, c := range n.cidrs { @@ -11,10 +11,10 @@ func (n *NIC) prepare() { } } -func (n *NIC) Up() { - execute("ifconfig", n.ifce.Name(), "inet", n.ip, n.ip, "up") -} - func (n *NIC) Down() { execute("ifconfig", n.ifce.Name(), "down") + execute("route", "delete", n.subnet, "-interface", n.ifce.Name()) + for _, c := range n.cidrs { + execute("route", "delete", c, "-interface", n.ifce.Name()) + } } diff --git a/lower/tun_linux.go b/lower/tun_linux.go index 5a27dce..f77e7ce 100644 --- a/lower/tun_linux.go +++ b/lower/tun_linux.go @@ -3,7 +3,7 @@ package lower -func (n *NIC) prepare() { +func (n *NIC) Up() { execute("/sbin/ip", "link", "set", "dev", n.ifce.Name(), "mtu", "1500") execute("/sbin/ip", "addr", "add", n.ip, "dev", n.ifce.Name()) execute("/sbin/ip", "link", "set", "dev", n.ifce.Name(), "up") @@ -13,10 +13,10 @@ func (n *NIC) prepare() { } } -func (n *NIC) Up() { - execute("/sbin/ip", "link", "set", "dev", n.ifce.Name(), "up") -} - func (n *NIC) Down() { execute("/sbin/ip", "link", "set", "dev", n.ifce.Name(), "down") + execute("/sbin/ip", "route", "del", n.subnet, "dev", n.ifce.Name()) + for _, c := range n.cidrs { + execute("/sbin/ip", "route", "del", c, "dev", n.ifce.Name()) + } } diff --git a/lower/tun_stub.go b/lower/tun_stub.go index 4f523da..362bfb2 100644 --- a/lower/tun_stub.go +++ b/lower/tun_stub.go @@ -3,14 +3,10 @@ package lower -func (n *NIC) prepare() { - panic("not support this os now") -} - func (n *NIC) Up() { - panic("not support this os now") + panic("not support lower on this os now") } func (n *NIC) Down() { - panic("not support this os now") + panic("not support lower on this os now") } diff --git a/lower/tun_windows.go b/lower/tun_windows.go index 829dbc6..514c56f 100644 --- a/lower/tun_windows.go +++ b/lower/tun_windows.go @@ -5,7 +5,8 @@ package lower import "net" -func (n *NIC) prepare() { +func (n *NIC) Up() { + // execute("netsh", "interface", "set", "interface", n.ifce.Name(), "enabled") _, ipn, err := net.ParseCIDR(n.subnet) if err != nil { panic(err) @@ -20,12 +21,13 @@ func (n *NIC) prepare() { } } -func (n *NIC) Up() { - // execute("netsh", "interface", "set", "interface", n.ifce.Name(), "enabled") - // don't need to bring up the device by hand -} - func (n *NIC) Down() { // execute("netsh", "interface", "set", "interface", n.ifce.Name(), "disabled") - // don't need to bring up the device by hand + for _, c := range n.cidrs { + ip, _, err := net.ParseCIDR(c) + if err != nil { + panic(err) + } + execute("cmd", "/c", "route DELETE "+ip.String()) + } } diff --git a/upper/services/tunnel/tunnel_test.go b/upper/services/tunnel/tunnel_test.go index 2fa3e4b..9428400 100644 --- a/upper/services/tunnel/tunnel_test.go +++ b/upper/services/tunnel/tunnel_test.go @@ -27,9 +27,9 @@ func TestTunnel(t *testing.T) { t.Log("peer priv key:", hex.EncodeToString(peerpk.Private()[:])) t.Log("peer publ key:", hex.EncodeToString(peerpk.Public()[:])) - m := link.NewMe(selfpk.Private(), "192.168.1.2/32", "127.0.0.1:1236", false, 1, 1, 4096) + m := link.NewMe(selfpk.Private(), "192.168.1.2/32", "127.0.0.1:1236", nil, 1, 1, 4096) m.AddPeer("192.168.1.3", peerpk.Public(), "127.0.0.1:1237", []string{"192.168.1.3/32"}, 0, false, false) - p := link.NewMe(peerpk.Private(), "192.168.1.3/32", "127.0.0.1:1237", false, 1, 1, 4096) + p := link.NewMe(peerpk.Private(), "192.168.1.3/32", "127.0.0.1:1237", nil, 1, 1, 4096) p.AddPeer("192.168.1.2", selfpk.Public(), "127.0.0.1:1236", []string{"192.168.1.2/32"}, 0, false, false) tunnme, err := Create(&m, "192.168.1.3") if err != nil { diff --git a/upper/services/wg/wg.go b/upper/services/wg/wg.go index 4bd0b80..ccc986f 100644 --- a/upper/services/wg/wg.go +++ b/upper/services/wg/wg.go @@ -19,7 +19,7 @@ type WG struct { c *config.Config key [32]byte PublicKey string - nic *lower.NIC + nic lower.NICIO me link.Me } @@ -50,19 +50,18 @@ func NewWireGold(c *config.Config) (wg WG, err error) { func (wg *WG) Start(srcport, destport, mtu uint16) { wg.init(srcport, destport, mtu) wg.nic.Up() - go wg.nic.Start(&wg.me) + go wg.me.ListenFromNIC() } func (wg *WG) Run(srcport, destport, mtu uint16) { wg.init(srcport, destport, mtu) wg.nic.Up() - wg.nic.Start(&wg.me) + wg.me.ListenFromNIC() } func (wg *WG) Stop() { - wg.nic.Stop() + wg.nic.Close() wg.nic.Down() - wg.nic.Destroy() } func (wg *WG) init(srcport, destport, mtu uint16) { @@ -89,8 +88,13 @@ func (wg *WG) init(srcport, destport, mtu uint16) { i++ } - wg.nic = lower.NewNIC(wg.c.IP, wg.c.SubNet, cidrs...) - wg.me = link.NewMe(&wg.key, wg.c.IP+"/32", wg.c.EndPoint, true, srcport, destport, mtu) + wg.me = link.NewMe( + &wg.key, + wg.c.IP+"/32", + wg.c.EndPoint, + lower.NewNIC(wg.c.IP, wg.c.SubNet, cidrs...), + srcport, destport, mtu, + ) for _, peer := range wg.c.Peers { var peerkey [32]byte