From cbdd96a5546b3fca2ad24c4d12512afd349379d3 Mon Sep 17 00:00:00 2001 From: Rohit Jnagal Date: Fri, 16 Jan 2015 22:27:40 +0000 Subject: [PATCH] Add task load stats to containers. The stats are only populated when cAdvisor is running outside network namespaces. We'll add a different backend to retrieve the same data from within namespaces. --- container/container.go | 3 +++ container/docker/handler.go | 8 ++++++++ container/mock.go | 5 +++++ container/raw/handler.go | 8 ++++++++ info/container.go | 21 +++++++++++++++++++++ manager/container.go | 16 +++++++++++++++- manager/container_test.go | 2 +- manager/manager.go | 19 +++++++++++++++++-- manager/manager_test.go | 2 +- utils/cpuload/cpuload.go | 27 +++++---------------------- utils/cpuload/netlink.go | 12 +++++++----- 11 files changed, 91 insertions(+), 32 deletions(-) diff --git a/container/container.go b/container/container.go index dab0fa21..492430ff 100644 --- a/container/container.go +++ b/container/container.go @@ -70,6 +70,9 @@ type ContainerHandler interface { // Stops watching for subcontainer changes. StopWatchingSubcontainers() error + // Returns absolute cgroup path for the requested resource. + GetCgroupPath(resource string) (string, error) + // Returns whether the container still exists. Exists() bool } diff --git a/container/docker/handler.go b/container/docker/handler.go index 435f7253..f09af734 100644 --- a/container/docker/handler.go +++ b/container/docker/handler.go @@ -341,6 +341,14 @@ func (self *dockerContainerHandler) ListContainers(listType container.ListType) return ret, nil } +func (self *dockerContainerHandler) GetCgroupPath(resource string) (string, error) { + path, ok := self.cgroupPaths[resource] + if !ok { + return "", fmt.Errorf("could not find path for resource %q for container %q\n", resource, self.name) + } + return path, nil +} + func (self *dockerContainerHandler) ListThreads(listType container.ListType) ([]int, error) { return nil, nil } diff --git a/container/mock.go b/container/mock.go index c3308d77..24816d00 100644 --- a/container/mock.go +++ b/container/mock.go @@ -90,6 +90,11 @@ func (self *MockContainerHandler) Exists() bool { return args.Get(0).(bool) } +func (self *MockContainerHandler) GetCgroupPath(path string) (string, error) { + args := self.Called(path) + return args.Get(0).(string), args.Error(1) +} + type FactoryForMockContainerHandler struct { Name string PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler) diff --git a/container/raw/handler.go b/container/raw/handler.go index 83bb314d..77ed73fe 100644 --- a/container/raw/handler.go +++ b/container/raw/handler.go @@ -322,6 +322,14 @@ func (self *rawContainerHandler) GetStats() (*info.ContainerStats, error) { return stats, nil } +func (self *rawContainerHandler) GetCgroupPath(resource string) (string, error) { + path, ok := self.cgroupPaths[resource] + if !ok { + return "", fmt.Errorf("could not find path for resource %q for container %q\n", resource, self.name) + } + return path, nil +} + // Lists all directories under "path" and outputs the results as children of "parent". func listDirectories(dirpath string, parent string, recursive bool, output map[string]struct{}) error { // Ignore if this hierarchy does not exist. diff --git a/info/container.go b/info/container.go index 5a4fbbd9..da66b2ed 100644 --- a/info/container.go +++ b/info/container.go @@ -157,6 +157,24 @@ func (self *ContainerInfo) StatsEndTime() time.Time { return ret } +// This mirrors kernel internal structure. +type LoadStats struct { + // Number of sleeping tasks. + NrSleeping uint64 `json:"nr_sleeping"` + + // Number of running tasks. + NrRunning uint64 `json:"nr_running"` + + // Number of tasks in stopped state + NrStopped uint64 `json:"nr_stopped"` + + // Number of tasks in uninterruptible state + NrUinterruptible uint64 `json:"nr_uninterruptible"` + + // Number of tasks waiting on IO + NrIoWait uint64 `json:"nr_io_wait"` +} + // All CPU usage metrics are cumulative from the creation of the container type CpuStats struct { Usage struct { @@ -310,6 +328,9 @@ type ContainerStats struct { // Filesystem statistics Filesystem []FsStats `json:"filesystem,omitempty"` + + // Task load stats + TaskStats LoadStats `json:"task_stats,omitempty"` } func timeEq(t1, t2 time.Time, tolerance time.Duration) bool { diff --git a/manager/container.go b/manager/container.go index 0cafbfef..98a46b7b 100644 --- a/manager/container.go +++ b/manager/container.go @@ -25,6 +25,7 @@ import ( "github.com/google/cadvisor/container" "github.com/google/cadvisor/info" "github.com/google/cadvisor/storage" + "github.com/google/cadvisor/utils/cpuload" ) // Housekeeping interval. @@ -43,6 +44,7 @@ type containerData struct { info containerInfo storageDriver storage.StorageDriver lock sync.Mutex + loadReader *cpuload.CpuLoadReader housekeepingInterval time.Duration lastUpdatedTime time.Time lastErrorTime time.Time @@ -91,7 +93,7 @@ func (c *containerData) GetInfo() (*containerInfo, error) { return &c.info, nil } -func newContainerData(containerName string, driver storage.StorageDriver, handler container.ContainerHandler, logUsage bool) (*containerData, error) { +func newContainerData(containerName string, driver storage.StorageDriver, handler container.ContainerHandler, loadReader *cpuload.CpuLoadReader, logUsage bool) (*containerData, error) { if driver == nil { return nil, fmt.Errorf("nil storage driver") } @@ -107,6 +109,7 @@ func newContainerData(containerName string, driver storage.StorageDriver, handle handler: handler, storageDriver: driver, housekeepingInterval: *HousekeepingInterval, + loadReader: loadReader, logUsage: logUsage, stop: make(chan bool, 1), } @@ -234,6 +237,17 @@ func (c *containerData) updateStats() error { if stats == nil { return nil } + if c.loadReader != nil { + path, err := c.handler.GetCgroupPath("cpu") + if err == nil { + loadStats, err := c.loadReader.GetCpuLoad(path) + if err != nil { + return fmt.Errorf("failed to get load stat for %q - path %q, error %s", c.info.Name, path, err) + } else { + stats.TaskStats = loadStats + } + } + } ref, err := c.handler.ContainerReference() if err != nil { // Ignore errors if the container is dead. diff --git a/manager/container_test.go b/manager/container_test.go index 09e62f69..b1b31d03 100644 --- a/manager/container_test.go +++ b/manager/container_test.go @@ -35,7 +35,7 @@ const containerName = "/container" func newTestContainerData(t *testing.T) (*containerData, *container.MockContainerHandler, *stest.MockStorageDriver) { mockHandler := container.NewMockContainerHandler(containerName) mockDriver := &stest.MockStorageDriver{} - ret, err := newContainerData(containerName, mockDriver, mockHandler, false) + ret, err := newContainerData(containerName, mockDriver, mockHandler, nil, false) if err != nil { t.Fatal(err) } diff --git a/manager/manager.go b/manager/manager.go index 36e7597c..b3ae5ac9 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -30,6 +30,7 @@ import ( "github.com/google/cadvisor/container/docker" "github.com/google/cadvisor/info" "github.com/google/cadvisor/storage" + "github.com/google/cadvisor/utils/cpuload" "github.com/google/cadvisor/utils/sysfs" ) @@ -119,12 +120,22 @@ type manager struct { quitChannels []chan error cadvisorContainer string dockerContainersRegexp *regexp.Regexp + loadReader *cpuload.CpuLoadReader } // Start the container manager. func (self *manager) Start() error { + // Create cpu load reader. + cpuLoadReader, err := cpuload.New() + if err != nil { + // TODO(rjnagal): Promote to warning once we support cpu load inside namespaces. + glog.Infof("could not initialize cpu load reader: %s", err) + } else { + self.loadReader = cpuLoadReader + } + // Create root and then recover all containers. - err := self.createContainer("/") + err = self.createContainer("/") if err != nil { return err } @@ -164,6 +175,10 @@ func (self *manager) Stop() error { } } self.quitChannels = make([]chan error, 0, 2) + if self.loadReader != nil { + self.loadReader.Close() + self.loadReader = nil + } return nil } @@ -357,7 +372,7 @@ func (m *manager) createContainer(containerName string) error { return err } logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer - cont, err := newContainerData(containerName, m.storageDriver, handler, logUsage) + cont, err := newContainerData(containerName, m.storageDriver, handler, m.loadReader, logUsage) if err != nil { return err } diff --git a/manager/manager_test.go b/manager/manager_test.go index 40d6c783..bbd6e1cb 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -50,7 +50,7 @@ func createManagerAndAddContainers( if ret, ok := mif.(*manager); ok { for _, name := range containers { mockHandler := container.NewMockContainerHandler(name) - cont, err := newContainerData(name, driver, mockHandler, false) + cont, err := newContainerData(name, driver, mockHandler, nil, false) if err != nil { t.Fatal(err) } diff --git a/utils/cpuload/cpuload.go b/utils/cpuload/cpuload.go index 29f79ca7..df9eabc7 100644 --- a/utils/cpuload/cpuload.go +++ b/utils/cpuload/cpuload.go @@ -19,6 +19,7 @@ import ( "os" "github.com/golang/glog" + "github.com/google/cadvisor/info" ) type CpuLoadReader struct { @@ -49,41 +50,23 @@ func (self *CpuLoadReader) Close() { } } -// This mirrors kernel internal structure. -type LoadStats struct { - // Number of sleeping tasks. - NrSleeping uint64 `json:"nr_sleeping"` - - // Number of running tasks. - NrRunning uint64 `json:"nr_running"` - - // Number of tasks in stopped state - NrStopped uint64 `json:"nr_stopped"` - - // Number of tasks in uninterruptible state - NrUinterruptible uint64 `json:"nr_uninterruptible"` - - // Number of tasks waiting on IO - NrIoWait uint64 `json:"nr_io_wait"` -} - // Returns instantaneous number of running tasks in a group. // Caller can use historical data to calculate cpu load. // path is an absolute filesystem path for a container under the CPU cgroup hierarchy. // NOTE: non-hierarchical load is returned. It does not include load for subcontainers. -func (self *CpuLoadReader) GetCpuLoad(path string) (LoadStats, error) { +func (self *CpuLoadReader) GetCpuLoad(path string) (info.LoadStats, error) { if len(path) == 0 { - return LoadStats{}, fmt.Errorf("cgroup path can not be empty!") + return info.LoadStats{}, fmt.Errorf("cgroup path can not be empty!") } cfd, err := os.Open(path) if err != nil { - return LoadStats{}, fmt.Errorf("failed to open cgroup path %s: %q", path, err) + return info.LoadStats{}, fmt.Errorf("failed to open cgroup path %s: %q", path, err) } stats, err := getLoadStats(self.familyId, cfd.Fd(), self.conn) if err != nil { - return LoadStats{}, err + return info.LoadStats{}, err } glog.V(1).Infof("Task stats for %q: %+v", path, stats) return stats, nil diff --git a/utils/cpuload/netlink.go b/utils/cpuload/netlink.go index 2d810449..d4655d41 100644 --- a/utils/cpuload/netlink.go +++ b/utils/cpuload/netlink.go @@ -19,6 +19,8 @@ import ( "encoding/binary" "fmt" "syscall" + + "github.com/google/cadvisor/info" ) const ( @@ -60,7 +62,7 @@ func (self netlinkMessage) toRawMsg() (rawmsg syscall.NetlinkMessage) { type loadStatsResp struct { Header syscall.NlMsghdr GenHeader genMsghdr - Stats LoadStats + Stats info.LoadStats } // Return required padding to align 'size' to 'alignment'. @@ -216,21 +218,21 @@ func verifyHeader(msg syscall.NetlinkMessage) error { // id: family id for taskstats. // fd: fd to path to the cgroup directory under cpu hierarchy. // conn: open netlink connection used to communicate with kernel. -func getLoadStats(id uint16, fd uintptr, conn *Connection) (LoadStats, error) { +func getLoadStats(id uint16, fd uintptr, conn *Connection) (info.LoadStats, error) { msg := prepareCmdMessage(id, fd) err := conn.WriteMessage(msg.toRawMsg()) if err != nil { - return LoadStats{}, err + return info.LoadStats{}, err } resp, err := conn.ReadMessage() if err != nil { - return LoadStats{}, err + return info.LoadStats{}, err } parsedmsg, err := parseLoadStatsResp(resp) if err != nil { - return LoadStats{}, err + return info.LoadStats{}, err } return parsedmsg.Stats, nil }