1
0
mirror of https://git.zx2c4.com/wireguard-go synced 2024-11-15 01:05:15 +01:00

device: prepare for multiple send/receive

This commit is contained in:
Jason A. Donenfeld 2019-07-01 09:39:08 +02:00
parent 5e6eff81b6
commit 647d7b7157
6 changed files with 77 additions and 45 deletions

View File

@ -24,7 +24,8 @@ type Bind interface {
SetMark(value uint32) error
ReceiveIPv6(buff []byte) (int, Endpoint, error)
ReceiveIPv4(buff []byte) (int, Endpoint, error)
Send(buff []byte, end Endpoint) error
Send(buff []byte, end Endpoint, now bool) error
Flush() error
Close() error
}

View File

@ -152,7 +152,7 @@ func (bind *nativeBind) ReceiveIPv6(buff []byte) (int, Endpoint, error) {
return n, (*NativeEndpoint)(endpoint), err
}
func (bind *nativeBind) Send(buff []byte, endpoint Endpoint) error {
func (bind *nativeBind) Send(buff []byte, endpoint Endpoint, now bool) error {
var err error
nend := endpoint.(*NativeEndpoint)
if nend.IP.To4() != nil {
@ -168,3 +168,7 @@ func (bind *nativeBind) Send(buff []byte, endpoint Endpoint) error {
}
return err
}
func (bind *nativeBind) Flush() error {
return nil
}

View File

@ -259,7 +259,7 @@ func (bind *nativeBind) ReceiveIPv4(buff []byte) (int, Endpoint, error) {
return n, &end, err
}
func (bind *nativeBind) Send(buff []byte, end Endpoint) error {
func (bind *nativeBind) Send(buff []byte, end Endpoint, now bool) error {
nend := end.(*NativeEndpoint)
if !nend.isV6 {
if bind.sock4 == -1 {
@ -274,6 +274,10 @@ func (bind *nativeBind) Send(buff []byte, end Endpoint) error {
}
}
func (bind *nativeBind) Flush() error {
return nil
}
func (end *NativeEndpoint) SrcIP() net.IP {
if !end.isV6 {
return net.IPv4(

View File

@ -126,7 +126,7 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
return peer, nil
}
func (peer *Peer) SendBuffer(buffer []byte) error {
func (peer *Peer) SendBuffer(buffer []byte, now bool) error {
peer.device.net.RLock()
defer peer.device.net.RUnlock()
@ -141,7 +141,7 @@ func (peer *Peer) SendBuffer(buffer []byte) error {
return errors.New("no known endpoint for peer")
}
err := peer.device.net.bind.Send(buffer, peer.endpoint)
err := peer.device.net.bind.Send(buffer, peer.endpoint, now)
if err == nil {
atomic.AddUint64(&peer.stats.txBytes, uint64(len(buffer)))
}

View File

@ -485,7 +485,7 @@ func (device *Device) RoutineHandshake() {
}
}
func (peer *Peer) elementStopOrFlush(shouldFlush *bool) (stop bool, elemOk bool, elem *QueueInboundElement) {
func (peer *Peer) receiveElementStopOrFlush(shouldFlush *bool) (stop bool, elemOk bool, elem *QueueInboundElement) {
if !*shouldFlush {
select {
case <-peer.routines.stop:
@ -505,9 +505,9 @@ func (peer *Peer) elementStopOrFlush(shouldFlush *bool) (stop bool, elemOk bool,
*shouldFlush = false
err := peer.device.tun.device.Flush()
if err != nil {
peer.device.log.Error.Printf("Unable to flush packets: %v", err)
peer.device.log.Error.Printf("Unable to flush receive packets: %v", err)
}
return peer.elementStopOrFlush(shouldFlush)
return peer.receiveElementStopOrFlush(shouldFlush)
}
}
}
@ -549,7 +549,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
elem = nil
}
stop, ok, elem = peer.elementStopOrFlush(&shouldFlush)
stop, ok, elem = peer.receiveElementStopOrFlush(&shouldFlush)
if stop || !ok {
return
}

View File

@ -160,7 +160,7 @@ func (peer *Peer) SendHandshakeInitiation(isRetry bool) error {
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
err = peer.SendBuffer(packet)
err = peer.SendBuffer(packet, true)
if err != nil {
peer.device.log.Error.Println(peer, "- Failed to send handshake initiation", err)
}
@ -198,7 +198,7 @@ func (peer *Peer) SendHandshakeResponse() error {
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
err = peer.SendBuffer(packet)
err = peer.SendBuffer(packet, true)
if err != nil {
peer.device.log.Error.Println(peer, "- Failed to send handshake response", err)
}
@ -219,7 +219,7 @@ func (device *Device) SendHandshakeCookie(initiatingElem *QueueHandshakeElement)
var buff [MessageCookieReplySize]byte
writer := bytes.NewBuffer(buff[:0])
binary.Write(writer, binary.LittleEndian, reply)
device.net.bind.Send(writer.Bytes(), initiatingElem.endpoint)
device.net.bind.Send(writer.Bytes(), initiatingElem.endpoint, true)
if err != nil {
device.log.Error.Println("Failed to send cookie reply:", err)
}
@ -541,6 +541,33 @@ func (device *Device) RoutineEncryption() {
}
}
func (peer *Peer) sendElementStopOrFlush(shouldFlush *bool) (stop bool, elemOk bool, elem *QueueOutboundElement) {
if !*shouldFlush {
select {
case <-peer.routines.stop:
stop = true
return
case elem, elemOk = <-peer.queue.outbound:
return
}
} else {
select {
case <-peer.routines.stop:
stop = true
return
case elem, elemOk = <-peer.queue.outbound:
return
default:
*shouldFlush = false
err := peer.device.net.bind.Flush()
if err != nil {
peer.device.log.Error.Printf("Unable to flush send packets: %v", err)
}
return peer.sendElementStopOrFlush(shouldFlush)
}
}
}
/* Sequentially reads packets from queue and sends to endpoint
*
* Obs. Single instance per peer.
@ -577,41 +604,37 @@ func (peer *Peer) RoutineSequentialSender() {
peer.routines.starting.Done()
shouldFlush := false
for {
select {
case <-peer.routines.stop:
stop, ok, elem := peer.sendElementStopOrFlush(&shouldFlush)
if stop || !ok {
return
case elem, ok := <-peer.queue.outbound:
if !ok {
return
}
elem.Lock()
if elem.IsDropped() {
device.PutOutboundElement(elem)
continue
}
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
// send message and return buffer to pool
err := peer.SendBuffer(elem.packet)
if len(elem.packet) != MessageKeepaliveSize {
peer.timersDataSent()
}
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
if err != nil {
logError.Println(peer, "- Failed to send data packet", err)
continue
}
peer.keepKeyFreshSending()
}
elem.Lock()
if elem.IsDropped() {
device.PutOutboundElement(elem)
continue
}
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
// send message and return buffer to pool
err := peer.SendBuffer(elem.packet, false)
if len(elem.packet) != MessageKeepaliveSize {
peer.timersDataSent()
}
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
if err != nil {
logError.Println(peer, "- Failed to send data packet", err)
continue
} else {
shouldFlush = true
}
peer.keepKeyFreshSending()
}
}