diff --git a/Makefile b/Makefile index 6760bcfa..8c2e25d7 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ test: @echo ">> running tests" @$(GO) test -short -race $(pkgs) -test-integration: test +test-integration: build test @./build/integration.sh format: diff --git a/api/versions.go b/api/versions.go index b567861e..a4b39cda 100644 --- a/api/versions.go +++ b/api/versions.go @@ -467,7 +467,7 @@ func (self *version2_1) Version() string { } func (self *version2_1) SupportedRequestTypes() []string { - return self.baseVersion.SupportedRequestTypes() + return append([]string{machineStatsApi}, self.baseVersion.SupportedRequestTypes()...) } func (self *version2_1) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error { @@ -492,9 +492,16 @@ func (self *version2_1) HandleRequest(requestType string, request []string, m ma if err != nil { return err } - contStats := make(map[string][]*v2.ContainerStats, len(conts)) + contStats := make(map[string]v2.ContainerInfo, len(conts)) for name, cont := range conts { - contStats[name] = v2.ContainerStatsFromV1(&cont.Spec, cont.Stats) + if name == "/" { + // Root cgroup stats should be exposed as machine stats + continue + } + contStats[name] = v2.ContainerInfo{ + Spec: v2.ContainerSpecFromV1(&cont.Spec, cont.Aliases, cont.Namespace), + Stats: v2.ContainerStatsFromV1(&cont.Spec, cont.Stats), + } } return writeResult(contStats, w) default: diff --git a/client/v2/client.go b/client/v2/client.go index f3ea6008..b36a6405 100644 --- a/client/v2/client.go +++ b/client/v2/client.go @@ -21,7 +21,9 @@ import ( "fmt" "io/ioutil" "net/http" + "net/url" "path" + "strconv" "strings" v1 "github.com/google/cadvisor/info/v1" @@ -85,6 +87,23 @@ func (self *Client) Attributes() (attr *v2.Attributes, err error) { return } +// Stats returns stats for the requested container. +func (self *Client) Stats(name string, request *v2.RequestOptions) (map[string]v2.ContainerInfo, error) { + u := self.statsUrl(name) + ret := make(map[string]v2.ContainerInfo) + data := url.Values{ + "type": []string{request.IdType}, + "count": []string{strconv.Itoa(request.Count)}, + "recursive": []string{strconv.FormatBool(request.Recursive)}, + } + + u = fmt.Sprintf("%s?%s", u, data.Encode()) + if err := self.httpGetJsonData(&ret, nil, u, "stats"); err != nil { + return nil, err + } + return ret, nil +} + func (self *Client) machineInfoUrl() string { return self.baseUrl + path.Join("machine") } @@ -101,7 +120,11 @@ func (self *Client) attributesUrl() string { return self.baseUrl + path.Join("attributes") } -func (self *Client) httpGetResponse(postData interface{}, url, infoName string) ([]byte, error) { +func (self *Client) statsUrl(name string) string { + return path.Join(self.baseUrl, "stats", name) +} + +func (self *Client) httpGetResponse(postData interface{}, urlPath, infoName string) ([]byte, error) { var resp *http.Response var err error @@ -110,24 +133,24 @@ func (self *Client) httpGetResponse(postData interface{}, url, infoName string) if marshalErr != nil { return nil, fmt.Errorf("unable to marshal data: %v", marshalErr) } - resp, err = http.Post(url, "application/json", bytes.NewBuffer(data)) + resp, err = http.Post(urlPath, "application/json", bytes.NewBuffer(data)) } else { - resp, err = http.Get(url) + resp, err = http.Get(urlPath) } if err != nil { - return nil, fmt.Errorf("unable to post %q to %q: %v", infoName, url, err) + return nil, fmt.Errorf("unable to post %q to %q: %v", infoName, urlPath, err) } if resp == nil { - return nil, fmt.Errorf("received empty response for %q from %q", infoName, url) + return nil, fmt.Errorf("received empty response for %q from %q", infoName, urlPath) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - err = fmt.Errorf("unable to read all %q from %q: %v", infoName, url, err) + err = fmt.Errorf("unable to read all %q from %q: %v", infoName, urlPath, err) return nil, err } if resp.StatusCode != 200 { - return nil, fmt.Errorf("request %q failed with error: %q", url, strings.TrimSpace(string(body))) + return nil, fmt.Errorf("request %q failed with error: %q", urlPath, strings.TrimSpace(string(body))) } return body, nil } diff --git a/container/docker/factory.go b/container/docker/factory.go index 34ed4b70..a39a2799 100644 --- a/container/docker/factory.go +++ b/container/docker/factory.go @@ -116,6 +116,8 @@ type dockerFactory struct { // Information about mounted filesystems. fsInfo fs.FsInfo + + dockerVersion []int } func (self *dockerFactory) String() string { @@ -140,6 +142,7 @@ func (self *dockerFactory) NewContainerHandler(name string, inHostNamespace bool &self.cgroupSubsystems, inHostNamespace, metadataEnvs, + self.dockerVersion, ) return } @@ -253,20 +256,21 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo) error { if err != nil { return fmt.Errorf("unable to communicate with docker daemon: %v", err) } + var dockerVersion []int if version, err := client.Version(); err != nil { return fmt.Errorf("unable to communicate with docker daemon: %v", err) } else { expected_version := []int{1, 0, 0} version_string := version.Get("Version") - version, err := parseDockerVersion(version_string) + dockerVersion, err = parseDockerVersion(version_string) if err != nil { return fmt.Errorf("couldn't parse docker version: %v", err) } - for index, number := range version { + for index, number := range dockerVersion { if number > expected_version[index] { break } else if number < expected_version[index] { - return fmt.Errorf("cAdvisor requires docker version %v or above but we have found version %v reported as \"%v\"", expected_version, version, version_string) + return fmt.Errorf("cAdvisor requires docker version %v or above but we have found version %v reported as \"%v\"", expected_version, dockerVersion, version_string) } } } @@ -289,7 +293,8 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo) error { storageDir, err := getStorageDir(information) if err != nil { - return err + glog.V(2).Infof("failed to detect storage directory from docker. Defaulting to using the value in --docker_root: %q", err, *dockerRootDir) + storageDir = path.Join(*dockerRootDir, sd) } cgroupSubsystems, err := libcontainer.GetCgroupSubsystems() if err != nil { @@ -298,12 +303,13 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo) error { glog.Infof("Registering Docker factory") f := &dockerFactory{ - machineInfoFactory: factory, + cgroupSubsystems: cgroupSubsystems, client: client, + dockerVersion: dockerVersion, + fsInfo: fsInfo, + machineInfoFactory: factory, storageDriver: storageDriver(sd), storageDriverDir: storageDir, - cgroupSubsystems: cgroupSubsystems, - fsInfo: fsInfo, } container.RegisterContainerHandlerFactory(f) return nil diff --git a/container/docker/handler.go b/container/docker/handler.go index 61f2a571..4ee718c3 100644 --- a/container/docker/handler.go +++ b/container/docker/handler.go @@ -17,6 +17,7 @@ package docker import ( "fmt" + "io/ioutil" "math" "path" "strings" @@ -82,6 +83,24 @@ type dockerContainerHandler struct { fsHandler fsHandler } +func getRwLayerID(containerID, storageDriverDir string, dockerVersion []int) (string, error) { + const ( + // Docker version >=1.10.0 have a randomized ID for the root fs of a container. + randomizedRWLayerMinorVersion = 10 + // Directory where the file containinig the randomized ID of the root fs of a container is stored in versions >= 1.10.0 + rwLayerIDDir = "../image/aufs/layerdb/mounts/" + rwLayerIDFile = "mount-id" + ) + if (dockerVersion[0] <= 1) && (dockerVersion[1] < randomizedRWLayerMinorVersion) { + return containerID, nil + } + bytes, err := ioutil.ReadFile(path.Join(storageDriverDir, rwLayerIDDir, containerID, rwLayerIDFile)) + if err != nil { + return "", fmt.Errorf("failed to identify the read-write layer ID for container %q. - %v", containerID, err) + } + return string(bytes), err +} + func newDockerContainerHandler( client *docker.Client, name string, @@ -92,6 +111,7 @@ func newDockerContainerHandler( cgroupSubsystems *containerlibcontainer.CgroupSubsystems, inHostNamespace bool, metadataEnvs []string, + dockerVersion []int, ) (container.ContainerHandler, error) { // Create the cgroup paths. cgroupPaths := make(map[string]string, len(cgroupSubsystems.MountPoints)) @@ -116,12 +136,17 @@ func newDockerContainerHandler( // Add the Containers dir where the log files are stored. otherStorageDir := path.Join(path.Dir(storageDriverDir), pathToContainersDir, id) + + rwLayerID, err := getRwLayerID(id, storageDriverDir, dockerVersion) + if err != nil { + return nil, err + } var rootfsStorageDir string switch storageDriver { case aufsStorageDriver: - rootfsStorageDir = path.Join(storageDriverDir, aufsRWLayer, id) + rootfsStorageDir = path.Join(storageDriverDir, aufsRWLayer, rwLayerID) case overlayStorageDriver: - rootfsStorageDir = path.Join(storageDriverDir, id) + rootfsStorageDir = path.Join(storageDriverDir, rwLayerID) } handler := &dockerContainerHandler{ diff --git a/container/docker/handler_test.go b/container/docker/handler_test.go new file mode 100644 index 00000000..2cec4b28 --- /dev/null +++ b/container/docker/handler_test.go @@ -0,0 +1,50 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Handler for Docker containers. +package docker + +import ( + "io/ioutil" + "os" + "path" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStorageDirDetectionWithOldVersions(t *testing.T) { + as := assert.New(t) + rwLayer, err := getRwLayerID("abcd", "/", []int{1, 9, 0}) + as.Nil(err) + as.Equal(rwLayer, "abcd") +} + +func TestStorageDirDetectionWithNewVersions(t *testing.T) { + as := assert.New(t) + testDir, err := ioutil.TempDir("", "") + as.Nil(err) + containerID := "abcd" + randomizedID := "xyz" + randomIDPath := path.Join(testDir, "image/aufs/layerdb/mounts/", containerID) + as.Nil(os.MkdirAll(randomIDPath, os.ModePerm)) + as.Nil(ioutil.WriteFile(path.Join(randomIDPath, "mount-id"), []byte(randomizedID), os.ModePerm)) + rwLayer, err := getRwLayerID(containerID, path.Join(testDir, "aufs"), []int{1, 10, 0}) + as.Nil(err) + as.Equal(rwLayer, randomizedID) + rwLayer, err = getRwLayerID(containerID, path.Join(testDir, "aufs"), []int{1, 10, 0}) + as.Nil(err) + as.Equal(rwLayer, randomizedID) + +} diff --git a/info/v2/conversion.go b/info/v2/conversion.go index fe241733..56aef777 100644 --- a/info/v2/conversion.go +++ b/info/v2/conversion.go @@ -123,7 +123,7 @@ func ContainerStatsFromV1(spec *v1.ContainerSpec, stats []*v1.ContainerStats) [] } } else if len(val.Filesystem) > 1 { // Cannot handle multiple devices per container. - glog.Errorf("failed to handle multiple devices for container. Skipping Filesystem stats") + glog.V(2).Infof("failed to handle multiple devices for container. Skipping Filesystem stats") } } if spec.HasDiskIo { diff --git a/integration/framework/framework.go b/integration/framework/framework.go index 32501ca1..6df2dc17 100644 --- a/integration/framework/framework.go +++ b/integration/framework/framework.go @@ -98,6 +98,13 @@ func New(t *testing.T) Framework { return fm } +const ( + Aufs string = "aufs" + Overlay string = "overlay" + DeviceMapper string = "devicemapper" + Unknown string = "" +) + type DockerActions interface { // Run the no-op pause Docker container and return its ID. RunPause() string @@ -113,6 +120,9 @@ type DockerActions interface { // -> docker run busybox ping www.google.com Run(args DockerRunArgs, cmd ...string) string RunStress(args DockerRunArgs, cmd ...string) string + + Version() []string + StorageDriver() string } type ShellActions interface { @@ -255,6 +265,44 @@ func (self dockerActions) Run(args DockerRunArgs, cmd ...string) string { return containerId } +func (self dockerActions) Version() []string { + dockerCommand := []string{"docker", "version", "-f", "'{{.Server.Version}}'"} + output, _ := self.fm.Shell().Run("sudo", dockerCommand...) + output = strings.TrimSpace(output) + ret := strings.Split(output, ".") + if len(ret) != 3 { + self.fm.T().Fatalf("invalid version %v", output) + } + return ret +} + +func (self dockerActions) StorageDriver() string { + dockerCommand := []string{"docker", "info"} + output, _ := self.fm.Shell().Run("sudo", dockerCommand...) + if len(output) < 1 { + self.fm.T().Fatalf("failed to find docker storage driver - %v", output) + } + for _, line := range strings.Split(output, "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "Storage Driver: ") { + idx := strings.LastIndex(line, ": ") + 2 + driver := line[idx:] + switch driver { + case Aufs: + return Aufs + case Overlay: + return Overlay + case DeviceMapper: + return DeviceMapper + default: + return Unknown + } + } + } + self.fm.T().Fatalf("failed to find docker storage driver from info - %v", output) + return Unknown +} + func (self dockerActions) RunStress(args DockerRunArgs, cmd ...string) string { dockerCommand := append(append(append(append([]string{"docker", "run", "-m=4M", "-d", "-t", "-i"}, args.Args...), args.Image), args.InnerArgs...), cmd...) diff --git a/integration/runner/runner.go b/integration/runner/runner.go index 708f17be..00177823 100644 --- a/integration/runner/runner.go +++ b/integration/runner/runner.go @@ -181,7 +181,7 @@ func PushAndRunTests(host, testDir string) error { if err != nil { return fmt.Errorf("error reading local log file: %v", err) } - glog.Errorf("%v", string(logs)) + glog.Errorf("----------------------\nLogs from Host: %q\n%v\n", host, string(logs)) err = fmt.Errorf("error on host %s: %v\n%+v", host, err, attributes) } return err diff --git a/integration/tests/api/docker_test.go b/integration/tests/api/docker_test.go index 609130c3..0a0ff9bf 100644 --- a/integration/tests/api/docker_test.go +++ b/integration/tests/api/docker_test.go @@ -22,6 +22,7 @@ import ( "time" info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/integration/framework" "github.com/stretchr/testify/assert" @@ -36,6 +37,14 @@ func sanityCheck(alias string, containerInfo info.ContainerInfo, t *testing.T) { assert.NotEmpty(t, containerInfo.Stats, "Expected container to have stats") } +// Sanity check the container by: +// - Checking that the specified alias is a valid one for this container. +// - Verifying that stats are not empty. +func sanityCheckV2(alias string, info v2.ContainerInfo, t *testing.T) { + assert.Contains(t, info.Spec.Aliases, alias, "Alias %q should be in list of aliases %v", alias, info.Spec.Aliases) + assert.NotEmpty(t, info.Stats, "Expected container to have stats") +} + // Waits up to 5s for a container with the specified alias to appear. func waitForContainer(alias string, fm framework.Framework) { err := framework.RetryForDuration(func() error { @@ -54,6 +63,12 @@ func waitForContainer(alias string, fm framework.Framework) { require.NoError(fm.T(), err, "Timed out waiting for container %q to be available in cAdvisor: %v", alias, err) } +func getDockerMinorVersion(fm framework.Framework) int { + val, err := strconv.Atoi(fm.Docker().Version()[1]) + assert.Nil(fm.T(), err) + return val +} + // A Docker container in /docker/ func TestDockerContainerById(t *testing.T) { fm := framework.New(t) @@ -172,11 +187,15 @@ func TestDockerContainerSpec(t *testing.T) { cpuShares := uint64(2048) cpuMask := "0" memoryLimit := uint64(1 << 30) // 1GB + cpusetArg := "--cpuset" + if getDockerMinorVersion(fm) >= 10 { + cpusetArg = "--cpuset-cpus" + } containerId := fm.Docker().Run(framework.DockerRunArgs{ Image: "kubernetes/pause", Args: []string{ "--cpu-shares", strconv.FormatUint(cpuShares, 10), - "--cpuset", cpuMask, + cpusetArg, cpuMask, "--memory", strconv.FormatUint(memoryLimit, 10), }, }) @@ -253,6 +272,7 @@ func TestDockerContainerNetworkStats(t *testing.T) { containerId := fm.Docker().RunBusybox("watch", "-n1", "wget", "https://www.google.com/") waitForContainer(containerId, fm) + time.Sleep(10 * time.Second) request := &info.ContainerInfoRequest{ NumStats: 1, } @@ -270,3 +290,37 @@ func TestDockerContainerNetworkStats(t *testing.T) { assert.NotEqual(stat.Network.RxBytes, stat.Network.TxBytes, "Network tx and rx bytes should not be equal") assert.NotEqual(stat.Network.RxPackets, stat.Network.TxPackets, "Network tx and rx packets should not be equal") } + +func TestDockerFilesystemStats(t *testing.T) { + fm := framework.New(t) + defer fm.Cleanup() + + storageDriver := fm.Docker().StorageDriver() + switch storageDriver { + case framework.Aufs: + case framework.Overlay: + default: + t.Skip("skipping filesystem stats test") + } + // Wait for the container to show up. + containerId := fm.Docker().RunBusybox("/bin/sh", "-c", "dd if=/dev/zero of=/file count=1 bs=1M & ping www.google.com") + waitForContainer(containerId, fm) + time.Sleep(time.Minute) + request := &v2.RequestOptions{ + IdType: v2.TypeDocker, + Count: 1, + } + containerInfo, err := fm.Cadvisor().ClientV2().Stats(containerId, request) + time.Sleep(time.Minute) + require.NoError(t, err) + require.True(t, len(containerInfo) == 1) + var info v2.ContainerInfo + for _, cInfo := range containerInfo { + info = cInfo + } + sanityCheckV2(containerId, info, t) + require.NotNil(t, info.Stats[0].Filesystem.BaseUsageBytes) + assert.True(t, *info.Stats[0].Filesystem.BaseUsageBytes > (1<<6), "expected base fs usage to be greater than 1MB") + require.NotNil(t, info.Stats[0].Filesystem.TotalUsageBytes) + assert.True(t, *info.Stats[0].Filesystem.TotalUsageBytes > (1<<6), "expected total fs usage to be greater than 1MB") +}