diff --git a/container/raw/handler.go b/container/raw/handler.go index d8ea8524..aced09e0 100644 --- a/container/raw/handler.go +++ b/container/raw/handler.go @@ -43,17 +43,11 @@ type rawContainerHandler struct { machineInfoFactory info.MachineInfoFactory // Inotify event watcher. - watcher *inotify.Watcher + watcher *InotifyWatcher // Signal for watcher thread to stop. stopWatcher chan error - // Containers being watched for new subcontainers. - watches map[string]struct{} - - // Cgroup paths being watched for new subcontainers - cgroupWatches map[string]struct{} - // Absolute path to the cgroup hierarchies of this container. // (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test") cgroupPaths map[string]string @@ -107,8 +101,6 @@ func newRawContainerHandler(name string, cgroupSubsystems *libcontainer.CgroupSu cgroupSubsystems: cgroupSubsystems, machineInfoFactory: machineInfoFactory, stopWatcher: make(chan error), - watches: make(map[string]struct{}), - cgroupWatches: make(map[string]struct{}), cgroupPaths: cgroupPaths, cgroupManager: cgroupManager, fsInfo: fsInfo, @@ -402,29 +394,43 @@ func (self *rawContainerHandler) ListProcesses(listType container.ListType) ([]i return libcontainer.GetProcesses(self.cgroupManager) } -func (self *rawContainerHandler) watchDirectory(dir string, containerName string) error { - err := self.watcher.AddWatch(dir, inotify.IN_CREATE|inotify.IN_DELETE|inotify.IN_MOVE) +// Watches the specified directory and all subdirectories. Returns whether the path was +// already being watched and an error (if any). +func (self *rawContainerHandler) watchDirectory(dir string, containerName string) (bool, error) { + alreadyWatching, err := self.watcher.AddWatch(containerName, dir) if err != nil { - return err + return alreadyWatching, err } - self.watches[containerName] = struct{}{} - self.cgroupWatches[dir] = struct{}{} + + // Remove the watch if further operations failed. + cleanup := true + defer func() { + if cleanup { + _, err := self.watcher.RemoveWatch(containerName, dir) + if err != nil { + glog.Warningf("Failed to remove inotify watch for %q: %v", dir, err) + } + } + }() // TODO(vmarmol): We should re-do this once we're done to ensure directories were not added in the meantime. // Watch subdirectories as well. entries, err := ioutil.ReadDir(dir) if err != nil { - return err + return alreadyWatching, err } for _, entry := range entries { if entry.IsDir() { - err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name())) + // TODO(vmarmol): We don't have to fail here, maybe we can recover and try to get as many registrations as we can. + _, err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name())) if err != nil { - return err + return alreadyWatching, err } } } - return nil + + cleanup = false + return alreadyWatching, nil } func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan container.SubcontainerEvent) error { @@ -460,33 +466,27 @@ func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan // Maintain the watch for the new or deleted container. switch { case eventType == container.SubcontainerAdd: - _, alreadyWatched := self.watches[containerName] - // New container was created, watch it. - err := self.watchDirectory(event.Name, containerName) + alreadyWatched, err := self.watchDirectory(event.Name, containerName) if err != nil { return err } // Only report container creation once. - if alreadyWatched { + if !alreadyWatched { return nil } case eventType == container.SubcontainerDelete: - // Container was deleted, stop watching for it. Only delete the event if we registered it. - if _, ok := self.cgroupWatches[event.Name]; ok { - err := self.watcher.RemoveWatch(event.Name) - if err != nil { - return err - } - delete(self.cgroupWatches, event.Name) + // Container was deleted, stop watching for it. + wasWatched, err := self.watcher.RemoveWatch(containerName, event.Name) + if err != nil { + return err } // Only report container deletion once. - if _, ok := self.watches[containerName]; !ok { + if wasWatched { return nil } - delete(self.watches, containerName) default: return fmt.Errorf("unknown event type %v", eventType) } @@ -503,7 +503,7 @@ func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan func (self *rawContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error { // Lazily initialize the watcher so we don't use it when not asked to. if self.watcher == nil { - w, err := inotify.NewWatcher() + w, err := NewInotifyWatcher() if err != nil { return err } @@ -512,7 +512,7 @@ func (self *rawContainerHandler) WatchSubcontainers(events chan container.Subcon // Watch this container (all its cgroups) and all subdirectories. for _, cgroupPath := range self.cgroupPaths { - err := self.watchDirectory(cgroupPath, self.name) + _, err := self.watchDirectory(cgroupPath, self.name) if err != nil { return err } @@ -522,12 +522,12 @@ func (self *rawContainerHandler) WatchSubcontainers(events chan container.Subcon go func() { for { select { - case event := <-self.watcher.Event: + case event := <-self.watcher.Event(): err := self.processEvent(event, events) if err != nil { glog.Warningf("Error while processing event (%+v): %v", event, err) } - case err := <-self.watcher.Error: + case err := <-self.watcher.Error(): glog.Warningf("Error while watching %q:", self.name, err) case <-self.stopWatcher: err := self.watcher.Close() diff --git a/container/raw/inotify_watcher.go b/container/raw/inotify_watcher.go new file mode 100644 index 00000000..76b3d879 --- /dev/null +++ b/container/raw/inotify_watcher.go @@ -0,0 +1,128 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raw + +import ( + "sync" + + "golang.org/x/exp/inotify" +) + +// Watcher for container-related inotify events in the cgroup hierarchy. +// +// Implementation is thread-safe. +type InotifyWatcher struct { + // Underlying inotify watcher. + watcher *inotify.Watcher + + // Containers being watched. + containersWatched map[string]bool + + // Full cgroup paths being watched. + cgroupsWatched map[string]bool + + // Lock for all datastructure access. + lock sync.Mutex +} + +func NewInotifyWatcher() (*InotifyWatcher, error) { + w, err := inotify.NewWatcher() + if err != nil { + return nil, err + } + + return &InotifyWatcher{ + watcher: w, + containersWatched: make(map[string]bool), + cgroupsWatched: make(map[string]bool), + }, nil +} + +// Add a watch to the specified directory. Returns if the container was already being watched. +func (iw *InotifyWatcher) AddWatch(containerName, dir string) (bool, error) { + iw.lock.Lock() + defer iw.lock.Unlock() + + alreadyWatched := iw.containersWatched[containerName] + + // Register an inotify notification. + if !iw.cgroupsWatched[dir] { + err := iw.watcher.AddWatch(dir, inotify.IN_CREATE|inotify.IN_DELETE|inotify.IN_MOVE) + if err != nil { + return alreadyWatched, err + } + iw.cgroupsWatched[dir] = true + } + + // Record our watching of the container. + if !alreadyWatched { + iw.containersWatched[containerName] = true + } + return alreadyWatched, nil +} + +// Remove watch from the specified directory. Returns if the container was already being watched. +func (iw *InotifyWatcher) RemoveWatch(containerName, dir string) (bool, error) { + iw.lock.Lock() + defer iw.lock.Unlock() + + alreadyWatched := iw.containersWatched[containerName] + + // Remove the inotify watch if it exists. + if iw.cgroupsWatched[dir] { + err := iw.watcher.RemoveWatch(dir) + if err != nil { + return alreadyWatched, nil + } + delete(iw.cgroupsWatched, dir) + } + + // Record the container as no longer being watched. + if alreadyWatched { + delete(iw.containersWatched, containerName) + } + + return alreadyWatched, nil +} + +// Errors are returned on this channel. +func (iw *InotifyWatcher) Error() chan error { + return iw.watcher.Error +} + +// Events are returned on this channel. +func (iw *InotifyWatcher) Event() chan *inotify.Event { + return iw.watcher.Event +} + +// Closes the inotify watcher. +func (iw *InotifyWatcher) Close() error { + return iw.watcher.Close() +} + +// Returns a list of: +// - Containers being watched. +// - Cgroup paths being watched. +func (iw *InotifyWatcher) GetWatches() ([]string, []string) { + return mapToSlice(iw.containersWatched), mapToSlice(iw.cgroupsWatched) +} + +func mapToSlice(m map[string]bool) []string { + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) + } + return out +}