Merge pull request #1263 from sjpotter/new-handler-detection

Refactor container watching out of raw handler into its own inteface / package
This commit is contained in:
Tim St. Clair 2016-05-12 15:08:30 -07:00
commit f8c4ccfe8f
14 changed files with 357 additions and 262 deletions

View File

@ -27,23 +27,6 @@ const (
ListRecursive
)
// SubcontainerEventType indicates an addition or deletion event.
type SubcontainerEventType int
const (
SubcontainerAdd SubcontainerEventType = iota
SubcontainerDelete
)
// SubcontainerEvent represents a
type SubcontainerEvent struct {
// The type of event that occurred.
EventType SubcontainerEventType
// The full container name of the container where the event occurred.
Name string
}
// Interface for container operation handlers.
type ContainerHandler interface {
// Returns the ContainerReference
@ -61,12 +44,6 @@ type ContainerHandler interface {
// Returns the processes inside this container.
ListProcesses(listType ListType) ([]int, error)
// Registers a channel to listen for events affecting subcontainers (recursively).
WatchSubcontainers(events chan SubcontainerEvent) error
// Stops watching for subcontainer changes.
StopWatchingSubcontainers() error
// Returns absolute cgroup path for the requested resource.
GetCgroupPath(resource string) (string, error)

View File

@ -26,6 +26,7 @@ import (
"github.com/google/cadvisor/container/libcontainer"
"github.com/google/cadvisor/fs"
info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/manager/watcher"
docker "github.com/docker/engine-api/client"
"github.com/golang/glog"
@ -198,6 +199,6 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo, ignoreMetrics c
ignoreMetrics: ignoreMetrics,
}
container.RegisterContainerHandlerFactory(f)
container.RegisterContainerHandlerFactory(f, []watcher.ContainerWatchSource{watcher.Raw})
return nil
}

View File

@ -331,15 +331,6 @@ func (self *dockerContainerHandler) ListProcesses(listType container.ListType) (
return containerlibcontainer.GetProcesses(self.cgroupManager)
}
func (self *dockerContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error {
return fmt.Errorf("watch is unimplemented in the Docker container driver")
}
func (self *dockerContainerHandler) StopWatchingSubcontainers() error {
// No-op for Docker driver.
return nil
}
func (self *dockerContainerHandler) Exists() bool {
return common.CgroupExists(self.cgroupPaths)
}

View File

@ -18,6 +18,8 @@ import (
"fmt"
"sync"
"github.com/google/cadvisor/manager/watcher"
"github.com/golang/glog"
)
@ -67,17 +69,19 @@ func (ms MetricSet) Add(mk MetricKind) {
// TODO(vmarmol): Consider not making this global.
// Global list of factories.
var (
factories []ContainerHandlerFactory
factories = map[watcher.ContainerWatchSource][]ContainerHandlerFactory{}
factoriesLock sync.RWMutex
)
// Register a ContainerHandlerFactory. These should be registered from least general to most general
// as they will be asked in order whether they can handle a particular container.
func RegisterContainerHandlerFactory(factory ContainerHandlerFactory) {
func RegisterContainerHandlerFactory(factory ContainerHandlerFactory, watchTypes []watcher.ContainerWatchSource) {
factoriesLock.Lock()
defer factoriesLock.Unlock()
factories = append(factories, factory)
for _, watchType := range watchTypes {
factories[watchType] = append(factories[watchType], factory)
}
}
// Returns whether there are any container handler factories registered.
@ -89,12 +93,12 @@ func HasFactories() bool {
}
// Create a new ContainerHandler for the specified container.
func NewContainerHandler(name string, inHostNamespace bool) (ContainerHandler, bool, error) {
func NewContainerHandler(name string, watchType watcher.ContainerWatchSource, inHostNamespace bool) (ContainerHandler, bool, error) {
factoriesLock.RLock()
defer factoriesLock.RUnlock()
// Create the ContainerHandler with the first factory that supports it.
for _, factory := range factories {
for _, factory := range factories[watchType] {
canHandle, canAccept, err := factory.CanHandleAndAccept(name)
if err != nil {
glog.V(4).Infof("Error trying to work out if we can handle %s: %v", name, err)
@ -120,7 +124,7 @@ func ClearContainerHandlerFactories() {
factoriesLock.Lock()
defer factoriesLock.Unlock()
factories = make([]ContainerHandlerFactory, 0, 4)
factories = map[watcher.ContainerWatchSource][]ContainerHandlerFactory{}
}
func DebugInfo() map[string][]string {
@ -129,9 +133,11 @@ func DebugInfo() map[string][]string {
// Get debug information for all factories.
out := make(map[string][]string)
for _, factory := range factories {
for k, v := range factory.DebugInfo() {
out[k] = v
for _, factoriesSlice := range factories {
for _, factory := range factoriesSlice {
for k, v := range factory.DebugInfo() {
out[k] = v
}
}
}
return out

View File

@ -17,6 +17,8 @@ package container
import (
"testing"
"github.com/google/cadvisor/manager/watcher"
"github.com/stretchr/testify/mock"
)
@ -57,7 +59,7 @@ func TestNewContainerHandler_FirstMatches(t *testing.T) {
CanHandleValue: true,
CanAcceptValue: true,
}
RegisterContainerHandlerFactory(allwaysYes)
RegisterContainerHandlerFactory(allwaysYes, []watcher.ContainerWatchSource{watcher.Raw})
// The yes factory should be asked to create the ContainerHandler.
mockContainer, err := mockFactory.NewContainerHandler(testContainerName, true)
@ -66,7 +68,7 @@ func TestNewContainerHandler_FirstMatches(t *testing.T) {
}
allwaysYes.On("NewContainerHandler", testContainerName).Return(mockContainer, nil)
cont, _, err := NewContainerHandler(testContainerName, true)
cont, _, err := NewContainerHandler(testContainerName, watcher.Raw, true)
if err != nil {
t.Error(err)
}
@ -84,13 +86,13 @@ func TestNewContainerHandler_SecondMatches(t *testing.T) {
CanHandleValue: false,
CanAcceptValue: true,
}
RegisterContainerHandlerFactory(allwaysNo)
RegisterContainerHandlerFactory(allwaysNo, []watcher.ContainerWatchSource{watcher.Raw})
allwaysYes := &mockContainerHandlerFactory{
Name: "yes",
CanHandleValue: true,
CanAcceptValue: true,
}
RegisterContainerHandlerFactory(allwaysYes)
RegisterContainerHandlerFactory(allwaysYes, []watcher.ContainerWatchSource{watcher.Raw})
// The yes factory should be asked to create the ContainerHandler.
mockContainer, err := mockFactory.NewContainerHandler(testContainerName, true)
@ -99,7 +101,7 @@ func TestNewContainerHandler_SecondMatches(t *testing.T) {
}
allwaysYes.On("NewContainerHandler", testContainerName).Return(mockContainer, nil)
cont, _, err := NewContainerHandler(testContainerName, true)
cont, _, err := NewContainerHandler(testContainerName, watcher.Raw, true)
if err != nil {
t.Error(err)
}
@ -117,15 +119,15 @@ func TestNewContainerHandler_NoneMatch(t *testing.T) {
CanHandleValue: false,
CanAcceptValue: true,
}
RegisterContainerHandlerFactory(allwaysNo1)
RegisterContainerHandlerFactory(allwaysNo1, []watcher.ContainerWatchSource{watcher.Raw})
allwaysNo2 := &mockContainerHandlerFactory{
Name: "no",
CanHandleValue: false,
CanAcceptValue: true,
}
RegisterContainerHandlerFactory(allwaysNo2)
RegisterContainerHandlerFactory(allwaysNo2, []watcher.ContainerWatchSource{watcher.Raw})
_, _, err := NewContainerHandler(testContainerName, true)
_, _, err := NewContainerHandler(testContainerName, watcher.Raw, true)
if err == nil {
t.Error("Expected NewContainerHandler to fail")
}
@ -140,15 +142,15 @@ func TestNewContainerHandler_Accept(t *testing.T) {
CanHandleValue: false,
CanAcceptValue: true,
}
RegisterContainerHandlerFactory(cannotHandle)
RegisterContainerHandlerFactory(cannotHandle, []watcher.ContainerWatchSource{watcher.Raw})
cannotAccept := &mockContainerHandlerFactory{
Name: "no",
CanHandleValue: true,
CanAcceptValue: false,
}
RegisterContainerHandlerFactory(cannotAccept)
RegisterContainerHandlerFactory(cannotAccept, []watcher.ContainerWatchSource{watcher.Raw})
_, accept, err := NewContainerHandler(testContainerName, true)
_, accept, err := NewContainerHandler(testContainerName, watcher.Raw, true)
if err != nil {
t.Error("Expected NewContainerHandler to succeed")
}

View File

@ -77,16 +77,6 @@ func (self *MockContainerHandler) ListProcesses(listType ListType) ([]int, error
return args.Get(0).([]int), args.Error(1)
}
func (self *MockContainerHandler) WatchSubcontainers(events chan SubcontainerEvent) error {
args := self.Called(events)
return args.Error(0)
}
func (self *MockContainerHandler) StopWatchingSubcontainers() error {
args := self.Called()
return args.Error(0)
}
func (self *MockContainerHandler) Exists() bool {
args := self.Called()
return args.Get(0).(bool)
@ -102,6 +92,11 @@ func (self *MockContainerHandler) GetContainerLabels() map[string]string {
return args.Get(0).(map[string]string)
}
func (self *MockContainerHandler) String() string {
args := self.Called()
return args.Get(0).(string)
}
type FactoryForMockContainerHandler struct {
Name string
PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler)

View File

@ -23,6 +23,7 @@ import (
"github.com/google/cadvisor/container/libcontainer"
"github.com/google/cadvisor/fs"
info "github.com/google/cadvisor/info/v1"
watch "github.com/google/cadvisor/manager/watcher"
"github.com/golang/glog"
)
@ -90,6 +91,6 @@ func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, igno
watcher: watcher,
ignoreMetrics: ignoreMetrics,
}
container.RegisterContainerHandlerFactory(factory)
container.RegisterContainerHandlerFactory(factory, []watch.ContainerWatchSource{watch.Raw})
return nil
}

View File

@ -17,9 +17,6 @@ package raw
import (
"fmt"
"io/ioutil"
"path"
"strings"
"github.com/google/cadvisor/container"
"github.com/google/cadvisor/container/common"
@ -32,7 +29,6 @@ import (
"github.com/opencontainers/runc/libcontainer/cgroups"
cgroupfs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/configs"
"golang.org/x/exp/inotify"
)
type rawContainerHandler struct {
@ -41,12 +37,6 @@ type rawContainerHandler struct {
cgroupSubsystems *libcontainer.CgroupSubsystems
machineInfoFactory info.MachineInfoFactory
// Inotify event watcher.
watcher *common.InotifyWatcher
// Signal for watcher thread to stop.
stopWatcher chan error
// Absolute path to the cgroup hierarchies of this container.
// (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test")
cgroupPaths map[string]string
@ -102,12 +92,10 @@ func newRawContainerHandler(name string, cgroupSubsystems *libcontainer.CgroupSu
name: name,
cgroupSubsystems: cgroupSubsystems,
machineInfoFactory: machineInfoFactory,
stopWatcher: make(chan error),
cgroupPaths: cgroupPaths,
cgroupManager: cgroupManager,
fsInfo: fsInfo,
externalMounts: externalMounts,
watcher: watcher,
rootFs: rootFs,
ignoreMetrics: ignoreMetrics,
pid: pid,
@ -276,151 +264,6 @@ func (self *rawContainerHandler) ListProcesses(listType container.ListType) ([]i
return libcontainer.GetProcesses(self.cgroupManager)
}
// Watches the specified directory and all subdirectories. Returns whether the path was
// already being watched and an error (if any).
func (self *rawContainerHandler) watchDirectory(dir string, containerName string) (bool, error) {
alreadyWatching, err := self.watcher.AddWatch(containerName, dir)
if err != nil {
return alreadyWatching, err
}
// Remove the watch if further operations failed.
cleanup := true
defer func() {
if cleanup {
_, err := self.watcher.RemoveWatch(containerName, dir)
if err != nil {
glog.Warningf("Failed to remove inotify watch for %q: %v", dir, err)
}
}
}()
// TODO(vmarmol): We should re-do this once we're done to ensure directories were not added in the meantime.
// Watch subdirectories as well.
entries, err := ioutil.ReadDir(dir)
if err != nil {
return alreadyWatching, err
}
for _, entry := range entries {
if entry.IsDir() {
// TODO(vmarmol): We don't have to fail here, maybe we can recover and try to get as many registrations as we can.
_, err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name()))
if err != nil {
return alreadyWatching, err
}
}
}
cleanup = false
return alreadyWatching, nil
}
func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan container.SubcontainerEvent) error {
// Convert the inotify event type to a container create or delete.
var eventType container.SubcontainerEventType
switch {
case (event.Mask & inotify.IN_CREATE) > 0:
eventType = container.SubcontainerAdd
case (event.Mask & inotify.IN_DELETE) > 0:
eventType = container.SubcontainerDelete
case (event.Mask & inotify.IN_MOVED_FROM) > 0:
eventType = container.SubcontainerDelete
case (event.Mask & inotify.IN_MOVED_TO) > 0:
eventType = container.SubcontainerAdd
default:
// Ignore other events.
return nil
}
// Derive the container name from the path name.
var containerName string
for _, mount := range self.cgroupSubsystems.Mounts {
mountLocation := path.Clean(mount.Mountpoint) + "/"
if strings.HasPrefix(event.Name, mountLocation) {
containerName = event.Name[len(mountLocation)-1:]
break
}
}
if containerName == "" {
return fmt.Errorf("unable to detect container from watch event on directory %q", event.Name)
}
// Maintain the watch for the new or deleted container.
switch {
case eventType == container.SubcontainerAdd:
// New container was created, watch it.
alreadyWatched, err := self.watchDirectory(event.Name, containerName)
if err != nil {
return err
}
// Only report container creation once.
if alreadyWatched {
return nil
}
case eventType == container.SubcontainerDelete:
// Container was deleted, stop watching for it.
lastWatched, err := self.watcher.RemoveWatch(containerName, event.Name)
if err != nil {
return err
}
// Only report container deletion once.
if !lastWatched {
return nil
}
default:
return fmt.Errorf("unknown event type %v", eventType)
}
// Deliver the event.
events <- container.SubcontainerEvent{
EventType: eventType,
Name: containerName,
}
return nil
}
func (self *rawContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error {
// Watch this container (all its cgroups) and all subdirectories.
for _, cgroupPath := range self.cgroupPaths {
_, err := self.watchDirectory(cgroupPath, self.name)
if err != nil {
return err
}
}
// Process the events received from the kernel.
go func() {
for {
select {
case event := <-self.watcher.Event():
err := self.processEvent(event, events)
if err != nil {
glog.Warningf("Error while processing event (%+v): %v", event, err)
}
case err := <-self.watcher.Error():
glog.Warningf("Error while watching %q:", self.name, err)
case <-self.stopWatcher:
err := self.watcher.Close()
if err == nil {
self.stopWatcher <- err
return
}
}
}
}()
return nil
}
func (self *rawContainerHandler) StopWatchingSubcontainers() error {
// Rendezvous with the watcher thread.
self.stopWatcher <- nil
return <-self.stopWatcher
}
func (self *rawContainerHandler) Exists() bool {
return common.CgroupExists(self.cgroupPaths)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/google/cadvisor/container/libcontainer"
"github.com/google/cadvisor/fs"
info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/manager/watcher"
"github.com/golang/glog"
)
@ -99,6 +100,6 @@ func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, igno
ignoreMetrics: ignoreMetrics,
rktPath: rktPath,
}
container.RegisterContainerHandlerFactory(factory)
container.RegisterContainerHandlerFactory(factory, []watcher.ContainerWatchSource{watcher.Raw})
return nil
}

View File

@ -266,15 +266,6 @@ func (handler *rktContainerHandler) ListProcesses(listType container.ListType) (
return libcontainer.GetProcesses(handler.cgroupManager)
}
func (handler *rktContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error {
return fmt.Errorf("watch is unimplemented in the Rkt container driver")
}
func (handler *rktContainerHandler) StopWatchingSubcontainers() error {
// No-op for Rkt driver.
return nil
}
func (handler *rktContainerHandler) Exists() bool {
return common.CgroupExists(handler.cgroupPaths)
}

View File

@ -21,6 +21,7 @@ import (
"github.com/google/cadvisor/container"
"github.com/google/cadvisor/fs"
info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/manager/watcher"
"github.com/golang/glog"
)
@ -52,6 +53,6 @@ func (f *systemdFactory) DebugInfo() map[string][]string {
func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, ignoreMetrics container.MetricSet) error {
glog.Infof("Registering systemd factory")
factory := &systemdFactory{}
container.RegisterContainerHandlerFactory(factory)
container.RegisterContainerHandlerFactory(factory, []watcher.ContainerWatchSource{watcher.Raw})
return nil
}

View File

@ -37,6 +37,8 @@ import (
info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/info/v2"
"github.com/google/cadvisor/machine"
"github.com/google/cadvisor/manager/watcher"
rawwatcher "github.com/google/cadvisor/manager/watcher/raw"
"github.com/google/cadvisor/utils/cpuload"
"github.com/google/cadvisor/utils/oomparser"
"github.com/google/cadvisor/utils/sysfs"
@ -163,6 +165,9 @@ func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingIn
inHostNamespace = true
}
// Register for new subcontainers.
eventsChannel := make(chan watcher.ContainerEvent, 16)
newManager := &manager{
containers: make(map[namespacedContainerName]*containerData),
quitChannels: make([]chan error, 0, 2),
@ -174,6 +179,8 @@ func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingIn
maxHousekeepingInterval: maxHousekeepingInterval,
allowDynamicHousekeeping: allowDynamicHousekeeping,
ignoreMetrics: ignoreMetricsSet,
containerWatchers: []watcher.ContainerWatcher{},
eventsChannel: eventsChannel,
}
machineInfo, err := machine.Info(sysfs, fsInfo, inHostNamespace)
@ -217,6 +224,8 @@ type manager struct {
maxHousekeepingInterval time.Duration
allowDynamicHousekeeping bool
ignoreMetrics container.MetricSet
containerWatchers []watcher.ContainerWatcher
eventsChannel chan watcher.ContainerEvent
}
// Start the container manager.
@ -241,6 +250,12 @@ func (self *manager) Start() error {
glog.Errorf("Registration of the raw container factory failed: %v", err)
}
rawWatcher, err := rawwatcher.NewRawContainerWatcher()
if err != nil {
return err
}
self.containerWatchers = append(self.containerWatchers, rawWatcher)
if *enableLoadReader {
// Create cpu load reader.
cpuLoadReader, err := cpuload.New()
@ -269,7 +284,7 @@ func (self *manager) Start() error {
}
// Create root and then recover all containers.
err = self.createContainer("/")
err = self.createContainer("/", watcher.Raw)
if err != nil {
return err
}
@ -769,10 +784,14 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
}
// Create a container.
func (m *manager) createContainer(containerName string) error {
func (m *manager) createContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
m.containersLock.Lock()
defer m.containersLock.Unlock()
return m.createContainerLocked(containerName, watchSource)
}
func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error {
namespacedName := namespacedContainerName{
Name: containerName,
}
@ -782,7 +801,7 @@ func (m *manager) createContainer(containerName string) error {
return nil
}
handler, accept, err := container.NewContainerHandler(containerName, m.inHostNamespace)
handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.inHostNamespace)
if err != nil {
return err
}
@ -849,6 +868,10 @@ func (m *manager) destroyContainer(containerName string) error {
m.containersLock.Lock()
defer m.containersLock.Unlock()
return m.destroyContainerLocked(containerName)
}
func (m *manager) destroyContainerLocked(containerName string) error {
namespacedName := namespacedContainerName{
Name: containerName,
}
@ -946,7 +969,7 @@ func (m *manager) detectSubcontainers(containerName string) error {
// Add the new containers.
for _, cont := range added {
err = m.createContainer(cont.Name)
err = m.createContainer(cont.Name, watcher.Raw)
if err != nil {
glog.Errorf("Failed to create existing container: %s: %s", cont.Name, err)
}
@ -965,28 +988,15 @@ func (m *manager) detectSubcontainers(containerName string) error {
// Watches for new containers started in the system. Runs forever unless there is a setup error.
func (self *manager) watchForNewContainers(quit chan error) error {
var root *containerData
var ok bool
func() {
self.containersLock.RLock()
defer self.containersLock.RUnlock()
root, ok = self.containers[namespacedContainerName{
Name: "/",
}]
}()
if !ok {
return fmt.Errorf("root container does not exist when watching for new containers")
}
// Register for new subcontainers.
eventsChannel := make(chan container.SubcontainerEvent, 16)
err := root.handler.WatchSubcontainers(eventsChannel)
if err != nil {
return err
for _, watcher := range self.containerWatchers {
err := watcher.Start(self.eventsChannel)
if err != nil {
return err
}
}
// There is a race between starting the watch and new container creation so we do a detection before we read new containers.
err = self.detectSubcontainers("/")
err := self.detectSubcontainers("/")
if err != nil {
return err
}
@ -995,21 +1005,32 @@ func (self *manager) watchForNewContainers(quit chan error) error {
go func() {
for {
select {
case event := <-eventsChannel:
case event := <-self.eventsChannel:
switch {
case event.EventType == container.SubcontainerAdd:
err = self.createContainer(event.Name)
case event.EventType == container.SubcontainerDelete:
case event.EventType == watcher.ContainerAdd:
err = self.createContainer(event.Name, event.WatchSource)
case event.EventType == watcher.ContainerDelete:
err = self.destroyContainer(event.Name)
}
if err != nil {
glog.Warningf("Failed to process watch event %+v: %v", event, err)
}
case <-quit:
errors := []string{}
// Stop processing events if asked to quit.
err := root.handler.StopWatchingSubcontainers()
quit <- err
if err == nil {
for _, watcher := range self.containerWatchers {
err := watcher.Stop()
if err != nil {
errors = append(errors, err.Error())
}
}
if len(errors) > 0 {
err_str := strings.Join(errors, ", ")
quit <- fmt.Errorf("Error quiting watchers: %v", err_str)
} else {
quit <- nil
glog.Infof("Exiting thread watching subcontainers")
return
}

214
manager/watcher/raw/raw.go Normal file
View File

@ -0,0 +1,214 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package container defines types for sub-container events and also
// defines an interface for container operation handlers.
package raw
import (
"fmt"
"io/ioutil"
"path"
"strings"
"github.com/google/cadvisor/container/common"
"github.com/google/cadvisor/container/libcontainer"
"github.com/google/cadvisor/manager/watcher"
"github.com/golang/glog"
"golang.org/x/exp/inotify"
)
type rawContainerWatcher struct {
// Absolute path to the root of the cgroup hierarchies
cgroupPaths map[string]string
cgroupSubsystems *libcontainer.CgroupSubsystems
// Inotify event watcher.
watcher *common.InotifyWatcher
// Signal for watcher thread to stop.
stopWatcher chan error
}
func NewRawContainerWatcher() (watcher.ContainerWatcher, error) {
cgroupSubsystems, err := libcontainer.GetCgroupSubsystems()
if err != nil {
return nil, fmt.Errorf("failed to get cgroup subsystems: %v", err)
}
if len(cgroupSubsystems.Mounts) == 0 {
return nil, fmt.Errorf("failed to find supported cgroup mounts for the raw factory")
}
watcher, err := common.NewInotifyWatcher()
if err != nil {
return nil, err
}
rawWatcher := &rawContainerWatcher{
cgroupPaths: common.MakeCgroupPaths(cgroupSubsystems.MountPoints, "/"),
cgroupSubsystems: &cgroupSubsystems,
watcher: watcher,
stopWatcher: make(chan error),
}
return rawWatcher, nil
}
func (self *rawContainerWatcher) Start(events chan watcher.ContainerEvent) error {
// Watch this container (all its cgroups) and all subdirectories.
for _, cgroupPath := range self.cgroupPaths {
_, err := self.watchDirectory(cgroupPath, "/")
if err != nil {
return err
}
}
// Process the events received from the kernel.
go func() {
for {
select {
case event := <-self.watcher.Event():
err := self.processEvent(event, events)
if err != nil {
glog.Warningf("Error while processing event (%+v): %v", event, err)
}
case err := <-self.watcher.Error():
glog.Warningf("Error while watching %q:", "/", err)
case <-self.stopWatcher:
err := self.watcher.Close()
if err == nil {
self.stopWatcher <- err
return
}
}
}
}()
return nil
}
func (self *rawContainerWatcher) Stop() error {
// Rendezvous with the watcher thread.
self.stopWatcher <- nil
return <-self.stopWatcher
}
// Watches the specified directory and all subdirectories. Returns whether the path was
// already being watched and an error (if any).
func (self *rawContainerWatcher) watchDirectory(dir string, containerName string) (bool, error) {
alreadyWatching, err := self.watcher.AddWatch(containerName, dir)
if err != nil {
return alreadyWatching, err
}
// Remove the watch if further operations failed.
cleanup := true
defer func() {
if cleanup {
_, err := self.watcher.RemoveWatch(containerName, dir)
if err != nil {
glog.Warningf("Failed to remove inotify watch for %q: %v", dir, err)
}
}
}()
// TODO(vmarmol): We should re-do this once we're done to ensure directories were not added in the meantime.
// Watch subdirectories as well.
entries, err := ioutil.ReadDir(dir)
if err != nil {
return alreadyWatching, err
}
for _, entry := range entries {
if entry.IsDir() {
// TODO(vmarmol): We don't have to fail here, maybe we can recover and try to get as many registrations as we can.
_, err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name()))
if err != nil {
return alreadyWatching, err
}
}
}
cleanup = false
return alreadyWatching, nil
}
func (self *rawContainerWatcher) processEvent(event *inotify.Event, events chan watcher.ContainerEvent) error {
// Convert the inotify event type to a container create or delete.
var eventType watcher.ContainerEventType
switch {
case (event.Mask & inotify.IN_CREATE) > 0:
eventType = watcher.ContainerAdd
case (event.Mask & inotify.IN_DELETE) > 0:
eventType = watcher.ContainerDelete
case (event.Mask & inotify.IN_MOVED_FROM) > 0:
eventType = watcher.ContainerDelete
case (event.Mask & inotify.IN_MOVED_TO) > 0:
eventType = watcher.ContainerAdd
default:
// Ignore other events.
return nil
}
// Derive the container name from the path name.
var containerName string
for _, mount := range self.cgroupSubsystems.Mounts {
mountLocation := path.Clean(mount.Mountpoint) + "/"
if strings.HasPrefix(event.Name, mountLocation) {
containerName = event.Name[len(mountLocation)-1:]
break
}
}
if containerName == "" {
return fmt.Errorf("unable to detect container from watch event on directory %q", event.Name)
}
// Maintain the watch for the new or deleted container.
switch eventType {
case watcher.ContainerAdd:
// New container was created, watch it.
alreadyWatched, err := self.watchDirectory(event.Name, containerName)
if err != nil {
return err
}
// Only report container creation once.
if alreadyWatched {
return nil
}
case watcher.ContainerDelete:
// Container was deleted, stop watching for it.
lastWatched, err := self.watcher.RemoveWatch(containerName, event.Name)
if err != nil {
return err
}
// Only report container deletion once.
if !lastWatched {
return nil
}
default:
return fmt.Errorf("unknown event type %v", eventType)
}
// Deliver the event.
events <- watcher.ContainerEvent{
EventType: eventType,
Name: containerName,
WatchSource: watcher.Raw,
}
return nil
}

View File

@ -0,0 +1,51 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package container defines types for sub-container events and also
// defines an interface for container operation handlers.
package watcher
// SubcontainerEventType indicates an addition or deletion event.
type ContainerEventType int
const (
ContainerAdd ContainerEventType = iota
ContainerDelete
)
type ContainerWatchSource int
const (
Raw ContainerWatchSource = iota
)
// ContainerEvent represents a
type ContainerEvent struct {
// The type of event that occurred.
EventType ContainerEventType
// The full container name of the container where the event occurred.
Name string
// The watcher that detected this change event
WatchSource ContainerWatchSource
}
type ContainerWatcher interface {
// Registers a channel to listen for events affecting subcontainers (recursively).
Start(events chan ContainerEvent) error
// Stops watching for subcontainer changes.
Stop() error
}