Supporting graceful signal shutdown.

Adding a signal handler and gracefully shutting down all threads.
This commit is contained in:
Victor Marmol 2014-09-23 11:00:19 -07:00
parent 195772a1bc
commit 6f35cf8e6c
6 changed files with 144 additions and 49 deletions

View File

@ -19,6 +19,8 @@ import (
"fmt" "fmt"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os"
"os/signal"
"runtime" "runtime"
"github.com/golang/glog" "github.com/golang/glog"
@ -38,6 +40,7 @@ var maxProcs = flag.Int("max_procs", 0, "max number of CPUs that can be used sim
var argDbDriver = flag.String("storage_driver", "", "storage driver to use. Empty means none. Options are: <empty> (default), bigquery, and influxdb") var argDbDriver = flag.String("storage_driver", "", "storage driver to use. Empty means none. Options are: <empty> (default), bigquery, and influxdb")
func main() { func main() {
defer glog.Flush()
flag.Parse() flag.Parse()
setMaxProcs() setMaxProcs()
@ -91,25 +94,18 @@ func main() {
} }
}) })
defer glog.Flush() // Start the manager.
if err := containerManager.Start(); err != nil {
glog.Fatalf("Failed to start container manager: %v", err)
}
errChan := make(chan error) // Install signal handler.
go func() { installSignalHandler(containerManager)
errChan <- containerManager.Start()
}()
glog.Infof("Starting cAdvisor version: %q", info.VERSION) glog.Infof("Starting cAdvisor version: %q on port %d", info.VERSION, *argPort)
glog.Infof("About to serve on port %d", *argPort)
addr := fmt.Sprintf(":%v", *argPort) addr := fmt.Sprintf(":%v", *argPort)
glog.Fatal(http.ListenAndServe(addr, nil))
go func() {
errChan <- http.ListenAndServe(addr, nil)
}()
select {
case err := <-errChan:
glog.Fatal(err)
}
} }
func setMaxProcs() { func setMaxProcs() {
@ -129,3 +125,18 @@ func setMaxProcs() {
glog.Warningf("Specified max procs of %v but using %v", numProcs, actualNumProcs) glog.Warningf("Specified max procs of %v but using %v", numProcs, actualNumProcs)
} }
} }
func installSignalHandler(containerManager manager.Manager) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
// Block until a signal is received.
go func() {
sig := <-c
if err := containerManager.Stop(); err != nil {
glog.Errorf("Failed to stop container manager: %v", err)
}
glog.Infof("Exiting given signal: %v", sig)
os.Exit(0)
}()
}

View File

@ -47,4 +47,5 @@ type ContainerHandler interface {
ListThreads(listType ListType) ([]int, error) ListThreads(listType ListType) ([]int, error)
ListProcesses(listType ListType) ([]int, error) ListProcesses(listType ListType) ([]int, error)
WatchSubcontainers(events chan SubcontainerEvent) error WatchSubcontainers(events chan SubcontainerEvent) error
StopWatchingSubcontainers() error
} }

View File

@ -268,3 +268,8 @@ func (self *dockerContainerHandler) ListProcesses(listType container.ListType) (
func (self *dockerContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error { func (self *dockerContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error {
return fmt.Errorf("watch is unimplemented in the Docker container driver") return fmt.Errorf("watch is unimplemented in the Docker container driver")
} }
func (self *dockerContainerHandler) StopWatchingSubcontainers() error {
// No-op for Docker driver.
return nil
}

View File

@ -80,6 +80,11 @@ func (self *MockContainerHandler) WatchSubcontainers(events chan SubcontainerEve
return args.Error(0) return args.Error(0)
} }
func (self *MockContainerHandler) StopWatchingSubcontainers() error {
args := self.Called()
return args.Error(0)
}
type FactoryForMockContainerHandler struct { type FactoryForMockContainerHandler struct {
Name string Name string
PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler) PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler)

View File

@ -37,6 +37,7 @@ type rawContainerHandler struct {
cgroupSubsystems *cgroupSubsystems cgroupSubsystems *cgroupSubsystems
machineInfoFactory info.MachineInfoFactory machineInfoFactory info.MachineInfoFactory
watcher *inotify.Watcher watcher *inotify.Watcher
stopWatcher chan error
watches map[string]struct{} watches map[string]struct{}
} }
@ -49,6 +50,7 @@ func newRawContainerHandler(name string, cgroupSubsystems *cgroupSubsystems, mac
}, },
cgroupSubsystems: cgroupSubsystems, cgroupSubsystems: cgroupSubsystems,
machineInfoFactory: machineInfoFactory, machineInfoFactory: machineInfoFactory,
stopWatcher: make(chan error),
watches: make(map[string]struct{}), watches: make(map[string]struct{}),
}, nil }, nil
} }
@ -319,13 +321,30 @@ func (self *rawContainerHandler) WatchSubcontainers(events chan container.Subcon
case event := <-self.watcher.Event: case event := <-self.watcher.Event:
err := self.processEvent(event, events) err := self.processEvent(event, events)
if err != nil { if err != nil {
glog.Warning("Error while processing event (%+v): %v", event, err) glog.Warningf("Error while processing event (%+v): %v", event, err)
} }
case err := <-self.watcher.Error: case err := <-self.watcher.Error:
glog.Warning("Error while watching %q:", self.name, err) glog.Warningf("Error while watching %q:", self.name, err)
case <-self.stopWatcher:
err := self.watcher.Close()
if err == nil {
self.stopWatcher <- err
self.watcher = nil
return
}
} }
} }
}() }()
return nil return nil
} }
func (self *rawContainerHandler) StopWatchingSubcontainers() error {
if self.watcher == nil {
return fmt.Errorf("can't stop watch that has not started for container %q", self.name)
}
// Rendezvous with the watcher thread.
self.stopWatcher <- nil
return <-self.stopWatcher
}

View File

@ -31,9 +31,12 @@ import (
var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings") var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings")
type Manager interface { type Manager interface {
// Start the manager, blocks forever. // Start the manager.
Start() error Start() error
// Stops the manager.
Stop() error
// Get information about a container. // Get information about a container.
GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
@ -51,8 +54,11 @@ func New(driver storage.StorageDriver) (Manager, error) {
if driver == nil { if driver == nil {
return nil, fmt.Errorf("nil storage driver!") return nil, fmt.Errorf("nil storage driver!")
} }
newManager := &manager{} newManager := &manager{
newManager.containers = make(map[string]*containerData) containers: make(map[string]*containerData),
quitChannels: make([]chan error, 0, 2),
storageDriver: driver,
}
machineInfo, err := getMachineInfo() machineInfo, err := getMachineInfo()
if err != nil { if err != nil {
@ -78,6 +84,7 @@ type manager struct {
storageDriver storage.StorageDriver storageDriver storage.StorageDriver
machineInfo info.MachineInfo machineInfo info.MachineInfo
versionInfo info.VersionInfo versionInfo info.VersionInfo
quitChannels []chan error
} }
// Start the container manager. // Start the container manager.
@ -94,34 +101,69 @@ func (self *manager) Start() error {
} }
glog.Infof("Recovery completed") glog.Infof("Recovery completed")
// Watch for new container.
quitWatcher := make(chan error)
err = self.watchForNewContainers(quitWatcher)
if err != nil {
return err
}
self.quitChannels = append(self.quitChannels, quitWatcher)
// Look for new containers in the main housekeeping thread.
quitGlobalHousekeeping := make(chan error)
self.quitChannels = append(self.quitChannels, quitGlobalHousekeeping)
go self.globalHousekeeping(quitGlobalHousekeeping)
return nil
}
func (self *manager) Stop() error {
// Stop and wait on all quit channels.
for i, c := range self.quitChannels {
// Send the exit signal and wait on the thread to exit (by closing the channel).
c <- nil
err := <-c
if err != nil {
// Remove the channels that quit successfully.
self.quitChannels = self.quitChannels[i:]
return err
}
}
self.quitChannels = make([]chan error, 0, 2)
return nil
}
func (self *manager) globalHousekeeping(quit chan error) {
// Long housekeeping is either 100ms or half of the housekeeping interval. // Long housekeeping is either 100ms or half of the housekeeping interval.
longHousekeeping := 100 * time.Millisecond longHousekeeping := 100 * time.Millisecond
if *globalHousekeepingInterval/2 < longHousekeeping { if *globalHousekeepingInterval/2 < longHousekeeping {
longHousekeeping = *globalHousekeepingInterval / 2 longHousekeeping = *globalHousekeepingInterval / 2
} }
// Watch for new container.
go self.watchForNewContainers()
// Look for new containers in the main housekeeping thread.
ticker := time.Tick(*globalHousekeepingInterval) ticker := time.Tick(*globalHousekeepingInterval)
for t := range ticker { for {
start := time.Now() select {
case t := <-ticker:
start := time.Now()
// Check for new containers. // Check for new containers.
err = self.detectSubcontainers("/") err := self.detectSubcontainers("/")
if err != nil { if err != nil {
glog.Errorf("Failed to detect containers: %s", err) glog.Errorf("Failed to detect containers: %s", err)
} }
// Log if housekeeping took more than 100ms. // Log if housekeeping took too long.
duration := time.Since(start) duration := time.Since(start)
if duration >= longHousekeeping { if duration >= longHousekeeping {
glog.V(1).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration) glog.V(1).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration)
}
case <-quit:
// Quit if asked to do so.
quit <- nil
glog.Infof("Exiting global housekeeping thread")
return
} }
} }
return nil
} }
// Get a container by name. // Get a container by name.
@ -359,7 +401,7 @@ func (self *manager) processEvent(event container.SubcontainerEvent) error {
} }
// Watches for new containers started in the system. Runs forever unless there is a setup error. // Watches for new containers started in the system. Runs forever unless there is a setup error.
func (self *manager) watchForNewContainers() error { func (self *manager) watchForNewContainers(quit chan error) error {
var root *containerData var root *containerData
var ok bool var ok bool
func() { func() {
@ -385,17 +427,29 @@ func (self *manager) watchForNewContainers() error {
} }
// Listen to events from the container handler. // Listen to events from the container handler.
for event := range events { go func() {
switch { for {
case event.EventType == container.SUBCONTAINER_ADD: select {
err = self.createContainer(event.Name) case event := <-events:
case event.EventType == container.SUBCONTAINER_DELETE: switch {
err = self.destroyContainer(event.Name) case event.EventType == container.SUBCONTAINER_ADD:
err = self.createContainer(event.Name)
case event.EventType == container.SUBCONTAINER_DELETE:
err = self.destroyContainer(event.Name)
}
if err != nil {
glog.Warning("Failed to process watch event: %v", err)
}
case <-quit:
// Stop processing events if asked to quit.
err := root.handler.StopWatchingSubcontainers()
quit <- err
if err == nil {
glog.Infof("Exiting thread watching subcontainers")
return
}
}
} }
if err != nil { }()
glog.Warning("Failed to process watch event: %v", err)
}
}
return nil return nil
} }