diff --git a/cadvisor.go b/cadvisor.go index 363c8cb3..68efb758 100644 --- a/cadvisor.go +++ b/cadvisor.go @@ -19,6 +19,8 @@ import ( "fmt" "net/http" _ "net/http/pprof" + "os" + "os/signal" "runtime" "github.com/golang/glog" @@ -38,6 +40,7 @@ var maxProcs = flag.Int("max_procs", 0, "max number of CPUs that can be used sim var argDbDriver = flag.String("storage_driver", "", "storage driver to use. Empty means none. Options are: (default), bigquery, and influxdb") func main() { + defer glog.Flush() flag.Parse() setMaxProcs() @@ -91,25 +94,18 @@ func main() { } }) - defer glog.Flush() + // Start the manager. + if err := containerManager.Start(); err != nil { + glog.Fatalf("Failed to start container manager: %v", err) + } - errChan := make(chan error) - go func() { - errChan <- containerManager.Start() - }() + // Install signal handler. + installSignalHandler(containerManager) - glog.Infof("Starting cAdvisor version: %q", info.VERSION) - glog.Infof("About to serve on port %d", *argPort) + glog.Infof("Starting cAdvisor version: %q on port %d", info.VERSION, *argPort) addr := fmt.Sprintf(":%v", *argPort) - - go func() { - errChan <- http.ListenAndServe(addr, nil) - }() - select { - case err := <-errChan: - glog.Fatal(err) - } + glog.Fatal(http.ListenAndServe(addr, nil)) } func setMaxProcs() { @@ -129,3 +125,18 @@ func setMaxProcs() { glog.Warningf("Specified max procs of %v but using %v", numProcs, actualNumProcs) } } + +func installSignalHandler(containerManager manager.Manager) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + + // Block until a signal is received. + go func() { + sig := <-c + if err := containerManager.Stop(); err != nil { + glog.Errorf("Failed to stop container manager: %v", err) + } + glog.Infof("Exiting given signal: %v", sig) + os.Exit(0) + }() +} diff --git a/container/container.go b/container/container.go index ba28854b..cd24af79 100644 --- a/container/container.go +++ b/container/container.go @@ -47,4 +47,5 @@ type ContainerHandler interface { ListThreads(listType ListType) ([]int, error) ListProcesses(listType ListType) ([]int, error) WatchSubcontainers(events chan SubcontainerEvent) error + StopWatchingSubcontainers() error } diff --git a/container/docker/handler.go b/container/docker/handler.go index d3dff07d..d1be0862 100644 --- a/container/docker/handler.go +++ b/container/docker/handler.go @@ -268,3 +268,8 @@ func (self *dockerContainerHandler) ListProcesses(listType container.ListType) ( func (self *dockerContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error { return fmt.Errorf("watch is unimplemented in the Docker container driver") } + +func (self *dockerContainerHandler) StopWatchingSubcontainers() error { + // No-op for Docker driver. + return nil +} diff --git a/container/mock.go b/container/mock.go index 3c683fdb..088f3f9c 100644 --- a/container/mock.go +++ b/container/mock.go @@ -80,6 +80,11 @@ func (self *MockContainerHandler) WatchSubcontainers(events chan SubcontainerEve return args.Error(0) } +func (self *MockContainerHandler) StopWatchingSubcontainers() error { + args := self.Called() + return args.Error(0) +} + type FactoryForMockContainerHandler struct { Name string PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler) diff --git a/container/raw/handler.go b/container/raw/handler.go index b777d9a4..e8f9d285 100644 --- a/container/raw/handler.go +++ b/container/raw/handler.go @@ -37,6 +37,7 @@ type rawContainerHandler struct { cgroupSubsystems *cgroupSubsystems machineInfoFactory info.MachineInfoFactory watcher *inotify.Watcher + stopWatcher chan error watches map[string]struct{} } @@ -49,6 +50,7 @@ func newRawContainerHandler(name string, cgroupSubsystems *cgroupSubsystems, mac }, cgroupSubsystems: cgroupSubsystems, machineInfoFactory: machineInfoFactory, + stopWatcher: make(chan error), watches: make(map[string]struct{}), }, nil } @@ -319,13 +321,30 @@ func (self *rawContainerHandler) WatchSubcontainers(events chan container.Subcon case event := <-self.watcher.Event: err := self.processEvent(event, events) if err != nil { - glog.Warning("Error while processing event (%+v): %v", event, err) + glog.Warningf("Error while processing event (%+v): %v", event, err) } case err := <-self.watcher.Error: - glog.Warning("Error while watching %q:", self.name, err) + glog.Warningf("Error while watching %q:", self.name, err) + case <-self.stopWatcher: + err := self.watcher.Close() + if err == nil { + self.stopWatcher <- err + self.watcher = nil + return + } } } }() return nil } + +func (self *rawContainerHandler) StopWatchingSubcontainers() error { + if self.watcher == nil { + return fmt.Errorf("can't stop watch that has not started for container %q", self.name) + } + + // Rendezvous with the watcher thread. + self.stopWatcher <- nil + return <-self.stopWatcher +} diff --git a/manager/manager.go b/manager/manager.go index b7f44975..4296605e 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -31,9 +31,12 @@ import ( var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings") type Manager interface { - // Start the manager, blocks forever. + // Start the manager. Start() error + // Stops the manager. + Stop() error + // Get information about a container. GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) @@ -51,8 +54,11 @@ func New(driver storage.StorageDriver) (Manager, error) { if driver == nil { return nil, fmt.Errorf("nil storage driver!") } - newManager := &manager{} - newManager.containers = make(map[string]*containerData) + newManager := &manager{ + containers: make(map[string]*containerData), + quitChannels: make([]chan error, 0, 2), + storageDriver: driver, + } machineInfo, err := getMachineInfo() if err != nil { @@ -78,6 +84,7 @@ type manager struct { storageDriver storage.StorageDriver machineInfo info.MachineInfo versionInfo info.VersionInfo + quitChannels []chan error } // Start the container manager. @@ -94,34 +101,69 @@ func (self *manager) Start() error { } glog.Infof("Recovery completed") + // Watch for new container. + quitWatcher := make(chan error) + err = self.watchForNewContainers(quitWatcher) + if err != nil { + return err + } + self.quitChannels = append(self.quitChannels, quitWatcher) + + // Look for new containers in the main housekeeping thread. + quitGlobalHousekeeping := make(chan error) + self.quitChannels = append(self.quitChannels, quitGlobalHousekeeping) + go self.globalHousekeeping(quitGlobalHousekeeping) + + return nil +} + +func (self *manager) Stop() error { + // Stop and wait on all quit channels. + for i, c := range self.quitChannels { + // Send the exit signal and wait on the thread to exit (by closing the channel). + c <- nil + err := <-c + if err != nil { + // Remove the channels that quit successfully. + self.quitChannels = self.quitChannels[i:] + return err + } + } + self.quitChannels = make([]chan error, 0, 2) + return nil +} + +func (self *manager) globalHousekeeping(quit chan error) { // Long housekeeping is either 100ms or half of the housekeeping interval. longHousekeeping := 100 * time.Millisecond if *globalHousekeepingInterval/2 < longHousekeeping { longHousekeeping = *globalHousekeepingInterval / 2 } - // Watch for new container. - go self.watchForNewContainers() - - // Look for new containers in the main housekeeping thread. ticker := time.Tick(*globalHousekeepingInterval) - for t := range ticker { - start := time.Now() + for { + select { + case t := <-ticker: + start := time.Now() - // Check for new containers. - err = self.detectSubcontainers("/") - if err != nil { - glog.Errorf("Failed to detect containers: %s", err) - } + // Check for new containers. + err := self.detectSubcontainers("/") + if err != nil { + glog.Errorf("Failed to detect containers: %s", err) + } - // Log if housekeeping took more than 100ms. - duration := time.Since(start) - if duration >= longHousekeeping { - glog.V(1).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration) + // Log if housekeeping took too long. + duration := time.Since(start) + if duration >= longHousekeeping { + glog.V(1).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration) + } + case <-quit: + // Quit if asked to do so. + quit <- nil + glog.Infof("Exiting global housekeeping thread") + return } } - - return nil } // Get a container by name. @@ -359,7 +401,7 @@ func (self *manager) processEvent(event container.SubcontainerEvent) error { } // Watches for new containers started in the system. Runs forever unless there is a setup error. -func (self *manager) watchForNewContainers() error { +func (self *manager) watchForNewContainers(quit chan error) error { var root *containerData var ok bool func() { @@ -385,17 +427,29 @@ func (self *manager) watchForNewContainers() error { } // Listen to events from the container handler. - for event := range events { - switch { - case event.EventType == container.SUBCONTAINER_ADD: - err = self.createContainer(event.Name) - case event.EventType == container.SUBCONTAINER_DELETE: - err = self.destroyContainer(event.Name) + go func() { + for { + select { + case event := <-events: + switch { + case event.EventType == container.SUBCONTAINER_ADD: + err = self.createContainer(event.Name) + case event.EventType == container.SUBCONTAINER_DELETE: + err = self.destroyContainer(event.Name) + } + if err != nil { + glog.Warning("Failed to process watch event: %v", err) + } + case <-quit: + // Stop processing events if asked to quit. + err := root.handler.StopWatchingSubcontainers() + quit <- err + if err == nil { + glog.Infof("Exiting thread watching subcontainers") + return + } + } } - if err != nil { - glog.Warning("Failed to process watch event: %v", err) - } - } - + }() return nil }