Use inotify to watch for new containers.

This reduces cAdvisor CPU usage below 1% in my CoreOS system.

We also reduce global housekeeping to every 60s as a fallback in case we
miss an event.
This commit is contained in:
Victor Marmol 2014-09-17 15:29:04 -07:00
parent 58e019028d
commit b63d61ca97
6 changed files with 251 additions and 30 deletions

View File

@ -24,6 +24,20 @@ const (
type ListType int
// SubcontainerEvent types.
const (
SUBCONTAINER_ADD = iota
SUBCONTAINER_DELETE
)
type SubcontainerEvent struct {
// The type of event that occured.
EventType int
// The container to which the event occured.
Name string
}
// Interface for container operation handlers.
type ContainerHandler interface {
ContainerReference() (info.ContainerReference, error)
@ -32,4 +46,5 @@ type ContainerHandler interface {
ListContainers(listType ListType) ([]info.ContainerReference, error)
ListThreads(listType ListType) ([]int, error)
ListProcesses(listType ListType) ([]int, error)
WatchSubcontainers(events chan SubcontainerEvent) error
}

View File

@ -262,3 +262,7 @@ func (self *dockerContainerHandler) ListProcesses(listType container.ListType) (
}
return fs.GetPids(c)
}
func (self *dockerContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error {
return fmt.Errorf("watch is unimplemented in the Docker container driver")
}

View File

@ -62,6 +62,10 @@ func (self *containerListFilter) ListProcesses(listType ListType) ([]int, error)
return self.handler.ListProcesses(listType)
}
func (self *containerListFilter) WatchSubcontainers(events chan SubcontainerEvent) error {
return self.handler.WatchSubcontainers(events)
}
func NewWhiteListFilter(handler ContainerHandler, acceptedPaths ...string) ContainerHandler {
filter := func(p string) bool {
for _, path := range acceptedPaths {

View File

@ -69,6 +69,11 @@ func (self *MockContainerHandler) ListProcesses(listType ListType) ([]int, error
return args.Get(0).([]int), args.Error(1)
}
func (self *MockContainerHandler) WatchSubcontainers(events chan SubcontainerEvent) error {
args := self.Called(events)
return args.Error(0)
}
type FactoryForMockContainerHandler struct {
Name string
PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler)

View File

@ -21,6 +21,7 @@ import (
"strconv"
"strings"
"code.google.com/p/go.exp/inotify"
"github.com/docker/libcontainer/cgroups"
"github.com/docker/libcontainer/cgroups/fs"
"github.com/golang/glog"
@ -35,6 +36,8 @@ type rawContainerHandler struct {
cgroup *cgroups.Cgroup
cgroupSubsystems *cgroupSubsystems
machineInfoFactory info.MachineInfoFactory
watcher *inotify.Watcher
watches map[string]struct{}
}
func newRawContainerHandler(name string, cgroupSubsystems *cgroupSubsystems, machineInfoFactory info.MachineInfoFactory) (container.ContainerHandler, error) {
@ -46,6 +49,7 @@ func newRawContainerHandler(name string, cgroupSubsystems *cgroupSubsystems, mac
},
cgroupSubsystems: cgroupSubsystems,
machineInfoFactory: machineInfoFactory,
watches: make(map[string]struct{}),
}, nil
}
@ -173,7 +177,7 @@ func listDirectories(dirpath string, parent string, recursive bool, output map[s
}
func (self *rawContainerHandler) ListContainers(listType container.ListType) ([]info.ContainerReference, error) {
containers := make(map[string]struct{}, 16)
containers := make(map[string]struct{})
for _, subsystem := range self.cgroupSubsystems.mounts {
err := listDirectories(path.Join(subsystem.Mountpoint, self.name), self.name, listType == container.LIST_RECURSIVE, containers)
if err != nil {
@ -200,3 +204,130 @@ func (self *rawContainerHandler) ListThreads(listType container.ListType) ([]int
func (self *rawContainerHandler) ListProcesses(listType container.ListType) ([]int, error) {
return fs.GetPids(self.cgroup)
}
func (self *rawContainerHandler) watchDirectory(dir string, containerName string) error {
err := self.watcher.AddWatch(dir, inotify.IN_CREATE|inotify.IN_DELETE|inotify.IN_MOVE)
if err != nil {
return err
}
self.watches[containerName] = struct{}{}
// Watch subdirectories as well.
entries, err := ioutil.ReadDir(dir)
if err != nil {
return err
}
for _, entry := range entries {
if entry.IsDir() {
err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name()))
if err != nil {
return err
}
}
}
return nil
}
func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan container.SubcontainerEvent) error {
// Convert the inotify event type to a container create or delete.
var eventType int
switch {
case (event.Mask & inotify.IN_CREATE) > 0:
eventType = container.SUBCONTAINER_ADD
case (event.Mask & inotify.IN_DELETE) > 0:
eventType = container.SUBCONTAINER_DELETE
case (event.Mask & inotify.IN_MOVED_FROM) > 0:
eventType = container.SUBCONTAINER_DELETE
case (event.Mask & inotify.IN_MOVED_TO) > 0:
eventType = container.SUBCONTAINER_ADD
default:
// Ignore other events.
return nil
}
// Derive the container name from the path name.
var containerName string
for _, mount := range self.cgroupSubsystems.mounts {
mountLocation := path.Clean(mount.Mountpoint) + "/"
if strings.HasPrefix(event.Name, mountLocation) {
containerName = event.Name[len(mountLocation)-1:]
break
}
}
if containerName == "" {
return fmt.Errorf("unable to detect container from watch event on directory %q", event.Name)
}
// Maintain the watch for the new or deleted container.
switch {
case eventType == container.SUBCONTAINER_ADD:
// If we've already seen this event, return.
if _, ok := self.watches[containerName]; ok {
return nil
}
// New container was created, watch it.
err := self.watchDirectory(event.Name, containerName)
if err != nil {
return err
}
case eventType == container.SUBCONTAINER_DELETE:
// If we've already seen this event, return.
if _, ok := self.watches[containerName]; !ok {
return nil
}
delete(self.watches, containerName)
// Container was deleted, stop watching for it.
err := self.watcher.RemoveWatch(event.Name)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown event type %v", eventType)
}
// Deliver the event.
events <- container.SubcontainerEvent{
EventType: eventType,
Name: containerName,
}
return nil
}
func (self *rawContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error {
// Lazily initialize the watcher so we don't use it when not asked to.
if self.watcher == nil {
w, err := inotify.NewWatcher()
if err != nil {
return err
}
self.watcher = w
}
// Watch this container (all its cgroups) and all subdirectories.
for _, mnt := range self.cgroupSubsystems.mounts {
err := self.watchDirectory(path.Join(mnt.Mountpoint, self.name), self.name)
if err != nil {
return err
}
}
// Process the events received from the kernel.
go func() {
for {
select {
case event := <-self.watcher.Event:
err := self.processEvent(event, events)
if err != nil {
glog.Warning("Error while processing event (%+v): %v", event, err)
}
case err := <-self.watcher.Error:
glog.Warning("Error while watching %q:", self.name, err)
}
}
}()
return nil
}

View File

@ -28,7 +28,7 @@ import (
"github.com/google/cadvisor/storage"
)
var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Second, "Interval between global housekeepings")
var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings")
type Manager interface {
// Start the manager, blocks forever.
@ -73,24 +73,22 @@ func New(driver storage.StorageDriver) (Manager, error) {
}
type manager struct {
containers map[string]*containerData
containersLock sync.RWMutex
storageDriver storage.StorageDriver
machineInfo info.MachineInfo
versionInfo info.VersionInfo
globalHousekeepingInterval time.Duration
containerHousekeepingInterval time.Duration
containers map[string]*containerData
containersLock sync.RWMutex
storageDriver storage.StorageDriver
machineInfo info.MachineInfo
versionInfo info.VersionInfo
}
// Start the container manager.
func (m *manager) Start() error {
func (self *manager) Start() error {
// Create root and then recover all containers.
_, err := m.createContainer("/")
err := self.createContainer("/")
if err != nil {
return err
}
glog.Infof("Starting recovery of all containers")
err = m.detectContainers()
err = self.detectSubcontainers("/")
if err != nil {
return err
}
@ -102,13 +100,16 @@ func (m *manager) Start() error {
longHousekeeping = *globalHousekeepingInterval / 2
}
// Watch for new container.
go self.watchForNewContainers()
// Look for new containers in the main housekeeping thread.
ticker := time.Tick(*globalHousekeepingInterval)
for t := range ticker {
start := time.Now()
// Check for new containers.
err = m.detectContainers()
err = self.detectSubcontainers("/")
if err != nil {
glog.Errorf("Failed to detect containers: %s", err)
}
@ -119,6 +120,7 @@ func (m *manager) Start() error {
glog.V(1).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration)
}
}
return nil
}
@ -218,29 +220,40 @@ func (m *manager) GetVersionInfo() (*info.VersionInfo, error) {
return &ret, nil
}
// Create a container. This expects to only be called from the global manager thread.
func (m *manager) createContainer(containerName string) (*containerData, error) {
// Create a container.
func (m *manager) createContainer(containerName string) error {
cont, err := NewContainerData(containerName, m.storageDriver)
if err != nil {
return nil, err
return err
}
// Add to the containers map.
func() {
alreadyExists := func() bool {
m.containersLock.Lock()
defer m.containersLock.Unlock()
// Check that the container didn't already exist
_, ok := m.containers[containerName]
if ok {
return true
}
// Add the container name and all its aliases.
m.containers[containerName] = cont
for _, alias := range cont.info.Aliases {
m.containers[alias] = cont
}
return false
}()
if alreadyExists {
return nil
}
glog.Infof("Added container: %q (aliases: %s)", containerName, cont.info.Aliases)
// Start the container's housekeeping.
cont.Start()
return cont, nil
return nil
}
func (m *manager) destroyContainer(containerName string) error {
@ -249,7 +262,8 @@ func (m *manager) destroyContainer(containerName string) error {
cont, ok := m.containers[containerName]
if !ok {
return fmt.Errorf("Expected container \"%s\" to exist during destroy", containerName)
// Already destroyed, done.
return nil
}
// Tell the container to stop.
@ -267,22 +281,21 @@ func (m *manager) destroyContainer(containerName string) error {
return nil
}
// Detect all containers that have been added or deleted.
func (m *manager) getContainersDiff() (added []info.ContainerReference, removed []info.ContainerReference, err error) {
// TODO(vmarmol): We probably don't need to lock around / since it will always be there.
// Detect all containers that have been added or deleted from the specified container.
func (m *manager) getContainersDiff(containerName string) (added []info.ContainerReference, removed []info.ContainerReference, err error) {
m.containersLock.RLock()
defer m.containersLock.RUnlock()
// Get all containers on the system.
cont, ok := m.containers["/"]
// Get all subcontainers recursively.
cont, ok := m.containers[containerName]
if !ok {
return nil, nil, fmt.Errorf("Failed to find container \"/\" while checking for new containers")
return nil, nil, fmt.Errorf("Failed to find container %q while checking for new containers", containerName)
}
allContainers, err := cont.handler.ListContainers(container.LIST_RECURSIVE)
if err != nil {
return nil, nil, err
}
allContainers = append(allContainers, info.ContainerReference{Name: "/"})
allContainers = append(allContainers, info.ContainerReference{Name: containerName})
// Determine which were added and which were removed.
allContainersSet := make(map[string]*containerData)
@ -292,6 +305,8 @@ func (m *manager) getContainersDiff() (added []info.ContainerReference, removed
allContainersSet[name] = d
}
}
// Added containers
for _, c := range allContainers {
delete(allContainersSet, c.Name)
_, ok := m.containers[c.Name]
@ -308,16 +323,16 @@ func (m *manager) getContainersDiff() (added []info.ContainerReference, removed
return
}
// Detect the existing containers and reflect the setup here.
func (m *manager) detectContainers() error {
added, removed, err := m.getContainersDiff()
// Detect the existing subcontainers and reflect the setup here.
func (m *manager) detectSubcontainers(containerName string) error {
added, removed, err := m.getContainersDiff(containerName)
if err != nil {
return err
}
// Add the new containers.
for _, cont := range added {
_, err = m.createContainer(cont.Name)
err = m.createContainer(cont.Name)
if err != nil {
glog.Errorf("Failed to create existing container: %s: %s", cont.Name, err)
}
@ -333,3 +348,50 @@ func (m *manager) detectContainers() error {
return nil
}
func (self *manager) processEvent(event container.SubcontainerEvent) error {
var err error = nil
return err
}
// Watches for new containers started in the system. Runs forever unless there is a setup error.
func (self *manager) watchForNewContainers() error {
var root *containerData
var ok bool
func() {
self.containersLock.RLock()
defer self.containersLock.RUnlock()
root, ok = self.containers["/"]
}()
if !ok {
return fmt.Errorf("root container does not exist when watching for new containers")
}
// Register for new subcontainers.
events := make(chan container.SubcontainerEvent, 16)
err := root.handler.WatchSubcontainers(events)
if err != nil {
return err
}
// There is a race between starting the watch and new container creation so we do a detection before we read new containers.
err = self.detectSubcontainers("/")
if err != nil {
return err
}
// Listen to events from the container handler.
for event := range events {
switch {
case event.EventType == container.SUBCONTAINER_ADD:
err = self.createContainer(event.Name)
case event.EventType == container.SUBCONTAINER_DELETE:
err = self.destroyContainer(event.Name)
}
if err != nil {
glog.Warning("Failed to process watch event: %v", err)
}
}
return nil
}