1
0
mirror of https://git.zx2c4.com/wireguard-go synced 2024-11-15 01:05:15 +01:00

device: use channel close to shut down and drain outbound channel

This is a similar treatment to the handling of the encryption
channel found a few commits ago: Use the closing of the channel
to manage goroutine lifetime and shutdown.
It is considerably simpler because there is only a single writer.

Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
This commit is contained in:
Josh Bleecher Snyder 2020-12-15 15:54:48 -08:00
parent e739ff71a5
commit 15af3e58ce
2 changed files with 35 additions and 56 deletions

View File

@ -17,7 +17,7 @@ import (
) )
const ( const (
PeerRoutineNumber = 3 PeerRoutineNumber = 2
) )
type Peer struct { type Peer struct {
@ -287,7 +287,6 @@ func (peer *Peer) Stop() {
peer.queue.Lock() peer.queue.Lock()
close(peer.queue.nonce) close(peer.queue.nonce)
close(peer.queue.outbound)
close(peer.queue.inbound) close(peer.queue.inbound)
peer.queue.Unlock() peer.queue.Unlock()

View File

@ -372,6 +372,7 @@ func (peer *Peer) RoutineNonce() {
logDebug.Println(peer, "- Routine: nonce worker - stopped") logDebug.Println(peer, "- Routine: nonce worker - stopped")
peer.queue.packetInNonceQueueIsAwaitingKey.Set(false) peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
device.queue.encryption.wg.Done() // no more writes from us device.queue.encryption.wg.Done() // no more writes from us
close(peer.queue.outbound) // no more writes to this channel
peer.routines.stopping.Done() peer.routines.stopping.Done()
}() }()
@ -545,64 +546,43 @@ func (peer *Peer) RoutineSequentialSender() {
logDebug := device.log.Debug logDebug := device.log.Debug
logError := device.log.Error logError := device.log.Error
defer func() { defer logDebug.Println(peer, "- Routine: sequential sender - stopped")
for {
select {
case elem, ok := <-peer.queue.outbound:
if ok {
elem.Lock()
if !elem.IsDropped() {
device.PutMessageBuffer(elem.buffer)
elem.Drop()
}
device.PutOutboundElement(elem)
}
default:
goto out
}
}
out:
logDebug.Println(peer, "- Routine: sequential sender - stopped")
peer.routines.stopping.Done()
}()
logDebug.Println(peer, "- Routine: sequential sender - started") logDebug.Println(peer, "- Routine: sequential sender - started")
for { for elem := range peer.queue.outbound {
select { elem.Lock()
if elem.IsDropped() {
case <-peer.routines.stop: device.PutOutboundElement(elem)
return continue
}
case elem, ok := <-peer.queue.outbound: if !peer.isRunning.Get() {
// peer has been stopped; return re-usable elems to the shared pool.
if !ok { // This is an optimization only. It is possible for the peer to be stopped
return // immediately after this check, in which case, elem will get processed.
} // The timers and SendBuffer code are resilient to a few stragglers.
// TODO(josharian): rework peer shutdown order to ensure
elem.Lock() // that we never accidentally keep timers alive longer than necessary.
if elem.IsDropped() {
device.PutOutboundElement(elem)
continue
}
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
// send message and return buffer to pool
err := peer.SendBuffer(elem.packet)
if len(elem.packet) != MessageKeepaliveSize {
peer.timersDataSent()
}
device.PutMessageBuffer(elem.buffer) device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem) device.PutOutboundElement(elem)
if err != nil { continue
logError.Println(peer, "- Failed to send data packet", err)
continue
}
peer.keepKeyFreshSending()
} }
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
// send message and return buffer to pool
err := peer.SendBuffer(elem.packet)
if len(elem.packet) != MessageKeepaliveSize {
peer.timersDataSent()
}
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
if err != nil {
logError.Println(peer, "- Failed to send data packet", err)
continue
}
peer.keepKeyFreshSending()
} }
} }