Refactor container watching out of raw handler into its own inteface / package
This commit is contained in:
parent
9a62ee199f
commit
e02632463b
@ -27,23 +27,6 @@ const (
|
|||||||
ListRecursive
|
ListRecursive
|
||||||
)
|
)
|
||||||
|
|
||||||
// SubcontainerEventType indicates an addition or deletion event.
|
|
||||||
type SubcontainerEventType int
|
|
||||||
|
|
||||||
const (
|
|
||||||
SubcontainerAdd SubcontainerEventType = iota
|
|
||||||
SubcontainerDelete
|
|
||||||
)
|
|
||||||
|
|
||||||
// SubcontainerEvent represents a
|
|
||||||
type SubcontainerEvent struct {
|
|
||||||
// The type of event that occurred.
|
|
||||||
EventType SubcontainerEventType
|
|
||||||
|
|
||||||
// The full container name of the container where the event occurred.
|
|
||||||
Name string
|
|
||||||
}
|
|
||||||
|
|
||||||
// Interface for container operation handlers.
|
// Interface for container operation handlers.
|
||||||
type ContainerHandler interface {
|
type ContainerHandler interface {
|
||||||
// Returns the ContainerReference
|
// Returns the ContainerReference
|
||||||
@ -61,12 +44,6 @@ type ContainerHandler interface {
|
|||||||
// Returns the processes inside this container.
|
// Returns the processes inside this container.
|
||||||
ListProcesses(listType ListType) ([]int, error)
|
ListProcesses(listType ListType) ([]int, error)
|
||||||
|
|
||||||
// Registers a channel to listen for events affecting subcontainers (recursively).
|
|
||||||
WatchSubcontainers(events chan SubcontainerEvent) error
|
|
||||||
|
|
||||||
// Stops watching for subcontainer changes.
|
|
||||||
StopWatchingSubcontainers() error
|
|
||||||
|
|
||||||
// Returns absolute cgroup path for the requested resource.
|
// Returns absolute cgroup path for the requested resource.
|
||||||
GetCgroupPath(resource string) (string, error)
|
GetCgroupPath(resource string) (string, error)
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/google/cadvisor/container/libcontainer"
|
"github.com/google/cadvisor/container/libcontainer"
|
||||||
"github.com/google/cadvisor/fs"
|
"github.com/google/cadvisor/fs"
|
||||||
info "github.com/google/cadvisor/info/v1"
|
info "github.com/google/cadvisor/info/v1"
|
||||||
|
"github.com/google/cadvisor/manager/watcher"
|
||||||
|
|
||||||
docker "github.com/docker/engine-api/client"
|
docker "github.com/docker/engine-api/client"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -198,6 +199,6 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo, ignoreMetrics c
|
|||||||
ignoreMetrics: ignoreMetrics,
|
ignoreMetrics: ignoreMetrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
container.RegisterContainerHandlerFactory(f)
|
container.RegisterContainerHandlerFactory(f, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -331,15 +331,6 @@ func (self *dockerContainerHandler) ListProcesses(listType container.ListType) (
|
|||||||
return containerlibcontainer.GetProcesses(self.cgroupManager)
|
return containerlibcontainer.GetProcesses(self.cgroupManager)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *dockerContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error {
|
|
||||||
return fmt.Errorf("watch is unimplemented in the Docker container driver")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *dockerContainerHandler) StopWatchingSubcontainers() error {
|
|
||||||
// No-op for Docker driver.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *dockerContainerHandler) Exists() bool {
|
func (self *dockerContainerHandler) Exists() bool {
|
||||||
return common.CgroupExists(self.cgroupPaths)
|
return common.CgroupExists(self.cgroupPaths)
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/google/cadvisor/manager/watcher"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -67,17 +69,19 @@ func (ms MetricSet) Add(mk MetricKind) {
|
|||||||
// TODO(vmarmol): Consider not making this global.
|
// TODO(vmarmol): Consider not making this global.
|
||||||
// Global list of factories.
|
// Global list of factories.
|
||||||
var (
|
var (
|
||||||
factories []ContainerHandlerFactory
|
factories = map[watcher.ContainerWatchSource][]ContainerHandlerFactory{}
|
||||||
factoriesLock sync.RWMutex
|
factoriesLock sync.RWMutex
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register a ContainerHandlerFactory. These should be registered from least general to most general
|
// Register a ContainerHandlerFactory. These should be registered from least general to most general
|
||||||
// as they will be asked in order whether they can handle a particular container.
|
// as they will be asked in order whether they can handle a particular container.
|
||||||
func RegisterContainerHandlerFactory(factory ContainerHandlerFactory) {
|
func RegisterContainerHandlerFactory(factory ContainerHandlerFactory, watchTypes []watcher.ContainerWatchSource) {
|
||||||
factoriesLock.Lock()
|
factoriesLock.Lock()
|
||||||
defer factoriesLock.Unlock()
|
defer factoriesLock.Unlock()
|
||||||
|
|
||||||
factories = append(factories, factory)
|
for _, watchType := range watchTypes {
|
||||||
|
factories[watchType] = append(factories[watchType], factory)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns whether there are any container handler factories registered.
|
// Returns whether there are any container handler factories registered.
|
||||||
@ -89,12 +93,12 @@ func HasFactories() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a new ContainerHandler for the specified container.
|
// Create a new ContainerHandler for the specified container.
|
||||||
func NewContainerHandler(name string, inHostNamespace bool) (ContainerHandler, bool, error) {
|
func NewContainerHandler(name string, watchType watcher.ContainerWatchSource, inHostNamespace bool) (ContainerHandler, bool, error) {
|
||||||
factoriesLock.RLock()
|
factoriesLock.RLock()
|
||||||
defer factoriesLock.RUnlock()
|
defer factoriesLock.RUnlock()
|
||||||
|
|
||||||
// Create the ContainerHandler with the first factory that supports it.
|
// Create the ContainerHandler with the first factory that supports it.
|
||||||
for _, factory := range factories {
|
for _, factory := range factories[watchType] {
|
||||||
canHandle, canAccept, err := factory.CanHandleAndAccept(name)
|
canHandle, canAccept, err := factory.CanHandleAndAccept(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("Error trying to work out if we can handle %s: %v", name, err)
|
glog.V(4).Infof("Error trying to work out if we can handle %s: %v", name, err)
|
||||||
@ -120,7 +124,7 @@ func ClearContainerHandlerFactories() {
|
|||||||
factoriesLock.Lock()
|
factoriesLock.Lock()
|
||||||
defer factoriesLock.Unlock()
|
defer factoriesLock.Unlock()
|
||||||
|
|
||||||
factories = make([]ContainerHandlerFactory, 0, 4)
|
factories = map[watcher.ContainerWatchSource][]ContainerHandlerFactory{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func DebugInfo() map[string][]string {
|
func DebugInfo() map[string][]string {
|
||||||
@ -129,10 +133,12 @@ func DebugInfo() map[string][]string {
|
|||||||
|
|
||||||
// Get debug information for all factories.
|
// Get debug information for all factories.
|
||||||
out := make(map[string][]string)
|
out := make(map[string][]string)
|
||||||
for _, factory := range factories {
|
for _, factoriesSlice := range factories {
|
||||||
|
for _, factory := range factoriesSlice {
|
||||||
for k, v := range factory.DebugInfo() {
|
for k, v := range factory.DebugInfo() {
|
||||||
out[k] = v
|
out[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,8 @@ package container
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/google/cadvisor/manager/watcher"
|
||||||
|
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -57,7 +59,7 @@ func TestNewContainerHandler_FirstMatches(t *testing.T) {
|
|||||||
CanHandleValue: true,
|
CanHandleValue: true,
|
||||||
CanAcceptValue: true,
|
CanAcceptValue: true,
|
||||||
}
|
}
|
||||||
RegisterContainerHandlerFactory(allwaysYes)
|
RegisterContainerHandlerFactory(allwaysYes, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
|
|
||||||
// The yes factory should be asked to create the ContainerHandler.
|
// The yes factory should be asked to create the ContainerHandler.
|
||||||
mockContainer, err := mockFactory.NewContainerHandler(testContainerName, true)
|
mockContainer, err := mockFactory.NewContainerHandler(testContainerName, true)
|
||||||
@ -66,7 +68,7 @@ func TestNewContainerHandler_FirstMatches(t *testing.T) {
|
|||||||
}
|
}
|
||||||
allwaysYes.On("NewContainerHandler", testContainerName).Return(mockContainer, nil)
|
allwaysYes.On("NewContainerHandler", testContainerName).Return(mockContainer, nil)
|
||||||
|
|
||||||
cont, _, err := NewContainerHandler(testContainerName, true)
|
cont, _, err := NewContainerHandler(testContainerName, watcher.Raw, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -84,13 +86,13 @@ func TestNewContainerHandler_SecondMatches(t *testing.T) {
|
|||||||
CanHandleValue: false,
|
CanHandleValue: false,
|
||||||
CanAcceptValue: true,
|
CanAcceptValue: true,
|
||||||
}
|
}
|
||||||
RegisterContainerHandlerFactory(allwaysNo)
|
RegisterContainerHandlerFactory(allwaysNo, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
allwaysYes := &mockContainerHandlerFactory{
|
allwaysYes := &mockContainerHandlerFactory{
|
||||||
Name: "yes",
|
Name: "yes",
|
||||||
CanHandleValue: true,
|
CanHandleValue: true,
|
||||||
CanAcceptValue: true,
|
CanAcceptValue: true,
|
||||||
}
|
}
|
||||||
RegisterContainerHandlerFactory(allwaysYes)
|
RegisterContainerHandlerFactory(allwaysYes, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
|
|
||||||
// The yes factory should be asked to create the ContainerHandler.
|
// The yes factory should be asked to create the ContainerHandler.
|
||||||
mockContainer, err := mockFactory.NewContainerHandler(testContainerName, true)
|
mockContainer, err := mockFactory.NewContainerHandler(testContainerName, true)
|
||||||
@ -99,7 +101,7 @@ func TestNewContainerHandler_SecondMatches(t *testing.T) {
|
|||||||
}
|
}
|
||||||
allwaysYes.On("NewContainerHandler", testContainerName).Return(mockContainer, nil)
|
allwaysYes.On("NewContainerHandler", testContainerName).Return(mockContainer, nil)
|
||||||
|
|
||||||
cont, _, err := NewContainerHandler(testContainerName, true)
|
cont, _, err := NewContainerHandler(testContainerName, watcher.Raw, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -117,15 +119,15 @@ func TestNewContainerHandler_NoneMatch(t *testing.T) {
|
|||||||
CanHandleValue: false,
|
CanHandleValue: false,
|
||||||
CanAcceptValue: true,
|
CanAcceptValue: true,
|
||||||
}
|
}
|
||||||
RegisterContainerHandlerFactory(allwaysNo1)
|
RegisterContainerHandlerFactory(allwaysNo1, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
allwaysNo2 := &mockContainerHandlerFactory{
|
allwaysNo2 := &mockContainerHandlerFactory{
|
||||||
Name: "no",
|
Name: "no",
|
||||||
CanHandleValue: false,
|
CanHandleValue: false,
|
||||||
CanAcceptValue: true,
|
CanAcceptValue: true,
|
||||||
}
|
}
|
||||||
RegisterContainerHandlerFactory(allwaysNo2)
|
RegisterContainerHandlerFactory(allwaysNo2, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
|
|
||||||
_, _, err := NewContainerHandler(testContainerName, true)
|
_, _, err := NewContainerHandler(testContainerName, watcher.Raw, true)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("Expected NewContainerHandler to fail")
|
t.Error("Expected NewContainerHandler to fail")
|
||||||
}
|
}
|
||||||
@ -140,15 +142,15 @@ func TestNewContainerHandler_Accept(t *testing.T) {
|
|||||||
CanHandleValue: false,
|
CanHandleValue: false,
|
||||||
CanAcceptValue: true,
|
CanAcceptValue: true,
|
||||||
}
|
}
|
||||||
RegisterContainerHandlerFactory(cannotHandle)
|
RegisterContainerHandlerFactory(cannotHandle, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
cannotAccept := &mockContainerHandlerFactory{
|
cannotAccept := &mockContainerHandlerFactory{
|
||||||
Name: "no",
|
Name: "no",
|
||||||
CanHandleValue: true,
|
CanHandleValue: true,
|
||||||
CanAcceptValue: false,
|
CanAcceptValue: false,
|
||||||
}
|
}
|
||||||
RegisterContainerHandlerFactory(cannotAccept)
|
RegisterContainerHandlerFactory(cannotAccept, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
|
|
||||||
_, accept, err := NewContainerHandler(testContainerName, true)
|
_, accept, err := NewContainerHandler(testContainerName, watcher.Raw, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error("Expected NewContainerHandler to succeed")
|
t.Error("Expected NewContainerHandler to succeed")
|
||||||
}
|
}
|
||||||
|
@ -77,16 +77,6 @@ func (self *MockContainerHandler) ListProcesses(listType ListType) ([]int, error
|
|||||||
return args.Get(0).([]int), args.Error(1)
|
return args.Get(0).([]int), args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *MockContainerHandler) WatchSubcontainers(events chan SubcontainerEvent) error {
|
|
||||||
args := self.Called(events)
|
|
||||||
return args.Error(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *MockContainerHandler) StopWatchingSubcontainers() error {
|
|
||||||
args := self.Called()
|
|
||||||
return args.Error(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *MockContainerHandler) Exists() bool {
|
func (self *MockContainerHandler) Exists() bool {
|
||||||
args := self.Called()
|
args := self.Called()
|
||||||
return args.Get(0).(bool)
|
return args.Get(0).(bool)
|
||||||
@ -102,6 +92,11 @@ func (self *MockContainerHandler) GetContainerLabels() map[string]string {
|
|||||||
return args.Get(0).(map[string]string)
|
return args.Get(0).(map[string]string)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *MockContainerHandler) String() string {
|
||||||
|
args := self.Called()
|
||||||
|
return args.Get(0).(string)
|
||||||
|
}
|
||||||
|
|
||||||
type FactoryForMockContainerHandler struct {
|
type FactoryForMockContainerHandler struct {
|
||||||
Name string
|
Name string
|
||||||
PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler)
|
PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler)
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/google/cadvisor/container/libcontainer"
|
"github.com/google/cadvisor/container/libcontainer"
|
||||||
"github.com/google/cadvisor/fs"
|
"github.com/google/cadvisor/fs"
|
||||||
info "github.com/google/cadvisor/info/v1"
|
info "github.com/google/cadvisor/info/v1"
|
||||||
|
watch "github.com/google/cadvisor/manager/watcher"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
@ -90,6 +91,6 @@ func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, igno
|
|||||||
watcher: watcher,
|
watcher: watcher,
|
||||||
ignoreMetrics: ignoreMetrics,
|
ignoreMetrics: ignoreMetrics,
|
||||||
}
|
}
|
||||||
container.RegisterContainerHandlerFactory(factory)
|
container.RegisterContainerHandlerFactory(factory, []watch.ContainerWatchSource{watch.Raw})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -17,9 +17,6 @@ package raw
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"path"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/google/cadvisor/container"
|
"github.com/google/cadvisor/container"
|
||||||
"github.com/google/cadvisor/container/common"
|
"github.com/google/cadvisor/container/common"
|
||||||
@ -32,7 +29,6 @@ import (
|
|||||||
"github.com/opencontainers/runc/libcontainer/cgroups"
|
"github.com/opencontainers/runc/libcontainer/cgroups"
|
||||||
cgroupfs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
|
cgroupfs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
|
||||||
"github.com/opencontainers/runc/libcontainer/configs"
|
"github.com/opencontainers/runc/libcontainer/configs"
|
||||||
"golang.org/x/exp/inotify"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type rawContainerHandler struct {
|
type rawContainerHandler struct {
|
||||||
@ -41,12 +37,6 @@ type rawContainerHandler struct {
|
|||||||
cgroupSubsystems *libcontainer.CgroupSubsystems
|
cgroupSubsystems *libcontainer.CgroupSubsystems
|
||||||
machineInfoFactory info.MachineInfoFactory
|
machineInfoFactory info.MachineInfoFactory
|
||||||
|
|
||||||
// Inotify event watcher.
|
|
||||||
watcher *common.InotifyWatcher
|
|
||||||
|
|
||||||
// Signal for watcher thread to stop.
|
|
||||||
stopWatcher chan error
|
|
||||||
|
|
||||||
// Absolute path to the cgroup hierarchies of this container.
|
// Absolute path to the cgroup hierarchies of this container.
|
||||||
// (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test")
|
// (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test")
|
||||||
cgroupPaths map[string]string
|
cgroupPaths map[string]string
|
||||||
@ -102,12 +92,10 @@ func newRawContainerHandler(name string, cgroupSubsystems *libcontainer.CgroupSu
|
|||||||
name: name,
|
name: name,
|
||||||
cgroupSubsystems: cgroupSubsystems,
|
cgroupSubsystems: cgroupSubsystems,
|
||||||
machineInfoFactory: machineInfoFactory,
|
machineInfoFactory: machineInfoFactory,
|
||||||
stopWatcher: make(chan error),
|
|
||||||
cgroupPaths: cgroupPaths,
|
cgroupPaths: cgroupPaths,
|
||||||
cgroupManager: cgroupManager,
|
cgroupManager: cgroupManager,
|
||||||
fsInfo: fsInfo,
|
fsInfo: fsInfo,
|
||||||
externalMounts: externalMounts,
|
externalMounts: externalMounts,
|
||||||
watcher: watcher,
|
|
||||||
rootFs: rootFs,
|
rootFs: rootFs,
|
||||||
ignoreMetrics: ignoreMetrics,
|
ignoreMetrics: ignoreMetrics,
|
||||||
pid: pid,
|
pid: pid,
|
||||||
@ -276,151 +264,6 @@ func (self *rawContainerHandler) ListProcesses(listType container.ListType) ([]i
|
|||||||
return libcontainer.GetProcesses(self.cgroupManager)
|
return libcontainer.GetProcesses(self.cgroupManager)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watches the specified directory and all subdirectories. Returns whether the path was
|
|
||||||
// already being watched and an error (if any).
|
|
||||||
func (self *rawContainerHandler) watchDirectory(dir string, containerName string) (bool, error) {
|
|
||||||
alreadyWatching, err := self.watcher.AddWatch(containerName, dir)
|
|
||||||
if err != nil {
|
|
||||||
return alreadyWatching, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove the watch if further operations failed.
|
|
||||||
cleanup := true
|
|
||||||
defer func() {
|
|
||||||
if cleanup {
|
|
||||||
_, err := self.watcher.RemoveWatch(containerName, dir)
|
|
||||||
if err != nil {
|
|
||||||
glog.Warningf("Failed to remove inotify watch for %q: %v", dir, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// TODO(vmarmol): We should re-do this once we're done to ensure directories were not added in the meantime.
|
|
||||||
// Watch subdirectories as well.
|
|
||||||
entries, err := ioutil.ReadDir(dir)
|
|
||||||
if err != nil {
|
|
||||||
return alreadyWatching, err
|
|
||||||
}
|
|
||||||
for _, entry := range entries {
|
|
||||||
if entry.IsDir() {
|
|
||||||
// TODO(vmarmol): We don't have to fail here, maybe we can recover and try to get as many registrations as we can.
|
|
||||||
_, err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name()))
|
|
||||||
if err != nil {
|
|
||||||
return alreadyWatching, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup = false
|
|
||||||
return alreadyWatching, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *rawContainerHandler) processEvent(event *inotify.Event, events chan container.SubcontainerEvent) error {
|
|
||||||
// Convert the inotify event type to a container create or delete.
|
|
||||||
var eventType container.SubcontainerEventType
|
|
||||||
switch {
|
|
||||||
case (event.Mask & inotify.IN_CREATE) > 0:
|
|
||||||
eventType = container.SubcontainerAdd
|
|
||||||
case (event.Mask & inotify.IN_DELETE) > 0:
|
|
||||||
eventType = container.SubcontainerDelete
|
|
||||||
case (event.Mask & inotify.IN_MOVED_FROM) > 0:
|
|
||||||
eventType = container.SubcontainerDelete
|
|
||||||
case (event.Mask & inotify.IN_MOVED_TO) > 0:
|
|
||||||
eventType = container.SubcontainerAdd
|
|
||||||
default:
|
|
||||||
// Ignore other events.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Derive the container name from the path name.
|
|
||||||
var containerName string
|
|
||||||
for _, mount := range self.cgroupSubsystems.Mounts {
|
|
||||||
mountLocation := path.Clean(mount.Mountpoint) + "/"
|
|
||||||
if strings.HasPrefix(event.Name, mountLocation) {
|
|
||||||
containerName = event.Name[len(mountLocation)-1:]
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if containerName == "" {
|
|
||||||
return fmt.Errorf("unable to detect container from watch event on directory %q", event.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Maintain the watch for the new or deleted container.
|
|
||||||
switch {
|
|
||||||
case eventType == container.SubcontainerAdd:
|
|
||||||
// New container was created, watch it.
|
|
||||||
alreadyWatched, err := self.watchDirectory(event.Name, containerName)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only report container creation once.
|
|
||||||
if alreadyWatched {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
case eventType == container.SubcontainerDelete:
|
|
||||||
// Container was deleted, stop watching for it.
|
|
||||||
lastWatched, err := self.watcher.RemoveWatch(containerName, event.Name)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only report container deletion once.
|
|
||||||
if !lastWatched {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("unknown event type %v", eventType)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deliver the event.
|
|
||||||
events <- container.SubcontainerEvent{
|
|
||||||
EventType: eventType,
|
|
||||||
Name: containerName,
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *rawContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error {
|
|
||||||
// Watch this container (all its cgroups) and all subdirectories.
|
|
||||||
for _, cgroupPath := range self.cgroupPaths {
|
|
||||||
_, err := self.watchDirectory(cgroupPath, self.name)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process the events received from the kernel.
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-self.watcher.Event():
|
|
||||||
err := self.processEvent(event, events)
|
|
||||||
if err != nil {
|
|
||||||
glog.Warningf("Error while processing event (%+v): %v", event, err)
|
|
||||||
}
|
|
||||||
case err := <-self.watcher.Error():
|
|
||||||
glog.Warningf("Error while watching %q:", self.name, err)
|
|
||||||
case <-self.stopWatcher:
|
|
||||||
err := self.watcher.Close()
|
|
||||||
if err == nil {
|
|
||||||
self.stopWatcher <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *rawContainerHandler) StopWatchingSubcontainers() error {
|
|
||||||
// Rendezvous with the watcher thread.
|
|
||||||
self.stopWatcher <- nil
|
|
||||||
return <-self.stopWatcher
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *rawContainerHandler) Exists() bool {
|
func (self *rawContainerHandler) Exists() bool {
|
||||||
return common.CgroupExists(self.cgroupPaths)
|
return common.CgroupExists(self.cgroupPaths)
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/google/cadvisor/container/libcontainer"
|
"github.com/google/cadvisor/container/libcontainer"
|
||||||
"github.com/google/cadvisor/fs"
|
"github.com/google/cadvisor/fs"
|
||||||
info "github.com/google/cadvisor/info/v1"
|
info "github.com/google/cadvisor/info/v1"
|
||||||
|
"github.com/google/cadvisor/manager/watcher"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
@ -99,6 +100,6 @@ func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, igno
|
|||||||
ignoreMetrics: ignoreMetrics,
|
ignoreMetrics: ignoreMetrics,
|
||||||
rktPath: rktPath,
|
rktPath: rktPath,
|
||||||
}
|
}
|
||||||
container.RegisterContainerHandlerFactory(factory)
|
container.RegisterContainerHandlerFactory(factory, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -266,15 +266,6 @@ func (handler *rktContainerHandler) ListProcesses(listType container.ListType) (
|
|||||||
return libcontainer.GetProcesses(handler.cgroupManager)
|
return libcontainer.GetProcesses(handler.cgroupManager)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (handler *rktContainerHandler) WatchSubcontainers(events chan container.SubcontainerEvent) error {
|
|
||||||
return fmt.Errorf("watch is unimplemented in the Rkt container driver")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (handler *rktContainerHandler) StopWatchingSubcontainers() error {
|
|
||||||
// No-op for Rkt driver.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (handler *rktContainerHandler) Exists() bool {
|
func (handler *rktContainerHandler) Exists() bool {
|
||||||
return common.CgroupExists(handler.cgroupPaths)
|
return common.CgroupExists(handler.cgroupPaths)
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"github.com/google/cadvisor/container"
|
"github.com/google/cadvisor/container"
|
||||||
"github.com/google/cadvisor/fs"
|
"github.com/google/cadvisor/fs"
|
||||||
info "github.com/google/cadvisor/info/v1"
|
info "github.com/google/cadvisor/info/v1"
|
||||||
|
"github.com/google/cadvisor/manager/watcher"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
@ -52,6 +53,6 @@ func (f *systemdFactory) DebugInfo() map[string][]string {
|
|||||||
func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, ignoreMetrics container.MetricSet) error {
|
func Register(machineInfoFactory info.MachineInfoFactory, fsInfo fs.FsInfo, ignoreMetrics container.MetricSet) error {
|
||||||
glog.Infof("Registering systemd factory")
|
glog.Infof("Registering systemd factory")
|
||||||
factory := &systemdFactory{}
|
factory := &systemdFactory{}
|
||||||
container.RegisterContainerHandlerFactory(factory)
|
container.RegisterContainerHandlerFactory(factory, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,8 @@ import (
|
|||||||
info "github.com/google/cadvisor/info/v1"
|
info "github.com/google/cadvisor/info/v1"
|
||||||
"github.com/google/cadvisor/info/v2"
|
"github.com/google/cadvisor/info/v2"
|
||||||
"github.com/google/cadvisor/machine"
|
"github.com/google/cadvisor/machine"
|
||||||
|
"github.com/google/cadvisor/manager/watcher"
|
||||||
|
rawwatcher "github.com/google/cadvisor/manager/watcher/raw"
|
||||||
"github.com/google/cadvisor/utils/cpuload"
|
"github.com/google/cadvisor/utils/cpuload"
|
||||||
"github.com/google/cadvisor/utils/oomparser"
|
"github.com/google/cadvisor/utils/oomparser"
|
||||||
"github.com/google/cadvisor/utils/sysfs"
|
"github.com/google/cadvisor/utils/sysfs"
|
||||||
@ -163,6 +165,9 @@ func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingIn
|
|||||||
inHostNamespace = true
|
inHostNamespace = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register for new subcontainers.
|
||||||
|
eventsChannel := make(chan watcher.ContainerEvent, 16)
|
||||||
|
|
||||||
newManager := &manager{
|
newManager := &manager{
|
||||||
containers: make(map[namespacedContainerName]*containerData),
|
containers: make(map[namespacedContainerName]*containerData),
|
||||||
quitChannels: make([]chan error, 0, 2),
|
quitChannels: make([]chan error, 0, 2),
|
||||||
@ -174,6 +179,8 @@ func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingIn
|
|||||||
maxHousekeepingInterval: maxHousekeepingInterval,
|
maxHousekeepingInterval: maxHousekeepingInterval,
|
||||||
allowDynamicHousekeeping: allowDynamicHousekeeping,
|
allowDynamicHousekeeping: allowDynamicHousekeeping,
|
||||||
ignoreMetrics: ignoreMetricsSet,
|
ignoreMetrics: ignoreMetricsSet,
|
||||||
|
containerWatchers: []watcher.ContainerWatcher{},
|
||||||
|
eventsChannel: eventsChannel,
|
||||||
}
|
}
|
||||||
|
|
||||||
machineInfo, err := machine.Info(sysfs, fsInfo, inHostNamespace)
|
machineInfo, err := machine.Info(sysfs, fsInfo, inHostNamespace)
|
||||||
@ -217,6 +224,8 @@ type manager struct {
|
|||||||
maxHousekeepingInterval time.Duration
|
maxHousekeepingInterval time.Duration
|
||||||
allowDynamicHousekeeping bool
|
allowDynamicHousekeeping bool
|
||||||
ignoreMetrics container.MetricSet
|
ignoreMetrics container.MetricSet
|
||||||
|
containerWatchers []watcher.ContainerWatcher
|
||||||
|
eventsChannel chan watcher.ContainerEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the container manager.
|
// Start the container manager.
|
||||||
@ -241,6 +250,12 @@ func (self *manager) Start() error {
|
|||||||
glog.Errorf("Registration of the raw container factory failed: %v", err)
|
glog.Errorf("Registration of the raw container factory failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rawWatcher, err := rawwatcher.NewRawContainerWatcher()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
self.containerWatchers = append(self.containerWatchers, rawWatcher)
|
||||||
|
|
||||||
if *enableLoadReader {
|
if *enableLoadReader {
|
||||||
// Create cpu load reader.
|
// Create cpu load reader.
|
||||||
cpuLoadReader, err := cpuload.New()
|
cpuLoadReader, err := cpuload.New()
|
||||||
@ -269,7 +284,7 @@ func (self *manager) Start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create root and then recover all containers.
|
// Create root and then recover all containers.
|
||||||
err = self.createContainer("/")
|
err = self.createContainer("/", watcher.Raw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -769,10 +784,14 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a container.
|
// Create a container.
|
||||||
func (m *manager) createContainer(containerName string) error {
|
func (m *manager) createContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
|
||||||
m.containersLock.Lock()
|
m.containersLock.Lock()
|
||||||
defer m.containersLock.Unlock()
|
defer m.containersLock.Unlock()
|
||||||
|
|
||||||
|
return m.createContainerLocked(containerName, watchSource)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error {
|
||||||
namespacedName := namespacedContainerName{
|
namespacedName := namespacedContainerName{
|
||||||
Name: containerName,
|
Name: containerName,
|
||||||
}
|
}
|
||||||
@ -782,7 +801,7 @@ func (m *manager) createContainer(containerName string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
handler, accept, err := container.NewContainerHandler(containerName, m.inHostNamespace)
|
handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.inHostNamespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -849,6 +868,10 @@ func (m *manager) destroyContainer(containerName string) error {
|
|||||||
m.containersLock.Lock()
|
m.containersLock.Lock()
|
||||||
defer m.containersLock.Unlock()
|
defer m.containersLock.Unlock()
|
||||||
|
|
||||||
|
return m.destroyContainerLocked(containerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *manager) destroyContainerLocked(containerName string) error {
|
||||||
namespacedName := namespacedContainerName{
|
namespacedName := namespacedContainerName{
|
||||||
Name: containerName,
|
Name: containerName,
|
||||||
}
|
}
|
||||||
@ -946,7 +969,7 @@ func (m *manager) detectSubcontainers(containerName string) error {
|
|||||||
|
|
||||||
// Add the new containers.
|
// Add the new containers.
|
||||||
for _, cont := range added {
|
for _, cont := range added {
|
||||||
err = m.createContainer(cont.Name)
|
err = m.createContainer(cont.Name, watcher.Raw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to create existing container: %s: %s", cont.Name, err)
|
glog.Errorf("Failed to create existing container: %s: %s", cont.Name, err)
|
||||||
}
|
}
|
||||||
@ -965,28 +988,15 @@ func (m *manager) detectSubcontainers(containerName string) error {
|
|||||||
|
|
||||||
// Watches for new containers started in the system. Runs forever unless there is a setup error.
|
// Watches for new containers started in the system. Runs forever unless there is a setup error.
|
||||||
func (self *manager) watchForNewContainers(quit chan error) error {
|
func (self *manager) watchForNewContainers(quit chan error) error {
|
||||||
var root *containerData
|
for _, watcher := range self.containerWatchers {
|
||||||
var ok bool
|
err := watcher.Start(self.eventsChannel)
|
||||||
func() {
|
|
||||||
self.containersLock.RLock()
|
|
||||||
defer self.containersLock.RUnlock()
|
|
||||||
root, ok = self.containers[namespacedContainerName{
|
|
||||||
Name: "/",
|
|
||||||
}]
|
|
||||||
}()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("root container does not exist when watching for new containers")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register for new subcontainers.
|
|
||||||
eventsChannel := make(chan container.SubcontainerEvent, 16)
|
|
||||||
err := root.handler.WatchSubcontainers(eventsChannel)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// There is a race between starting the watch and new container creation so we do a detection before we read new containers.
|
// There is a race between starting the watch and new container creation so we do a detection before we read new containers.
|
||||||
err = self.detectSubcontainers("/")
|
err := self.detectSubcontainers("/")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -995,21 +1005,32 @@ func (self *manager) watchForNewContainers(quit chan error) error {
|
|||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event := <-eventsChannel:
|
case event := <-self.eventsChannel:
|
||||||
switch {
|
switch {
|
||||||
case event.EventType == container.SubcontainerAdd:
|
case event.EventType == watcher.ContainerAdd:
|
||||||
err = self.createContainer(event.Name)
|
err = self.createContainer(event.Name, event.WatchSource)
|
||||||
case event.EventType == container.SubcontainerDelete:
|
case event.EventType == watcher.ContainerDelete:
|
||||||
err = self.destroyContainer(event.Name)
|
err = self.destroyContainer(event.Name)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("Failed to process watch event %+v: %v", event, err)
|
glog.Warningf("Failed to process watch event %+v: %v", event, err)
|
||||||
}
|
}
|
||||||
case <-quit:
|
case <-quit:
|
||||||
|
errors := []string{}
|
||||||
|
|
||||||
// Stop processing events if asked to quit.
|
// Stop processing events if asked to quit.
|
||||||
err := root.handler.StopWatchingSubcontainers()
|
for _, watcher := range self.containerWatchers {
|
||||||
quit <- err
|
err := watcher.Stop()
|
||||||
if err == nil {
|
if err != nil {
|
||||||
|
errors = append(errors, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errors) > 0 {
|
||||||
|
err_str := strings.Join(errors, ", ")
|
||||||
|
quit <- fmt.Errorf("Error quiting watchers: %v", err_str)
|
||||||
|
} else {
|
||||||
|
quit <- nil
|
||||||
glog.Infof("Exiting thread watching subcontainers")
|
glog.Infof("Exiting thread watching subcontainers")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
214
manager/watcher/raw/raw.go
Normal file
214
manager/watcher/raw/raw.go
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
// Copyright 2014 Google Inc. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// Package container defines types for sub-container events and also
|
||||||
|
// defines an interface for container operation handlers.
|
||||||
|
package raw
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/google/cadvisor/container/common"
|
||||||
|
"github.com/google/cadvisor/container/libcontainer"
|
||||||
|
"github.com/google/cadvisor/manager/watcher"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"golang.org/x/exp/inotify"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rawContainerWatcher struct {
|
||||||
|
// Absolute path to the root of the cgroup hierarchies
|
||||||
|
cgroupPaths map[string]string
|
||||||
|
|
||||||
|
cgroupSubsystems *libcontainer.CgroupSubsystems
|
||||||
|
|
||||||
|
// Inotify event watcher.
|
||||||
|
watcher *common.InotifyWatcher
|
||||||
|
|
||||||
|
// Signal for watcher thread to stop.
|
||||||
|
stopWatcher chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRawContainerWatcher() (watcher.ContainerWatcher, error) {
|
||||||
|
cgroupSubsystems, err := libcontainer.GetCgroupSubsystems()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get cgroup subsystems: %v", err)
|
||||||
|
}
|
||||||
|
if len(cgroupSubsystems.Mounts) == 0 {
|
||||||
|
return nil, fmt.Errorf("failed to find supported cgroup mounts for the raw factory")
|
||||||
|
}
|
||||||
|
|
||||||
|
watcher, err := common.NewInotifyWatcher()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rawWatcher := &rawContainerWatcher{
|
||||||
|
cgroupPaths: common.MakeCgroupPaths(cgroupSubsystems.MountPoints, "/"),
|
||||||
|
cgroupSubsystems: &cgroupSubsystems,
|
||||||
|
watcher: watcher,
|
||||||
|
stopWatcher: make(chan error),
|
||||||
|
}
|
||||||
|
|
||||||
|
return rawWatcher, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *rawContainerWatcher) Start(events chan watcher.ContainerEvent) error {
|
||||||
|
// Watch this container (all its cgroups) and all subdirectories.
|
||||||
|
for _, cgroupPath := range self.cgroupPaths {
|
||||||
|
_, err := self.watchDirectory(cgroupPath, "/")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process the events received from the kernel.
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event := <-self.watcher.Event():
|
||||||
|
err := self.processEvent(event, events)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Error while processing event (%+v): %v", event, err)
|
||||||
|
}
|
||||||
|
case err := <-self.watcher.Error():
|
||||||
|
glog.Warningf("Error while watching %q:", "/", err)
|
||||||
|
case <-self.stopWatcher:
|
||||||
|
err := self.watcher.Close()
|
||||||
|
if err == nil {
|
||||||
|
self.stopWatcher <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *rawContainerWatcher) Stop() error {
|
||||||
|
// Rendezvous with the watcher thread.
|
||||||
|
self.stopWatcher <- nil
|
||||||
|
return <-self.stopWatcher
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watches the specified directory and all subdirectories. Returns whether the path was
|
||||||
|
// already being watched and an error (if any).
|
||||||
|
func (self *rawContainerWatcher) watchDirectory(dir string, containerName string) (bool, error) {
|
||||||
|
alreadyWatching, err := self.watcher.AddWatch(containerName, dir)
|
||||||
|
if err != nil {
|
||||||
|
return alreadyWatching, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the watch if further operations failed.
|
||||||
|
cleanup := true
|
||||||
|
defer func() {
|
||||||
|
if cleanup {
|
||||||
|
_, err := self.watcher.RemoveWatch(containerName, dir)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Failed to remove inotify watch for %q: %v", dir, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// TODO(vmarmol): We should re-do this once we're done to ensure directories were not added in the meantime.
|
||||||
|
// Watch subdirectories as well.
|
||||||
|
entries, err := ioutil.ReadDir(dir)
|
||||||
|
if err != nil {
|
||||||
|
return alreadyWatching, err
|
||||||
|
}
|
||||||
|
for _, entry := range entries {
|
||||||
|
if entry.IsDir() {
|
||||||
|
// TODO(vmarmol): We don't have to fail here, maybe we can recover and try to get as many registrations as we can.
|
||||||
|
_, err = self.watchDirectory(path.Join(dir, entry.Name()), path.Join(containerName, entry.Name()))
|
||||||
|
if err != nil {
|
||||||
|
return alreadyWatching, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup = false
|
||||||
|
return alreadyWatching, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *rawContainerWatcher) processEvent(event *inotify.Event, events chan watcher.ContainerEvent) error {
|
||||||
|
// Convert the inotify event type to a container create or delete.
|
||||||
|
var eventType watcher.ContainerEventType
|
||||||
|
switch {
|
||||||
|
case (event.Mask & inotify.IN_CREATE) > 0:
|
||||||
|
eventType = watcher.ContainerAdd
|
||||||
|
case (event.Mask & inotify.IN_DELETE) > 0:
|
||||||
|
eventType = watcher.ContainerDelete
|
||||||
|
case (event.Mask & inotify.IN_MOVED_FROM) > 0:
|
||||||
|
eventType = watcher.ContainerDelete
|
||||||
|
case (event.Mask & inotify.IN_MOVED_TO) > 0:
|
||||||
|
eventType = watcher.ContainerAdd
|
||||||
|
default:
|
||||||
|
// Ignore other events.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Derive the container name from the path name.
|
||||||
|
var containerName string
|
||||||
|
for _, mount := range self.cgroupSubsystems.Mounts {
|
||||||
|
mountLocation := path.Clean(mount.Mountpoint) + "/"
|
||||||
|
if strings.HasPrefix(event.Name, mountLocation) {
|
||||||
|
containerName = event.Name[len(mountLocation)-1:]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if containerName == "" {
|
||||||
|
return fmt.Errorf("unable to detect container from watch event on directory %q", event.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Maintain the watch for the new or deleted container.
|
||||||
|
switch eventType {
|
||||||
|
case watcher.ContainerAdd:
|
||||||
|
// New container was created, watch it.
|
||||||
|
alreadyWatched, err := self.watchDirectory(event.Name, containerName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only report container creation once.
|
||||||
|
if alreadyWatched {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case watcher.ContainerDelete:
|
||||||
|
// Container was deleted, stop watching for it.
|
||||||
|
lastWatched, err := self.watcher.RemoveWatch(containerName, event.Name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only report container deletion once.
|
||||||
|
if !lastWatched {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown event type %v", eventType)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deliver the event.
|
||||||
|
events <- watcher.ContainerEvent{
|
||||||
|
EventType: eventType,
|
||||||
|
Name: containerName,
|
||||||
|
WatchSource: watcher.Raw,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
51
manager/watcher/watcher.go
Normal file
51
manager/watcher/watcher.go
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// Package container defines types for sub-container events and also
|
||||||
|
// defines an interface for container operation handlers.
|
||||||
|
package watcher
|
||||||
|
|
||||||
|
// SubcontainerEventType indicates an addition or deletion event.
|
||||||
|
type ContainerEventType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
ContainerAdd ContainerEventType = iota
|
||||||
|
ContainerDelete
|
||||||
|
)
|
||||||
|
|
||||||
|
type ContainerWatchSource int
|
||||||
|
|
||||||
|
const (
|
||||||
|
Raw ContainerWatchSource = iota
|
||||||
|
)
|
||||||
|
|
||||||
|
// ContainerEvent represents a
|
||||||
|
type ContainerEvent struct {
|
||||||
|
// The type of event that occurred.
|
||||||
|
EventType ContainerEventType
|
||||||
|
|
||||||
|
// The full container name of the container where the event occurred.
|
||||||
|
Name string
|
||||||
|
|
||||||
|
// The watcher that detected this change event
|
||||||
|
WatchSource ContainerWatchSource
|
||||||
|
}
|
||||||
|
|
||||||
|
type ContainerWatcher interface {
|
||||||
|
// Registers a channel to listen for events affecting subcontainers (recursively).
|
||||||
|
Start(events chan ContainerEvent) error
|
||||||
|
|
||||||
|
// Stops watching for subcontainer changes.
|
||||||
|
Stop() error
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user