diff --git a/container/container.go b/container/container.go index ef64e69d..03465f16 100644 --- a/container/container.go +++ b/container/container.go @@ -27,23 +27,6 @@ const ( ListRecursive ) -// SubcontainerEventType indicates an addition or deletion event. -type SubcontainerEventType int - -const ( - SubcontainerAdd SubcontainerEventType = iota - SubcontainerDelete -) - -// SubcontainerEvent represents a -type SubcontainerEvent struct { - // The type of event that occurred. - EventType SubcontainerEventType - - // The full container name of the container where the event occurred. - Name string -} - // Interface for container operation handlers. type ContainerHandler interface { // Returns the ContainerReference @@ -61,12 +44,6 @@ type ContainerHandler interface { // Returns the processes inside this container. ListProcesses(listType ListType) ([]int, error) - // Registers a channel to listen for events affecting subcontainers (recursively). - WatchSubcontainers(events chan SubcontainerEvent) error - - // Stops watching for subcontainer changes. - StopWatchingSubcontainers() error - // Returns absolute cgroup path for the requested resource. GetCgroupPath(resource string) (string, error) diff --git a/container/docker/factory.go b/container/docker/factory.go index 7708aacf..5394b0fb 100644 --- a/container/docker/factory.go +++ b/container/docker/factory.go @@ -26,6 +26,7 @@ import ( "github.com/google/cadvisor/container/libcontainer" "github.com/google/cadvisor/fs" info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/manager/watcher" docker "github.com/docker/engine-api/client" "github.com/golang/glog" @@ -198,6 +199,6 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo, ignoreMetrics c ignoreMetrics: ignoreMetrics, } - container.RegisterContainerHandlerFactory(f) + container.RegisterContainerHandlerFactory(f, []watcher.ContainerWatchSource{watcher.Raw}) return nil } diff --git a/container/docker/handler.go b/container/docker/handler.go index 3f95d1c9..ffa1d3e7 100644 --- a/container/docker/handler.go +++ b/container/docker/handler.go @@ -331,15 +331,6 @@ func (self *dockerContainerHandler) ListProcesses(listType container.ListType) ( return containerlibcontainer.GetProcesses(self.cgroupManager) } -func (self *dockerContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error { - return fmt.Errorf("watch is unimplemented in the Docker container driver") -} - -func (self *dockerContainerHandler) StopWatchingSubcontainers() error { - // No-op for Docker driver. - return nil -} - func (self *dockerContainerHandler) Exists() bool { return common.CgroupExists(self.cgroupPaths) } diff --git a/container/factory.go b/container/factory.go index 140e7289..5d0e0007 100644 --- a/container/factory.go +++ b/container/factory.go @@ -18,6 +18,8 @@ import ( "fmt" "sync" + "github.com/google/cadvisor/manager/watcher" + "github.com/golang/glog" ) @@ -67,17 +69,19 @@ func (ms MetricSet) Add(mk MetricKind) { // TODO(vmarmol): Consider not making this global. // Global list of factories. var ( - factories []ContainerHandlerFactory + factories = map[watcher.ContainerWatchSource][]ContainerHandlerFactory{} factoriesLock sync.RWMutex ) // Register a ContainerHandlerFactory. These should be registered from least general to most general // as they will be asked in order whether they can handle a particular container. -func RegisterContainerHandlerFactory(factory ContainerHandlerFactory) { +func RegisterContainerHandlerFactory(factory ContainerHandlerFactory, watchTypes []watcher.ContainerWatchSource) { factoriesLock.Lock() defer factoriesLock.Unlock() - factories = append(factories, factory) + for _, watchType := range watchTypes { + factories[watchType] = append(factories[watchType], factory) + } } // Returns whether there are any container handler factories registered. @@ -89,12 +93,12 @@ func HasFactories() bool { } // Create a new ContainerHandler for the specified container. -func NewContainerHandler(name string, inHostNamespace bool) (ContainerHandler, bool, error) { +func NewContainerHandler(name string, watchType watcher.ContainerWatchSource, inHostNamespace bool) (ContainerHandler, bool, error) { factoriesLock.RLock() defer factoriesLock.RUnlock() // Create the ContainerHandler with the first factory that supports it. - for _, factory := range factories { + for _, factory := range factories[watchType] { canHandle, canAccept, err := factory.CanHandleAndAccept(name) if err != nil { glog.V(4).Infof("Error trying to work out if we can handle %s: %v", name, err) @@ -120,7 +124,7 @@ func ClearContainerHandlerFactories() { factoriesLock.Lock() defer factoriesLock.Unlock() - factories = make([]ContainerHandlerFactory, 0, 4) + factories = map[watcher.ContainerWatchSource][]ContainerHandlerFactory{} } func DebugInfo() map[string][]string { @@ -129,9 +133,11 @@ func DebugInfo() map[string][]string { // Get debug information for all factories. out := make(map[string][]string) - for _, factory := range factories { - for k, v := range factory.DebugInfo() { - out[k] = v + for _, factoriesSlice := range factories { + for _, factory := range factoriesSlice { + for k, v := range factory.DebugInfo() { + out[k] = v + } } } return out diff --git a/container/factory_test.go b/container/factory_test.go index 8c4bb042..60ce7a36 100644 --- a/container/factory_test.go +++ b/container/factory_test.go @@ -17,6 +17,8 @@ package container import ( "testing" + "github.com/google/cadvisor/manager/watcher" + "github.com/stretchr/testify/mock" ) @@ -57,7 +59,7 @@ func TestNewContainerHandler_FirstMatches(t *testing.T) { CanHandleValue: true, CanAcceptValue: true, } - RegisterContainerHandlerFactory(allwaysYes) + RegisterContainerHandlerFactory(allwaysYes, []watcher.ContainerWatchSource{watcher.Raw}) // The yes factory should be asked to create the ContainerHandler. mockContainer, err := mockFactory.NewContainerHandler(testContainerName, true) @@ -66,7 +68,7 @@ func TestNewContainerHandler_FirstMatches(t *testing.T) { } allwaysYes.On("NewContainerHandler", testContainerName).Return(mockContainer, nil) - cont, _, err := NewContainerHandler(testContainerName, true) + cont, _, err := NewContainerHandler(testContainerName, watcher.Raw, true) if err != nil { t.Error(err) } @@ -84,13 +86,13 @@ func TestNewContainerHandler_SecondMatches(t *testing.T) { CanHandleValue: false, CanAcceptValue: true, } - RegisterContainerHandlerFactory(allwaysNo) + RegisterContainerHandlerFactory(allwaysNo, []watcher.ContainerWatchSource{watcher.Raw}) allwaysYes := &mockContainerHandlerFactory{ Name: "yes", CanHandleValue: true, CanAcceptValue: true, } - RegisterContainerHandlerFactory(allwaysYes) + RegisterContainerHandlerFactory(allwaysYes, []watcher.ContainerWatchSource{watcher.Raw}) // The yes factory should be asked to create the ContainerHandler. mockContainer, err := mockFactory.NewContainerHandler(testContainerName, true) @@ -99,7 +101,7 @@ func TestNewContainerHandler_SecondMatches(t *testing.T) { } allwaysYes.On("NewContainerHandler", testContainerName).Return(mockContainer, nil) - cont, _, err := NewContainerHandler(testContainerName, true) + cont, _, err := NewContainerHandler(testContainerName, watcher.Raw, true) if err != nil { t.Error(err) } @@ -117,15 +119,15 @@ func TestNewContainerHandler_NoneMatch(t *testing.T) { CanHandleValue: false, CanAcceptValue: true, } - RegisterContainerHandlerFactory(allwaysNo1) + RegisterContainerHandlerFactory(allwaysNo1, []watcher.ContainerWatchSource{watcher.Raw}) allwaysNo2 := &mockContainerHandlerFactory{ Name: "no", CanHandleValue: false, CanAcceptValue: true, } - RegisterContainerHandlerFactory(allwaysNo2) + RegisterContainerHandlerFactory(allwaysNo2, []watcher.ContainerWatchSource{watcher.Raw}) - _, _, err := NewContainerHandler(testContainerName, true) + _, _, err := NewContainerHandler(testContainerName, watcher.Raw, true) if err == nil { t.Error("Expected NewContainerHandler to fail") } @@ -140,15 +142,15 @@ func TestNewContainerHandler_Accept(t *testing.T) { CanHandleValue: false, CanAcceptValue: true, } - RegisterContainerHandlerFactory(cannotHandle) + RegisterContainerHandlerFactory(cannotHandle, []watcher.ContainerWatchSource{watcher.Raw}) cannotAccept := &mockContainerHandlerFactory{ Name: "no", CanHandleValue: true, CanAcceptValue: false, } - RegisterContainerHandlerFactory(cannotAccept) + RegisterContainerHandlerFactory(cannotAccept, []watcher.ContainerWatchSource{watcher.Raw}) - _, accept, err := NewContainerHandler(testContainerName, true) + _, accept, err := NewContainerHandler(testContainerName, watcher.Raw, true) if err != nil { t.Error("Expected NewContainerHandler to succeed") } diff --git a/container/mock.go b/container/mock.go index cc79762e..e3668fb0 100644 --- a/container/mock.go +++ b/container/mock.go @@ -77,16 +77,6 @@ func (self *MockContainerHandler) ListProcesses(listType ListType) ([]int, error return args.Get(0).([]int), args.Error(1) } -func (self *MockContainerHandler) WatchSubcontainers(events chan SubcontainerEvent) error { - args := self.Called(events) - return args.Error(0) -} - -func (self *MockContainerHandler) StopWatchingSubcontainers() error { - args := self.Called() - return args.Error(0) -} - func (self *MockContainerHandler) Exists() bool { args := self.Called() return args.Get(0).(bool) @@ -102,6 +92,11 @@ func (self *MockContainerHandler) GetContainerLabels() map[string]string { return args.Get(0).(map[string]string) } +func (self *MockContainerHandler) String() string { + args := self.Called() + return args.Get(0).(string) +} + type FactoryForMockContainerHandler struct { Name string PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler) diff --git a/container/raw/factory.go b/container/raw/factory.go index e413207f..36d236c8 100644 --- a/container/raw/factory.go +++ b/container/raw/factory.go @@ -23,6 +23,7 @@ import ( "github.com/google/cadvisor/container/libcontainer" "github.com/google/cadvisor/fs" info "github.com/google/cadvisor/info/v1" + watch "github.com/google/cadvisor/manager/watcher" "github.com/golang/glog" ) @@ -90,6 +91,6 @@ func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, igno watcher: watcher, ignoreMetrics: ignoreMetrics, } - container.RegisterContainerHandlerFactory(factory) + container.RegisterContainerHandlerFactory(factory, []watch.ContainerWatchSource{watch.Raw}) return nil } diff --git a/container/raw/handler.go b/container/raw/handler.go index 55645316..79c6f780 100644 --- a/container/raw/handler.go +++ b/container/raw/handler.go @@ -17,9 +17,6 @@ package raw import ( "fmt" - "io/ioutil" - "path" - "strings" "github.com/google/cadvisor/container" "github.com/google/cadvisor/container/common" @@ -32,7 +29,6 @@ import ( "github.com/opencontainers/runc/libcontainer/cgroups" cgroupfs "github.com/opencontainers/runc/libcontainer/cgroups/fs" "github.com/opencontainers/runc/libcontainer/configs" - "golang.org/x/exp/inotify" ) type rawContainerHandler struct { @@ -41,12 +37,6 @@ type rawContainerHandler struct { cgroupSubsystems *libcontainer.CgroupSubsystems machineInfoFactory info.MachineInfoFactory - // Inotify event watcher. - watcher *common.InotifyWatcher - - // Signal for watcher thread to stop. - stopWatcher chan error - // Absolute path to the cgroup hierarchies of this container. // (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test") cgroupPaths map[string]string @@ -102,12 +92,10 @@ func newRawContainerHandler(name string, cgroupSubsystems *libcontainer.CgroupSu name: name, cgroupSubsystems: cgroupSubsystems, machineInfoFactory: machineInfoFactory, - stopWatcher: make(chan error), cgroupPaths: cgroupPaths, cgroupManager: cgroupManager, fsInfo: fsInfo, externalMounts: externalMounts, - watcher: watcher, rootFs: rootFs, ignoreMetrics: ignoreMetrics, pid: pid, @@ -276,151 +264,6 @@ func (self *rawContainerHandler) ListProcesses(listType container.ListType) ([]i return libcontainer.GetProcesses(self.cgroupManager) } -// Watches the specified directory and all subdirectories. Returns whether the path was -// already being watched and an error (if any). -func (self *rawContainerHandler) watchDirectory(dir string, containerName string) (bool, error) { - alreadyWatching, err := self.watcher.AddWatch(containerName, dir) - if err != nil { - return alreadyWatching, err - } - - // Remove the watch if further operations failed. - cleanup := true - defer func() { - if cleanup { - _, err := self.watcher.RemoveWatch(containerName, dir) - if err != nil { - glog.Warningf("Failed to remove inotify watch for %q: %v", dir, err) - } - } - }() - - // TODO(vmarmol): We should re-do this once we're done to ensure directories were not added in the meantime. - // Watch subdirectories as well. - entries, err := ioutil.ReadDir(dir) - if err != nil { - return alreadyWatching, err - } - for _, entry := range entries { - if entry.IsDir() { - // TODO(vmarmol): We don't have to fail here, maybe we can recover and try to get as many registrations as we can. - _, err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name())) - if err != nil { - return alreadyWatching, err - } - } - } - - cleanup = false - return alreadyWatching, nil -} - -func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan container.SubcontainerEvent) error { - // Convert the inotify event type to a container create or delete. - var eventType container.SubcontainerEventType - switch { - case (event.Mask & inotify.IN_CREATE) > 0: - eventType = container.SubcontainerAdd - case (event.Mask & inotify.IN_DELETE) > 0: - eventType = container.SubcontainerDelete - case (event.Mask & inotify.IN_MOVED_FROM) > 0: - eventType = container.SubcontainerDelete - case (event.Mask & inotify.IN_MOVED_TO) > 0: - eventType = container.SubcontainerAdd - default: - // Ignore other events. - return nil - } - - // Derive the container name from the path name. - var containerName string - for _, mount := range self.cgroupSubsystems.Mounts { - mountLocation := path.Clean(mount.Mountpoint) + "/" - if strings.HasPrefix(event.Name, mountLocation) { - containerName = event.Name[len(mountLocation)-1:] - break - } - } - if containerName == "" { - return fmt.Errorf("unable to detect container from watch event on directory %q", event.Name) - } - - // Maintain the watch for the new or deleted container. - switch { - case eventType == container.SubcontainerAdd: - // New container was created, watch it. - alreadyWatched, err := self.watchDirectory(event.Name, containerName) - if err != nil { - return err - } - - // Only report container creation once. - if alreadyWatched { - return nil - } - case eventType == container.SubcontainerDelete: - // Container was deleted, stop watching for it. - lastWatched, err := self.watcher.RemoveWatch(containerName, event.Name) - if err != nil { - return err - } - - // Only report container deletion once. - if !lastWatched { - return nil - } - default: - return fmt.Errorf("unknown event type %v", eventType) - } - - // Deliver the event. - events <- container.SubcontainerEvent{ - EventType: eventType, - Name: containerName, - } - - return nil -} - -func (self *rawContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error { - // Watch this container (all its cgroups) and all subdirectories. - for _, cgroupPath := range self.cgroupPaths { - _, err := self.watchDirectory(cgroupPath, self.name) - if err != nil { - return err - } - } - - // Process the events received from the kernel. - go func() { - for { - select { - case event := <-self.watcher.Event(): - err := self.processEvent(event, events) - if err != nil { - glog.Warningf("Error while processing event (%+v): %v", event, err) - } - case err := <-self.watcher.Error(): - glog.Warningf("Error while watching %q:", self.name, err) - case <-self.stopWatcher: - err := self.watcher.Close() - if err == nil { - self.stopWatcher <- err - return - } - } - } - }() - - return nil -} - -func (self *rawContainerHandler) StopWatchingSubcontainers() error { - // Rendezvous with the watcher thread. - self.stopWatcher <- nil - return <-self.stopWatcher -} - func (self *rawContainerHandler) Exists() bool { return common.CgroupExists(self.cgroupPaths) } diff --git a/container/rkt/factory.go b/container/rkt/factory.go index b4337b68..324881a4 100644 --- a/container/rkt/factory.go +++ b/container/rkt/factory.go @@ -22,6 +22,7 @@ import ( "github.com/google/cadvisor/container/libcontainer" "github.com/google/cadvisor/fs" info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/manager/watcher" "github.com/golang/glog" ) @@ -99,6 +100,6 @@ func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, igno ignoreMetrics: ignoreMetrics, rktPath: rktPath, } - container.RegisterContainerHandlerFactory(factory) + container.RegisterContainerHandlerFactory(factory, []watcher.ContainerWatchSource{watcher.Raw}) return nil } diff --git a/container/rkt/handler.go b/container/rkt/handler.go index 579763fb..e48f185f 100644 --- a/container/rkt/handler.go +++ b/container/rkt/handler.go @@ -266,15 +266,6 @@ func (handler *rktContainerHandler) ListProcesses(listType container.ListType) ( return libcontainer.GetProcesses(handler.cgroupManager) } -func (handler *rktContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error { - return fmt.Errorf("watch is unimplemented in the Rkt container driver") -} - -func (handler *rktContainerHandler) StopWatchingSubcontainers() error { - // No-op for Rkt driver. - return nil -} - func (handler *rktContainerHandler) Exists() bool { return common.CgroupExists(handler.cgroupPaths) } diff --git a/container/systemd/factory.go b/container/systemd/factory.go index 635c9c78..4e71d40b 100644 --- a/container/systemd/factory.go +++ b/container/systemd/factory.go @@ -21,6 +21,7 @@ import ( "github.com/google/cadvisor/container" "github.com/google/cadvisor/fs" info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/manager/watcher" "github.com/golang/glog" ) @@ -52,6 +53,6 @@ func (f *systemdFactory) DebugInfo() map[string][]string { func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, ignoreMetrics container.MetricSet) error { glog.Infof("Registering systemd factory") factory := &systemdFactory{} - container.RegisterContainerHandlerFactory(factory) + container.RegisterContainerHandlerFactory(factory, []watcher.ContainerWatchSource{watcher.Raw}) return nil } diff --git a/manager/manager.go b/manager/manager.go index 3b3e4b00..4e5a1a63 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -37,6 +37,8 @@ import ( info "github.com/google/cadvisor/info/v1" "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/machine" + "github.com/google/cadvisor/manager/watcher" + rawwatcher "github.com/google/cadvisor/manager/watcher/raw" "github.com/google/cadvisor/utils/cpuload" "github.com/google/cadvisor/utils/oomparser" "github.com/google/cadvisor/utils/sysfs" @@ -163,6 +165,9 @@ func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingIn inHostNamespace = true } + // Register for new subcontainers. + eventsChannel := make(chan watcher.ContainerEvent, 16) + newManager := &manager{ containers: make(map[namespacedContainerName]*containerData), quitChannels: make([]chan error, 0, 2), @@ -174,6 +179,8 @@ func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingIn maxHousekeepingInterval: maxHousekeepingInterval, allowDynamicHousekeeping: allowDynamicHousekeeping, ignoreMetrics: ignoreMetricsSet, + containerWatchers: []watcher.ContainerWatcher{}, + eventsChannel: eventsChannel, } machineInfo, err := machine.Info(sysfs, fsInfo, inHostNamespace) @@ -217,6 +224,8 @@ type manager struct { maxHousekeepingInterval time.Duration allowDynamicHousekeeping bool ignoreMetrics container.MetricSet + containerWatchers []watcher.ContainerWatcher + eventsChannel chan watcher.ContainerEvent } // Start the container manager. @@ -241,6 +250,12 @@ func (self *manager) Start() error { glog.Errorf("Registration of the raw container factory failed: %v", err) } + rawWatcher, err := rawwatcher.NewRawContainerWatcher() + if err != nil { + return err + } + self.containerWatchers = append(self.containerWatchers, rawWatcher) + if *enableLoadReader { // Create cpu load reader. cpuLoadReader, err := cpuload.New() @@ -269,7 +284,7 @@ func (self *manager) Start() error { } // Create root and then recover all containers. - err = self.createContainer("/") + err = self.createContainer("/", watcher.Raw) if err != nil { return err } @@ -769,10 +784,14 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c } // Create a container. -func (m *manager) createContainer(containerName string) error { +func (m *manager) createContainer(containerName string, watchSource watcher.ContainerWatchSource) error { m.containersLock.Lock() defer m.containersLock.Unlock() + return m.createContainerLocked(containerName, watchSource) +} + +func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error { namespacedName := namespacedContainerName{ Name: containerName, } @@ -782,7 +801,7 @@ func (m *manager) createContainer(containerName string) error { return nil } - handler, accept, err := container.NewContainerHandler(containerName, m.inHostNamespace) + handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.inHostNamespace) if err != nil { return err } @@ -849,6 +868,10 @@ func (m *manager) destroyContainer(containerName string) error { m.containersLock.Lock() defer m.containersLock.Unlock() + return m.destroyContainerLocked(containerName) +} + +func (m *manager) destroyContainerLocked(containerName string) error { namespacedName := namespacedContainerName{ Name: containerName, } @@ -946,7 +969,7 @@ func (m *manager) detectSubcontainers(containerName string) error { // Add the new containers. for _, cont := range added { - err = m.createContainer(cont.Name) + err = m.createContainer(cont.Name, watcher.Raw) if err != nil { glog.Errorf("Failed to create existing container: %s: %s", cont.Name, err) } @@ -965,28 +988,15 @@ func (m *manager) detectSubcontainers(containerName string) error { // Watches for new containers started in the system. Runs forever unless there is a setup error. func (self *manager) watchForNewContainers(quit chan error) error { - var root *containerData - var ok bool - func() { - self.containersLock.RLock() - defer self.containersLock.RUnlock() - root, ok = self.containers[namespacedContainerName{ - Name: "/", - }] - }() - if !ok { - return fmt.Errorf("root container does not exist when watching for new containers") - } - - // Register for new subcontainers. - eventsChannel := make(chan container.SubcontainerEvent, 16) - err := root.handler.WatchSubcontainers(eventsChannel) - if err != nil { - return err + for _, watcher := range self.containerWatchers { + err := watcher.Start(self.eventsChannel) + if err != nil { + return err + } } // There is a race between starting the watch and new container creation so we do a detection before we read new containers. - err = self.detectSubcontainers("/") + err := self.detectSubcontainers("/") if err != nil { return err } @@ -995,21 +1005,32 @@ func (self *manager) watchForNewContainers(quit chan error) error { go func() { for { select { - case event := <-eventsChannel: + case event := <-self.eventsChannel: switch { - case event.EventType == container.SubcontainerAdd: - err = self.createContainer(event.Name) - case event.EventType == container.SubcontainerDelete: + case event.EventType == watcher.ContainerAdd: + err = self.createContainer(event.Name, event.WatchSource) + case event.EventType == watcher.ContainerDelete: err = self.destroyContainer(event.Name) } if err != nil { glog.Warningf("Failed to process watch event %+v: %v", event, err) } case <-quit: + errors := []string{} + // Stop processing events if asked to quit. - err := root.handler.StopWatchingSubcontainers() - quit <- err - if err == nil { + for _, watcher := range self.containerWatchers { + err := watcher.Stop() + if err != nil { + errors = append(errors, err.Error()) + } + } + + if len(errors) > 0 { + err_str := strings.Join(errors, ", ") + quit <- fmt.Errorf("Error quiting watchers: %v", err_str) + } else { + quit <- nil glog.Infof("Exiting thread watching subcontainers") return } diff --git a/manager/watcher/raw/raw.go b/manager/watcher/raw/raw.go new file mode 100644 index 00000000..b6f41539 --- /dev/null +++ b/manager/watcher/raw/raw.go @@ -0,0 +1,214 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package container defines types for sub-container events and also +// defines an interface for container operation handlers. +package raw + +import ( + "fmt" + "io/ioutil" + "path" + "strings" + + "github.com/google/cadvisor/container/common" + "github.com/google/cadvisor/container/libcontainer" + "github.com/google/cadvisor/manager/watcher" + + "github.com/golang/glog" + "golang.org/x/exp/inotify" +) + +type rawContainerWatcher struct { + // Absolute path to the root of the cgroup hierarchies + cgroupPaths map[string]string + + cgroupSubsystems *libcontainer.CgroupSubsystems + + // Inotify event watcher. + watcher *common.InotifyWatcher + + // Signal for watcher thread to stop. + stopWatcher chan error +} + +func NewRawContainerWatcher() (watcher.ContainerWatcher, error) { + cgroupSubsystems, err := libcontainer.GetCgroupSubsystems() + if err != nil { + return nil, fmt.Errorf("failed to get cgroup subsystems: %v", err) + } + if len(cgroupSubsystems.Mounts) == 0 { + return nil, fmt.Errorf("failed to find supported cgroup mounts for the raw factory") + } + + watcher, err := common.NewInotifyWatcher() + if err != nil { + return nil, err + } + + rawWatcher := &rawContainerWatcher{ + cgroupPaths: common.MakeCgroupPaths(cgroupSubsystems.MountPoints, "/"), + cgroupSubsystems: &cgroupSubsystems, + watcher: watcher, + stopWatcher: make(chan error), + } + + return rawWatcher, nil +} + +func (self *rawContainerWatcher) Start(events chan watcher.ContainerEvent) error { + // Watch this container (all its cgroups) and all subdirectories. + for _, cgroupPath := range self.cgroupPaths { + _, err := self.watchDirectory(cgroupPath, "/") + if err != nil { + return err + } + } + + // Process the events received from the kernel. + go func() { + for { + select { + case event := <-self.watcher.Event(): + err := self.processEvent(event, events) + if err != nil { + glog.Warningf("Error while processing event (%+v): %v", event, err) + } + case err := <-self.watcher.Error(): + glog.Warningf("Error while watching %q:", "/", err) + case <-self.stopWatcher: + err := self.watcher.Close() + if err == nil { + self.stopWatcher <- err + return + } + } + } + }() + + return nil +} + +func (self *rawContainerWatcher) Stop() error { + // Rendezvous with the watcher thread. + self.stopWatcher <- nil + return <-self.stopWatcher +} + +// Watches the specified directory and all subdirectories. Returns whether the path was +// already being watched and an error (if any). +func (self *rawContainerWatcher) watchDirectory(dir string, containerName string) (bool, error) { + alreadyWatching, err := self.watcher.AddWatch(containerName, dir) + if err != nil { + return alreadyWatching, err + } + + // Remove the watch if further operations failed. + cleanup := true + defer func() { + if cleanup { + _, err := self.watcher.RemoveWatch(containerName, dir) + if err != nil { + glog.Warningf("Failed to remove inotify watch for %q: %v", dir, err) + } + } + }() + + // TODO(vmarmol): We should re-do this once we're done to ensure directories were not added in the meantime. + // Watch subdirectories as well. + entries, err := ioutil.ReadDir(dir) + if err != nil { + return alreadyWatching, err + } + for _, entry := range entries { + if entry.IsDir() { + // TODO(vmarmol): We don't have to fail here, maybe we can recover and try to get as many registrations as we can. + _, err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name())) + if err != nil { + return alreadyWatching, err + } + } + } + + cleanup = false + return alreadyWatching, nil +} + +func (self *rawContainerWatcher) processEvent(event *inotify.Event, events chan watcher.ContainerEvent) error { + // Convert the inotify event type to a container create or delete. + var eventType watcher.ContainerEventType + switch { + case (event.Mask & inotify.IN_CREATE) > 0: + eventType = watcher.ContainerAdd + case (event.Mask & inotify.IN_DELETE) > 0: + eventType = watcher.ContainerDelete + case (event.Mask & inotify.IN_MOVED_FROM) > 0: + eventType = watcher.ContainerDelete + case (event.Mask & inotify.IN_MOVED_TO) > 0: + eventType = watcher.ContainerAdd + default: + // Ignore other events. + return nil + } + + // Derive the container name from the path name. + var containerName string + for _, mount := range self.cgroupSubsystems.Mounts { + mountLocation := path.Clean(mount.Mountpoint) + "/" + if strings.HasPrefix(event.Name, mountLocation) { + containerName = event.Name[len(mountLocation)-1:] + break + } + } + if containerName == "" { + return fmt.Errorf("unable to detect container from watch event on directory %q", event.Name) + } + + // Maintain the watch for the new or deleted container. + switch eventType { + case watcher.ContainerAdd: + // New container was created, watch it. + alreadyWatched, err := self.watchDirectory(event.Name, containerName) + if err != nil { + return err + } + + // Only report container creation once. + if alreadyWatched { + return nil + } + case watcher.ContainerDelete: + // Container was deleted, stop watching for it. + lastWatched, err := self.watcher.RemoveWatch(containerName, event.Name) + if err != nil { + return err + } + + // Only report container deletion once. + if !lastWatched { + return nil + } + default: + return fmt.Errorf("unknown event type %v", eventType) + } + + // Deliver the event. + events <- watcher.ContainerEvent{ + EventType: eventType, + Name: containerName, + WatchSource: watcher.Raw, + } + + return nil +} diff --git a/manager/watcher/watcher.go b/manager/watcher/watcher.go new file mode 100644 index 00000000..e9d20a8e --- /dev/null +++ b/manager/watcher/watcher.go @@ -0,0 +1,51 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package container defines types for sub-container events and also +// defines an interface for container operation handlers. +package watcher + +// SubcontainerEventType indicates an addition or deletion event. +type ContainerEventType int + +const ( + ContainerAdd ContainerEventType = iota + ContainerDelete +) + +type ContainerWatchSource int + +const ( + Raw ContainerWatchSource = iota +) + +// ContainerEvent represents a +type ContainerEvent struct { + // The type of event that occurred. + EventType ContainerEventType + + // The full container name of the container where the event occurred. + Name string + + // The watcher that detected this change event + WatchSource ContainerWatchSource +} + +type ContainerWatcher interface { + // Registers a channel to listen for events affecting subcontainers (recursively). + Start(events chan ContainerEvent) error + + // Stops watching for subcontainer changes. + Stop() error +}