Merge pull request #445 from rjnagal/docker

Add task load stats to containers.
This commit is contained in:
Victor Marmol 2015-01-16 16:00:50 -08:00
commit 909fa133e1
11 changed files with 91 additions and 32 deletions

View File

@ -70,6 +70,9 @@ type ContainerHandler interface {
// Stops watching for subcontainer changes.
StopWatchingSubcontainers() error
// Returns absolute cgroup path for the requested resource.
GetCgroupPath(resource string) (string, error)
// Returns whether the container still exists.
Exists() bool
}

View File

@ -341,6 +341,14 @@ func (self *dockerContainerHandler) ListContainers(listType container.ListType)
return ret, nil
}
func (self *dockerContainerHandler) GetCgroupPath(resource string) (string, error) {
path, ok := self.cgroupPaths[resource]
if !ok {
return "", fmt.Errorf("could not find path for resource %q for container %q\n", resource, self.name)
}
return path, nil
}
func (self *dockerContainerHandler) ListThreads(listType container.ListType) ([]int, error) {
return nil, nil
}

View File

@ -90,6 +90,11 @@ func (self *MockContainerHandler) Exists() bool {
return args.Get(0).(bool)
}
func (self *MockContainerHandler) GetCgroupPath(path string) (string, error) {
args := self.Called(path)
return args.Get(0).(string), args.Error(1)
}
type FactoryForMockContainerHandler struct {
Name string
PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler)

View File

@ -322,6 +322,14 @@ func (self *rawContainerHandler) GetStats() (*info.ContainerStats, error) {
return stats, nil
}
func (self *rawContainerHandler) GetCgroupPath(resource string) (string, error) {
path, ok := self.cgroupPaths[resource]
if !ok {
return "", fmt.Errorf("could not find path for resource %q for container %q\n", resource, self.name)
}
return path, nil
}
// Lists all directories under "path" and outputs the results as children of "parent".
func listDirectories(dirpath string, parent string, recursive bool, output map[string]struct{}) error {
// Ignore if this hierarchy does not exist.

View File

@ -157,6 +157,24 @@ func (self *ContainerInfo) StatsEndTime() time.Time {
return ret
}
// This mirrors kernel internal structure.
type LoadStats struct {
// Number of sleeping tasks.
NrSleeping uint64 `json:"nr_sleeping"`
// Number of running tasks.
NrRunning uint64 `json:"nr_running"`
// Number of tasks in stopped state
NrStopped uint64 `json:"nr_stopped"`
// Number of tasks in uninterruptible state
NrUinterruptible uint64 `json:"nr_uninterruptible"`
// Number of tasks waiting on IO
NrIoWait uint64 `json:"nr_io_wait"`
}
// All CPU usage metrics are cumulative from the creation of the container
type CpuStats struct {
Usage struct {
@ -310,6 +328,9 @@ type ContainerStats struct {
// Filesystem statistics
Filesystem []FsStats `json:"filesystem,omitempty"`
// Task load stats
TaskStats LoadStats `json:"task_stats,omitempty"`
}
func timeEq(t1, t2 time.Time, tolerance time.Duration) bool {

View File

@ -25,6 +25,7 @@ import (
"github.com/google/cadvisor/container"
"github.com/google/cadvisor/info"
"github.com/google/cadvisor/storage"
"github.com/google/cadvisor/utils/cpuload"
)
// Housekeeping interval.
@ -43,6 +44,7 @@ type containerData struct {
info containerInfo
storageDriver storage.StorageDriver
lock sync.Mutex
loadReader *cpuload.CpuLoadReader
housekeepingInterval time.Duration
lastUpdatedTime time.Time
lastErrorTime time.Time
@ -91,7 +93,7 @@ func (c *containerData) GetInfo() (*containerInfo, error) {
return &c.info, nil
}
func newContainerData(containerName string, driver storage.StorageDriver, handler container.ContainerHandler, logUsage bool) (*containerData, error) {
func newContainerData(containerName string, driver storage.StorageDriver, handler container.ContainerHandler, loadReader *cpuload.CpuLoadReader, logUsage bool) (*containerData, error) {
if driver == nil {
return nil, fmt.Errorf("nil storage driver")
}
@ -107,6 +109,7 @@ func newContainerData(containerName string, driver storage.StorageDriver, handle
handler: handler,
storageDriver: driver,
housekeepingInterval: *HousekeepingInterval,
loadReader: loadReader,
logUsage: logUsage,
stop: make(chan bool, 1),
}
@ -234,6 +237,17 @@ func (c *containerData) updateStats() error {
if stats == nil {
return nil
}
if c.loadReader != nil {
path, err := c.handler.GetCgroupPath("cpu")
if err == nil {
loadStats, err := c.loadReader.GetCpuLoad(path)
if err != nil {
return fmt.Errorf("failed to get load stat for %q - path %q, error %s", c.info.Name, path, err)
} else {
stats.TaskStats = loadStats
}
}
}
ref, err := c.handler.ContainerReference()
if err != nil {
// Ignore errors if the container is dead.

View File

@ -35,7 +35,7 @@ const containerName = "/container"
func newTestContainerData(t *testing.T) (*containerData, *container.MockContainerHandler, *stest.MockStorageDriver) {
mockHandler := container.NewMockContainerHandler(containerName)
mockDriver := &stest.MockStorageDriver{}
ret, err := newContainerData(containerName, mockDriver, mockHandler, false)
ret, err := newContainerData(containerName, mockDriver, mockHandler, nil, false)
if err != nil {
t.Fatal(err)
}

View File

@ -30,6 +30,7 @@ import (
"github.com/google/cadvisor/container/docker"
"github.com/google/cadvisor/info"
"github.com/google/cadvisor/storage"
"github.com/google/cadvisor/utils/cpuload"
"github.com/google/cadvisor/utils/sysfs"
)
@ -119,12 +120,22 @@ type manager struct {
quitChannels []chan error
cadvisorContainer string
dockerContainersRegexp *regexp.Regexp
loadReader *cpuload.CpuLoadReader
}
// Start the container manager.
func (self *manager) Start() error {
// Create cpu load reader.
cpuLoadReader, err := cpuload.New()
if err != nil {
// TODO(rjnagal): Promote to warning once we support cpu load inside namespaces.
glog.Infof("could not initialize cpu load reader: %s", err)
} else {
self.loadReader = cpuLoadReader
}
// Create root and then recover all containers.
err := self.createContainer("/")
err = self.createContainer("/")
if err != nil {
return err
}
@ -164,6 +175,10 @@ func (self *manager) Stop() error {
}
}
self.quitChannels = make([]chan error, 0, 2)
if self.loadReader != nil {
self.loadReader.Close()
self.loadReader = nil
}
return nil
}
@ -357,7 +372,7 @@ func (m *manager) createContainer(containerName string) error {
return err
}
logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
cont, err := newContainerData(containerName, m.storageDriver, handler, logUsage)
cont, err := newContainerData(containerName, m.storageDriver, handler, m.loadReader, logUsage)
if err != nil {
return err
}

View File

@ -50,7 +50,7 @@ func createManagerAndAddContainers(
if ret, ok := mif.(*manager); ok {
for _, name := range containers {
mockHandler := container.NewMockContainerHandler(name)
cont, err := newContainerData(name, driver, mockHandler, false)
cont, err := newContainerData(name, driver, mockHandler, nil, false)
if err != nil {
t.Fatal(err)
}

View File

@ -19,6 +19,7 @@ import (
"os"
"github.com/golang/glog"
"github.com/google/cadvisor/info"
)
type CpuLoadReader struct {
@ -49,41 +50,23 @@ func (self *CpuLoadReader) Close() {
}
}
// This mirrors kernel internal structure.
type LoadStats struct {
// Number of sleeping tasks.
NrSleeping uint64 `json:"nr_sleeping"`
// Number of running tasks.
NrRunning uint64 `json:"nr_running"`
// Number of tasks in stopped state
NrStopped uint64 `json:"nr_stopped"`
// Number of tasks in uninterruptible state
NrUinterruptible uint64 `json:"nr_uninterruptible"`
// Number of tasks waiting on IO
NrIoWait uint64 `json:"nr_io_wait"`
}
// Returns instantaneous number of running tasks in a group.
// Caller can use historical data to calculate cpu load.
// path is an absolute filesystem path for a container under the CPU cgroup hierarchy.
// NOTE: non-hierarchical load is returned. It does not include load for subcontainers.
func (self *CpuLoadReader) GetCpuLoad(path string) (LoadStats, error) {
func (self *CpuLoadReader) GetCpuLoad(path string) (info.LoadStats, error) {
if len(path) == 0 {
return LoadStats{}, fmt.Errorf("cgroup path can not be empty!")
return info.LoadStats{}, fmt.Errorf("cgroup path can not be empty!")
}
cfd, err := os.Open(path)
if err != nil {
return LoadStats{}, fmt.Errorf("failed to open cgroup path %s: %q", path, err)
return info.LoadStats{}, fmt.Errorf("failed to open cgroup path %s: %q", path, err)
}
stats, err := getLoadStats(self.familyId, cfd.Fd(), self.conn)
if err != nil {
return LoadStats{}, err
return info.LoadStats{}, err
}
glog.V(1).Infof("Task stats for %q: %+v", path, stats)
return stats, nil

View File

@ -19,6 +19,8 @@ import (
"encoding/binary"
"fmt"
"syscall"
"github.com/google/cadvisor/info"
)
const (
@ -60,7 +62,7 @@ func (self netlinkMessage) toRawMsg() (rawmsg syscall.NetlinkMessage) {
type loadStatsResp struct {
Header syscall.NlMsghdr
GenHeader genMsghdr
Stats LoadStats
Stats info.LoadStats
}
// Return required padding to align 'size' to 'alignment'.
@ -216,21 +218,21 @@ func verifyHeader(msg syscall.NetlinkMessage) error {
// id: family id for taskstats.
// fd: fd to path to the cgroup directory under cpu hierarchy.
// conn: open netlink connection used to communicate with kernel.
func getLoadStats(id uint16, fd uintptr, conn *Connection) (LoadStats, error) {
func getLoadStats(id uint16, fd uintptr, conn *Connection) (info.LoadStats, error) {
msg := prepareCmdMessage(id, fd)
err := conn.WriteMessage(msg.toRawMsg())
if err != nil {
return LoadStats{}, err
return info.LoadStats{}, err
}
resp, err := conn.ReadMessage()
if err != nil {
return LoadStats{}, err
return info.LoadStats{}, err
}
parsedmsg, err := parseLoadStatsResp(resp)
if err != nil {
return LoadStats{}, err
return info.LoadStats{}, err
}
return parsedmsg.Stats, nil
}