Merge pull request #1489 from dashpole/per_container_inode_stats
Cadvisor now publishes per-container inode stats
This commit is contained in:
commit
e972272aef
@ -16,6 +16,7 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -26,15 +27,20 @@ import (
|
||||
|
||||
type FsHandler interface {
|
||||
Start()
|
||||
Usage() (baseUsageBytes uint64, totalUsageBytes uint64)
|
||||
Usage() FsUsage
|
||||
Stop()
|
||||
}
|
||||
|
||||
type FsUsage struct {
|
||||
BaseUsageBytes uint64
|
||||
TotalUsageBytes uint64
|
||||
InodeUsage uint64
|
||||
}
|
||||
|
||||
type realFsHandler struct {
|
||||
sync.RWMutex
|
||||
lastUpdate time.Time
|
||||
usageBytes uint64
|
||||
baseUsageBytes uint64
|
||||
usage FsUsage
|
||||
period time.Duration
|
||||
minPeriod time.Duration
|
||||
rootfs string
|
||||
@ -45,18 +51,19 @@ type realFsHandler struct {
|
||||
}
|
||||
|
||||
const (
|
||||
longDu = time.Second
|
||||
duTimeout = time.Minute
|
||||
maxDuBackoffFactor = 20
|
||||
longOp = time.Second
|
||||
timeout = 2 * time.Minute
|
||||
maxBackoffFactor = 20
|
||||
)
|
||||
|
||||
const DefaultPeriod = time.Minute
|
||||
|
||||
var _ FsHandler = &realFsHandler{}
|
||||
|
||||
func NewFsHandler(period time.Duration, rootfs, extraDir string, fsInfo fs.FsInfo) FsHandler {
|
||||
return &realFsHandler{
|
||||
lastUpdate: time.Time{},
|
||||
usageBytes: 0,
|
||||
baseUsageBytes: 0,
|
||||
usage: FsUsage{},
|
||||
period: period,
|
||||
minPeriod: period,
|
||||
rootfs: rootfs,
|
||||
@ -68,29 +75,37 @@ func NewFsHandler(period time.Duration, rootfs, extraDir string, fsInfo fs.FsInf
|
||||
|
||||
func (fh *realFsHandler) update() error {
|
||||
var (
|
||||
baseUsage, extraDirUsage uint64
|
||||
err error
|
||||
baseUsage, extraDirUsage, inodeUsage uint64
|
||||
rootDiskErr, rootInodeErr, extraDiskErr error
|
||||
)
|
||||
// TODO(vishh): Add support for external mounts.
|
||||
if fh.rootfs != "" {
|
||||
baseUsage, err = fh.fsInfo.GetDirUsage(fh.rootfs, duTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
baseUsage, rootDiskErr = fh.fsInfo.GetDirDiskUsage(fh.rootfs, timeout)
|
||||
inodeUsage, rootInodeErr = fh.fsInfo.GetDirInodeUsage(fh.rootfs, timeout)
|
||||
}
|
||||
|
||||
if fh.extraDir != "" {
|
||||
extraDirUsage, err = fh.fsInfo.GetDirUsage(fh.extraDir, duTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
extraDirUsage, extraDiskErr = fh.fsInfo.GetDirDiskUsage(fh.extraDir, timeout)
|
||||
}
|
||||
|
||||
// Wait to handle errors until after all operartions are run.
|
||||
// An error in one will not cause an early return, skipping others
|
||||
fh.Lock()
|
||||
defer fh.Unlock()
|
||||
fh.lastUpdate = time.Now()
|
||||
fh.usageBytes = baseUsage + extraDirUsage
|
||||
fh.baseUsageBytes = baseUsage
|
||||
if rootDiskErr == nil && fh.rootfs != "" {
|
||||
fh.usage.InodeUsage = inodeUsage
|
||||
}
|
||||
if rootInodeErr == nil && fh.rootfs != "" {
|
||||
fh.usage.TotalUsageBytes = baseUsage + extraDirUsage
|
||||
}
|
||||
if extraDiskErr == nil && fh.extraDir != "" {
|
||||
fh.usage.BaseUsageBytes = baseUsage
|
||||
}
|
||||
// Combine errors into a single error to return
|
||||
if rootDiskErr != nil || rootInodeErr != nil || extraDiskErr != nil {
|
||||
return fmt.Errorf("rootDiskErr: %v, rootInodeErr: %v, extraDiskErr: %v", rootDiskErr, rootInodeErr, extraDiskErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -105,15 +120,15 @@ func (fh *realFsHandler) trackUsage() {
|
||||
if err := fh.update(); err != nil {
|
||||
glog.Errorf("failed to collect filesystem stats - %v", err)
|
||||
fh.period = fh.period * 2
|
||||
if fh.period > maxDuBackoffFactor*fh.minPeriod {
|
||||
fh.period = maxDuBackoffFactor * fh.minPeriod
|
||||
if fh.period > maxBackoffFactor*fh.minPeriod {
|
||||
fh.period = maxBackoffFactor * fh.minPeriod
|
||||
}
|
||||
} else {
|
||||
fh.period = fh.minPeriod
|
||||
}
|
||||
duration := time.Since(start)
|
||||
if duration > longDu {
|
||||
glog.V(2).Infof("`du` on following dirs took %v: %v", duration, []string{fh.rootfs, fh.extraDir})
|
||||
if duration > longOp {
|
||||
glog.V(2).Infof("du and find on following dirs took %v: %v", duration, []string{fh.rootfs, fh.extraDir})
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -127,8 +142,8 @@ func (fh *realFsHandler) Stop() {
|
||||
close(fh.stopChan)
|
||||
}
|
||||
|
||||
func (fh *realFsHandler) Usage() (baseUsageBytes, totalUsageBytes uint64) {
|
||||
func (fh *realFsHandler) Usage() FsUsage {
|
||||
fh.RLock()
|
||||
defer fh.RUnlock()
|
||||
return fh.baseUsageBytes, fh.usageBytes
|
||||
return fh.usage
|
||||
}
|
||||
|
@ -243,7 +243,7 @@ func newDockerContainerHandler(
|
||||
|
||||
if !ignoreMetrics.Has(container.DiskUsageMetrics) {
|
||||
handler.fsHandler = &dockerFsHandler{
|
||||
fsHandler: common.NewFsHandler(time.Minute, rootfsStorageDir, otherStorageDir, fsInfo),
|
||||
fsHandler: common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, otherStorageDir, fsInfo),
|
||||
thinPoolWatcher: thinPoolWatcher,
|
||||
deviceID: handler.deviceID,
|
||||
}
|
||||
@ -283,8 +283,8 @@ func (h *dockerFsHandler) Stop() {
|
||||
h.fsHandler.Stop()
|
||||
}
|
||||
|
||||
func (h *dockerFsHandler) Usage() (uint64, uint64) {
|
||||
baseUsage, usage := h.fsHandler.Usage()
|
||||
func (h *dockerFsHandler) Usage() common.FsUsage {
|
||||
usage := h.fsHandler.Usage()
|
||||
|
||||
// When devicemapper is the storage driver, the base usage of the container comes from the thin pool.
|
||||
// We still need the result of the fsHandler for any extra storage associated with the container.
|
||||
@ -299,12 +299,12 @@ func (h *dockerFsHandler) Usage() (uint64, uint64) {
|
||||
// had at least 1 refresh and we still can't find the device.
|
||||
glog.V(5).Infof("unable to get fs usage from thin pool for device %s: %v", h.deviceID, err)
|
||||
} else {
|
||||
baseUsage = thinPoolUsage
|
||||
usage += thinPoolUsage
|
||||
usage.BaseUsageBytes = thinPoolUsage
|
||||
usage.TotalUsageBytes += thinPoolUsage
|
||||
}
|
||||
}
|
||||
|
||||
return baseUsage, usage
|
||||
return usage
|
||||
}
|
||||
|
||||
func (self *dockerContainerHandler) Start() {
|
||||
@ -387,7 +387,10 @@ func (self *dockerContainerHandler) getFsStats(stats *info.ContainerStats) error
|
||||
}
|
||||
|
||||
fsStat := info.FsStats{Device: device, Type: fsType, Limit: limit}
|
||||
fsStat.BaseUsage, fsStat.Usage = self.fsHandler.Usage()
|
||||
usage := self.fsHandler.Usage()
|
||||
fsStat.BaseUsage = usage.BaseUsageBytes
|
||||
fsStat.Usage = usage.TotalUsageBytes
|
||||
fsStat.Inodes = usage.InodeUsage
|
||||
|
||||
stats.Filesystem = append(stats.Filesystem, fsStat)
|
||||
|
||||
|
@ -18,7 +18,6 @@ package rkt
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
rktapi "github.com/coreos/rkt/api/v1alpha"
|
||||
"github.com/google/cadvisor/container"
|
||||
@ -150,7 +149,7 @@ func newRktContainerHandler(name string, rktClient rktapi.PublicAPIClient, rktPa
|
||||
}
|
||||
|
||||
if !ignoreMetrics.Has(container.DiskUsageMetrics) {
|
||||
handler.fsHandler = common.NewFsHandler(time.Minute, rootfsStorageDir, "", fsInfo)
|
||||
handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, "", fsInfo)
|
||||
}
|
||||
|
||||
return handler, nil
|
||||
@ -228,7 +227,10 @@ func (handler *rktContainerHandler) getFsStats(stats *info.ContainerStats) error
|
||||
|
||||
fsStat := info.FsStats{Device: deviceInfo.Device, Limit: limit}
|
||||
|
||||
fsStat.BaseUsage, fsStat.Usage = handler.fsHandler.Usage()
|
||||
usage := handler.fsHandler.Usage()
|
||||
fsStat.BaseUsage = usage.BaseUsageBytes
|
||||
fsStat.Usage = usage.TotalUsageBytes
|
||||
fsStat.Inodes = usage.InodeUsage
|
||||
|
||||
stats.Filesystem = append(stats.Filesystem, fsStat)
|
||||
|
||||
|
79
fs/fs.go
79
fs/fs.go
@ -19,6 +19,7 @@ package fs
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@ -44,24 +45,24 @@ const (
|
||||
LabelRktImages = "rkt-images"
|
||||
)
|
||||
|
||||
// The maximum number of `du` tasks that can be running at once.
|
||||
const maxConsecutiveDus = 20
|
||||
// The maximum number of `du` and `find` tasks that can be running at once.
|
||||
const maxConcurrentOps = 20
|
||||
|
||||
// A pool for restricting the number of consecutive `du` tasks running.
|
||||
var duPool = make(chan struct{}, maxConsecutiveDus)
|
||||
// A pool for restricting the number of consecutive `du` and `find` tasks running.
|
||||
var pool = make(chan struct{}, maxConcurrentOps)
|
||||
|
||||
func init() {
|
||||
for i := 0; i < maxConsecutiveDus; i++ {
|
||||
releaseDuToken()
|
||||
for i := 0; i < maxConcurrentOps; i++ {
|
||||
releaseToken()
|
||||
}
|
||||
}
|
||||
|
||||
func claimDuToken() {
|
||||
<-duPool
|
||||
func claimToken() {
|
||||
<-pool
|
||||
}
|
||||
|
||||
func releaseDuToken() {
|
||||
duPool <- struct{}{}
|
||||
func releaseToken() {
|
||||
pool <- struct{}{}
|
||||
}
|
||||
|
||||
type partition struct {
|
||||
@ -428,12 +429,12 @@ func (self *RealFsInfo) GetDirFsDevice(dir string) (*DeviceInfo, error) {
|
||||
return nil, fmt.Errorf("could not find device with major: %d, minor: %d in cached partitions map", major, minor)
|
||||
}
|
||||
|
||||
func (self *RealFsInfo) GetDirUsage(dir string, timeout time.Duration) (uint64, error) {
|
||||
func (self *RealFsInfo) GetDirDiskUsage(dir string, timeout time.Duration) (uint64, error) {
|
||||
if dir == "" {
|
||||
return 0, fmt.Errorf("invalid directory")
|
||||
}
|
||||
claimDuToken()
|
||||
defer releaseDuToken()
|
||||
claimToken()
|
||||
defer releaseToken()
|
||||
cmd := exec.Command("nice", "-n", "19", "du", "-s", dir)
|
||||
stdoutp, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
@ -447,21 +448,21 @@ func (self *RealFsInfo) GetDirUsage(dir string, timeout time.Duration) (uint64,
|
||||
if err := cmd.Start(); err != nil {
|
||||
return 0, fmt.Errorf("failed to exec du - %v", err)
|
||||
}
|
||||
stdoutb, souterr := ioutil.ReadAll(stdoutp)
|
||||
stderrb, _ := ioutil.ReadAll(stderrp)
|
||||
timer := time.AfterFunc(timeout, func() {
|
||||
glog.Infof("killing cmd %v due to timeout(%s)", cmd.Args, timeout.String())
|
||||
cmd.Process.Kill()
|
||||
})
|
||||
stdoutb, souterr := ioutil.ReadAll(stdoutp)
|
||||
if souterr != nil {
|
||||
glog.Errorf("failed to read from stdout for cmd %v - %v", cmd.Args, souterr)
|
||||
}
|
||||
stderrb, _ := ioutil.ReadAll(stderrp)
|
||||
err = cmd.Wait()
|
||||
timer.Stop()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("du command failed on %s with output stdout: %s, stderr: %s - %v", dir, string(stdoutb), string(stderrb), err)
|
||||
}
|
||||
stdout := string(stdoutb)
|
||||
if souterr != nil {
|
||||
glog.Errorf("failed to read from stdout for cmd %v - %v", cmd.Args, souterr)
|
||||
}
|
||||
usageInKb, err := strconv.ParseUint(strings.Fields(stdout)[0], 10, 64)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("cannot parse 'du' output %s - %s", stdout, err)
|
||||
@ -469,6 +470,48 @@ func (self *RealFsInfo) GetDirUsage(dir string, timeout time.Duration) (uint64,
|
||||
return usageInKb * 1024, nil
|
||||
}
|
||||
|
||||
func (self *RealFsInfo) GetDirInodeUsage(dir string, timeout time.Duration) (uint64, error) {
|
||||
if dir == "" {
|
||||
return 0, fmt.Errorf("invalid directory")
|
||||
}
|
||||
var stdout, stdwcerr, stdfinderr bytes.Buffer
|
||||
var err error
|
||||
claimToken()
|
||||
defer releaseToken()
|
||||
findCmd := exec.Command("find", dir, "-xdev", "-printf", ".")
|
||||
wcCmd := exec.Command("wc", "-c")
|
||||
if wcCmd.Stdin, err = findCmd.StdoutPipe(); err != nil {
|
||||
return 0, fmt.Errorf("failed to setup stdout for cmd %v - %v", findCmd.Args, err)
|
||||
}
|
||||
wcCmd.Stdout, wcCmd.Stderr, findCmd.Stderr = &stdout, &stdwcerr, &stdfinderr
|
||||
if err = findCmd.Start(); err != nil {
|
||||
return 0, fmt.Errorf("failed to exec cmd %v - %v; stderr: %v", findCmd.Args, err, stdfinderr.String())
|
||||
}
|
||||
|
||||
if err = wcCmd.Start(); err != nil {
|
||||
return 0, fmt.Errorf("failed to exec cmd %v - %v; stderr %v", wcCmd.Args, err, stdwcerr.String())
|
||||
}
|
||||
timer := time.AfterFunc(timeout, func() {
|
||||
glog.Infof("killing cmd %v, and cmd %v due to timeout(%s)", findCmd.Args, wcCmd.Args, timeout.String())
|
||||
wcCmd.Process.Kill()
|
||||
findCmd.Process.Kill()
|
||||
})
|
||||
err = findCmd.Wait()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("cmd %v failed. stderr: %s; err: %v", findCmd.Args, stdfinderr.String(), err)
|
||||
}
|
||||
err = wcCmd.Wait()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("cmd %v failed. stderr: %s; err: %v", wcCmd.Args, stdwcerr.String(), err)
|
||||
}
|
||||
timer.Stop()
|
||||
inodeUsage, err := strconv.ParseUint(strings.TrimSpace(stdout.String()), 10, 64)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("cannot parse cmds: %v, %v output %s - %s", findCmd.Args, wcCmd.Args, stdout.String(), err)
|
||||
}
|
||||
return inodeUsage, nil
|
||||
}
|
||||
|
||||
func getVfsStats(path string) (total uint64, free uint64, avail uint64, inodes uint64, inodesFree uint64, err error) {
|
||||
var s syscall.Statfs_t
|
||||
if err = syscall.Statfs(path, &s); err != nil {
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/docker/docker/pkg/mount"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetDiskStatsMap(t *testing.T) {
|
||||
@ -85,7 +86,7 @@ func TestFileNotExist(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDirUsage(t *testing.T) {
|
||||
func TestDirDiskUsage(t *testing.T) {
|
||||
as := assert.New(t)
|
||||
fsInfo, err := NewFsInfo(Context{})
|
||||
as.NoError(err)
|
||||
@ -100,11 +101,63 @@ func TestDirUsage(t *testing.T) {
|
||||
fi, err := f.Stat()
|
||||
as.NoError(err)
|
||||
expectedSize := uint64(fi.Size())
|
||||
size, err := fsInfo.GetDirUsage(dir, time.Minute)
|
||||
size, err := fsInfo.GetDirDiskUsage(dir, time.Minute)
|
||||
as.NoError(err)
|
||||
as.True(expectedSize <= size, "expected dir size to be at-least %d; got size: %d", expectedSize, size)
|
||||
}
|
||||
|
||||
// Make sure that the timeout is actually being triggered (found this bug in PR#1489)
|
||||
func TestDirDiskUsageTimeout(t *testing.T) {
|
||||
as := assert.New(t)
|
||||
fsInfo, err := NewFsInfo(Context{})
|
||||
as.NoError(err)
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "")
|
||||
as.NoError(err)
|
||||
defer os.RemoveAll(dir)
|
||||
dataSize := 1024 * 10000 //10000 KB. It is large to make sure it triggers the timeout
|
||||
b := make([]byte, dataSize)
|
||||
f, err := ioutil.TempFile(dir, "")
|
||||
as.NoError(err)
|
||||
as.NoError(ioutil.WriteFile(f.Name(), b, 0700))
|
||||
_, err = fsInfo.GetDirDiskUsage(dir, time.Nanosecond)
|
||||
as.Error(err)
|
||||
}
|
||||
|
||||
func TestDirInodeUsage(t *testing.T) {
|
||||
as := assert.New(t)
|
||||
fsInfo, err := NewFsInfo(Context{})
|
||||
as.NoError(err)
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "")
|
||||
as.NoError(err)
|
||||
defer os.RemoveAll(dir)
|
||||
numFiles := 1000
|
||||
for i := 0; i < numFiles; i++ {
|
||||
_, err := ioutil.TempFile(dir, "")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
inodes, err := fsInfo.GetDirInodeUsage(dir, time.Minute)
|
||||
as.NoError(err)
|
||||
// We sould get numFiles+1 inodes, since we get 1 inode for each file, plus 1 for the directory
|
||||
as.True(uint64(numFiles+1) == inodes, "expected inodes in dir to be %d; got inodes: %d", numFiles+1, inodes)
|
||||
}
|
||||
|
||||
//make sure that the timeout is actually being triggered
|
||||
func TestDirInodeUsageTimeout(t *testing.T) {
|
||||
as := assert.New(t)
|
||||
fsInfo, err := NewFsInfo(Context{})
|
||||
as.NoError(err)
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "")
|
||||
as.NoError(err)
|
||||
defer os.RemoveAll(dir)
|
||||
numFiles := 100000 //make sure we actually trigger the timeout
|
||||
for i := 0; i < numFiles; i++ {
|
||||
_, err := ioutil.TempFile(dir, "")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
_, err = fsInfo.GetDirInodeUsage(dir, time.Nanosecond)
|
||||
as.Error(err)
|
||||
}
|
||||
|
||||
var dmStatusTests = []struct {
|
||||
dmStatus string
|
||||
used uint64
|
||||
|
@ -67,7 +67,10 @@ type FsInfo interface {
|
||||
GetFsInfoForPath(mountSet map[string]struct{}) ([]Fs, error)
|
||||
|
||||
// Returns number of bytes occupied by 'dir'.
|
||||
GetDirUsage(dir string, timeout time.Duration) (uint64, error)
|
||||
GetDirDiskUsage(dir string, timeout time.Duration) (uint64, error)
|
||||
|
||||
// Returns number of inodes used by 'dir'.
|
||||
GetDirInodeUsage(dir string, timeout time.Duration) (uint64, error)
|
||||
|
||||
// Returns the block device info of the filesystem on which 'dir' resides.
|
||||
GetDirFsDevice(dir string) (*DeviceInfo, error)
|
||||
|
@ -301,4 +301,8 @@ type FilesystemStats struct {
|
||||
TotalUsageBytes *uint64 `json:"totalUsageBytes,omitempty"`
|
||||
// Number of bytes consumed by a container through its root filesystem.
|
||||
BaseUsageBytes *uint64 `json:"baseUsageBytes,omitempty"`
|
||||
// Number of inodes used within the container's root filesystem.
|
||||
// This only accounts for inodes that are shared across containers,
|
||||
// and does not include inodes used in mounted directories.
|
||||
InodeUsage *uint64 `json:"containter_inode_usage,omitempty"`
|
||||
}
|
||||
|
@ -127,6 +127,7 @@ func ContainerStatsFromV1(spec *v1.ContainerSpec, stats []*v1.ContainerStats) []
|
||||
stat.Filesystem = &FilesystemStats{
|
||||
TotalUsageBytes: &val.Filesystem[0].Usage,
|
||||
BaseUsageBytes: &val.Filesystem[0].BaseUsage,
|
||||
InodeUsage: &val.Filesystem[0].Inodes,
|
||||
}
|
||||
} else if len(val.Filesystem) > 1 {
|
||||
// Cannot handle multiple devices per container.
|
||||
|
@ -185,6 +185,7 @@ func TestContainerStatsFromV1(t *testing.T) {
|
||||
Filesystem: &FilesystemStats{
|
||||
TotalUsageBytes: &v1Stats.Filesystem[0].Usage,
|
||||
BaseUsageBytes: &v1Stats.Filesystem[0].BaseUsage,
|
||||
InodeUsage: &v1Stats.Filesystem[0].Inodes,
|
||||
},
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user