mirror of
https://github.com/netbirdio/netbird.git
synced 2026-03-31 06:24:18 -04:00
Compare commits
19 Commits
nmap/compa
...
feature/in
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f07671cf49 | ||
|
|
5a504ee6be | ||
|
|
660b2542d2 | ||
|
|
d0ad53b247 | ||
|
|
2cffe6526a | ||
|
|
dded91235e | ||
|
|
314f34f916 | ||
|
|
eaf985624d | ||
|
|
48b7c6ec3c | ||
|
|
acf271bf25 | ||
|
|
f49c299d77 | ||
|
|
73b5f8d63b | ||
|
|
6653894691 | ||
|
|
a7facc2d72 | ||
|
|
0721b87c56 | ||
|
|
2829cce644 | ||
|
|
9350c5f8d8 | ||
|
|
2ae4c204af | ||
|
|
f5e974c04c |
98
client/hhhh.go
Normal file
98
client/hhhh.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package main
|
||||
|
||||
/*
|
||||
import (
|
||||
"flag"
|
||||
"github.com/netbirdio/netbird/iface"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"net"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"time"
|
||||
)
|
||||
|
||||
var name = flag.String("name", "wg0", "WireGuard interface name")
|
||||
var addr = flag.String("addr", "100.64.0.1/24", "interface WireGuard IP addr")
|
||||
var key = flag.String("key", "100.64.0.1/24", "WireGuard private key")
|
||||
var port = flag.Int("port", 51820, "WireGuard port")
|
||||
|
||||
var remoteKey = flag.String("remote-key", "", "remote WireGuard public key")
|
||||
var remoteAddr = flag.String("remote-addr", "100.64.0.2/32", "remote WireGuard IP addr")
|
||||
var remoteEndpoint = flag.String("remote-endpoint", "127.0.0.1:51820", "remote WireGuard endpoint")
|
||||
|
||||
func fff() {
|
||||
|
||||
flag.Parse()
|
||||
|
||||
go func() {
|
||||
log.Println(http.ListenAndServe("localhost:6060", nil))
|
||||
}()
|
||||
|
||||
myKey, err := wgtypes.ParseKey(*key)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("public key and addr [%s] [%s] ", myKey.PublicKey().String(), *addr)
|
||||
|
||||
wgIFace, err := iface.NewWGIFace(*name, *addr, 1280)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
defer wgIFace.Close()
|
||||
|
||||
// todo wrap into UDPMux
|
||||
sharedSock, _, err := listenNet("udp4", *port)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
defer sharedSock.Close()
|
||||
|
||||
// err = wgIFace.Create()
|
||||
err = wgIFace.CreateNew(sharedSock)
|
||||
if err != nil {
|
||||
log.Errorf("failed to create interface %s %v", *name, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = wgIFace.Configure(*key, *port)
|
||||
if err != nil {
|
||||
log.Errorf("failed to configure interface %s %v", *name, err)
|
||||
return
|
||||
}
|
||||
|
||||
ip, err := net.ResolveUDPAddr("udp4", *remoteEndpoint)
|
||||
if err != nil {
|
||||
// handle error
|
||||
}
|
||||
|
||||
err = wgIFace.UpdatePeer(*remoteKey, *remoteAddr, 20*time.Second, ip, nil)
|
||||
if err != nil {
|
||||
log.Errorf("failed to configure remote peer %s %v", *remoteKey, err)
|
||||
return
|
||||
}
|
||||
|
||||
select {}
|
||||
|
||||
}
|
||||
|
||||
func listenNet(network string, port int) (*net.UDPConn, int, error) {
|
||||
conn, err := net.ListenUDP(network, &net.UDPAddr{Port: port})
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Retrieve port.
|
||||
laddr := conn.LocalAddr()
|
||||
uaddr, err := net.ResolveUDPAddr(
|
||||
laddr.Network(),
|
||||
laddr.String(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return conn, uaddr.Port, nil
|
||||
}*/
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
nbstatus "github.com/netbirdio/netbird/client/status"
|
||||
"github.com/netbirdio/netbird/route"
|
||||
"math/rand"
|
||||
"net"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
@@ -89,10 +88,7 @@ type Engine struct {
|
||||
|
||||
wgInterface *iface.WGIface
|
||||
|
||||
udpMux ice.UDPMux
|
||||
udpMuxSrflx ice.UniversalUDPMux
|
||||
udpMuxConn *net.UDPConn
|
||||
udpMuxConnSrflx *net.UDPConn
|
||||
iceMux ice.UniversalUDPMux
|
||||
|
||||
// networkSerial is the latest CurrentSerial (state ID) of the network sent by the Management service
|
||||
networkSerial uint64
|
||||
@@ -155,30 +151,6 @@ func (e *Engine) Stop() error {
|
||||
}
|
||||
}
|
||||
|
||||
if e.udpMux != nil {
|
||||
if err := e.udpMux.Close(); err != nil {
|
||||
log.Debugf("close udp mux: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if e.udpMuxSrflx != nil {
|
||||
if err := e.udpMuxSrflx.Close(); err != nil {
|
||||
log.Debugf("close server reflexive udp mux: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if e.udpMuxConn != nil {
|
||||
if err := e.udpMuxConn.Close(); err != nil {
|
||||
log.Debugf("close udp mux connection: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if e.udpMuxConnSrflx != nil {
|
||||
if err := e.udpMuxConnSrflx.Close(); err != nil {
|
||||
log.Debugf("close server reflexive udp mux connection: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !isNil(e.sshServer) {
|
||||
err := e.sshServer.Stop()
|
||||
if err != nil {
|
||||
@@ -213,34 +185,34 @@ func (e *Engine) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
e.udpMuxConn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: e.config.UDPMuxPort})
|
||||
if err != nil {
|
||||
log.Errorf("failed listening on UDP port %d: [%s]", e.config.UDPMuxPort, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
e.udpMuxConnSrflx, err = net.ListenUDP("udp4", &net.UDPAddr{Port: e.config.UDPMuxSrflxPort})
|
||||
if err != nil {
|
||||
log.Errorf("failed listening on UDP port %d: [%s]", e.config.UDPMuxSrflxPort, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
e.udpMux = ice.NewUDPMuxDefault(ice.UDPMuxParams{UDPConn: e.udpMuxConn})
|
||||
e.udpMuxSrflx = ice.NewUniversalUDPMuxDefault(ice.UniversalUDPMuxParams{UDPConn: e.udpMuxConnSrflx})
|
||||
|
||||
err = e.wgInterface.Create()
|
||||
bind := &iface.ICEBind{}
|
||||
err = e.wgInterface.CreateNew(bind)
|
||||
if err != nil {
|
||||
log.Errorf("failed creating tunnel interface %s: [%s]", wgIfaceName, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
err = e.wgInterface.Configure(myPrivateKey.String(), e.config.WgPort)
|
||||
port, err := e.wgInterface.GetListenPort()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = e.wgInterface.Configure(myPrivateKey.String(), *port)
|
||||
if err != nil {
|
||||
log.Errorf("failed configuring Wireguard interface [%s]: %s", wgIfaceName, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
iceMux, err := bind.GetICEMux()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.iceMux = iceMux
|
||||
|
||||
log.Infof("NetBird Engine started listening on WireGuard port %d", *port)
|
||||
|
||||
e.routeManager = routemanager.NewManager(e.ctx, e.config.WgPrivateKey.PublicKey().String(), e.wgInterface, e.statusRecorder)
|
||||
e.config.WgPort = *port
|
||||
|
||||
e.receiveSignalEvents()
|
||||
e.receiveManagementEvents()
|
||||
@@ -760,8 +732,8 @@ func (e Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, er
|
||||
StunTurn: stunTurn,
|
||||
InterfaceBlackList: e.config.IFaceBlackList,
|
||||
Timeout: timeout,
|
||||
UDPMux: e.udpMux,
|
||||
UDPMuxSrflx: e.udpMuxSrflx,
|
||||
UDPMux: e.iceMux,
|
||||
UDPMuxSrflx: e.iceMux,
|
||||
ProxyConfig: proxyConfig,
|
||||
LocalWgPort: e.config.WgPort,
|
||||
}
|
||||
|
||||
@@ -147,7 +147,7 @@ func (conn *Conn) reCreateAgent() error {
|
||||
MulticastDNSMode: ice.MulticastDNSModeDisabled,
|
||||
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
|
||||
Urls: conn.config.StunTurn,
|
||||
CandidateTypes: []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay},
|
||||
CandidateTypes: []ice.CandidateType{ice.CandidateTypeServerReflexive, ice.CandidateTypeHost, ice.CandidateTypeRelay},
|
||||
FailedTimeout: &failedTimeout,
|
||||
InterfaceFilter: interfaceFilter(conn.config.InterfaceBlackList),
|
||||
UDPMux: conn.config.UDPMux,
|
||||
@@ -280,14 +280,7 @@ func (conn *Conn) Open() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if conn.proxy.Type() == proxy.TypeNoProxy {
|
||||
host, _, _ := net.SplitHostPort(remoteConn.LocalAddr().String())
|
||||
rhost, _, _ := net.SplitHostPort(remoteConn.RemoteAddr().String())
|
||||
// direct Wireguard connection
|
||||
log.Infof("directly connected to peer %s [laddr <-> raddr] [%s:%d <-> %s:%d]", conn.config.Key, host, iface.DefaultWgPort, rhost, iface.DefaultWgPort)
|
||||
} else {
|
||||
log.Infof("connected to peer %s [laddr <-> raddr] [%s <-> %s]", conn.config.Key, remoteConn.LocalAddr().String(), remoteConn.RemoteAddr().String())
|
||||
}
|
||||
log.Infof("connected to peer %s [laddr <-> raddr] [%s <-> %s]", conn.config.Key, remoteConn.LocalAddr().String(), remoteConn.RemoteAddr().String())
|
||||
|
||||
// wait until connection disconnected or has been closed externally (upper layer, e.g. engine)
|
||||
select {
|
||||
@@ -351,15 +344,16 @@ func (conn *Conn) startProxy(remoteConn net.Conn, remoteWgPort int) error {
|
||||
}
|
||||
|
||||
peerState := nbStatus.PeerState{PubKey: conn.config.Key}
|
||||
useProxy := shouldUseProxy(pair)
|
||||
|
||||
var p proxy.Proxy
|
||||
if useProxy {
|
||||
if pair.Local.Type() == ice.CandidateTypeRelay || pair.Remote.Type() == ice.CandidateTypeRelay {
|
||||
p = proxy.NewWireguardProxy(conn.config.ProxyConfig)
|
||||
peerState.Direct = false
|
||||
} else {
|
||||
p = proxy.NewNoProxy(conn.config.ProxyConfig, remoteWgPort)
|
||||
peerState.Direct = true
|
||||
}
|
||||
|
||||
conn.proxy = p
|
||||
err = p.Start(remoteConn)
|
||||
if err != nil {
|
||||
|
||||
@@ -39,7 +39,6 @@ func (p *NoProxy) Start(remoteConn net.Conn) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
addr.Port = p.RemoteWgListenPort
|
||||
err = p.config.WgInterface.UpdatePeer(p.config.RemoteKey, p.config.AllowedIps, DefaultWgKeepAlive,
|
||||
addr, p.config.PreSharedKey)
|
||||
|
||||
|
||||
9
go.mod
9
go.mod
@@ -39,9 +39,13 @@ require (
|
||||
github.com/libp2p/go-netroute v0.2.0
|
||||
github.com/magiconair/properties v1.8.5
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/pion/logging v0.2.2
|
||||
github.com/pion/stun v0.3.5
|
||||
github.com/pion/transport v0.13.0
|
||||
github.com/rs/xid v1.3.0
|
||||
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
|
||||
github.com/stretchr/testify v1.7.1
|
||||
go.uber.org/zap v1.17.0
|
||||
golang.org/x/net v0.0.0-20220513224357-95641704303c
|
||||
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467
|
||||
)
|
||||
@@ -81,11 +85,8 @@ require (
|
||||
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
|
||||
github.com/pegasus-kv/thrift v0.13.0 // indirect
|
||||
github.com/pion/dtls/v2 v2.1.2 // indirect
|
||||
github.com/pion/logging v0.2.2 // indirect
|
||||
github.com/pion/mdns v0.0.5 // indirect
|
||||
github.com/pion/randutil v0.1.0 // indirect
|
||||
github.com/pion/stun v0.3.5 // indirect
|
||||
github.com/pion/transport v0.13.0 // indirect
|
||||
github.com/pion/turn/v2 v2.0.7 // indirect
|
||||
github.com/pion/udp v0.1.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
@@ -99,6 +100,8 @@ require (
|
||||
github.com/srwiley/rasterx v0.0.0-20200120212402-85cb7272f5e9 // indirect
|
||||
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect
|
||||
github.com/yuin/goldmark v1.4.1 // indirect
|
||||
go.uber.org/atomic v1.7.0 // indirect
|
||||
go.uber.org/multierr v1.6.0 // indirect
|
||||
golang.org/x/image v0.0.0-20200430140353-33d19683fad8 // indirect
|
||||
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||
|
||||
3
go.sum
3
go.sum
@@ -646,8 +646,11 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
||||
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
|
||||
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
|
||||
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
|
||||
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
|
||||
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||
go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U=
|
||||
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
|
||||
185
iface/bind.go
Normal file
185
iface/bind.go
Normal file
@@ -0,0 +1,185 @@
|
||||
package iface
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/pion/stun"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.zx2c4.com/wireguard/conn"
|
||||
"net"
|
||||
"net/netip"
|
||||
"sync"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
type ICEBind struct {
|
||||
sharedConn net.PacketConn
|
||||
udpMux *UniversalUDPMuxDefault
|
||||
iceHostMux *UDPMuxDefault
|
||||
|
||||
mu sync.Mutex // protects following fields
|
||||
}
|
||||
|
||||
func (b *ICEBind) GetICEMux() (UniversalUDPMux, error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if b.udpMux == nil {
|
||||
return nil, fmt.Errorf("ICEBind has not been initialized yet")
|
||||
}
|
||||
|
||||
return b.udpMux, nil
|
||||
}
|
||||
|
||||
func (b *ICEBind) GetICEHostMux() (UDPMux, error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if b.iceHostMux == nil {
|
||||
return nil, fmt.Errorf("ICEBind has not been initialized yet")
|
||||
}
|
||||
|
||||
return b.iceHostMux, nil
|
||||
}
|
||||
|
||||
func (b *ICEBind) Open(uport uint16) ([]conn.ReceiveFunc, uint16, error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if b.sharedConn != nil {
|
||||
return nil, 0, conn.ErrBindAlreadyOpen
|
||||
}
|
||||
|
||||
port := int(uport)
|
||||
ipv4Conn, port, err := listenNet("udp4", port)
|
||||
if err != nil && !errors.Is(err, syscall.EAFNOSUPPORT) {
|
||||
return nil, 0, err
|
||||
}
|
||||
b.sharedConn = ipv4Conn
|
||||
b.udpMux = NewUniversalUDPMuxDefault(UniversalUDPMuxParams{UDPConn: b.sharedConn})
|
||||
|
||||
portAddr1, err := netip.ParseAddrPort(ipv4Conn.LocalAddr().String())
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
log.Infof("opened ICEBind on %s", ipv4Conn.LocalAddr().String())
|
||||
|
||||
return []conn.ReceiveFunc{
|
||||
b.makeReceiveIPv4(b.sharedConn),
|
||||
},
|
||||
portAddr1.Port(), nil
|
||||
}
|
||||
|
||||
func listenNet(network string, port int) (*net.UDPConn, int, error) {
|
||||
conn, err := net.ListenUDP(network, &net.UDPAddr{Port: port})
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Retrieve port.
|
||||
laddr := conn.LocalAddr()
|
||||
uaddr, err := net.ResolveUDPAddr(
|
||||
laddr.Network(),
|
||||
laddr.String(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return conn, uaddr.Port, nil
|
||||
}
|
||||
|
||||
func parseSTUNMessage(raw []byte) (*stun.Message, error) {
|
||||
msg := &stun.Message{
|
||||
Raw: append([]byte{}, raw...),
|
||||
}
|
||||
if err := msg.Decode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
func (b *ICEBind) makeReceiveIPv4(c net.PacketConn) conn.ReceiveFunc {
|
||||
return func(buff []byte) (int, conn.Endpoint, error) {
|
||||
n, endpoint, err := c.ReadFrom(buff)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
e, err := netip.ParseAddrPort(endpoint.String())
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
if !stun.IsMessage(buff[:20]) {
|
||||
// WireGuard traffic
|
||||
return n, (*conn.StdNetEndpoint)(&net.UDPAddr{
|
||||
IP: e.Addr().AsSlice(),
|
||||
Port: int(e.Port()),
|
||||
Zone: e.Addr().Zone(),
|
||||
}), nil
|
||||
}
|
||||
|
||||
msg, err := parseSTUNMessage(buff[:n])
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
err = b.udpMux.HandleSTUNMessage(msg, endpoint)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
if err != nil {
|
||||
log.Warnf("failed to handle packet")
|
||||
}
|
||||
|
||||
// discard packets because they are STUN related
|
||||
return 0, nil, nil //todo proper return
|
||||
}
|
||||
}
|
||||
|
||||
func (b *ICEBind) Close() error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
var err1, err2 error
|
||||
if b.sharedConn != nil {
|
||||
c := b.sharedConn
|
||||
b.sharedConn = nil
|
||||
err1 = c.Close()
|
||||
}
|
||||
|
||||
if b.udpMux != nil {
|
||||
m := b.udpMux
|
||||
b.udpMux = nil
|
||||
err2 = m.Close()
|
||||
}
|
||||
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
|
||||
return err2
|
||||
}
|
||||
|
||||
// SetMark sets the mark for each packet sent through this Bind.
|
||||
// This mark is passed to the kernel as the socket option SO_MARK.
|
||||
func (b *ICEBind) SetMark(mark uint32) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *ICEBind) Send(buff []byte, endpoint conn.Endpoint) error {
|
||||
nend, ok := endpoint.(*conn.StdNetEndpoint)
|
||||
if !ok {
|
||||
return conn.ErrWrongEndpointType
|
||||
}
|
||||
_, err := b.sharedConn.WriteTo(buff, (*net.UDPAddr)(nend))
|
||||
return err
|
||||
}
|
||||
|
||||
// ParseEndpoint creates a new endpoint from a string.
|
||||
func (b *ICEBind) ParseEndpoint(s string) (ep conn.Endpoint, err error) {
|
||||
e, err := netip.ParseAddrPort(s)
|
||||
return (*conn.StdNetEndpoint)(&net.UDPAddr{
|
||||
IP: e.Addr().AsSlice(),
|
||||
Port: int(e.Port()),
|
||||
Zone: e.Addr().Zone(),
|
||||
}), err
|
||||
}
|
||||
@@ -55,7 +55,6 @@ func (w *WGIface) Configure(privateKey string, port int) error {
|
||||
PrivateKey: &key,
|
||||
ReplacePeers: true,
|
||||
FirewallMark: &fwmark,
|
||||
ListenPort: &port,
|
||||
}
|
||||
|
||||
err = w.configureDevice(config)
|
||||
|
||||
@@ -2,6 +2,10 @@ package iface
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.zx2c4.com/wireguard/conn"
|
||||
"golang.zx2c4.com/wireguard/device"
|
||||
"golang.zx2c4.com/wireguard/tun"
|
||||
"net"
|
||||
"os"
|
||||
"runtime"
|
||||
@@ -21,6 +25,7 @@ type WGIface struct {
|
||||
Address WGAddress
|
||||
Interface NetInterface
|
||||
mu sync.Mutex
|
||||
Bind *ICEBind
|
||||
}
|
||||
|
||||
// WGAddress Wireguard parsed address
|
||||
@@ -91,3 +96,49 @@ func (w *WGIface) Close() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WGIface) CreateNew(bind conn.Bind) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
return w.createWithUserspaceNew(bind)
|
||||
}
|
||||
|
||||
func (w *WGIface) createWithUserspaceNew(bind conn.Bind) error {
|
||||
tunIface, err := tun.CreateTUN(w.Name, w.MTU)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.Interface = tunIface
|
||||
|
||||
// We need to create a wireguard-go device and listen to configuration requests
|
||||
tunDevice := device.NewDevice(tunIface, bind, device.NewLogger(device.LogLevelSilent, "[wiretrustee] "))
|
||||
err = tunDevice.Up()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uapi, err := getUAPI(w.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
uapiConn, uapiErr := uapi.Accept()
|
||||
if uapiErr != nil {
|
||||
log.Traceln("uapi Accept failed with error: ", uapiErr)
|
||||
continue
|
||||
}
|
||||
go tunDevice.IpcHandle(uapiConn)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Debugln("UAPI listener started")
|
||||
|
||||
err = w.assignAddr()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -39,13 +39,7 @@ func (w *WGIface) Create() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if WireguardModExists() {
|
||||
log.Info("using kernel WireGuard")
|
||||
return w.createWithKernel()
|
||||
} else {
|
||||
log.Info("using userspace WireGuard")
|
||||
return w.createWithUserspace()
|
||||
}
|
||||
return w.createWithUserspace()
|
||||
}
|
||||
|
||||
// createWithKernel Creates a new Wireguard interface using kernel Wireguard module.
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/sys/windows"
|
||||
"golang.zx2c4.com/wireguard/ipc"
|
||||
"golang.zx2c4.com/wireguard/windows/driver"
|
||||
"golang.zx2c4.com/wireguard/windows/tunnel/winipcfg"
|
||||
"net"
|
||||
@@ -62,3 +63,8 @@ func (w *WGIface) UpdateAddr(newAddr string) error {
|
||||
func WireguardModExists() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// getUAPI returns a Listener
|
||||
func getUAPI(iface string) (net.Listener, error) {
|
||||
return ipc.UAPIListen(iface)
|
||||
}
|
||||
|
||||
288
iface/udp_mux.go
Normal file
288
iface/udp_mux.go
Normal file
@@ -0,0 +1,288 @@
|
||||
package iface
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/pion/logging"
|
||||
"github.com/pion/stun"
|
||||
)
|
||||
|
||||
const receiveMTU = 8192
|
||||
|
||||
// UDPMux allows multiple connections to go over a single UDP port
|
||||
type UDPMux interface {
|
||||
io.Closer
|
||||
GetConn(ufrag string) (net.PacketConn, error)
|
||||
RemoveConnByUfrag(ufrag string)
|
||||
}
|
||||
|
||||
// UDPMuxDefault is an implementation of the interface
|
||||
type UDPMuxDefault struct {
|
||||
params UDPMuxParams
|
||||
|
||||
closedChan chan struct{}
|
||||
closeOnce sync.Once
|
||||
|
||||
// conns is a map of all udpMuxedConn indexed by ufrag|network|candidateType
|
||||
conns map[string]*udpMuxedConn
|
||||
|
||||
addressMapMu sync.RWMutex
|
||||
addressMap map[string][]*udpMuxedConn
|
||||
|
||||
// buffer pool to recycle buffers for net.UDPAddr encodes/decodes
|
||||
pool *sync.Pool
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
const maxAddrSize = 512
|
||||
|
||||
// UDPMuxParams are parameters for UDPMux.
|
||||
type UDPMuxParams struct {
|
||||
Logger logging.LeveledLogger
|
||||
UDPConn net.PacketConn
|
||||
}
|
||||
|
||||
// NewUDPMuxDefault creates an implementation of UDPMux
|
||||
func NewUDPMuxDefault(params UDPMuxParams) *UDPMuxDefault {
|
||||
if params.Logger == nil {
|
||||
params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice")
|
||||
}
|
||||
|
||||
return &UDPMuxDefault{
|
||||
addressMap: map[string][]*udpMuxedConn{},
|
||||
params: params,
|
||||
conns: make(map[string]*udpMuxedConn),
|
||||
closedChan: make(chan struct{}, 1),
|
||||
pool: &sync.Pool{
|
||||
New: func() interface{} {
|
||||
// big enough buffer to fit both packet and address
|
||||
return newBufferHolder(receiveMTU + maxAddrSize)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *UDPMuxDefault) HandleSTUNMessage(msg *stun.Message, addr net.Addr) error {
|
||||
|
||||
remoteAddr, ok := addr.(*net.UDPAddr)
|
||||
if !ok {
|
||||
return fmt.Errorf("underlying PacketConn did not return a UDPAddr")
|
||||
}
|
||||
|
||||
// If we have already seen this address dispatch to the appropriate destination
|
||||
// If you are using the same socket for the Host and SRFLX candidates, it might be that there are more than one
|
||||
// muxed connection - one for the SRFLX candidate and the other one for the HOST one.
|
||||
// We will then forward STUN packets to each of these connections.
|
||||
m.addressMapMu.Lock()
|
||||
var destinationConnList []*udpMuxedConn
|
||||
if storedConns, ok := m.addressMap[addr.String()]; ok {
|
||||
for _, conn := range storedConns {
|
||||
destinationConnList = append(destinationConnList, conn)
|
||||
}
|
||||
}
|
||||
m.addressMapMu.Unlock()
|
||||
|
||||
// This block is needed to discover Peer Reflexive Candidates for which we don't know the Endpoint upfront.
|
||||
// However, we can take a username attribute from the STUN message which contains ufrag.
|
||||
// We can use ufrag to identify the destination conn to route packet to.
|
||||
attr, stunAttrErr := msg.Get(stun.AttrUsername)
|
||||
if stunAttrErr == nil {
|
||||
ufrag := strings.Split(string(attr), ":")[0]
|
||||
|
||||
m.mu.Lock()
|
||||
if destinationConn, ok := m.conns[ufrag]; ok {
|
||||
exists := false
|
||||
for _, conn := range destinationConnList {
|
||||
if conn.params.Key == destinationConn.params.Key {
|
||||
exists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !exists {
|
||||
destinationConnList = append(destinationConnList, destinationConn)
|
||||
}
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// Forward STUN packets to each destination connections even thought the STUN packet might not belong there.
|
||||
// It will be discarded by the further ICE candidate logic if so.
|
||||
for _, conn := range destinationConnList {
|
||||
if err := conn.writePacket(msg.Raw, remoteAddr); err != nil {
|
||||
log.Errorf("could not write packet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// LocalAddr returns the listening address of this UDPMuxDefault
|
||||
func (m *UDPMuxDefault) LocalAddr() net.Addr {
|
||||
return m.params.UDPConn.LocalAddr()
|
||||
}
|
||||
|
||||
// GetConn returns a PacketConn given the connection's ufrag and network
|
||||
// creates the connection if an existing one can't be found
|
||||
func (m *UDPMuxDefault) GetConn(ufrag string) (net.PacketConn, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
log.Debugf("ICE: getting muxed connection for %s", ufrag)
|
||||
|
||||
if m.IsClosed() {
|
||||
return nil, io.ErrClosedPipe
|
||||
}
|
||||
|
||||
if c, ok := m.conns[ufrag]; ok {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
c := m.createMuxedConn(ufrag)
|
||||
go func() {
|
||||
<-c.CloseChannel()
|
||||
m.removeConn(ufrag)
|
||||
}()
|
||||
m.conns[ufrag] = c
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// RemoveConnByUfrag stops and removes the muxed packet connection
|
||||
func (m *UDPMuxDefault) RemoveConnByUfrag(ufrag string) {
|
||||
m.mu.Lock()
|
||||
removedConns := make([]*udpMuxedConn, 0)
|
||||
for key := range m.conns {
|
||||
if key != ufrag {
|
||||
continue
|
||||
}
|
||||
|
||||
c := m.conns[key]
|
||||
delete(m.conns, key)
|
||||
if c != nil {
|
||||
removedConns = append(removedConns, c)
|
||||
}
|
||||
}
|
||||
// keep lock section small to avoid deadlock with conn lock
|
||||
m.mu.Unlock()
|
||||
|
||||
m.addressMapMu.Lock()
|
||||
defer m.addressMapMu.Unlock()
|
||||
|
||||
for _, c := range removedConns {
|
||||
addresses := c.getAddresses()
|
||||
for _, addr := range addresses {
|
||||
if connList, ok := m.addressMap[addr]; ok {
|
||||
var newList []*udpMuxedConn
|
||||
for _, conn := range connList {
|
||||
if conn.params.Key != ufrag {
|
||||
newList = append(newList, conn)
|
||||
}
|
||||
}
|
||||
m.addressMap[addr] = newList
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// IsClosed returns true if the mux had been closed
|
||||
func (m *UDPMuxDefault) IsClosed() bool {
|
||||
select {
|
||||
case <-m.closedChan:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Close the mux, no further connections could be created
|
||||
func (m *UDPMuxDefault) Close() error {
|
||||
var err error
|
||||
m.closeOnce.Do(func() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
for _, c := range m.conns {
|
||||
_ = c.Close()
|
||||
}
|
||||
m.conns = make(map[string]*udpMuxedConn)
|
||||
close(m.closedChan)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *UDPMuxDefault) removeConn(key string) {
|
||||
m.mu.Lock()
|
||||
c := m.conns[key]
|
||||
delete(m.conns, key)
|
||||
// keep lock section small to avoid deadlock with conn lock
|
||||
m.mu.Unlock()
|
||||
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.addressMapMu.Lock()
|
||||
defer m.addressMapMu.Unlock()
|
||||
|
||||
addresses := c.getAddresses()
|
||||
for _, addr := range addresses {
|
||||
if connList, ok := m.addressMap[addr]; ok {
|
||||
var newList []*udpMuxedConn
|
||||
for _, conn := range connList {
|
||||
if conn.params.Key != key {
|
||||
newList = append(newList, conn)
|
||||
}
|
||||
}
|
||||
m.addressMap[addr] = newList
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *UDPMuxDefault) writeTo(buf []byte, raddr net.Addr) (n int, err error) {
|
||||
return m.params.UDPConn.WriteTo(buf, raddr)
|
||||
}
|
||||
|
||||
func (m *UDPMuxDefault) registerConnForAddress(conn *udpMuxedConn, addr string) {
|
||||
if m.IsClosed() {
|
||||
return
|
||||
}
|
||||
|
||||
m.addressMapMu.Lock()
|
||||
defer m.addressMapMu.Unlock()
|
||||
|
||||
existing, ok := m.addressMap[addr]
|
||||
if !ok {
|
||||
existing = []*udpMuxedConn{}
|
||||
}
|
||||
existing = append(existing, conn)
|
||||
m.addressMap[addr] = existing
|
||||
|
||||
log.Debugf("ICE: registered %s for %s", addr, conn.params.Key)
|
||||
}
|
||||
|
||||
func (m *UDPMuxDefault) createMuxedConn(key string) *udpMuxedConn {
|
||||
c := newUDPMuxedConn(&udpMuxedConnParams{
|
||||
Mux: m,
|
||||
Key: key,
|
||||
AddrPool: m.pool,
|
||||
LocalAddr: m.LocalAddr(),
|
||||
Logger: m.params.Logger,
|
||||
})
|
||||
log.Debugf("ICE: created muxed connection %s for %s", c.LocalAddr().String(), key)
|
||||
return c
|
||||
}
|
||||
|
||||
type bufferHolder struct {
|
||||
buffer []byte
|
||||
}
|
||||
|
||||
func newBufferHolder(size int) *bufferHolder {
|
||||
return &bufferHolder{
|
||||
buffer: make([]byte, size),
|
||||
}
|
||||
}
|
||||
235
iface/udp_mux_universal.go
Normal file
235
iface/udp_mux_universal.go
Normal file
@@ -0,0 +1,235 @@
|
||||
package iface
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/pion/logging"
|
||||
"github.com/pion/stun"
|
||||
)
|
||||
|
||||
// UniversalUDPMux allows multiple connections to go over a single UDP port for
|
||||
// host, server reflexive and relayed candidates.
|
||||
// Actual connection muxing is happening in the UDPMux.
|
||||
type UniversalUDPMux interface {
|
||||
UDPMux
|
||||
GetXORMappedAddr(stunAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error)
|
||||
GetRelayedAddr(turnAddr net.Addr, deadline time.Duration) (*net.Addr, error)
|
||||
GetConnForURL(ufrag string, url string) (net.PacketConn, error)
|
||||
}
|
||||
|
||||
// UniversalUDPMuxDefault handles STUN and TURN servers packets by wrapping the original UDPConn overriding ReadFrom.
|
||||
// It the passes packets to the UDPMux that does the actual connection muxing.
|
||||
type UniversalUDPMuxDefault struct {
|
||||
*UDPMuxDefault
|
||||
params UniversalUDPMuxParams
|
||||
|
||||
// since we have a shared socket, for srflx candidates it makes sense to have a shared mapped address across all the agents
|
||||
// stun.XORMappedAddress indexed by the STUN server addr
|
||||
xorMappedMap map[string]*xorMapped
|
||||
}
|
||||
|
||||
// UniversalUDPMuxParams are parameters for UniversalUDPMux server reflexive.
|
||||
type UniversalUDPMuxParams struct {
|
||||
Logger logging.LeveledLogger
|
||||
UDPConn net.PacketConn
|
||||
XORMappedAddrCacheTTL time.Duration
|
||||
}
|
||||
|
||||
// NewUniversalUDPMuxDefault creates an implementation of UniversalUDPMux embedding UDPMux
|
||||
func NewUniversalUDPMuxDefault(params UniversalUDPMuxParams) *UniversalUDPMuxDefault {
|
||||
if params.Logger == nil {
|
||||
params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice")
|
||||
}
|
||||
if params.XORMappedAddrCacheTTL == 0 {
|
||||
params.XORMappedAddrCacheTTL = time.Second * 25
|
||||
}
|
||||
|
||||
m := &UniversalUDPMuxDefault{
|
||||
params: params,
|
||||
xorMappedMap: make(map[string]*xorMapped),
|
||||
}
|
||||
|
||||
// embed UDPMux
|
||||
udpMuxParams := UDPMuxParams{
|
||||
Logger: params.Logger,
|
||||
UDPConn: m.params.UDPConn,
|
||||
}
|
||||
m.UDPMuxDefault = NewUDPMuxDefault(udpMuxParams)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// GetRelayedAddr creates relayed connection to the given TURN service and returns the relayed addr.
|
||||
// Not implemented yet.
|
||||
func (m *UniversalUDPMuxDefault) GetRelayedAddr(turnAddr net.Addr, deadline time.Duration) (*net.Addr, error) {
|
||||
return nil, errors.New("not implemented yet")
|
||||
}
|
||||
|
||||
// GetConnForURL add uniques to the muxed connection by concatenating ufrag and URL (e.g. STUN URL) to be able to support multiple STUN/TURN servers
|
||||
// and return a unique connection per server.
|
||||
func (m *UniversalUDPMuxDefault) GetConnForURL(ufrag string, url string) (net.PacketConn, error) {
|
||||
return m.UDPMuxDefault.GetConn(fmt.Sprintf("%s%s", ufrag, url))
|
||||
}
|
||||
|
||||
func (m *UniversalUDPMuxDefault) HandleSTUNMessage(msg *stun.Message, addr net.Addr) error {
|
||||
|
||||
udpAddr, ok := addr.(*net.UDPAddr)
|
||||
if !ok {
|
||||
// message about this err will be logged in the UDPMux
|
||||
return nil
|
||||
}
|
||||
|
||||
if m.isXORMappedResponse(msg, udpAddr.String()) {
|
||||
err := m.handleXORMappedResponse(udpAddr, msg)
|
||||
if err != nil {
|
||||
log.Debugf("%w: %v", errors.New("failed to get XOR-MAPPED-ADDRESS response"), err)
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return m.UDPMuxDefault.HandleSTUNMessage(msg, addr)
|
||||
}
|
||||
|
||||
// isXORMappedResponse indicates whether the message is a XORMappedAddress and is coming from the known STUN server.
|
||||
func (m *UniversalUDPMuxDefault) isXORMappedResponse(msg *stun.Message, stunAddr string) bool {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
// check first if it is a STUN server address because remote peer can also send similar messages but as a BindingSuccess
|
||||
_, ok := m.xorMappedMap[stunAddr]
|
||||
_, err := msg.Get(stun.AttrXORMappedAddress)
|
||||
return err == nil && ok
|
||||
}
|
||||
|
||||
// handleXORMappedResponse parses response from the STUN server, extracts XORMappedAddress attribute
|
||||
// and set the mapped address for the server
|
||||
func (m *UniversalUDPMuxDefault) handleXORMappedResponse(stunAddr *net.UDPAddr, msg *stun.Message) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
mappedAddr, ok := m.xorMappedMap[stunAddr.String()]
|
||||
if !ok {
|
||||
return errors.New("no address mapping")
|
||||
}
|
||||
|
||||
var addr stun.XORMappedAddress
|
||||
if err := addr.GetFrom(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.xorMappedMap[stunAddr.String()] = mappedAddr
|
||||
mappedAddr.SetAddr(&addr)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetXORMappedAddr returns *stun.XORMappedAddress if already present for a given STUN server.
|
||||
// Makes a STUN binding request to discover mapped address otherwise.
|
||||
// Blocks until the stun.XORMappedAddress has been discovered or deadline.
|
||||
// Method is safe for concurrent use.
|
||||
func (m *UniversalUDPMuxDefault) GetXORMappedAddr(serverAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error) {
|
||||
m.mu.Lock()
|
||||
mappedAddr, ok := m.xorMappedMap[serverAddr.String()]
|
||||
// if we already have a mapping for this STUN server (address already received)
|
||||
// and if it is not too old we return it without making a new request to STUN server
|
||||
if ok {
|
||||
if mappedAddr.expired() {
|
||||
mappedAddr.closeWaiters()
|
||||
delete(m.xorMappedMap, serverAddr.String())
|
||||
ok = false
|
||||
} else if mappedAddr.pending() {
|
||||
ok = false
|
||||
}
|
||||
}
|
||||
m.mu.Unlock()
|
||||
if ok {
|
||||
return mappedAddr.addr, nil
|
||||
}
|
||||
|
||||
// otherwise, make a STUN request to discover the address
|
||||
// or wait for already sent request to complete
|
||||
waitAddrReceived, err := m.sendStun(serverAddr)
|
||||
if err != nil {
|
||||
return nil, errors.New("failed to send STUN packet")
|
||||
}
|
||||
|
||||
// block until response was handled by the connWorker routine and XORMappedAddress was updated
|
||||
select {
|
||||
case <-waitAddrReceived:
|
||||
// when channel closed, addr was obtained
|
||||
m.mu.Lock()
|
||||
mappedAddr := *m.xorMappedMap[serverAddr.String()]
|
||||
m.mu.Unlock()
|
||||
if mappedAddr.addr == nil {
|
||||
return nil, errors.New("no address mapping")
|
||||
}
|
||||
return mappedAddr.addr, nil
|
||||
case <-time.After(deadline):
|
||||
return nil, errors.New("timeout while waiting for XORMappedAddr")
|
||||
}
|
||||
}
|
||||
|
||||
// sendStun sends a STUN request via UDP conn.
|
||||
//
|
||||
// The returned channel is closed when the STUN response has been received.
|
||||
// Method is safe for concurrent use.
|
||||
func (m *UniversalUDPMuxDefault) sendStun(serverAddr net.Addr) (chan struct{}, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// if record present in the map, we already sent a STUN request,
|
||||
// just wait when waitAddrReceived will be closed
|
||||
addrMap, ok := m.xorMappedMap[serverAddr.String()]
|
||||
if !ok {
|
||||
addrMap = &xorMapped{
|
||||
expiresAt: time.Now().Add(m.params.XORMappedAddrCacheTTL),
|
||||
waitAddrReceived: make(chan struct{}),
|
||||
}
|
||||
m.xorMappedMap[serverAddr.String()] = addrMap
|
||||
}
|
||||
|
||||
req, err := stun.Build(stun.BindingRequest, stun.TransactionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err = m.params.UDPConn.WriteTo(req.Raw, serverAddr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return addrMap.waitAddrReceived, nil
|
||||
}
|
||||
|
||||
type xorMapped struct {
|
||||
addr *stun.XORMappedAddress
|
||||
waitAddrReceived chan struct{}
|
||||
expiresAt time.Time
|
||||
}
|
||||
|
||||
func (a *xorMapped) closeWaiters() {
|
||||
select {
|
||||
case <-a.waitAddrReceived:
|
||||
// notify was close, ok, that means we received duplicate response
|
||||
// just exit
|
||||
break
|
||||
default:
|
||||
// notify tha twe have a new addr
|
||||
close(a.waitAddrReceived)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *xorMapped) pending() bool {
|
||||
return a.addr == nil
|
||||
}
|
||||
|
||||
func (a *xorMapped) expired() bool {
|
||||
return a.expiresAt.Before(time.Now())
|
||||
}
|
||||
|
||||
func (a *xorMapped) SetAddr(addr *stun.XORMappedAddress) {
|
||||
a.addr = addr
|
||||
a.closeWaiters()
|
||||
}
|
||||
246
iface/udp_muxed_conn.go
Normal file
246
iface/udp_muxed_conn.go
Normal file
@@ -0,0 +1,246 @@
|
||||
package iface
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pion/logging"
|
||||
"github.com/pion/transport/packetio"
|
||||
)
|
||||
|
||||
type udpMuxedConnParams struct {
|
||||
Mux *UDPMuxDefault
|
||||
AddrPool *sync.Pool
|
||||
Key string
|
||||
LocalAddr net.Addr
|
||||
Logger logging.LeveledLogger
|
||||
}
|
||||
|
||||
// udpMuxedConn represents a logical packet conn for a single remote as identified by ufrag
|
||||
type udpMuxedConn struct {
|
||||
params *udpMuxedConnParams
|
||||
// remote addresses that we have sent to on this conn
|
||||
addresses []string
|
||||
|
||||
// channel holding incoming packets
|
||||
buffer *packetio.Buffer
|
||||
closedChan chan struct{}
|
||||
closeOnce sync.Once
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newUDPMuxedConn(params *udpMuxedConnParams) *udpMuxedConn {
|
||||
p := &udpMuxedConn{
|
||||
params: params,
|
||||
buffer: packetio.NewBuffer(),
|
||||
closedChan: make(chan struct{}),
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) ReadFrom(b []byte) (n int, raddr net.Addr, err error) {
|
||||
buf := c.params.AddrPool.Get().(*bufferHolder)
|
||||
defer c.params.AddrPool.Put(buf)
|
||||
|
||||
// read address
|
||||
total, err := c.buffer.Read(buf.buffer)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
dataLen := int(binary.LittleEndian.Uint16(buf.buffer[:2]))
|
||||
if dataLen > total || dataLen > len(b) {
|
||||
return 0, nil, io.ErrShortBuffer
|
||||
}
|
||||
|
||||
// read data and then address
|
||||
offset := 2
|
||||
copy(b, buf.buffer[offset:offset+dataLen])
|
||||
offset += dataLen
|
||||
|
||||
// read address len & decode address
|
||||
addrLen := int(binary.LittleEndian.Uint16(buf.buffer[offset : offset+2]))
|
||||
offset += 2
|
||||
|
||||
if raddr, err = decodeUDPAddr(buf.buffer[offset : offset+addrLen]); err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
return dataLen, raddr, nil
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) WriteTo(buf []byte, raddr net.Addr) (n int, err error) {
|
||||
if c.isClosed() {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
// each time we write to a new address, we'll register it with the mux
|
||||
addr := raddr.String()
|
||||
if !c.containsAddress(addr) {
|
||||
c.addAddress(addr)
|
||||
}
|
||||
|
||||
return c.params.Mux.writeTo(buf, raddr)
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) LocalAddr() net.Addr {
|
||||
return c.params.LocalAddr
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) SetDeadline(tm time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) SetReadDeadline(tm time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) SetWriteDeadline(tm time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) CloseChannel() <-chan struct{} {
|
||||
return c.closedChan
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) Close() error {
|
||||
var err error
|
||||
c.closeOnce.Do(func() {
|
||||
err = c.buffer.Close()
|
||||
close(c.closedChan)
|
||||
})
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.addresses = nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) isClosed() bool {
|
||||
select {
|
||||
case <-c.closedChan:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) getAddresses() []string {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
addresses := make([]string, len(c.addresses))
|
||||
copy(addresses, c.addresses)
|
||||
return addresses
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) addAddress(addr string) {
|
||||
c.mu.Lock()
|
||||
c.addresses = append(c.addresses, addr)
|
||||
c.mu.Unlock()
|
||||
|
||||
// map it on mux
|
||||
c.params.Mux.registerConnForAddress(c, addr)
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) removeAddress(addr string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
newAddresses := make([]string, 0, len(c.addresses))
|
||||
for _, a := range c.addresses {
|
||||
if a != addr {
|
||||
newAddresses = append(newAddresses, a)
|
||||
}
|
||||
}
|
||||
|
||||
c.addresses = newAddresses
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) containsAddress(addr string) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for _, a := range c.addresses {
|
||||
if addr == a {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *udpMuxedConn) writePacket(data []byte, addr *net.UDPAddr) error {
|
||||
// write two packets, address and data
|
||||
buf := c.params.AddrPool.Get().(*bufferHolder)
|
||||
defer c.params.AddrPool.Put(buf)
|
||||
|
||||
// format of buffer | data len | data bytes | addr len | addr bytes |
|
||||
if len(buf.buffer) < len(data)+maxAddrSize {
|
||||
return io.ErrShortBuffer
|
||||
}
|
||||
// data len
|
||||
binary.LittleEndian.PutUint16(buf.buffer, uint16(len(data)))
|
||||
offset := 2
|
||||
|
||||
// data
|
||||
copy(buf.buffer[offset:], data)
|
||||
offset += len(data)
|
||||
|
||||
// write address first, leaving room for its length
|
||||
n, err := encodeUDPAddr(addr, buf.buffer[offset+2:])
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
total := offset + n + 2
|
||||
|
||||
// address len
|
||||
binary.LittleEndian.PutUint16(buf.buffer[offset:], uint16(n))
|
||||
|
||||
if _, err := c.buffer.Write(buf.buffer[:total]); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func encodeUDPAddr(addr *net.UDPAddr, buf []byte) (int, error) {
|
||||
ipdata, err := addr.IP.MarshalText()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total := 2 + len(ipdata) + 2 + len(addr.Zone)
|
||||
if total > len(buf) {
|
||||
return 0, io.ErrShortBuffer
|
||||
}
|
||||
|
||||
binary.LittleEndian.PutUint16(buf, uint16(len(ipdata)))
|
||||
offset := 2
|
||||
n := copy(buf[offset:], ipdata)
|
||||
offset += n
|
||||
binary.LittleEndian.PutUint16(buf[offset:], uint16(addr.Port))
|
||||
offset += 2
|
||||
copy(buf[offset:], addr.Zone)
|
||||
return total, nil
|
||||
}
|
||||
|
||||
func decodeUDPAddr(buf []byte) (*net.UDPAddr, error) {
|
||||
addr := net.UDPAddr{}
|
||||
|
||||
offset := 0
|
||||
ipLen := int(binary.LittleEndian.Uint16(buf[:2]))
|
||||
offset += 2
|
||||
// basic bounds checking
|
||||
if ipLen+offset > len(buf) {
|
||||
return nil, io.ErrShortBuffer
|
||||
}
|
||||
if err := addr.IP.UnmarshalText(buf[offset : offset+ipLen]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += ipLen
|
||||
addr.Port = int(binary.LittleEndian.Uint16(buf[offset : offset+2]))
|
||||
offset += 2
|
||||
zone := make([]byte, len(buf[offset:]))
|
||||
copy(zone, buf[offset:])
|
||||
addr.Zone = string(zone)
|
||||
|
||||
return &addr, nil
|
||||
}
|
||||
@@ -109,7 +109,7 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error
|
||||
return err
|
||||
}
|
||||
|
||||
stream, err := c.connectToStream(*serverPubKey)
|
||||
cancel, stream, err := c.connectToStream(*serverPubKey)
|
||||
if err != nil {
|
||||
log.Debugf("failed to open Management Service stream: %s", err)
|
||||
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
|
||||
@@ -117,6 +117,7 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
log.Infof("connected to the Management Service stream")
|
||||
|
||||
@@ -145,7 +146,7 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GrpcClient) connectToStream(serverPubKey wgtypes.Key) (proto.ManagementService_SyncClient, error) {
|
||||
func (c *GrpcClient) connectToStream(serverPubKey wgtypes.Key) (context.CancelFunc, proto.ManagementService_SyncClient, error) {
|
||||
req := &proto.SyncRequest{}
|
||||
|
||||
myPrivateKey := c.key
|
||||
@@ -154,11 +155,16 @@ func (c *GrpcClient) connectToStream(serverPubKey wgtypes.Key) (proto.Management
|
||||
encryptedReq, err := encryption.EncryptMessage(serverPubKey, myPrivateKey, req)
|
||||
if err != nil {
|
||||
log.Errorf("failed encrypting message: %s", err)
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(c.ctx)
|
||||
syncReq := &proto.EncryptedMessage{WgPubKey: myPublicKey.String(), Body: encryptedReq}
|
||||
return c.realClient.Sync(c.ctx, syncReq)
|
||||
sync, err := c.realClient.Sync(ctx, syncReq)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, nil, err
|
||||
}
|
||||
return cancel, sync, nil
|
||||
}
|
||||
|
||||
func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
|
||||
|
||||
Reference in New Issue
Block a user