1
0
mirror of https://git.zx2c4.com/wireguard-go synced 2024-11-15 01:05:15 +01:00

Number of fixes in response to code review

This version cannot complete a handshake.
The program will panic upon receiving any message on the UDP socket.
This commit is contained in:
Mathias Hall-Andersen 2017-08-07 15:25:04 +02:00
parent 8c34c4cbb3
commit cba1d6585a
12 changed files with 552 additions and 445 deletions

View File

@ -84,13 +84,47 @@ func ipcGetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
return nil return nil
} }
func updateUDPConn(device *Device) error {
var err error
netc := &device.net
netc.mutex.Lock()
// close existing connection
if netc.conn != nil {
netc.conn.Close()
netc.conn = nil
}
// open new existing connection
conn, err := net.ListenUDP("udp", netc.addr)
if err == nil {
netc.conn = conn
signalSend(device.signal.newUDPConn)
}
netc.mutex.Unlock()
return err
}
func closeUDPConn(device *Device) {
device.net.mutex.Lock()
device.net.conn = nil
device.net.mutex.Unlock()
println("send signal")
signalSend(device.signal.newUDPConn)
}
func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError { func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
scanner := bufio.NewScanner(socket) scanner := bufio.NewScanner(socket)
logInfo := device.log.Info
logError := device.log.Error logError := device.log.Error
logDebug := device.log.Debug logDebug := device.log.Debug
var peer *Peer var peer *Peer
dummy := false
deviceConfig := true deviceConfig := true
for scanner.Scan() { for scanner.Scan() {
@ -135,17 +169,11 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
netc := &device.net netc := &device.net
netc.mutex.Lock() netc.mutex.Lock()
if netc.addr.Port != int(port) { if netc.addr.Port != int(port) {
if netc.conn != nil {
netc.conn.Close()
}
netc.addr.Port = int(port) netc.addr.Port = int(port)
netc.conn, err = net.ListenUDP("udp", netc.addr)
} }
netc.mutex.Unlock() netc.mutex.Unlock()
if err != nil { updateUDPConn(device)
logError.Println("Failed to create UDP listener:", err)
return &IPCError{Code: ipcErrorIO}
}
// TODO: Clear source address of all peers // TODO: Clear source address of all peers
case "fwmark": case "fwmark":
@ -189,17 +217,30 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
device.mutex.RLock() device.mutex.RLock()
if device.publicKey.Equals(pubKey) { if device.publicKey.Equals(pubKey) {
// create dummy instance
peer = &Peer{}
dummy = true
device.mutex.RUnlock() device.mutex.RUnlock()
logError.Println("Public key of peer matches private key of device") logInfo.Println("Ignoring peer with public key of device")
return &IPCError{Code: ipcErrorInvalid}
}
// find peer referenced } else {
// find peer referenced
peer, _ = device.peers[pubKey]
device.mutex.RUnlock()
if peer == nil {
peer, err = device.NewPeer(pubKey)
if err != nil {
logError.Println("Failed to create new peer:", err)
return &IPCError{Code: ipcErrorInvalid}
}
}
signalSend(peer.signal.handshakeReset)
dummy = false
peer, _ = device.peers[pubKey]
device.mutex.RUnlock()
if peer == nil {
peer = device.NewPeer(pubKey)
} }
case "remove": case "remove":
@ -207,16 +248,17 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
logError.Println("Failed to set remove, invalid value:", value) logError.Println("Failed to set remove, invalid value:", value)
return &IPCError{Code: ipcErrorInvalid} return &IPCError{Code: ipcErrorInvalid}
} }
device.RemovePeer(peer.handshake.remoteStatic) if !dummy {
logDebug.Println("Removing", peer.String()) logDebug.Println("Removing", peer.String())
peer = nil device.RemovePeer(peer.handshake.remoteStatic)
}
peer = &Peer{}
dummy = true
case "preshared_key": case "preshared_key":
err := func() error { peer.mutex.Lock()
peer.mutex.Lock() err := peer.handshake.presharedKey.FromHex(value)
defer peer.mutex.Unlock() peer.mutex.Unlock()
return peer.handshake.presharedKey.FromHex(value)
}()
if err != nil { if err != nil {
logError.Println("Failed to set preshared_key:", err) logError.Println("Failed to set preshared_key:", err)
return &IPCError{Code: ipcErrorInvalid} return &IPCError{Code: ipcErrorInvalid}
@ -232,6 +274,7 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
peer.mutex.Lock() peer.mutex.Lock()
peer.endpoint = addr peer.endpoint = addr
peer.mutex.Unlock() peer.mutex.Unlock()
signalSend(peer.signal.handshakeReset)
case "persistent_keepalive_interval": case "persistent_keepalive_interval":
@ -251,12 +294,11 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
// send immediate keep-alive // send immediate keep-alive
if old == 0 && secs != 0 { if old == 0 && secs != 0 {
up, err := device.tun.IsUp()
if err != nil { if err != nil {
logError.Println("Failed to get tun device status:", err) logError.Println("Failed to get tun device status:", err)
return &IPCError{Code: ipcErrorIO} return &IPCError{Code: ipcErrorIO}
} }
if up { if atomic.LoadInt32(&device.isUp) == AtomicTrue && !dummy {
peer.SendKeepAlive() peer.SendKeepAlive()
} }
} }
@ -266,7 +308,9 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
logError.Println("Failed to set replace_allowed_ips, invalid value:", value) logError.Println("Failed to set replace_allowed_ips, invalid value:", value)
return &IPCError{Code: ipcErrorInvalid} return &IPCError{Code: ipcErrorInvalid}
} }
device.routingTable.RemovePeer(peer) if !dummy {
device.routingTable.RemovePeer(peer)
}
case "allowed_ip": case "allowed_ip":
_, network, err := net.ParseCIDR(value) _, network, err := net.ParseCIDR(value)
@ -275,7 +319,9 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
return &IPCError{Code: ipcErrorInvalid} return &IPCError{Code: ipcErrorInvalid}
} }
ones, _ := network.Mask.Size() ones, _ := network.Mask.Size()
device.routingTable.Insert(network.IP, uint(ones), peer) if !dummy {
device.routingTable.Insert(network.IP, uint(ones), peer)
}
default: default:
logError.Println("Invalid UAPI key (peer configuration):", key) logError.Println("Invalid UAPI key (peer configuration):", key)

View File

@ -7,16 +7,15 @@ import (
/* Specification constants */ /* Specification constants */
const ( const (
RekeyAfterMessages = (1 << 64) - (1 << 16) - 1 RekeyAfterMessages = (1 << 64) - (1 << 16) - 1
RejectAfterMessages = (1 << 64) - (1 << 4) - 1 RejectAfterMessages = (1 << 64) - (1 << 4) - 1
RekeyAfterTime = time.Second * 120 RekeyAfterTime = time.Second * 120
RekeyAttemptTime = time.Second * 90 RekeyAttemptTime = time.Second * 90
RekeyTimeout = time.Second * 5 RekeyTimeout = time.Second * 5
RejectAfterTime = time.Second * 180 RejectAfterTime = time.Second * 180
KeepaliveTimeout = time.Second * 10 KeepaliveTimeout = time.Second * 10
CookieRefreshTime = time.Second * 120 CookieRefreshTime = time.Second * 120
MaxHandshakeAttemptTime = time.Second * 90 PaddingMultiple = 16
PaddingMultiple = 16
) )
const ( const (
@ -33,4 +32,5 @@ const (
QueueHandshakeBusySize = QueueHandshakeSize / 8 QueueHandshakeBusySize = QueueHandshakeSize / 8
MinMessageSize = MessageTransportSize // size of keep-alive MinMessageSize = MessageTransportSize // size of keep-alive
MaxMessageSize = ((1 << 16) - 1) + MessageTransportHeaderSize MaxMessageSize = ((1 << 16) - 1) + MessageTransportHeaderSize
MaxPeers = 1 << 16
) )

View File

@ -7,6 +7,8 @@ import (
/* Daemonizes the process on linux /* Daemonizes the process on linux
* *
* This is done by spawning and releasing a copy with the --foreground flag * This is done by spawning and releasing a copy with the --foreground flag
*
* TODO: Use env variable to spawn in background
*/ */
func Daemonize() error { func Daemonize() error {

View File

@ -1,13 +1,10 @@
package main package main
import ( import (
"errors"
"fmt"
"net" "net"
"runtime" "runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
) )
type Device struct { type Device struct {
@ -34,31 +31,45 @@ type Device struct {
queue struct { queue struct {
encryption chan *QueueOutboundElement encryption chan *QueueOutboundElement
decryption chan *QueueInboundElement decryption chan *QueueInboundElement
inbound chan *QueueInboundElement
handshake chan QueueHandshakeElement handshake chan QueueHandshakeElement
} }
signal struct { signal struct {
stop chan struct{} stop chan struct{} // halts all go routines
newUDPConn chan struct{} // a net.conn was set
} }
underLoad int32 // used as an atomic bool isUp int32 // atomic bool: interface is up
underLoad int32 // atomic bool: device is under load
ratelimiter Ratelimiter ratelimiter Ratelimiter
peers map[NoisePublicKey]*Peer peers map[NoisePublicKey]*Peer
mac MACStateDevice mac MACStateDevice
} }
/* Warning:
* The caller must hold the device mutex (write lock)
*/
func removePeerUnsafe(device *Device, key NoisePublicKey) {
peer, ok := device.peers[key]
if !ok {
return
}
peer.mutex.Lock()
device.routingTable.RemovePeer(peer)
delete(device.peers, key)
peer.Close()
}
func (device *Device) SetPrivateKey(sk NoisePrivateKey) error { func (device *Device) SetPrivateKey(sk NoisePrivateKey) error {
device.mutex.Lock() device.mutex.Lock()
defer device.mutex.Unlock() defer device.mutex.Unlock()
// check if public key is matching any peer // remove peers with matching public keys
publicKey := sk.publicKey() publicKey := sk.publicKey()
for _, peer := range device.peers { for key, peer := range device.peers {
h := &peer.handshake h := &peer.handshake
h.mutex.RLock() h.mutex.RLock()
if h.remoteStatic.Equals(publicKey) { if h.remoteStatic.Equals(publicKey) {
h.mutex.RUnlock() removePeerUnsafe(device, key)
return errors.New("Private key matches public key of peer")
} }
h.mutex.RUnlock() h.mutex.RUnlock()
} }
@ -71,17 +82,19 @@ func (device *Device) SetPrivateKey(sk NoisePrivateKey) error {
// do DH precomputations // do DH precomputations
isZero := device.privateKey.IsZero() rmKey := device.privateKey.IsZero()
for _, peer := range device.peers { for key, peer := range device.peers {
h := &peer.handshake h := &peer.handshake
h.mutex.Lock() h.mutex.Lock()
if isZero { if rmKey {
h.precomputedStaticStatic = [NoisePublicKeySize]byte{} h.precomputedStaticStatic = [NoisePublicKeySize]byte{}
} else { } else {
h.precomputedStaticStatic = device.privateKey.sharedSecret(h.remoteStatic) h.precomputedStaticStatic = device.privateKey.sharedSecret(h.remoteStatic)
if isZero(h.precomputedStaticStatic[:]) {
removePeerUnsafe(device, key)
}
} }
fmt.Println(h.precomputedStaticStatic)
h.mutex.Unlock() h.mutex.Unlock()
} }
@ -130,11 +143,11 @@ func NewDevice(tun TUNDevice, logLevel int) *Device {
device.queue.handshake = make(chan QueueHandshakeElement, QueueHandshakeSize) device.queue.handshake = make(chan QueueHandshakeElement, QueueHandshakeSize)
device.queue.encryption = make(chan *QueueOutboundElement, QueueOutboundSize) device.queue.encryption = make(chan *QueueOutboundElement, QueueOutboundSize)
device.queue.decryption = make(chan *QueueInboundElement, QueueInboundSize) device.queue.decryption = make(chan *QueueInboundElement, QueueInboundSize)
device.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize)
// prepare signals // prepare signals
device.signal.stop = make(chan struct{}) device.signal.stop = make(chan struct{})
device.signal.newUDPConn = make(chan struct{}, 1)
// start workers // start workers
@ -145,33 +158,42 @@ func NewDevice(tun TUNDevice, logLevel int) *Device {
} }
go device.RoutineBusyMonitor() go device.RoutineBusyMonitor()
go device.RoutineMTUUpdater()
go device.RoutineWriteToTUN()
go device.RoutineReadFromTUN() go device.RoutineReadFromTUN()
go device.RoutineTUNEventReader()
go device.RoutineReceiveIncomming() go device.RoutineReceiveIncomming()
go device.ratelimiter.RoutineGarbageCollector(device.signal.stop) go device.ratelimiter.RoutineGarbageCollector(device.signal.stop)
return device return device
} }
func (device *Device) RoutineMTUUpdater() { func (device *Device) RoutineTUNEventReader() {
events := device.tun.Events()
logError := device.log.Error logError := device.log.Error
for ; ; time.Sleep(5 * time.Second) {
// load updated MTU for event := range events {
if event&TUNEventMTUUpdate != 0 {
mtu, err := device.tun.MTU() mtu, err := device.tun.MTU()
if err != nil { if err != nil {
logError.Println("Failed to load updated MTU of device:", err) logError.Println("Failed to load updated MTU of device:", err)
continue } else {
if mtu+MessageTransportSize > MaxMessageSize {
mtu = MaxMessageSize - MessageTransportSize
}
atomic.StoreInt32(&device.mtu, int32(mtu))
}
} }
// upper bound of mtu if event&TUNEventUp != 0 {
println("handle 1")
if mtu+MessageTransportSize > MaxMessageSize { atomic.StoreInt32(&device.isUp, AtomicTrue)
mtu = MaxMessageSize - MessageTransportSize updateUDPConn(device)
println("handle 2", device.net.conn)
}
if event&TUNEventDown != 0 {
atomic.StoreInt32(&device.isUp, AtomicFalse)
closeUDPConn(device)
} }
atomic.StoreInt32(&device.mtu, int32(mtu))
} }
} }
@ -184,15 +206,7 @@ func (device *Device) LookupPeer(pk NoisePublicKey) *Peer {
func (device *Device) RemovePeer(key NoisePublicKey) { func (device *Device) RemovePeer(key NoisePublicKey) {
device.mutex.Lock() device.mutex.Lock()
defer device.mutex.Unlock() defer device.mutex.Unlock()
removePeerUnsafe(device, key)
peer, ok := device.peers[key]
if !ok {
return
}
peer.mutex.Lock()
device.routingTable.RemovePeer(peer)
delete(device.peers, key)
peer.Close()
} }
func (device *Device) RemoveAllPeers() { func (device *Device) RemoveAllPeers() {

View File

@ -18,12 +18,13 @@ type MACStateDevice struct {
} }
type MACStatePeer struct { type MACStatePeer struct {
mutex sync.RWMutex mutex sync.RWMutex
cookieSet time.Time cookieSet time.Time
cookie [blake2s.Size128]byte cookie [blake2s.Size128]byte
lastMAC1 [blake2s.Size128]byte // TODO: Check if set lastMAC1Set bool
keyMAC1 [blake2s.Size]byte lastMAC1 [blake2s.Size128]byte
keyMAC2 [blake2s.Size]byte keyMAC1 [blake2s.Size]byte
keyMAC2 [blake2s.Size]byte
} }
/* Methods for verifing MAC fields /* Methods for verifing MAC fields
@ -184,6 +185,10 @@ func (device *Device) ConsumeMessageCookieReply(msg *MessageCookieReply) bool {
state.mutex.Lock() state.mutex.Lock()
defer state.mutex.Unlock() defer state.mutex.Unlock()
if !state.lastMAC1Set {
return false
}
_, err := XChaCha20Poly1305Decrypt( _, err := XChaCha20Poly1305Decrypt(
cookie[:0], cookie[:0],
&msg.Nonce, &msg.Nonce,
@ -246,7 +251,7 @@ func (state *MACStatePeer) AddMacs(msg []byte) {
mac.Sum(mac1[:0]) mac.Sum(mac1[:0])
}() }()
copy(state.lastMAC1[:], mac1) copy(state.lastMAC1[:], mac1)
// TODO: Set lastMac flag state.lastMAC1Set = true
// set mac2 // set mac2

View File

@ -9,16 +9,14 @@ import (
"time" "time"
) )
const ()
type Peer struct { type Peer struct {
id uint id uint
mutex sync.RWMutex mutex sync.RWMutex
endpoint *net.UDPAddr
persistentKeepaliveInterval uint64 persistentKeepaliveInterval uint64
keyPairs KeyPairs keyPairs KeyPairs
handshake Handshake handshake Handshake
device *Device device *Device
endpoint *net.UDPAddr
stats struct { stats struct {
txBytes uint64 // bytes send to peer (endpoint) txBytes uint64 // bytes send to peer (endpoint)
rxBytes uint64 // bytes received from peer rxBytes uint64 // bytes received from peer
@ -34,6 +32,7 @@ type Peer struct {
newKeyPair chan struct{} // (size 1) : a new key pair was generated newKeyPair chan struct{} // (size 1) : a new key pair was generated
handshakeBegin chan struct{} // (size 1) : request that a new handshake be started ("queue handshake") handshakeBegin chan struct{} // (size 1) : request that a new handshake be started ("queue handshake")
handshakeCompleted chan struct{} // (size 1) : handshake completed handshakeCompleted chan struct{} // (size 1) : handshake completed
handshakeReset chan struct{} // (size 1) : reset handshake negotiation state
flushNonceQueue chan struct{} // (size 1) : empty queued packets flushNonceQueue chan struct{} // (size 1) : empty queued packets
messageSend chan struct{} // (size 1) : a message was send to the peer messageSend chan struct{} // (size 1) : a message was send to the peer
messageReceived chan struct{} // (size 1) : an authenticated message was received messageReceived chan struct{} // (size 1) : an authenticated message was received
@ -44,6 +43,7 @@ type Peer struct {
keepalivePassive *time.Timer // set upon recieving messages keepalivePassive *time.Timer // set upon recieving messages
newHandshake *time.Timer // begin a new handshake (after Keepalive + RekeyTimeout) newHandshake *time.Timer // begin a new handshake (after Keepalive + RekeyTimeout)
zeroAllKeys *time.Timer // zero all key material (after RejectAfterTime*3) zeroAllKeys *time.Timer // zero all key material (after RejectAfterTime*3)
handshakeDeadline *time.Timer // Current handshake must be completed
pendingKeepalivePassive bool pendingKeepalivePassive bool
pendingNewHandshake bool pendingNewHandshake bool
@ -59,7 +59,7 @@ type Peer struct {
mac MACStatePeer mac MACStatePeer
} }
func (device *Device) NewPeer(pk NoisePublicKey) *Peer { func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
// create peer // create peer
peer := new(Peer) peer := new(Peer)
@ -80,11 +80,17 @@ func (device *Device) NewPeer(pk NoisePublicKey) *Peer {
peer.id = device.idCounter peer.id = device.idCounter
device.idCounter += 1 device.idCounter += 1
// check if over limit
if len(device.peers) >= MaxPeers {
return nil, errors.New("Too many peers")
}
// map public key // map public key
_, ok := device.peers[pk] _, ok := device.peers[pk]
if ok { if ok {
panic(errors.New("bug: adding existing peer")) return nil, errors.New("Adding existing peer")
} }
device.peers[pk] = peer device.peers[pk] = peer
device.mutex.Unlock() device.mutex.Unlock()
@ -108,6 +114,7 @@ func (device *Device) NewPeer(pk NoisePublicKey) *Peer {
peer.signal.stop = make(chan struct{}) peer.signal.stop = make(chan struct{})
peer.signal.newKeyPair = make(chan struct{}, 1) peer.signal.newKeyPair = make(chan struct{}, 1)
peer.signal.handshakeBegin = make(chan struct{}, 1) peer.signal.handshakeBegin = make(chan struct{}, 1)
peer.signal.handshakeReset = make(chan struct{}, 1)
peer.signal.handshakeCompleted = make(chan struct{}, 1) peer.signal.handshakeCompleted = make(chan struct{}, 1)
peer.signal.flushNonceQueue = make(chan struct{}, 1) peer.signal.flushNonceQueue = make(chan struct{}, 1)
@ -117,7 +124,7 @@ func (device *Device) NewPeer(pk NoisePublicKey) *Peer {
go peer.RoutineSequentialSender() go peer.RoutineSequentialSender()
go peer.RoutineSequentialReceiver() go peer.RoutineSequentialReceiver()
return peer return peer, nil
} }
func (peer *Peer) String() string { func (peer *Peer) String() string {

View File

@ -111,113 +111,84 @@ func (device *Device) RoutineBusyMonitor() {
func (device *Device) RoutineReceiveIncomming() { func (device *Device) RoutineReceiveIncomming() {
logInfo := device.log.Info
logDebug := device.log.Debug logDebug := device.log.Debug
logDebug.Println("Routine, receive incomming, started") logDebug.Println("Routine, receive incomming, started")
var buffer *[MaxMessageSize]byte
for { for {
// check if stopped // wait for new conn
var conn *net.UDPConn
select { select {
case <-device.signal.newUDPConn:
device.net.mutex.RLock()
conn = device.net.conn
device.net.mutex.RUnlock()
case <-device.signal.stop: case <-device.signal.stop:
return return
default:
} }
// read next datagram
if buffer == nil {
buffer = device.GetMessageBuffer()
}
// TODO: Take writelock to sleep
device.net.mutex.RLock()
conn := device.net.conn
device.net.mutex.RUnlock()
if conn == nil { if conn == nil {
time.Sleep(time.Second)
continue continue
} }
// TODO: Wait for new conn or message // receive datagrams until closed
conn.SetReadDeadline(time.Now().Add(time.Second))
size, raddr, err := conn.ReadFromUDP(buffer[:]) buffer := device.GetMessageBuffer()
if err != nil || size < MinMessageSize {
continue
}
// handle packet for {
packet := buffer[:size] // read next datagram
msgType := binary.LittleEndian.Uint32(packet[:4])
size, raddr, err := conn.ReadFromUDP(buffer[:]) // TODO: This is broken
if err != nil {
break
}
if size < MinMessageSize {
continue
}
// check size of packet
packet := buffer[:size]
msgType := binary.LittleEndian.Uint32(packet[:4])
var okay bool
func() {
switch msgType { switch msgType {
case MessageInitiationType, MessageResponseType: // check if transport
// TODO: Check size early
// add to handshake queue
device.addToHandshakeQueue(
device.queue.handshake,
QueueHandshakeElement{
msgType: msgType,
buffer: buffer,
packet: packet,
source: raddr,
},
)
buffer = nil
case MessageCookieReplyType:
// TODO: Queue all the things
// verify and update peer cookie state
if len(packet) != MessageCookieReplySize {
return
}
var reply MessageCookieReply
reader := bytes.NewReader(packet)
err := binary.Read(reader, binary.LittleEndian, &reply)
if err != nil {
logDebug.Println("Failed to decode cookie reply")
return
}
device.ConsumeMessageCookieReply(&reply)
case MessageTransportType: case MessageTransportType:
// lookup key pair // check size
if len(packet) < MessageTransportSize { if len(packet) < MessageTransportType {
return continue
} }
// lookup key pair
receiver := binary.LittleEndian.Uint32( receiver := binary.LittleEndian.Uint32(
packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter], packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter],
) )
value := device.indices.Lookup(receiver) value := device.indices.Lookup(receiver)
keyPair := value.keyPair keyPair := value.keyPair
if keyPair == nil { if keyPair == nil {
return continue
} }
// check key-pair expiry // check key-pair expiry
if keyPair.created.Add(RejectAfterTime).Before(time.Now()) { if keyPair.created.Add(RejectAfterTime).Before(time.Now()) {
return continue
} }
// add to peer queue // create work element
peer := value.peer peer := value.peer
elem := &QueueInboundElement{ elem := &QueueInboundElement{
@ -233,11 +204,33 @@ func (device *Device) RoutineReceiveIncomming() {
device.addToInboundQueue(device.queue.decryption, elem) device.addToInboundQueue(device.queue.decryption, elem)
device.addToInboundQueue(peer.queue.inbound, elem) device.addToInboundQueue(peer.queue.inbound, elem)
buffer = nil buffer = nil
continue
default: // otherwise it is a handshake related packet
logInfo.Println("Got unknown message from:", raddr)
case MessageInitiationType:
okay = len(packet) == MessageInitiationSize
case MessageResponseType:
okay = len(packet) == MessageResponseSize
case MessageCookieReplyType:
okay = len(packet) == MessageCookieReplySize
} }
}()
if okay {
device.addToHandshakeQueue(
device.queue.handshake,
QueueHandshakeElement{
msgType: msgType,
buffer: buffer,
packet: packet,
source: raddr,
},
)
buffer = device.GetMessageBuffer()
}
}
} }
} }
@ -306,154 +299,165 @@ func (device *Device) RoutineHandshake() {
return return
} }
func() { // handle cookie fields and ratelimiting
// verify mac1 switch elem.msgType {
case MessageCookieReplyType:
// verify and update peer cookie state
var reply MessageCookieReply
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &reply)
if err != nil {
logDebug.Println("Failed to decode cookie reply")
return
}
device.ConsumeMessageCookieReply(&reply)
continue
case MessageInitiationType, MessageResponseType:
// check mac fields and ratelimit
if !device.mac.CheckMAC1(elem.packet) { if !device.mac.CheckMAC1(elem.packet) {
logDebug.Println("Received packet with invalid mac1") logDebug.Println("Received packet with invalid mac1")
return return
} }
// verify mac2
busy := atomic.LoadInt32(&device.underLoad) == AtomicTrue busy := atomic.LoadInt32(&device.underLoad) == AtomicTrue
if busy && !device.mac.CheckMAC2(elem.packet, elem.source) { if busy {
sender := binary.LittleEndian.Uint32(elem.packet[4:8]) // "sender" always follows "type" if !device.mac.CheckMAC2(elem.packet, elem.source) {
reply, err := device.CreateMessageCookieReply(elem.packet, sender, elem.source) sender := binary.LittleEndian.Uint32(elem.packet[4:8]) // "sender" always follows "type"
if err != nil { reply, err := device.CreateMessageCookieReply(elem.packet, sender, elem.source)
logError.Println("Failed to create cookie reply:", err) if err != nil {
return logError.Println("Failed to create cookie reply:", err)
} return
// TODO: Use temp }
writer := bytes.NewBuffer(elem.packet[:0]) writer := bytes.NewBuffer(temp[:0])
binary.Write(writer, binary.LittleEndian, reply) binary.Write(writer, binary.LittleEndian, reply)
elem.packet = writer.Bytes() _, err = device.net.conn.WriteToUDP(
_, err = device.net.conn.WriteToUDP(elem.packet, elem.source) writer.Bytes(),
if err != nil { elem.source,
logDebug.Println("Failed to send cookie reply:", err)
}
return
}
// ratelimit
// TODO: Only ratelimit when busy
if !device.ratelimiter.Allow(elem.source.IP) {
return
}
// handle messages
switch elem.msgType {
case MessageInitiationType:
// unmarshal
if len(elem.packet) != MessageInitiationSize {
return
}
var msg MessageInitiation
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
logError.Println("Failed to decode initiation message")
return
}
// consume initiation
peer := device.ConsumeMessageInitiation(&msg)
if peer == nil {
logInfo.Println(
"Recieved invalid initiation message from",
elem.source.IP.String(),
elem.source.Port,
) )
return if err != nil {
logDebug.Println("Failed to send cookie reply:", err)
}
continue
} }
if !device.ratelimiter.Allow(elem.source.IP) {
// update timers continue
peer.TimerAnyAuthenticatedPacketTraversal()
peer.TimerAnyAuthenticatedPacketReceived()
// update endpoint
// TODO: Add a race condition \s
peer.mutex.Lock()
peer.endpoint = elem.source
peer.mutex.Unlock()
// create response
response, err := device.CreateMessageResponse(peer)
if err != nil {
logError.Println("Failed to create response message:", err)
return
} }
peer.TimerEphemeralKeyCreated()
peer.NewKeyPair()
logDebug.Println("Creating response message for", peer.String())
writer := bytes.NewBuffer(temp[:0])
binary.Write(writer, binary.LittleEndian, response)
packet := writer.Bytes()
peer.mac.AddMacs(packet)
// send response
peer.SendBuffer(packet)
peer.TimerAnyAuthenticatedPacketTraversal()
case MessageResponseType:
// unmarshal
if len(elem.packet) != MessageResponseSize {
return
}
var msg MessageResponse
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
logError.Println("Failed to decode response message")
return
}
// consume response
peer := device.ConsumeMessageResponse(&msg)
if peer == nil {
logInfo.Println(
"Recieved invalid response message from",
elem.source.IP.String(),
elem.source.Port,
)
return
}
// update timers
peer.TimerAnyAuthenticatedPacketTraversal()
peer.TimerAnyAuthenticatedPacketReceived()
peer.TimerHandshakeComplete()
// derive key-pair
peer.NewKeyPair()
peer.SendKeepAlive()
default:
logError.Println("Invalid message type in handshake queue")
} }
}()
default:
logError.Println("Invalid packet ended up in the handshake queue")
continue
}
// handle handshake initation/response content
switch elem.msgType {
case MessageInitiationType:
// unmarshal
var msg MessageInitiation
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
logError.Println("Failed to decode initiation message")
continue
}
// consume initiation
peer := device.ConsumeMessageInitiation(&msg)
if peer == nil {
logInfo.Println(
"Recieved invalid initiation message from",
elem.source.IP.String(),
elem.source.Port,
)
continue
}
// update timers
peer.TimerAnyAuthenticatedPacketTraversal()
peer.TimerAnyAuthenticatedPacketReceived()
// update endpoint
// TODO: Discover destination address also, only update on change
peer.mutex.Lock()
peer.endpoint = elem.source
peer.mutex.Unlock()
// create response
response, err := device.CreateMessageResponse(peer)
if err != nil {
logError.Println("Failed to create response message:", err)
continue
}
peer.TimerEphemeralKeyCreated()
peer.NewKeyPair()
logDebug.Println("Creating response message for", peer.String())
writer := bytes.NewBuffer(temp[:0])
binary.Write(writer, binary.LittleEndian, response)
packet := writer.Bytes()
peer.mac.AddMacs(packet)
// send response
_, err = peer.SendBuffer(packet)
if err == nil {
peer.TimerAnyAuthenticatedPacketTraversal()
}
case MessageResponseType:
// unmarshal
var msg MessageResponse
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
logError.Println("Failed to decode response message")
continue
}
// consume response
peer := device.ConsumeMessageResponse(&msg)
if peer == nil {
logInfo.Println(
"Recieved invalid response message from",
elem.source.IP.String(),
elem.source.Port,
)
continue
}
peer.TimerEphemeralKeyCreated()
// update timers
peer.TimerAnyAuthenticatedPacketTraversal()
peer.TimerAnyAuthenticatedPacketReceived()
peer.TimerHandshakeComplete()
// derive key-pair
peer.NewKeyPair()
peer.SendKeepAlive()
}
} }
} }
@ -463,6 +467,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
device := peer.device device := peer.device
logInfo := device.log.Info logInfo := device.log.Info
logError := device.log.Error
logDebug := device.log.Debug logDebug := device.log.Debug
logDebug.Println("Routine, sequential receiver, started for peer", peer.id) logDebug.Println("Routine, sequential receiver, started for peer", peer.id)
@ -478,116 +483,104 @@ func (peer *Peer) RoutineSequentialReceiver() {
// process packet // process packet
func() { if elem.IsDropped() {
if elem.IsDropped() { continue
return }
// check for replay
if !elem.keyPair.replayFilter.ValidateCounter(elem.counter) {
continue
}
peer.TimerAnyAuthenticatedPacketTraversal()
peer.TimerAnyAuthenticatedPacketReceived()
peer.KeepKeyFreshReceiving()
// check if using new key-pair
kp := &peer.keyPairs
kp.mutex.Lock()
if kp.next == elem.keyPair {
peer.TimerHandshakeComplete()
kp.previous = kp.current
kp.current = kp.next
kp.next = nil
}
kp.mutex.Unlock()
// check for keep-alive
if len(elem.packet) == 0 {
logDebug.Println("Received keep-alive from", peer.String())
continue
}
peer.TimerDataReceived()
// verify source and strip padding
switch elem.packet[0] >> 4 {
case ipv4.Version:
// strip padding
if len(elem.packet) < ipv4.HeaderLen {
continue
} }
// check for replay field := elem.packet[IPv4offsetTotalLength : IPv4offsetTotalLength+2]
length := binary.BigEndian.Uint16(field)
if !elem.keyPair.replayFilter.ValidateCounter(elem.counter) { if int(length) > len(elem.packet) || int(length) < ipv4.HeaderLen {
return continue
} }
peer.TimerAnyAuthenticatedPacketTraversal() elem.packet = elem.packet[:length]
peer.TimerAnyAuthenticatedPacketReceived()
peer.KeepKeyFreshReceiving()
// check if using new key-pair // verify IPv4 source
kp := &peer.keyPairs src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len]
kp.mutex.Lock() if device.routingTable.LookupIPv4(src) != peer {
if kp.next == elem.keyPair { logInfo.Println("Packet with unallowed source IP from", peer.String())
peer.TimerHandshakeComplete() continue
kp.previous = kp.current
kp.current = kp.next
kp.next = nil
}
kp.mutex.Unlock()
// check for keep-alive
if len(elem.packet) == 0 {
logDebug.Println("Received keep-alive from", peer.String())
return
}
peer.TimerDataReceived()
// verify source and strip padding
switch elem.packet[0] >> 4 {
case ipv4.Version:
// strip padding
if len(elem.packet) < ipv4.HeaderLen {
return
}
field := elem.packet[IPv4offsetTotalLength : IPv4offsetTotalLength+2]
length := binary.BigEndian.Uint16(field)
// TODO: check length of packet & NOT TOO SMALL either
elem.packet = elem.packet[:length]
// verify IPv4 source
src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len]
if device.routingTable.LookupIPv4(src) != peer {
logInfo.Println("Packet with unallowed source IP from", peer.String())
return
}
case ipv6.Version:
// strip padding
if len(elem.packet) < ipv6.HeaderLen {
return
}
field := elem.packet[IPv6offsetPayloadLength : IPv6offsetPayloadLength+2]
length := binary.BigEndian.Uint16(field)
length += ipv6.HeaderLen
// TODO: check length of packet
elem.packet = elem.packet[:length]
// verify IPv6 source
src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len]
if device.routingTable.LookupIPv6(src) != peer {
logInfo.Println("Packet with unallowed source IP from", peer.String())
return
}
default:
logInfo.Println("Packet with invalid IP version from", peer.String())
return
} }
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet))) case ipv6.Version:
device.addToInboundQueue(device.queue.inbound, elem)
// TODO: move TUN write into per peer routine // strip padding
}()
}
}
func (device *Device) RoutineWriteToTUN() { if len(elem.packet) < ipv6.HeaderLen {
continue
logError := device.log.Error
logDebug := device.log.Debug
logDebug.Println("Routine, sequential tun writer, started")
for {
select {
case <-device.signal.stop:
return
case elem := <-device.queue.inbound:
_, err := device.tun.Write(elem.packet)
device.PutMessageBuffer(elem.buffer)
if err != nil {
logError.Println("Failed to write packet to TUN device:", err)
} }
field := elem.packet[IPv6offsetPayloadLength : IPv6offsetPayloadLength+2]
length := binary.BigEndian.Uint16(field)
length += ipv6.HeaderLen
if int(length) > len(elem.packet) {
continue
}
elem.packet = elem.packet[:length]
// verify IPv6 source
src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len]
if device.routingTable.LookupIPv6(src) != peer {
logInfo.Println("Packet with unallowed source IP from", peer.String())
continue
}
default:
logInfo.Println("Packet with invalid IP version from", peer.String())
continue
}
// write to tun
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)))
_, err := device.tun.Write(elem.packet)
device.PutMessageBuffer(elem.buffer)
if err != nil {
logError.Println("Failed to write packet to TUN device:", err)
} }
} }
} }

View File

@ -168,8 +168,6 @@ func (device *Device) RoutineReadFromTUN() {
continue continue
} }
println(size, err)
elem.packet = elem.packet[:size] elem.packet = elem.packet[:size]
// lookup peer // lookup peer
@ -210,6 +208,7 @@ func (device *Device) RoutineReadFromTUN() {
// insert into nonce/pre-handshake queue // insert into nonce/pre-handshake queue
signalSend(peer.signal.handshakeReset)
addToOutboundQueue(peer.queue.nonce, elem) addToOutboundQueue(peer.queue.nonce, elem)
elem = nil elem = nil

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"golang.org/x/crypto/blake2s" "golang.org/x/crypto/blake2s"
"math/rand"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@ -16,12 +17,11 @@ func (peer *Peer) KeepKeyFreshSending() {
if kp == nil { if kp == nil {
return return
} }
if !kp.isInitiator {
return
}
nonce := atomic.LoadUint64(&kp.sendNonce) nonce := atomic.LoadUint64(&kp.sendNonce)
send := nonce > RekeyAfterMessages || time.Now().Sub(kp.created) > RekeyAfterTime if nonce > RekeyAfterMessages {
if send { signalSend(peer.signal.handshakeBegin)
}
if kp.isInitiator && time.Now().Sub(kp.created) > RekeyAfterTime {
signalSend(peer.signal.handshakeBegin) signalSend(peer.signal.handshakeBegin)
} }
} }
@ -30,6 +30,7 @@ func (peer *Peer) KeepKeyFreshSending() {
* *
*/ */
func (peer *Peer) KeepKeyFreshReceiving() { func (peer *Peer) KeepKeyFreshReceiving() {
// TODO: Add a guard, clear on handshake complete (clear in TimerHandshakeComplete)
kp := peer.keyPairs.Current() kp := peer.keyPairs.Current()
if kp == nil { if kp == nil {
return return
@ -108,7 +109,6 @@ func (peer *Peer) TimerAnyAuthenticatedPacketTraversal() {
* - First transport message under the "next" key * - First transport message under the "next" key
*/ */
func (peer *Peer) TimerHandshakeComplete() { func (peer *Peer) TimerHandshakeComplete() {
timerStop(peer.timer.zeroAllKeys)
atomic.StoreInt64( atomic.StoreInt64(
&peer.stats.lastHandshakeNano, &peer.stats.lastHandshakeNano,
time.Now().UnixNano(), time.Now().UnixNano(),
@ -129,10 +129,7 @@ func (peer *Peer) TimerHandshakeComplete() {
* upon failure to complete a handshake * upon failure to complete a handshake
*/ */
func (peer *Peer) TimerEphemeralKeyCreated() { func (peer *Peer) TimerEphemeralKeyCreated() {
if !peer.timer.pendingZeroAllKeys { peer.timer.zeroAllKeys.Reset(RejectAfterTime * 3)
peer.timer.pendingZeroAllKeys = true
peer.timer.zeroAllKeys.Reset(RejectAfterTime * 3)
}
} }
func (peer *Peer) RoutineTimerHandler() { func (peer *Peer) RoutineTimerHandler() {
@ -154,19 +151,19 @@ func (peer *Peer) RoutineTimerHandler() {
interval := atomic.LoadUint64(&peer.persistentKeepaliveInterval) interval := atomic.LoadUint64(&peer.persistentKeepaliveInterval)
if interval > 0 { if interval > 0 {
logDebug.Println("Sending persistent keep-alive to", peer.String()) logDebug.Println("Sending keep-alive to", peer.String())
peer.SendKeepAlive() peer.SendKeepAlive()
} }
case <-peer.timer.keepalivePassive.C: case <-peer.timer.keepalivePassive.C:
logDebug.Println("Sending passive keep-alive to", peer.String()) logDebug.Println("Sending keep-alive to", peer.String())
peer.SendKeepAlive() peer.SendKeepAlive()
if peer.timer.needAnotherKeepalive { if peer.timer.needAnotherKeepalive {
peer.timer.keepalivePassive.Reset(KeepaliveTimeout) peer.timer.keepalivePassive.Reset(KeepaliveTimeout)
peer.timer.needAnotherKeepalive = true peer.timer.needAnotherKeepalive = false
} }
// unresponsive session // unresponsive session
@ -189,8 +186,6 @@ func (peer *Peer) RoutineTimerHandler() {
kp := &peer.keyPairs kp := &peer.keyPairs
kp.mutex.Lock() kp.mutex.Lock()
peer.timer.pendingZeroAllKeys = false
// unmap indecies // unmap indecies
indices.mutex.Lock() indices.mutex.Lock()
@ -251,40 +246,41 @@ func (peer *Peer) RoutineHandshakeInitiator() {
return return
} }
// wait for handshake // set deadline
deadline := time.Now().Add(MaxHandshakeAttemptTime) BeginHandshakes:
signalClear(peer.signal.handshakeReset)
deadline := time.NewTimer(RekeyAttemptTime)
AttemptHandshakes:
Loop:
for attempts := uint(1); ; attempts++ { for attempts := uint(1); ; attempts++ {
// clear completed signal // check if deadline reached
select { select {
case <-peer.signal.handshakeCompleted: case <-deadline.C:
logInfo.Println("Handshake negotiation timed out for:", peer.String())
signalSend(peer.signal.flushNonceQueue)
timerStop(peer.timer.keepalivePersistent)
break
case <-peer.signal.stop: case <-peer.signal.stop:
return return
default: default:
} }
// check if sufficient time for retry signalClear(peer.signal.handshakeCompleted)
if deadline.Before(time.Now().Add(RekeyTimeout)) {
logInfo.Println("Handshake negotiation timed out for", peer.String())
signalSend(peer.signal.flushNonceQueue)
timerStop(peer.timer.keepalivePersistent)
timerStop(peer.timer.keepalivePassive)
break Loop
}
// create initiation message // create initiation message
msg, err := peer.device.CreateMessageInitiation(peer) msg, err := peer.device.CreateMessageInitiation(peer)
if err != nil { if err != nil {
logError.Println("Failed to create handshake initiation message:", err) logError.Println("Failed to create handshake initiation message:", err)
break Loop break AttemptHandshakes
} }
peer.TimerEphemeralKeyCreated()
jitter := time.Millisecond * time.Duration(rand.Uint32()%334)
// marshal and send // marshal and send
@ -299,14 +295,14 @@ func (peer *Peer) RoutineHandshakeInitiator() {
"Failed to send handshake initiation message to", "Failed to send handshake initiation message to",
peer.String(), ":", err, peer.String(), ":", err,
) )
continue break
} }
peer.TimerAnyAuthenticatedPacketTraversal() peer.TimerAnyAuthenticatedPacketTraversal()
// set timeout // set handshake timeout
timeout := time.NewTimer(RekeyTimeout) timeout := time.NewTimer(RekeyTimeout + jitter)
logDebug.Println( logDebug.Println(
"Handshake initiation attempt", "Handshake initiation attempt",
attempts, "sent to", peer.String(), attempts, "sent to", peer.String(),
@ -321,15 +317,19 @@ func (peer *Peer) RoutineHandshakeInitiator() {
case <-peer.signal.handshakeCompleted: case <-peer.signal.handshakeCompleted:
<-timeout.C <-timeout.C
break Loop break AttemptHandshakes
case <-peer.signal.handshakeReset:
<-timeout.C
goto BeginHandshakes
case <-timeout.C: case <-timeout.C:
// TODO: Clear source address for peer
continue continue
} }
} }
// allow new signal to be set // clear signal set in the meantime
signalClear(peer.signal.handshakeBegin) signalClear(peer.signal.handshakeBegin)
} }

View File

@ -6,10 +6,19 @@ package main
const DefaultMTU = 1420 const DefaultMTU = 1420
type TUNEvent int
const (
TUNEventUp = 1 << iota
TUNEventDown
TUNEventMTUUpdate
)
type TUNDevice interface { type TUNDevice interface {
Read([]byte) (int, error) // read a packet from the device (without any additional headers) Read([]byte) (int, error) // read a packet from the device (without any additional headers)
Write([]byte) (int, error) // writes a packet to the device (without any additional headers) Write([]byte) (int, error) // writes a packet to the device (without any additional headers)
IsUp() (bool, error) // is the interface up?
MTU() (int, error) // returns the MTU of the device MTU() (int, error) // returns the MTU of the device
Name() string // returns the current name Name() string // returns the current name
Events() chan TUNEvent // returns a constant channel of events related to the device
Close() error // stops the device and closes the event channel
} }

View File

@ -16,11 +16,12 @@ import (
const CloneDevicePath = "/dev/net/tun" const CloneDevicePath = "/dev/net/tun"
type NativeTun struct { type NativeTun struct {
fd *os.File fd *os.File
name string name string
events chan TUNEvent
} }
func (tun *NativeTun) IsUp() (bool, error) { func (tun *NativeTun) isUp() (bool, error) {
inter, err := net.InterfaceByName(tun.name) inter, err := net.InterfaceByName(tun.name)
return inter.Flags&net.FlagUp != 0, err return inter.Flags&net.FlagUp != 0, err
} }
@ -111,6 +112,14 @@ func (tun *NativeTun) Read(d []byte) (int, error) {
return tun.fd.Read(d) return tun.fd.Read(d)
} }
func (tun *NativeTun) Events() chan TUNEvent {
return tun.events
}
func (tun *NativeTun) Close() error {
return nil
}
func CreateTUN(name string) (TUNDevice, error) { func CreateTUN(name string) (TUNDevice, error) {
// open clone device // open clone device
@ -146,10 +155,14 @@ func CreateTUN(name string) (TUNDevice, error) {
newName := string(ifr[:]) newName := string(ifr[:])
newName = newName[:strings.Index(newName, "\000")] newName = newName[:strings.Index(newName, "\000")]
device := &NativeTun{ device := &NativeTun{
fd: fd, fd: fd,
name: newName, name: newName,
events: make(chan TUNEvent, 5),
} }
// TODO: Wait for device to be upped
device.events <- TUNEventUp
// set default MTU // set default MTU
err = device.setMTU(DefaultMTU) err = device.setMTU(DefaultMTU)

View File

@ -7,7 +7,6 @@ import (
"net" "net"
"os" "os"
"path" "path"
"time"
) )
const ( const (
@ -26,9 +25,10 @@ const (
*/ */
type UAPIListener struct { type UAPIListener struct {
listener net.Listener // unix socket listener listener net.Listener // unix socket listener
connNew chan net.Conn connNew chan net.Conn
connErr chan error connErr chan error
inotifyFd int
} }
func (l *UAPIListener) Accept() (net.Conn, error) { func (l *UAPIListener) Accept() (net.Conn, error) {
@ -106,9 +106,28 @@ func NewUAPIListener(name string) (net.Listener, error) {
// watch for deletion of socket // watch for deletion of socket
uapi.inotifyFd, err = unix.InotifyInit()
if err != nil {
return nil, err
}
_, err = unix.InotifyAddWatch(
uapi.inotifyFd,
socketPath,
unix.IN_ATTRIB|
unix.IN_DELETE|
unix.IN_DELETE_SELF,
)
if err != nil {
return nil, err
}
go func(l *UAPIListener) { go func(l *UAPIListener) {
for ; ; time.Sleep(time.Second) { var buff [4096]byte
if _, err := os.Stat(socketPath); os.IsNotExist(err) { for {
unix.Read(uapi.inotifyFd, buff[:])
if _, err := os.Lstat(socketPath); os.IsNotExist(err) {
l.connErr <- err l.connErr <- err
return return
} }