From b63d61ca974d5a8e2c4ebf2d82bc2c90dda1d081 Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Wed, 17 Sep 2014 15:29:04 -0700 Subject: [PATCH] Use inotify to watch for new containers. This reduces cAdvisor CPU usage below 1% in my CoreOS system. We also reduce global housekeeping to every 60s as a fallback in case we miss an event. --- container/container.go | 15 ++++ container/docker/handler.go | 4 ++ container/filter.go | 4 ++ container/mock.go | 5 ++ container/raw/handler.go | 133 +++++++++++++++++++++++++++++++++++- manager/manager.go | 120 ++++++++++++++++++++++++-------- 6 files changed, 251 insertions(+), 30 deletions(-) diff --git a/container/container.go b/container/container.go index de478fd0..3f6bb66f 100644 --- a/container/container.go +++ b/container/container.go @@ -24,6 +24,20 @@ const ( type ListType int +// SubcontainerEvent types. +const ( + SUBCONTAINER_ADD = iota + SUBCONTAINER_DELETE +) + +type SubcontainerEvent struct { + // The type of event that occured. + EventType int + + // The container to which the event occured. + Name string +} + // Interface for container operation handlers. type ContainerHandler interface { ContainerReference() (info.ContainerReference, error) @@ -32,4 +46,5 @@ type ContainerHandler interface { ListContainers(listType ListType) ([]info.ContainerReference, error) ListThreads(listType ListType) ([]int, error) ListProcesses(listType ListType) ([]int, error) + WatchSubcontainers(events chan SubcontainerEvent) error } diff --git a/container/docker/handler.go b/container/docker/handler.go index b19b8a93..296cb7ad 100644 --- a/container/docker/handler.go +++ b/container/docker/handler.go @@ -262,3 +262,7 @@ func (self *dockerContainerHandler) ListProcesses(listType container.ListType) ( } return fs.GetPids(c) } + +func (self *dockerContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error { + return fmt.Errorf("watch is unimplemented in the Docker container driver") +} diff --git a/container/filter.go b/container/filter.go index 4d2a8ce1..8624cd49 100644 --- a/container/filter.go +++ b/container/filter.go @@ -62,6 +62,10 @@ func (self *containerListFilter) ListProcesses(listType ListType) ([]int, error) return self.handler.ListProcesses(listType) } +func (self *containerListFilter) WatchSubcontainers(events chan SubcontainerEvent) error { + return self.handler.WatchSubcontainers(events) +} + func NewWhiteListFilter(handler ContainerHandler, acceptedPaths ...string) ContainerHandler { filter := func(p string) bool { for _, path := range acceptedPaths { diff --git a/container/mock.go b/container/mock.go index 90846c54..28b7fcea 100644 --- a/container/mock.go +++ b/container/mock.go @@ -69,6 +69,11 @@ func (self *MockContainerHandler) ListProcesses(listType ListType) ([]int, error return args.Get(0).([]int), args.Error(1) } +func (self *MockContainerHandler) WatchSubcontainers(events chan SubcontainerEvent) error { + args := self.Called(events) + return args.Error(0) +} + type FactoryForMockContainerHandler struct { Name string PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler) diff --git a/container/raw/handler.go b/container/raw/handler.go index da2852ea..3848174c 100644 --- a/container/raw/handler.go +++ b/container/raw/handler.go @@ -21,6 +21,7 @@ import ( "strconv" "strings" + "code.google.com/p/go.exp/inotify" "github.com/docker/libcontainer/cgroups" "github.com/docker/libcontainer/cgroups/fs" "github.com/golang/glog" @@ -35,6 +36,8 @@ type rawContainerHandler struct { cgroup *cgroups.Cgroup cgroupSubsystems *cgroupSubsystems machineInfoFactory info.MachineInfoFactory + watcher *inotify.Watcher + watches map[string]struct{} } func newRawContainerHandler(name string, cgroupSubsystems *cgroupSubsystems, machineInfoFactory info.MachineInfoFactory) (container.ContainerHandler, error) { @@ -46,6 +49,7 @@ func newRawContainerHandler(name string, cgroupSubsystems *cgroupSubsystems, mac }, cgroupSubsystems: cgroupSubsystems, machineInfoFactory: machineInfoFactory, + watches: make(map[string]struct{}), }, nil } @@ -173,7 +177,7 @@ func listDirectories(dirpath string, parent string, recursive bool, output map[s } func (self *rawContainerHandler) ListContainers(listType container.ListType) ([]info.ContainerReference, error) { - containers := make(map[string]struct{}, 16) + containers := make(map[string]struct{}) for _, subsystem := range self.cgroupSubsystems.mounts { err := listDirectories(path.Join(subsystem.Mountpoint, self.name), self.name, listType == container.LIST_RECURSIVE, containers) if err != nil { @@ -200,3 +204,130 @@ func (self *rawContainerHandler) ListThreads(listType container.ListType) ([]int func (self *rawContainerHandler) ListProcesses(listType container.ListType) ([]int, error) { return fs.GetPids(self.cgroup) } + +func (self *rawContainerHandler) watchDirectory(dir string, containerName string) error { + err := self.watcher.AddWatch(dir, inotify.IN_CREATE|inotify.IN_DELETE|inotify.IN_MOVE) + if err != nil { + return err + } + self.watches[containerName] = struct{}{} + + // Watch subdirectories as well. + entries, err := ioutil.ReadDir(dir) + if err != nil { + return err + } + for _, entry := range entries { + if entry.IsDir() { + err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name())) + if err != nil { + return err + } + } + } + return nil +} + +func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan container.SubcontainerEvent) error { + // Convert the inotify event type to a container create or delete. + var eventType int + switch { + case (event.Mask & inotify.IN_CREATE) > 0: + eventType = container.SUBCONTAINER_ADD + case (event.Mask & inotify.IN_DELETE) > 0: + eventType = container.SUBCONTAINER_DELETE + case (event.Mask & inotify.IN_MOVED_FROM) > 0: + eventType = container.SUBCONTAINER_DELETE + case (event.Mask & inotify.IN_MOVED_TO) > 0: + eventType = container.SUBCONTAINER_ADD + default: + // Ignore other events. + return nil + } + + // Derive the container name from the path name. + var containerName string + for _, mount := range self.cgroupSubsystems.mounts { + mountLocation := path.Clean(mount.Mountpoint) + "/" + if strings.HasPrefix(event.Name, mountLocation) { + containerName = event.Name[len(mountLocation)-1:] + break + } + } + if containerName == "" { + return fmt.Errorf("unable to detect container from watch event on directory %q", event.Name) + } + + // Maintain the watch for the new or deleted container. + switch { + case eventType == container.SUBCONTAINER_ADD: + // If we've already seen this event, return. + if _, ok := self.watches[containerName]; ok { + return nil + } + + // New container was created, watch it. + err := self.watchDirectory(event.Name, containerName) + if err != nil { + return err + } + case eventType == container.SUBCONTAINER_DELETE: + // If we've already seen this event, return. + if _, ok := self.watches[containerName]; !ok { + return nil + } + delete(self.watches, containerName) + + // Container was deleted, stop watching for it. + err := self.watcher.RemoveWatch(event.Name) + if err != nil { + return err + } + default: + return fmt.Errorf("unknown event type %v", eventType) + } + + // Deliver the event. + events <- container.SubcontainerEvent{ + EventType: eventType, + Name: containerName, + } + + return nil +} + +func (self *rawContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error { + // Lazily initialize the watcher so we don't use it when not asked to. + if self.watcher == nil { + w, err := inotify.NewWatcher() + if err != nil { + return err + } + self.watcher = w + } + + // Watch this container (all its cgroups) and all subdirectories. + for _, mnt := range self.cgroupSubsystems.mounts { + err := self.watchDirectory(path.Join(mnt.Mountpoint, self.name), self.name) + if err != nil { + return err + } + } + + // Process the events received from the kernel. + go func() { + for { + select { + case event := <-self.watcher.Event: + err := self.processEvent(event, events) + if err != nil { + glog.Warning("Error while processing event (%+v): %v", event, err) + } + case err := <-self.watcher.Error: + glog.Warning("Error while watching %q:", self.name, err) + } + } + }() + + return nil +} diff --git a/manager/manager.go b/manager/manager.go index a057bc5e..3817a770 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -28,7 +28,7 @@ import ( "github.com/google/cadvisor/storage" ) -var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Second, "Interval between global housekeepings") +var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings") type Manager interface { // Start the manager, blocks forever. @@ -73,24 +73,22 @@ func New(driver storage.StorageDriver) (Manager, error) { } type manager struct { - containers map[string]*containerData - containersLock sync.RWMutex - storageDriver storage.StorageDriver - machineInfo info.MachineInfo - versionInfo info.VersionInfo - globalHousekeepingInterval time.Duration - containerHousekeepingInterval time.Duration + containers map[string]*containerData + containersLock sync.RWMutex + storageDriver storage.StorageDriver + machineInfo info.MachineInfo + versionInfo info.VersionInfo } // Start the container manager. -func (m *manager) Start() error { +func (self *manager) Start() error { // Create root and then recover all containers. - _, err := m.createContainer("/") + err := self.createContainer("/") if err != nil { return err } glog.Infof("Starting recovery of all containers") - err = m.detectContainers() + err = self.detectSubcontainers("/") if err != nil { return err } @@ -102,13 +100,16 @@ func (m *manager) Start() error { longHousekeeping = *globalHousekeepingInterval / 2 } + // Watch for new container. + go self.watchForNewContainers() + // Look for new containers in the main housekeeping thread. ticker := time.Tick(*globalHousekeepingInterval) for t := range ticker { start := time.Now() // Check for new containers. - err = m.detectContainers() + err = self.detectSubcontainers("/") if err != nil { glog.Errorf("Failed to detect containers: %s", err) } @@ -119,6 +120,7 @@ func (m *manager) Start() error { glog.V(1).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration) } } + return nil } @@ -218,29 +220,40 @@ func (m *manager) GetVersionInfo() (*info.VersionInfo, error) { return &ret, nil } -// Create a container. This expects to only be called from the global manager thread. -func (m *manager) createContainer(containerName string) (*containerData, error) { +// Create a container. +func (m *manager) createContainer(containerName string) error { cont, err := NewContainerData(containerName, m.storageDriver) if err != nil { - return nil, err + return err } // Add to the containers map. - func() { + alreadyExists := func() bool { m.containersLock.Lock() defer m.containersLock.Unlock() + // Check that the container didn't already exist + _, ok := m.containers[containerName] + if ok { + return true + } + // Add the container name and all its aliases. m.containers[containerName] = cont for _, alias := range cont.info.Aliases { m.containers[alias] = cont } + + return false }() + if alreadyExists { + return nil + } glog.Infof("Added container: %q (aliases: %s)", containerName, cont.info.Aliases) // Start the container's housekeeping. cont.Start() - return cont, nil + return nil } func (m *manager) destroyContainer(containerName string) error { @@ -249,7 +262,8 @@ func (m *manager) destroyContainer(containerName string) error { cont, ok := m.containers[containerName] if !ok { - return fmt.Errorf("Expected container \"%s\" to exist during destroy", containerName) + // Already destroyed, done. + return nil } // Tell the container to stop. @@ -267,22 +281,21 @@ func (m *manager) destroyContainer(containerName string) error { return nil } -// Detect all containers that have been added or deleted. -func (m *manager) getContainersDiff() (added []info.ContainerReference, removed []info.ContainerReference, err error) { - // TODO(vmarmol): We probably don't need to lock around / since it will always be there. +// Detect all containers that have been added or deleted from the specified container. +func (m *manager) getContainersDiff(containerName string) (added []info.ContainerReference, removed []info.ContainerReference, err error) { m.containersLock.RLock() defer m.containersLock.RUnlock() - // Get all containers on the system. - cont, ok := m.containers["/"] + // Get all subcontainers recursively. + cont, ok := m.containers[containerName] if !ok { - return nil, nil, fmt.Errorf("Failed to find container \"/\" while checking for new containers") + return nil, nil, fmt.Errorf("Failed to find container %q while checking for new containers", containerName) } allContainers, err := cont.handler.ListContainers(container.LIST_RECURSIVE) if err != nil { return nil, nil, err } - allContainers = append(allContainers, info.ContainerReference{Name: "/"}) + allContainers = append(allContainers, info.ContainerReference{Name: containerName}) // Determine which were added and which were removed. allContainersSet := make(map[string]*containerData) @@ -292,6 +305,8 @@ func (m *manager) getContainersDiff() (added []info.ContainerReference, removed allContainersSet[name] = d } } + + // Added containers for _, c := range allContainers { delete(allContainersSet, c.Name) _, ok := m.containers[c.Name] @@ -308,16 +323,16 @@ func (m *manager) getContainersDiff() (added []info.ContainerReference, removed return } -// Detect the existing containers and reflect the setup here. -func (m *manager) detectContainers() error { - added, removed, err := m.getContainersDiff() +// Detect the existing subcontainers and reflect the setup here. +func (m *manager) detectSubcontainers(containerName string) error { + added, removed, err := m.getContainersDiff(containerName) if err != nil { return err } // Add the new containers. for _, cont := range added { - _, err = m.createContainer(cont.Name) + err = m.createContainer(cont.Name) if err != nil { glog.Errorf("Failed to create existing container: %s: %s", cont.Name, err) } @@ -333,3 +348,50 @@ func (m *manager) detectContainers() error { return nil } + +func (self *manager) processEvent(event container.SubcontainerEvent) error { + var err error = nil + return err +} + +// Watches for new containers started in the system. Runs forever unless there is a setup error. +func (self *manager) watchForNewContainers() error { + var root *containerData + var ok bool + func() { + self.containersLock.RLock() + defer self.containersLock.RUnlock() + root, ok = self.containers["/"] + }() + if !ok { + return fmt.Errorf("root container does not exist when watching for new containers") + } + + // Register for new subcontainers. + events := make(chan container.SubcontainerEvent, 16) + err := root.handler.WatchSubcontainers(events) + if err != nil { + return err + } + + // There is a race between starting the watch and new container creation so we do a detection before we read new containers. + err = self.detectSubcontainers("/") + if err != nil { + return err + } + + // Listen to events from the container handler. + for event := range events { + switch { + case event.EventType == container.SUBCONTAINER_ADD: + err = self.createContainer(event.Name) + case event.EventType == container.SUBCONTAINER_DELETE: + err = self.destroyContainer(event.Name) + } + if err != nil { + glog.Warning("Failed to process watch event: %v", err) + } + } + + return nil +}