Merge pull request #1084 from vishh/docker-v1.10

Track fs usage for docker versions >= 1.10
This commit is contained in:
Vish Kannan 2016-02-03 18:13:55 -08:00
commit 05068c196a
10 changed files with 236 additions and 23 deletions

View File

@ -20,7 +20,7 @@ test:
@echo ">> running tests" @echo ">> running tests"
@$(GO) test -short -race $(pkgs) @$(GO) test -short -race $(pkgs)
test-integration: test test-integration: build test
@./build/integration.sh @./build/integration.sh
format: format:

View File

@ -467,7 +467,7 @@ func (self *version2_1) Version() string {
} }
func (self *version2_1) SupportedRequestTypes() []string { func (self *version2_1) SupportedRequestTypes() []string {
return self.baseVersion.SupportedRequestTypes() return append([]string{machineStatsApi}, self.baseVersion.SupportedRequestTypes()...)
} }
func (self *version2_1) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error { func (self *version2_1) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error {
@ -492,9 +492,16 @@ func (self *version2_1) HandleRequest(requestType string, request []string, m ma
if err != nil { if err != nil {
return err return err
} }
contStats := make(map[string][]*v2.ContainerStats, len(conts)) contStats := make(map[string]v2.ContainerInfo, len(conts))
for name, cont := range conts { for name, cont := range conts {
contStats[name] = v2.ContainerStatsFromV1(&cont.Spec, cont.Stats) if name == "/" {
// Root cgroup stats should be exposed as machine stats
continue
}
contStats[name] = v2.ContainerInfo{
Spec: v2.ContainerSpecFromV1(&cont.Spec, cont.Aliases, cont.Namespace),
Stats: v2.ContainerStatsFromV1(&cont.Spec, cont.Stats),
}
} }
return writeResult(contStats, w) return writeResult(contStats, w)
default: default:

View File

@ -21,7 +21,9 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url"
"path" "path"
"strconv"
"strings" "strings"
v1 "github.com/google/cadvisor/info/v1" v1 "github.com/google/cadvisor/info/v1"
@ -85,6 +87,23 @@ func (self *Client) Attributes() (attr *v2.Attributes, err error) {
return return
} }
// Stats returns stats for the requested container.
func (self *Client) Stats(name string, request *v2.RequestOptions) (map[string]v2.ContainerInfo, error) {
u := self.statsUrl(name)
ret := make(map[string]v2.ContainerInfo)
data := url.Values{
"type": []string{request.IdType},
"count": []string{strconv.Itoa(request.Count)},
"recursive": []string{strconv.FormatBool(request.Recursive)},
}
u = fmt.Sprintf("%s?%s", u, data.Encode())
if err := self.httpGetJsonData(&ret, nil, u, "stats"); err != nil {
return nil, err
}
return ret, nil
}
func (self *Client) machineInfoUrl() string { func (self *Client) machineInfoUrl() string {
return self.baseUrl + path.Join("machine") return self.baseUrl + path.Join("machine")
} }
@ -101,7 +120,11 @@ func (self *Client) attributesUrl() string {
return self.baseUrl + path.Join("attributes") return self.baseUrl + path.Join("attributes")
} }
func (self *Client) httpGetResponse(postData interface{}, url, infoName string) ([]byte, error) { func (self *Client) statsUrl(name string) string {
return path.Join(self.baseUrl, "stats", name)
}
func (self *Client) httpGetResponse(postData interface{}, urlPath, infoName string) ([]byte, error) {
var resp *http.Response var resp *http.Response
var err error var err error
@ -110,24 +133,24 @@ func (self *Client) httpGetResponse(postData interface{}, url, infoName string)
if marshalErr != nil { if marshalErr != nil {
return nil, fmt.Errorf("unable to marshal data: %v", marshalErr) return nil, fmt.Errorf("unable to marshal data: %v", marshalErr)
} }
resp, err = http.Post(url, "application/json", bytes.NewBuffer(data)) resp, err = http.Post(urlPath, "application/json", bytes.NewBuffer(data))
} else { } else {
resp, err = http.Get(url) resp, err = http.Get(urlPath)
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to post %q to %q: %v", infoName, url, err) return nil, fmt.Errorf("unable to post %q to %q: %v", infoName, urlPath, err)
} }
if resp == nil { if resp == nil {
return nil, fmt.Errorf("received empty response for %q from %q", infoName, url) return nil, fmt.Errorf("received empty response for %q from %q", infoName, urlPath)
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
err = fmt.Errorf("unable to read all %q from %q: %v", infoName, url, err) err = fmt.Errorf("unable to read all %q from %q: %v", infoName, urlPath, err)
return nil, err return nil, err
} }
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
return nil, fmt.Errorf("request %q failed with error: %q", url, strings.TrimSpace(string(body))) return nil, fmt.Errorf("request %q failed with error: %q", urlPath, strings.TrimSpace(string(body)))
} }
return body, nil return body, nil
} }

View File

@ -116,6 +116,8 @@ type dockerFactory struct {
// Information about mounted filesystems. // Information about mounted filesystems.
fsInfo fs.FsInfo fsInfo fs.FsInfo
dockerVersion []int
} }
func (self *dockerFactory) String() string { func (self *dockerFactory) String() string {
@ -140,6 +142,7 @@ func (self *dockerFactory) NewContainerHandler(name string, inHostNamespace bool
&self.cgroupSubsystems, &self.cgroupSubsystems,
inHostNamespace, inHostNamespace,
metadataEnvs, metadataEnvs,
self.dockerVersion,
) )
return return
} }
@ -253,20 +256,21 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo) error {
if err != nil { if err != nil {
return fmt.Errorf("unable to communicate with docker daemon: %v", err) return fmt.Errorf("unable to communicate with docker daemon: %v", err)
} }
var dockerVersion []int
if version, err := client.Version(); err != nil { if version, err := client.Version(); err != nil {
return fmt.Errorf("unable to communicate with docker daemon: %v", err) return fmt.Errorf("unable to communicate with docker daemon: %v", err)
} else { } else {
expected_version := []int{1, 0, 0} expected_version := []int{1, 0, 0}
version_string := version.Get("Version") version_string := version.Get("Version")
version, err := parseDockerVersion(version_string) dockerVersion, err = parseDockerVersion(version_string)
if err != nil { if err != nil {
return fmt.Errorf("couldn't parse docker version: %v", err) return fmt.Errorf("couldn't parse docker version: %v", err)
} }
for index, number := range version { for index, number := range dockerVersion {
if number > expected_version[index] { if number > expected_version[index] {
break break
} else if number < expected_version[index] { } else if number < expected_version[index] {
return fmt.Errorf("cAdvisor requires docker version %v or above but we have found version %v reported as \"%v\"", expected_version, version, version_string) return fmt.Errorf("cAdvisor requires docker version %v or above but we have found version %v reported as \"%v\"", expected_version, dockerVersion, version_string)
} }
} }
} }
@ -289,7 +293,8 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo) error {
storageDir, err := getStorageDir(information) storageDir, err := getStorageDir(information)
if err != nil { if err != nil {
return err glog.V(2).Infof("failed to detect storage directory from docker. Defaulting to using the value in --docker_root: %q", err, *dockerRootDir)
storageDir = path.Join(*dockerRootDir, sd)
} }
cgroupSubsystems, err := libcontainer.GetCgroupSubsystems() cgroupSubsystems, err := libcontainer.GetCgroupSubsystems()
if err != nil { if err != nil {
@ -298,12 +303,13 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo) error {
glog.Infof("Registering Docker factory") glog.Infof("Registering Docker factory")
f := &dockerFactory{ f := &dockerFactory{
machineInfoFactory: factory, cgroupSubsystems: cgroupSubsystems,
client: client, client: client,
dockerVersion: dockerVersion,
fsInfo: fsInfo,
machineInfoFactory: factory,
storageDriver: storageDriver(sd), storageDriver: storageDriver(sd),
storageDriverDir: storageDir, storageDriverDir: storageDir,
cgroupSubsystems: cgroupSubsystems,
fsInfo: fsInfo,
} }
container.RegisterContainerHandlerFactory(f) container.RegisterContainerHandlerFactory(f)
return nil return nil

View File

@ -17,6 +17,7 @@ package docker
import ( import (
"fmt" "fmt"
"io/ioutil"
"math" "math"
"path" "path"
"strings" "strings"
@ -82,6 +83,24 @@ type dockerContainerHandler struct {
fsHandler fsHandler fsHandler fsHandler
} }
func getRwLayerID(containerID, storageDriverDir string, dockerVersion []int) (string, error) {
const (
// Docker version >=1.10.0 have a randomized ID for the root fs of a container.
randomizedRWLayerMinorVersion = 10
// Directory where the file containinig the randomized ID of the root fs of a container is stored in versions >= 1.10.0
rwLayerIDDir = "../image/aufs/layerdb/mounts/"
rwLayerIDFile = "mount-id"
)
if (dockerVersion[0] <= 1) && (dockerVersion[1] < randomizedRWLayerMinorVersion) {
return containerID, nil
}
bytes, err := ioutil.ReadFile(path.Join(storageDriverDir, rwLayerIDDir, containerID, rwLayerIDFile))
if err != nil {
return "", fmt.Errorf("failed to identify the read-write layer ID for container %q. - %v", containerID, err)
}
return string(bytes), err
}
func newDockerContainerHandler( func newDockerContainerHandler(
client *docker.Client, client *docker.Client,
name string, name string,
@ -92,6 +111,7 @@ func newDockerContainerHandler(
cgroupSubsystems *containerlibcontainer.CgroupSubsystems, cgroupSubsystems *containerlibcontainer.CgroupSubsystems,
inHostNamespace bool, inHostNamespace bool,
metadataEnvs []string, metadataEnvs []string,
dockerVersion []int,
) (container.ContainerHandler, error) { ) (container.ContainerHandler, error) {
// Create the cgroup paths. // Create the cgroup paths.
cgroupPaths := make(map[string]string, len(cgroupSubsystems.MountPoints)) cgroupPaths := make(map[string]string, len(cgroupSubsystems.MountPoints))
@ -116,12 +136,17 @@ func newDockerContainerHandler(
// Add the Containers dir where the log files are stored. // Add the Containers dir where the log files are stored.
otherStorageDir := path.Join(path.Dir(storageDriverDir), pathToContainersDir, id) otherStorageDir := path.Join(path.Dir(storageDriverDir), pathToContainersDir, id)
rwLayerID, err := getRwLayerID(id, storageDriverDir, dockerVersion)
if err != nil {
return nil, err
}
var rootfsStorageDir string var rootfsStorageDir string
switch storageDriver { switch storageDriver {
case aufsStorageDriver: case aufsStorageDriver:
rootfsStorageDir = path.Join(storageDriverDir, aufsRWLayer, id) rootfsStorageDir = path.Join(storageDriverDir, aufsRWLayer, rwLayerID)
case overlayStorageDriver: case overlayStorageDriver:
rootfsStorageDir = path.Join(storageDriverDir, id) rootfsStorageDir = path.Join(storageDriverDir, rwLayerID)
} }
handler := &dockerContainerHandler{ handler := &dockerContainerHandler{

View File

@ -0,0 +1,50 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Handler for Docker containers.
package docker
import (
"io/ioutil"
"os"
"path"
"testing"
"github.com/stretchr/testify/assert"
)
func TestStorageDirDetectionWithOldVersions(t *testing.T) {
as := assert.New(t)
rwLayer, err := getRwLayerID("abcd", "/", []int{1, 9, 0})
as.Nil(err)
as.Equal(rwLayer, "abcd")
}
func TestStorageDirDetectionWithNewVersions(t *testing.T) {
as := assert.New(t)
testDir, err := ioutil.TempDir("", "")
as.Nil(err)
containerID := "abcd"
randomizedID := "xyz"
randomIDPath := path.Join(testDir, "image/aufs/layerdb/mounts/", containerID)
as.Nil(os.MkdirAll(randomIDPath, os.ModePerm))
as.Nil(ioutil.WriteFile(path.Join(randomIDPath, "mount-id"), []byte(randomizedID), os.ModePerm))
rwLayer, err := getRwLayerID(containerID, path.Join(testDir, "aufs"), []int{1, 10, 0})
as.Nil(err)
as.Equal(rwLayer, randomizedID)
rwLayer, err = getRwLayerID(containerID, path.Join(testDir, "aufs"), []int{1, 10, 0})
as.Nil(err)
as.Equal(rwLayer, randomizedID)
}

View File

@ -123,7 +123,7 @@ func ContainerStatsFromV1(spec *v1.ContainerSpec, stats []*v1.ContainerStats) []
} }
} else if len(val.Filesystem) > 1 { } else if len(val.Filesystem) > 1 {
// Cannot handle multiple devices per container. // Cannot handle multiple devices per container.
glog.Errorf("failed to handle multiple devices for container. Skipping Filesystem stats") glog.V(2).Infof("failed to handle multiple devices for container. Skipping Filesystem stats")
} }
} }
if spec.HasDiskIo { if spec.HasDiskIo {

View File

@ -98,6 +98,13 @@ func New(t *testing.T) Framework {
return fm return fm
} }
const (
Aufs string = "aufs"
Overlay string = "overlay"
DeviceMapper string = "devicemapper"
Unknown string = ""
)
type DockerActions interface { type DockerActions interface {
// Run the no-op pause Docker container and return its ID. // Run the no-op pause Docker container and return its ID.
RunPause() string RunPause() string
@ -113,6 +120,9 @@ type DockerActions interface {
// -> docker run busybox ping www.google.com // -> docker run busybox ping www.google.com
Run(args DockerRunArgs, cmd ...string) string Run(args DockerRunArgs, cmd ...string) string
RunStress(args DockerRunArgs, cmd ...string) string RunStress(args DockerRunArgs, cmd ...string) string
Version() []string
StorageDriver() string
} }
type ShellActions interface { type ShellActions interface {
@ -255,6 +265,44 @@ func (self dockerActions) Run(args DockerRunArgs, cmd ...string) string {
return containerId return containerId
} }
func (self dockerActions) Version() []string {
dockerCommand := []string{"docker", "version", "-f", "'{{.Server.Version}}'"}
output, _ := self.fm.Shell().Run("sudo", dockerCommand...)
output = strings.TrimSpace(output)
ret := strings.Split(output, ".")
if len(ret) != 3 {
self.fm.T().Fatalf("invalid version %v", output)
}
return ret
}
func (self dockerActions) StorageDriver() string {
dockerCommand := []string{"docker", "info"}
output, _ := self.fm.Shell().Run("sudo", dockerCommand...)
if len(output) < 1 {
self.fm.T().Fatalf("failed to find docker storage driver - %v", output)
}
for _, line := range strings.Split(output, "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "Storage Driver: ") {
idx := strings.LastIndex(line, ": ") + 2
driver := line[idx:]
switch driver {
case Aufs:
return Aufs
case Overlay:
return Overlay
case DeviceMapper:
return DeviceMapper
default:
return Unknown
}
}
}
self.fm.T().Fatalf("failed to find docker storage driver from info - %v", output)
return Unknown
}
func (self dockerActions) RunStress(args DockerRunArgs, cmd ...string) string { func (self dockerActions) RunStress(args DockerRunArgs, cmd ...string) string {
dockerCommand := append(append(append(append([]string{"docker", "run", "-m=4M", "-d", "-t", "-i"}, args.Args...), args.Image), args.InnerArgs...), cmd...) dockerCommand := append(append(append(append([]string{"docker", "run", "-m=4M", "-d", "-t", "-i"}, args.Args...), args.Image), args.InnerArgs...), cmd...)

View File

@ -181,7 +181,7 @@ func PushAndRunTests(host, testDir string) error {
if err != nil { if err != nil {
return fmt.Errorf("error reading local log file: %v", err) return fmt.Errorf("error reading local log file: %v", err)
} }
glog.Errorf("%v", string(logs)) glog.Errorf("----------------------\nLogs from Host: %q\n%v\n", host, string(logs))
err = fmt.Errorf("error on host %s: %v\n%+v", host, err, attributes) err = fmt.Errorf("error on host %s: %v\n%+v", host, err, attributes)
} }
return err return err

View File

@ -22,6 +22,7 @@ import (
"time" "time"
info "github.com/google/cadvisor/info/v1" info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/info/v2"
"github.com/google/cadvisor/integration/framework" "github.com/google/cadvisor/integration/framework"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -36,6 +37,14 @@ func sanityCheck(alias string, containerInfo info.ContainerInfo, t *testing.T) {
assert.NotEmpty(t, containerInfo.Stats, "Expected container to have stats") assert.NotEmpty(t, containerInfo.Stats, "Expected container to have stats")
} }
// Sanity check the container by:
// - Checking that the specified alias is a valid one for this container.
// - Verifying that stats are not empty.
func sanityCheckV2(alias string, info v2.ContainerInfo, t *testing.T) {
assert.Contains(t, info.Spec.Aliases, alias, "Alias %q should be in list of aliases %v", alias, info.Spec.Aliases)
assert.NotEmpty(t, info.Stats, "Expected container to have stats")
}
// Waits up to 5s for a container with the specified alias to appear. // Waits up to 5s for a container with the specified alias to appear.
func waitForContainer(alias string, fm framework.Framework) { func waitForContainer(alias string, fm framework.Framework) {
err := framework.RetryForDuration(func() error { err := framework.RetryForDuration(func() error {
@ -54,6 +63,12 @@ func waitForContainer(alias string, fm framework.Framework) {
require.NoError(fm.T(), err, "Timed out waiting for container %q to be available in cAdvisor: %v", alias, err) require.NoError(fm.T(), err, "Timed out waiting for container %q to be available in cAdvisor: %v", alias, err)
} }
func getDockerMinorVersion(fm framework.Framework) int {
val, err := strconv.Atoi(fm.Docker().Version()[1])
assert.Nil(fm.T(), err)
return val
}
// A Docker container in /docker/<ID> // A Docker container in /docker/<ID>
func TestDockerContainerById(t *testing.T) { func TestDockerContainerById(t *testing.T) {
fm := framework.New(t) fm := framework.New(t)
@ -172,11 +187,15 @@ func TestDockerContainerSpec(t *testing.T) {
cpuShares := uint64(2048) cpuShares := uint64(2048)
cpuMask := "0" cpuMask := "0"
memoryLimit := uint64(1 << 30) // 1GB memoryLimit := uint64(1 << 30) // 1GB
cpusetArg := "--cpuset"
if getDockerMinorVersion(fm) >= 10 {
cpusetArg = "--cpuset-cpus"
}
containerId := fm.Docker().Run(framework.DockerRunArgs{ containerId := fm.Docker().Run(framework.DockerRunArgs{
Image: "kubernetes/pause", Image: "kubernetes/pause",
Args: []string{ Args: []string{
"--cpu-shares", strconv.FormatUint(cpuShares, 10), "--cpu-shares", strconv.FormatUint(cpuShares, 10),
"--cpuset", cpuMask, cpusetArg, cpuMask,
"--memory", strconv.FormatUint(memoryLimit, 10), "--memory", strconv.FormatUint(memoryLimit, 10),
}, },
}) })
@ -253,6 +272,7 @@ func TestDockerContainerNetworkStats(t *testing.T) {
containerId := fm.Docker().RunBusybox("watch", "-n1", "wget", "https://www.google.com/") containerId := fm.Docker().RunBusybox("watch", "-n1", "wget", "https://www.google.com/")
waitForContainer(containerId, fm) waitForContainer(containerId, fm)
time.Sleep(10 * time.Second)
request := &info.ContainerInfoRequest{ request := &info.ContainerInfoRequest{
NumStats: 1, NumStats: 1,
} }
@ -270,3 +290,37 @@ func TestDockerContainerNetworkStats(t *testing.T) {
assert.NotEqual(stat.Network.RxBytes, stat.Network.TxBytes, "Network tx and rx bytes should not be equal") assert.NotEqual(stat.Network.RxBytes, stat.Network.TxBytes, "Network tx and rx bytes should not be equal")
assert.NotEqual(stat.Network.RxPackets, stat.Network.TxPackets, "Network tx and rx packets should not be equal") assert.NotEqual(stat.Network.RxPackets, stat.Network.TxPackets, "Network tx and rx packets should not be equal")
} }
func TestDockerFilesystemStats(t *testing.T) {
fm := framework.New(t)
defer fm.Cleanup()
storageDriver := fm.Docker().StorageDriver()
switch storageDriver {
case framework.Aufs:
case framework.Overlay:
default:
t.Skip("skipping filesystem stats test")
}
// Wait for the container to show up.
containerId := fm.Docker().RunBusybox("/bin/sh", "-c", "dd if=/dev/zero of=/file count=1 bs=1M & ping www.google.com")
waitForContainer(containerId, fm)
time.Sleep(time.Minute)
request := &v2.RequestOptions{
IdType: v2.TypeDocker,
Count: 1,
}
containerInfo, err := fm.Cadvisor().ClientV2().Stats(containerId, request)
time.Sleep(time.Minute)
require.NoError(t, err)
require.True(t, len(containerInfo) == 1)
var info v2.ContainerInfo
for _, cInfo := range containerInfo {
info = cInfo
}
sanityCheckV2(containerId, info, t)
require.NotNil(t, info.Stats[0].Filesystem.BaseUsageBytes)
assert.True(t, *info.Stats[0].Filesystem.BaseUsageBytes > (1<<6), "expected base fs usage to be greater than 1MB")
require.NotNil(t, info.Stats[0].Filesystem.TotalUsageBytes)
assert.True(t, *info.Stats[0].Filesystem.TotalUsageBytes > (1<<6), "expected total fs usage to be greater than 1MB")
}