Merge pull request #96 from vishh/network_stats

Bug fixes and introduces network stats.
This commit is contained in:
Victor Marmol 2014-07-22 13:01:42 -07:00
commit 03def6fff0
11 changed files with 198 additions and 93 deletions

View File

@ -83,11 +83,14 @@ func main() {
}
})
go containerManager.Start()
go func() {
log.Fatal(containerManager.Start())
}()
log.Printf("Starting cAdvisor version: %q", info.VERSION)
log.Print("About to serve on port ", *argPort)
addr := fmt.Sprintf(":%v", *argPort)
log.Fatal(http.ListenAndServe(addr, nil))
}

View File

@ -35,6 +35,8 @@ type dockerFactory struct {
// Whether this system is using systemd.
useSystemd bool
client *docker.Client
}
func (self *dockerFactory) String() string {
@ -56,12 +58,34 @@ func (self *dockerFactory) NewContainerHandler(name string) (handler container.C
}
// Docker handles all containers under /docker
// TODO(vishh): Change the CanHandle interface to be able to return errors.
func (self *dockerFactory) CanHandle(name string) bool {
// In systemd systems the containers are: /docker-{ID}
if self.useSystemd {
return strings.HasPrefix(name, "/docker-")
if !strings.HasPrefix(name, "/docker-") {
return false
}
} else if name == "/" {
return false
} else if name == "/docker" {
// We need the docker driver to handle /docker. Otherwise the aggregation at the API level will break.
return true
} else if !strings.HasPrefix(name, "/docker/") {
return false
}
return strings.HasPrefix(name, "/docker/")
// Check if the container is known to docker and it is active.
_, id, err := splitName(name)
if err != nil {
return false
}
ctnr, err := self.client.InspectContainer(id)
// We assume that if Inspect fails then the container is not known to docker.
// TODO(vishh): Detect lxc containers and avoid handling them.
if err != nil || !ctnr.State.Running {
return false
}
return true
}
func parseDockerVersion(full_version_string string) ([]int, error) {
@ -109,6 +133,7 @@ func Register(factory info.MachineInfoFactory) error {
f := &dockerFactory{
machineInfoFactory: factory,
useSystemd: systemd.UseSystemd(),
client: client,
}
log.Printf("Registering Docker factory")
container.RegisterContainerHandlerFactory(f)

View File

@ -17,25 +17,32 @@ package docker
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/docker/libcontainer"
"github.com/docker/libcontainer/cgroups"
"github.com/fsouza/go-dockerclient"
"github.com/google/cadvisor/container"
containerLibcontainer "github.com/google/cadvisor/container/libcontainer"
"github.com/google/cadvisor/info"
"github.com/google/cadvisor/utils"
)
// Basepath to all container specific information that libcontainer stores.
const dockerRootDir = "/var/lib/docker/execdriver/native"
var fileNotFound = errors.New("file not found")
type dockerContainerHandler struct {
client *docker.Client
name string
parent string
ID string
aliases []string
machineInfoFactory info.MachineInfoFactory
useSystemd bool
@ -53,16 +60,19 @@ func newDockerContainerHandler(
machineInfoFactory: machineInfoFactory,
useSystemd: useSystemd,
}
if !handler.isDockerContainer() {
if handler.isDockerRoot() {
return handler, nil
}
_, id, err := handler.splitName()
parent, id, err := splitName(name)
if err != nil {
return nil, fmt.Errorf("invalid docker container %v: %v", name, err)
}
handler.parent = parent
handler.ID = id
ctnr, err := client.InspectContainer(id)
// We assume that if Inspect fails then the container is not known to docker.
if err != nil {
return nil, fmt.Errorf("unable to inspect container %v: %v", name, err)
return nil, fmt.Errorf("failed to inspect container %s - %s\n", id, err)
}
handler.aliases = append(handler.aliases, path.Join("/docker", ctnr.Name))
return handler, nil
@ -75,8 +85,12 @@ func (self *dockerContainerHandler) ContainerReference() (info.ContainerReferenc
}, nil
}
func (self *dockerContainerHandler) splitName() (string, string, error) {
parent, id := path.Split(self.name)
func (self *dockerContainerHandler) isDockerRoot() bool {
return self.name == "/docker"
}
func splitName(containerName string) (string, string, error) {
parent, id := path.Split(containerName)
cgroupSelf, err := os.Open("/proc/self/cgroup")
if err != nil {
return "", "", err
@ -111,35 +125,50 @@ func (self *dockerContainerHandler) splitName() (string, string, error) {
return parent, id, nil
}
func (self *dockerContainerHandler) isDockerRoot() bool {
// TODO(dengnan): Should we consider other cases?
return self.name == "/docker"
}
func (self *dockerContainerHandler) isRootContainer() bool {
return self.name == "/"
}
func (self *dockerContainerHandler) isDockerContainer() bool {
return (!self.isDockerRoot()) && (!self.isRootContainer())
}
// TODO(vmarmol): Switch to getting this from libcontainer once we have a solid API.
func readLibcontainerSpec(id string) (spec *libcontainer.Config, err error) {
dir := "/var/lib/docker/execdriver/native"
configPath := path.Join(dir, id, "container.json")
func (self *dockerContainerHandler) readLibcontainerConfig() (config *libcontainer.Config, err error) {
configPath := path.Join(dockerRootDir, self.ID, "container.json")
if !utils.FileExists(configPath) {
// TODO(vishh): Return file name as well once we have a better error interface.
err = fileNotFound
return
}
f, err := os.Open(configPath)
if err != nil {
return
return nil, fmt.Errorf("failed to open %s - %s\n", configPath, err)
}
defer f.Close()
d := json.NewDecoder(f)
ret := new(libcontainer.Config)
err = d.Decode(ret)
retConfig := new(libcontainer.Config)
err = d.Decode(retConfig)
if err != nil {
return
}
spec = ret
config = retConfig
return
}
func (self *dockerContainerHandler) readLibcontainerState() (state *libcontainer.State, err error) {
statePath := path.Join(dockerRootDir, self.ID, "state.json")
if !utils.FileExists(statePath) {
// TODO(vishh): Return file name as well once we have a better error interface.
err = fileNotFound
return
}
f, err := os.Open(statePath)
if err != nil {
return nil, fmt.Errorf("failed to open %s - %s\n", statePath, err)
}
defer f.Close()
d := json.NewDecoder(f)
retState := new(libcontainer.State)
err = d.Decode(retState)
if err != nil {
return
}
state = retState
return
}
@ -171,51 +200,47 @@ func libcontainerConfigToContainerSpec(config *libcontainer.Config, mi *info.Mac
}
func (self *dockerContainerHandler) GetSpec() (spec *info.ContainerSpec, err error) {
if !self.isDockerContainer() {
spec = new(info.ContainerSpec)
return
if self.isDockerRoot() {
return &info.ContainerSpec{}, nil
}
mi, err := self.machineInfoFactory.GetMachineInfo()
if err != nil {
return
}
_, id, err := self.splitName()
if err != nil {
return
}
libcontainerSpec, err := readLibcontainerSpec(id)
libcontainerConfig, err := self.readLibcontainerConfig()
if err != nil {
return
}
spec = libcontainerConfigToContainerSpec(libcontainerSpec, mi)
spec = libcontainerConfigToContainerSpec(libcontainerConfig, mi)
return
}
func (self *dockerContainerHandler) GetStats() (stats *info.ContainerStats, err error) {
if !self.isDockerContainer() {
// Return empty stats for root containers.
stats = new(info.ContainerStats)
stats.Timestamp = time.Now()
return
if self.isDockerRoot() {
return &info.ContainerStats{}, nil
}
parent, id, err := self.splitName()
config, err := self.readLibcontainerConfig()
if err != nil {
if err == fileNotFound {
return &info.ContainerStats{}, nil
}
return
}
cg := &cgroups.Cgroup{
Parent: parent,
Name: id,
state, err := self.readLibcontainerState()
if err != nil {
if err == fileNotFound {
return &info.ContainerStats{}, nil
}
return
}
return containerLibcontainer.GetStats(cg, self.useSystemd)
return containerLibcontainer.GetStats(config, state)
}
func (self *dockerContainerHandler) ListContainers(listType container.ListType) ([]info.ContainerReference, error) {
if self.isDockerContainer() {
return nil, nil
}
if self.isRootContainer() && listType == container.LIST_SELF {
return []info.ContainerReference{info.ContainerReference{Name: "/docker"}}, nil
if self.name != "/docker" {
return []info.ContainerReference{}, nil
}
opt := docker.ListContainersOptions{
All: true,
@ -237,9 +262,7 @@ func (self *dockerContainerHandler) ListContainers(listType container.ListType)
}
ret = append(ret, ref)
}
if self.isRootContainer() {
ret = append(ret, info.ContainerReference{Name: "/docker"})
}
return ret, nil
}

View File

@ -33,8 +33,10 @@ type ContainerHandlerFactory interface {
// TODO(vmarmol): Consider not making this global.
// Global list of factories.
var factories []ContainerHandlerFactory
var factoriesLock sync.RWMutex
var (
factories []ContainerHandlerFactory
factoriesLock sync.RWMutex
)
// Register a ContainerHandlerFactory. These should be registered from least general to most general
// as they will be asked in order whether they can handle a particular container.
@ -58,7 +60,7 @@ func NewContainerHandler(name string) (ContainerHandler, error) {
}
}
return nil, fmt.Errorf("no known factory can handle creation of container %q", name)
return nil, fmt.Errorf("no known factory can handle creation of container")
}
// Clear the known factories.

View File

@ -60,9 +60,9 @@ func TestWhiteListContainerFilter(t *testing.T) {
mockc := &mockContainerHandler{}
mockc.On("ListContainers", LIST_RECURSIVE).Return(
[]info.ContainerReference{
info.ContainerReference{Name: "/docker/ee0103"},
info.ContainerReference{Name: "/container/created/by/lmctfy"},
info.ContainerReference{Name: "/user/something"},
{Name: "/docker/ee0103"},
{Name: "/container/created/by/lmctfy"},
{Name: "/user/something"},
},
nil,
)
@ -95,9 +95,9 @@ func TestBlackListContainerFilter(t *testing.T) {
mockc := &mockContainerHandler{}
mockc.On("ListContainers", LIST_RECURSIVE).Return(
[]info.ContainerReference{
info.ContainerReference{Name: "/docker/ee0103"},
info.ContainerReference{Name: "/container/created/by/lmctfy"},
info.ContainerReference{Name: "/user/something"},
{Name: "/docker/ee0103"},
{Name: "/container/created/by/lmctfy"},
{Name: "/user/something"},
},
nil,
)

View File

@ -3,33 +3,33 @@ package libcontainer
import (
"time"
"github.com/docker/libcontainer"
"github.com/docker/libcontainer/cgroups"
"github.com/docker/libcontainer/cgroups/fs"
"github.com/docker/libcontainer/cgroups/systemd"
"github.com/google/cadvisor/info"
)
// Get stats of the specified cgroup
func GetStats(cgroup *cgroups.Cgroup, useSystemd bool) (*info.ContainerStats, error) {
// Get stats of the specified container
func GetStats(config *libcontainer.Config, state *libcontainer.State) (*info.ContainerStats, error) {
// TODO(vmarmol): Use libcontainer's Stats() in the new API when that is ready.
// Use systemd paths if systemd is being used.
var (
s *cgroups.Stats
err error
)
if useSystemd {
s, err = systemd.GetStats(cgroup)
} else {
s, err = fs.GetStats(cgroup)
}
libcontainerStats, err := libcontainer.GetStats(config, state)
if err != nil {
return nil, err
}
return toContainerStats(s), nil
return toContainerStats(libcontainerStats), nil
}
func GetStatsCgroupOnly(cgroup *cgroups.Cgroup) (*info.ContainerStats, error) {
s, err := fs.GetStats(cgroup)
if err != nil {
return nil, err
}
return toContainerStats(&libcontainer.ContainerStats{CgroupStats: s}), nil
}
// Convert libcontainer stats to info.ContainerStats.
func toContainerStats(s *cgroups.Stats) *info.ContainerStats {
func toContainerStats(libcontainerStats *libcontainer.ContainerStats) *info.ContainerStats {
s := libcontainerStats.CgroupStats
ret := new(info.ContainerStats)
ret.Timestamp = time.Now()
ret.Cpu = new(info.CpuStats)
@ -59,5 +59,8 @@ func toContainerStats(s *cgroups.Stats) *info.ContainerStats {
ret.Memory.WorkingSet -= v
}
}
// TODO(vishh): Perform a deep copy or alias libcontainer network stats.
ret.Network = (*info.NetworkStats)(&libcontainerStats.NetworkStats)
return ret
}

View File

@ -49,6 +49,11 @@ func (self *lmctfyFactory) NewContainerHandler(name string) (container.Container
}
func (self *lmctfyFactory) CanHandle(name string) bool {
// TODO(vmarmol): Try to attach to the container before blindly saying true.
cmd := exec.Command(lmctfyBinary, "stats", "summary", name)
_, err := cmd.Output()
if err != nil {
return false
}
return true
}

View File

@ -58,7 +58,7 @@ func (self *rawContainerHandler) GetStats() (stats *info.ContainerStats, err err
Name: self.name,
}
return libcontainer.GetStats(cgroup, false)
return libcontainer.GetStatsCgroupOnly(cgroup)
}
// Lists all directories under "path" and outputs the results as children of "parent".
@ -96,7 +96,7 @@ func (self *rawContainerHandler) ListContainers(listType container.ListType) ([]
// Make into container references.
ret := make([]info.ContainerReference, 0, len(containers))
for cont, _ := range containers {
for cont := range containers {
ret = append(ret, info.ContainerReference{
Name: cont,
})

View File

@ -239,11 +239,31 @@ type MemoryStatsMemoryData struct {
Pgmajfault uint64 `json:"pgmajfault,omitempty"`
}
type NetworkStats struct {
// Cumulative count of bytes received.
RxBytes uint64 `json:"rx_bytes,omitempty"`
// Cumulative count of packets received.
RxPackets uint64 `json:"rx_packets,omitempty"`
// Cumulative count of receive errors encountered.
RxErrors uint64 `json:"rx_errors,omitempty"`
// Cumulative count of packets dropped while receiving.
RxDropped uint64 `json:"rx_dropped,omitempty"`
// Cumulative count of bytes transmitted.
TxBytes uint64 `json:"tx_bytes,omitempty"`
// Cumulative count of packets transmitted.
TxPackets uint64 `json:"tx_packets,omitempty"`
// Cumulative count of transmit errors encountered.
TxErrors uint64 `json:"tx_errors,omitempty"`
// Cumulative count of packets dropped while transmitting.
TxDropped uint64 `json:"tx_dropped,omitempty"`
}
type ContainerStats struct {
// The time of this stat point.
Timestamp time.Time `json:"timestamp"`
Cpu *CpuStats `json:"cpu,omitempty"`
Memory *MemoryStats `json:"memory,omitempty"`
Timestamp time.Time `json:"timestamp"`
Cpu *CpuStats `json:"cpu,omitempty"`
Memory *MemoryStats `json:"memory,omitempty"`
Network *NetworkStats `json:"network,omitempty"`
}
// Makes a deep copy of the ContainerStats and returns a pointer to the new

View File

@ -281,18 +281,18 @@ func (m *manager) detectContainers() error {
}
// Add the new containers.
for _, container := range added {
_, err = m.createContainer(container.Name)
for _, cont := range added {
_, err = m.createContainer(cont.Name)
if err != nil {
return fmt.Errorf("Failed to create existing container: %s: %s", container.Name, err)
log.Printf("failed to create existing container: %s: %s", cont.Name, err)
}
}
// Remove the old containers.
for _, container := range removed {
err = m.destroyContainer(container.Name)
for _, cont := range removed {
err = m.destroyContainer(cont.Name)
if err != nil {
return fmt.Errorf("Failed to destroy existing container: %s: %s", container.Name, err)
log.Printf("failed to destroy existing container: %s: %s", cont.Name, err)
}
}

24
utils/path.go Normal file
View File

@ -0,0 +1,24 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import "os"
func FileExists(file string) bool {
if _, err := os.Stat(file); err != nil {
return false
}
return true
}