diff --git a/gold/head/packet.go b/gold/head/packet.go index d196d90..ab9342d 100644 --- a/gold/head/packet.go +++ b/gold/head/packet.go @@ -1,19 +1,22 @@ package head type Packet struct { + DataSZ uint32 Proto uint8 + TTL uint8 SrcPort uint16 DstPort uint16 - TTL uint8 + Src string + Dst string Data []byte } func NewPacket(proto uint8, srcPort uint16, dstPort uint16, data []byte) *Packet { return &Packet{ Proto: proto, + TTL: 255, SrcPort: srcPort, DstPort: dstPort, - TTL: 255, Data: data, } } @@ -22,6 +25,6 @@ func (p *Packet) UnMashal(data []byte) { } -func (p *Packet) Mashal() []byte { +func (p *Packet) Mashal(src string, dst string) []byte { return nil } diff --git a/gold/link/crypto.go b/gold/link/crypto.go deleted file mode 100644 index d0c7126..0000000 --- a/gold/link/crypto.go +++ /dev/null @@ -1,23 +0,0 @@ -package link - -import "net" - -var ( - privKey [32]byte - myip string - me net.IP -) - -func SetMyself(privateKey [32]byte, myIP string) { - privKey = privateKey - myip = myIP - me = net.ParseIP(myIP) -} - -func (id *Identity) Encode(b []byte) (n int, err error) { - return 0, nil -} - -func (id *Identity) Decode(b []byte) (n int, err error) { - return 0, nil -} diff --git a/gold/link/identify.go b/gold/link/identify.go deleted file mode 100644 index 97031bf..0000000 --- a/gold/link/identify.go +++ /dev/null @@ -1,9 +0,0 @@ -package link - -type Identity struct { - PubicKey [32]byte - EndPoint string - KeepAlive int64 -} - -var peers = make(map[string]*Identity) diff --git a/gold/link/link.go b/gold/link/link.go index 176c25f..40f8cd0 100644 --- a/gold/link/link.go +++ b/gold/link/link.go @@ -3,6 +3,7 @@ package link import ( "errors" "net" + "sync" "github.com/fumiama/WireGold/gold/head" ) @@ -10,14 +11,32 @@ import ( type Link struct { conn net.Conn peer *Identity + peerip net.IP hasKeepRuning bool } +var ( + connections = make(map[string]*Link) + connmapmu sync.RWMutex +) + func Connect(peer string) (l Link, err error) { - p, ok := peers[peer] + peer = net.ParseIP(peer).String() + p, ok := IsInPeer(peer) if ok { - l.conn, err = net.Dial("udp", peer) + connmapmu.RLock() + lnk, ok := connections[peer] + connmapmu.RUnlock() + if ok { + return *lnk, nil + } + l.conn, err = net.Dial("udp", p.EndPoint) l.peer = p + l.peerip = net.ParseIP(peer) + connmapmu.Lock() + connections[l.peerip.String()] = &l + connmapmu.Unlock() + l.keepAlive() } else { err = errors.New("peer not exist") } @@ -26,23 +45,18 @@ func Connect(peer string) (l Link, err error) { func (l *Link) Close() { l.conn.Close() + connmapmu.Lock() + delete(connections, l.peerip.String()) + connmapmu.Unlock() } -func (l *Link) Read(p *head.Packet) (n int, err error) { - d := make([]byte, 1024) - n, err = l.conn.Read(d) - if err == nil { - n, err = l.peer.Decode(d) - if err == nil { - p.UnMashal(d) - } - } - return +func (l *Link) Read() *head.Packet { + return <-l.peer.pipe } func (l *Link) Write(p *head.Packet) (n int, err error) { - d := p.Mashal() - _, err = l.peer.Encode(d) + d := p.Mashal(me.String(), l.peerip.String()) + d, err = l.peer.Encode(d) if err == nil { n, err = l.conn.Write(d) } diff --git a/gold/link/local.go b/gold/link/local.go new file mode 100644 index 0000000..b00a180 --- /dev/null +++ b/gold/link/local.go @@ -0,0 +1,67 @@ +package link + +import ( + "net" + + "github.com/fumiama/WireGold/gold/head" + "github.com/sirupsen/logrus" +) + +var ( + privKey [32]byte + me net.IP +) + +func SetMyself(privateKey [32]byte, myIP string) { + privKey = privateKey + me = net.ParseIP(myIP) +} + +func (id *Identity) Encode(b []byte) (eb []byte, err error) { + return b, nil +} + +func (id *Identity) Decode(b []byte) (db []byte, err error) { + return b, nil +} + +func Listen(endpoint string) error { + conn, err := net.ListenPacket("udp", endpoint) + if err == nil { + go func() { + listenbuff := make([]byte, 65536) + for { + _, addr, err := conn.ReadFrom(listenbuff) + if err == nil { + p, ok := IsInPeer(addr.String()) + if ok { + packet := head.Packet{} + d, err := p.Decode(listenbuff) + if err == nil { + packet.UnMashal(d) + r := packet.DataSZ - uint32(len(packet.Data)) + if r > 0 { + i := 0 + n := 0 + remain := make([]byte, r) + for r > 0 { + n, _, err = conn.ReadFrom(remain[i:]) + if err == nil { + i += n + r -= uint32(n) + } else { + logrus.Errorln("[link.listen]", err) + return + } + } + packet.Data = append(packet.Data, remain...) + } + p.pipe <- &packet + } + } + } + } + }() + } + return err +} diff --git a/gold/link/nat.go b/gold/link/nat.go index 1bc7d24..9e912da 100644 --- a/gold/link/nat.go +++ b/gold/link/nat.go @@ -8,7 +8,7 @@ import ( "github.com/fumiama/WireGold/gold/head" ) -func (l *Link) KeepAlive() { +func (l *Link) keepAlive() { if l.peer.KeepAlive > 0 && !l.hasKeepRuning { l.hasKeepRuning = true go func() { diff --git a/gold/link/remote.go b/gold/link/remote.go new file mode 100644 index 0000000..8461865 --- /dev/null +++ b/gold/link/remote.go @@ -0,0 +1,48 @@ +package link + +import ( + "net" + "sync" + + "github.com/fumiama/WireGold/gold/head" +) + +type Identity struct { + PubicKey [32]byte + EndPoint string + KeepAlive int64 + pipe chan *head.Packet +} + +var ( + peers = make(map[string]*Identity) + peersmu sync.RWMutex +) + +func AddPeer(peerip string, pubicKey [32]byte, endPoint string, keepAlive int64) (i *Identity) { + peerip = net.ParseIP(peerip).String() + var ok bool + peersmu.RLock() + i, ok = peers[peerip] + peersmu.RUnlock() + if ok { + return + } + i = &Identity{ + PubicKey: pubicKey, + EndPoint: endPoint, + KeepAlive: keepAlive, + pipe: make(chan *head.Packet, 32), + } + peersmu.Lock() + peers[peerip] = i + peersmu.Unlock() + return +} + +func IsInPeer(peer string) (p *Identity, ok bool) { + peersmu.RLock() + p, ok = peers[peer] + peersmu.RUnlock() + return +} diff --git a/upper/data.go b/upper/data.go index 25be846..2d55ba6 100644 --- a/upper/data.go +++ b/upper/data.go @@ -3,5 +3,6 @@ package upper import "io" type Service interface { + Create(peer string, srcport uint16, destport uint16) (Service, error) io.ReadWriteCloser } diff --git a/upper/services/tunnel/tunnel.go b/upper/services/tunnel/tunnel.go index 9fb835a..9792c8a 100644 --- a/upper/services/tunnel/tunnel.go +++ b/upper/services/tunnel/tunnel.go @@ -1,6 +1,8 @@ package tunnel import ( + "errors" + "github.com/sirupsen/logrus" "github.com/fumiama/WireGold/gold/head" @@ -8,11 +10,12 @@ import ( ) type Tunnel struct { - l *link.Link - In *chan []byte - Out *chan []byte - src uint16 - dest uint16 + l *link.Link + in chan []byte + out chan []byte + outcache []byte + src uint16 + dest uint16 } func Create(peer string, srcport uint16, destport uint16) (s Tunnel, err error) { @@ -21,26 +24,70 @@ func Create(peer string, srcport uint16, destport uint16) (s Tunnel, err error) l, err = link.Connect(peer) if err == nil { s.l = &l - s.In = new(chan []byte) - s.Out = new(chan []byte) + s.in = make(chan []byte, 4) + s.out = make(chan []byte, 4) s.src = srcport s.dest = destport go s.handleWrite() + go s.handleRead() } else { logrus.Errorln("[tunnel] create err:", err) } return } +func (s *Tunnel) Write(p []byte) (int, error) { + s.in <- p + return len(p), nil +} + +func (s *Tunnel) Read(p []byte) (int, error) { + var d []byte + if s.outcache != nil { + d = s.outcache + } else { + d = <-s.out + } + if d != nil { + if len(p) >= len(d) { + s.outcache = nil + return copy(p, d), nil + } else { + s.outcache = d[len(p):] + return copy(p, d[:len(p)]), nil + } + } + return 0, errors.New("reading reaches nil") +} + +func (s *Tunnel) Close() error { + s.l.Close() + close(s.in) + return nil +} + func (s *Tunnel) handleWrite() { - for b := range *s.In { + for b := range s.in { + if b == nil { + break + } + logrus.Debugln("[tunnel] writing", len(b), "bytes...") _, err := s.l.Write(head.NewPacket(head.ProtoData, s.src, s.dest, b)) if err != nil { logrus.Errorln("[tunnel] write err:", err) + break + } else { + logrus.Debugln("[tunnel] write succeeded") } } } -func (s *Tunnel) Handle(srcport uint16, destport uint16, data *[]byte) { - +func (s *Tunnel) handleRead() { + for { + p := s.l.Read() + if p == nil { + break + } + s.out <- p.Data + } } diff --git a/upper/services/tunnel/tunnel_test.go b/upper/services/tunnel/tunnel_test.go new file mode 100644 index 0000000..3452ae7 --- /dev/null +++ b/upper/services/tunnel/tunnel_test.go @@ -0,0 +1,23 @@ +package tunnel + +import ( + "testing" + + "github.com/fumiama/WireGold/gold/link" +) + +func TestTunnel(t *testing.T) { + link.SetMyself([32]byte{}, "127.0.0.1:1234") + link.AddPeer("192.168.1.1", [32]byte{}, "127.0.0.1:1235", 0) + link.Listen("127.0.0.1:1234") + link.Listen("127.0.0.1:1235") + tunn, err := Create("192.168.1.1", 1, 1) + if err != nil { + t.Error(err) + } else { + tunn.Write(([]byte)("1234")) + p := make([]byte, 4) + tunn.Read(p) + t.Log(p) + } +}