Merge pull request #264 from vishh/disk_usage

Adding filesystem stats to db storage drivers.
This commit is contained in:
Victor Marmol 2014-10-01 14:02:18 -07:00
commit 7d011aa6ae
8 changed files with 186 additions and 37 deletions

View File

@ -16,6 +16,7 @@ import (
"github.com/docker/docker/pkg/mount" "github.com/docker/docker/pkg/mount"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/google/cadvisor/info"
) )
type partition struct { type partition struct {
@ -47,14 +48,14 @@ func NewFsInfo() (FsInfo, error) {
return &FsInfoImpl{partitions}, nil return &FsInfoImpl{partitions}, nil
} }
func (self *FsInfoImpl) GetFsStats() ([]FsStats, error) { func (self *FsInfoImpl) GetFsStats() ([]info.FsStats, error) {
filesystems := make([]FsStats, 0) filesystems := make([]info.FsStats, 0)
for device, partition := range self.partitions { for device, partition := range self.partitions {
total, free, err := getVfsStats(partition.mountpoint) total, free, err := getVfsStats(partition.mountpoint)
if err != nil { if err != nil {
glog.Errorf("Statvfs failed. Error: %v", err) glog.Errorf("Statvfs failed. Error: %v", err)
} else { } else {
fsStat := FsStats{ fsStat := info.FsStats{
Device: device, Device: device,
Major: uint(partition.major), Major: uint(partition.major),
Minor: uint(partition.minor), Minor: uint(partition.minor),

View File

@ -1,14 +1,8 @@
package fs package fs
type FsStats struct { import "github.com/google/cadvisor/info"
Device string `json:"device,omitempty"`
Major uint `json:"major"`
Minor uint `json:"minor"`
Capacity uint64 `json:"capacity"`
Free uint64 `json:"free"`
}
type FsInfo interface { type FsInfo interface {
// Returns capacity and free space, in bytes, of all the ext2, ext3, ext4 filesystems on the host. // Returns capacity and free space, in bytes, of all the ext2, ext3, ext4 filesystems on the host.
GetFsStats() ([]FsStats, error) GetFsStats() ([]info.FsStats, error)
} }

View File

@ -17,8 +17,6 @@ package info
import ( import (
"reflect" "reflect"
"time" "time"
"github.com/google/cadvisor/fs"
) )
type CpuSpec struct { type CpuSpec struct {
@ -233,6 +231,14 @@ type NetworkStats struct {
TxDropped uint64 `json:"tx_dropped"` TxDropped uint64 `json:"tx_dropped"`
} }
type FsStats struct {
Device string `json:"device,omitempty"`
Major uint `json:"major"`
Minor uint `json:"minor"`
Capacity uint64 `json:"capacity"`
Free uint64 `json:"free"`
}
type ContainerStats struct { type ContainerStats struct {
// The time of this stat point. // The time of this stat point.
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
@ -241,7 +247,7 @@ type ContainerStats struct {
Memory *MemoryStats `json:"memory,omitempty"` Memory *MemoryStats `json:"memory,omitempty"`
Network *NetworkStats `json:"network,omitempty"` Network *NetworkStats `json:"network,omitempty"`
// Filesystem statistics // Filesystem statistics
Filesystem []fs.FsStats `json:"filesystem,omitempty"` Filesystem []FsStats `json:"filesystem,omitempty"`
} }
// Makes a deep copy of the ContainerStats and returns a pointer to the new // Makes a deep copy of the ContainerStats and returns a pointer to the new

View File

@ -27,7 +27,6 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/google/cadvisor/fs"
"github.com/google/cadvisor/info" "github.com/google/cadvisor/info"
"github.com/google/cadvisor/manager" "github.com/google/cadvisor/manager"
) )
@ -256,9 +255,9 @@ func getColdMemoryPercent(spec *info.ContainerSpec, stats []*info.ContainerStats
return toMemoryPercent((latestStats.Usage)-(latestStats.WorkingSet), spec, machine) return toMemoryPercent((latestStats.Usage)-(latestStats.WorkingSet), spec, machine)
} }
func getFsStats(stats []*info.ContainerStats) []fs.FsStats { func getFsStats(stats []*info.ContainerStats) []info.FsStats {
if len(stats) == 0 { if len(stats) == 0 {
return []fs.FsStats{} return []info.FsStats{}
} }
return stats[len(stats)-1].Filesystem return stats[len(stats)-1].Filesystem
} }

View File

@ -63,6 +63,12 @@ const (
colTxBytes string = "tx_bytes" colTxBytes string = "tx_bytes"
// Cumulative count of transmit errors encountered. // Cumulative count of transmit errors encountered.
colTxErrors string = "tx_errors" colTxErrors string = "tx_errors"
// Filesystem device.
colFsDevice = "fs_device"
// Filesystem capacity.
colFsCapacity = "fs_capacity"
// Filesystem available space.
colFsFree = "fs_free"
) )
// TODO(jnagal): Infer schema through reflection. (See bigquery/client/example) // TODO(jnagal): Infer schema through reflection. (See bigquery/client/example)
@ -151,16 +157,30 @@ func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema {
Type: typeInteger, Type: typeInteger,
Name: colTxErrors, Name: colTxErrors,
} }
i++
fields[i] = &bigquery.TableFieldSchema{
Type: typeString,
Name: colFsDevice,
}
i++
fields[i] = &bigquery.TableFieldSchema{
Type: typeInteger,
Name: colFsCapacity,
}
i++
fields[i] = &bigquery.TableFieldSchema{
Type: typeInteger,
Name: colFsFree,
}
return &bigquery.TableSchema{ return &bigquery.TableSchema{
Fields: fields, Fields: fields,
} }
} }
func (self *bigqueryStorage) containerStatsToValues( func (self *bigqueryStorage) containerStatsToRows(
ref info.ContainerReference, ref info.ContainerReference,
stats *info.ContainerStats, stats *info.ContainerStats,
) (row map[string]interface{}) { ) (row map[string]interface{}) {
row = make(map[string]interface{}) row = make(map[string]interface{})
// Timestamp // Timestamp
@ -216,6 +236,20 @@ func (self *bigqueryStorage) containerStatsToValues(
return return
} }
func (self *bigqueryStorage) containerFilesystemStatsToRows(
ref info.ContainerReference,
stats *info.ContainerStats,
) (rows []map[string]interface{}) {
for _, fsStat := range stats.Filesystem {
row := make(map[string]interface{}, 0)
row[colFsDevice] = fsStat.Device
row[colFsCapacity] = fsStat.Capacity
row[colFsFree] = fsStat.Free
rows = append(rows, row)
}
return rows
}
func convertToUint64(v interface{}) (uint64, error) { func convertToUint64(v interface{}) (uint64, error) {
if v == nil { if v == nil {
return 0, nil return 0, nil
@ -254,9 +288,10 @@ func convertToUint64(v interface{}) (uint64, error) {
func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) {
stats := &info.ContainerStats{ stats := &info.ContainerStats{
Cpu: &info.CpuStats{}, Cpu: &info.CpuStats{},
Memory: &info.MemoryStats{}, Memory: &info.MemoryStats{},
Network: &info.NetworkStats{}, Network: &info.NetworkStats{},
Filesystem: make([]info.FsStats, 0),
} }
var err error var err error
for i, col := range columns { for i, col := range columns {
@ -309,6 +344,36 @@ func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []i
stats.Network.TxBytes, err = convertToUint64(v) stats.Network.TxBytes, err = convertToUint64(v)
case col == colTxErrors: case col == colTxErrors:
stats.Network.TxErrors, err = convertToUint64(v) stats.Network.TxErrors, err = convertToUint64(v)
case col == colFsDevice:
device, ok := v.(string)
if !ok {
return nil, fmt.Errorf("filesystem name field is not a string: %+v", v)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Device: device})
} else {
stats.Filesystem[0].Device = device
}
case col == colFsCapacity:
capacity, err := convertToUint64(v)
if err != nil {
return nil, fmt.Errorf("filesystem capacity field %+v invalid: %s", v, err)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Capacity: capacity})
} else {
stats.Filesystem[0].Capacity = capacity
}
case col == colFsFree:
free, err := convertToUint64(v)
if err != nil {
return nil, fmt.Errorf("filesystem free field %+v invalid: %s", v, err)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Free: free})
} else {
stats.Filesystem[0].Free = free
}
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
@ -321,12 +386,14 @@ func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.C
if stats == nil || stats.Cpu == nil || stats.Memory == nil { if stats == nil || stats.Cpu == nil || stats.Memory == nil {
return nil return nil
} }
rows := make([]map[string]interface{}, 0)
row := self.containerStatsToValues(ref, stats) rows = append(rows, self.containerStatsToRows(ref, stats))
rows = append(rows, self.containerFilesystemStatsToRows(ref, stats)...)
err := self.client.InsertRow(row) for _, row := range rows {
if err != nil { err := self.client.InsertRow(row)
return err if err != nil {
return err
}
} }
return nil return nil
} }

View File

@ -51,13 +51,19 @@ const (
colTxBytes string = "tx_bytes" colTxBytes string = "tx_bytes"
// Cumulative count of transmit errors encountered. // Cumulative count of transmit errors encountered.
colTxErrors string = "tx_errors" colTxErrors string = "tx_errors"
// Filesystem device.
colFsDevice = "fs_device"
// Filesystem capacity.
colFsCapacity = "fs_capacity"
// Filesystem available space.
colFsFree = "fs_free"
) )
func (self *influxdbStorage) containerStatsToValues( func (self *influxdbStorage) getSeriesDefaultValues(
ref info.ContainerReference, ref info.ContainerReference,
stats *info.ContainerStats, stats *info.ContainerStats,
) (columns []string, values []interface{}) { columns []string,
values []interface{}) {
// Timestamp // Timestamp
columns = append(columns, colTimestamp) columns = append(columns, colTimestamp)
values = append(values, stats.Timestamp.UnixNano()/1E3) values = append(values, stats.Timestamp.UnixNano()/1E3)
@ -73,7 +79,38 @@ func (self *influxdbStorage) containerStatsToValues(
} else { } else {
values = append(values, ref.Name) values = append(values, ref.Name)
} }
}
// In order to maintain a fixed column format, we add a new series for each filesystem partition.
func (self *influxdbStorage) containerFilesystemStatsToSeries(
ref info.ContainerReference,
stats *info.ContainerStats) (series []*influxdb.Series) {
if len(stats.Filesystem) == 0 {
return series
}
for _, fsStat := range stats.Filesystem {
columns := make([]string, 0)
values := make([]interface{}, 0)
self.getSeriesDefaultValues(ref, stats, columns, values)
columns = append(columns, colFsDevice)
values = append(values, fsStat.Device)
columns = append(columns, colFsCapacity)
values = append(values, fsStat.Capacity)
columns = append(columns, colFsFree)
values = append(values, fsStat.Free)
series = append(series, self.newSeries(columns, values))
}
return series
}
func (self *influxdbStorage) containerStatsToValues(
ref info.ContainerReference,
stats *info.ContainerStats,
) (columns []string, values []interface{}) {
self.getSeriesDefaultValues(ref, stats, columns, values)
// Cumulative Cpu Usage // Cumulative Cpu Usage
columns = append(columns, colCpuCumulativeUsage) columns = append(columns, colCpuCumulativeUsage)
values = append(values, stats.Cpu.Usage.Total) values = append(values, stats.Cpu.Usage.Total)
@ -139,9 +176,10 @@ func convertToUint64(v interface{}) (uint64, error) {
func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) {
stats := &info.ContainerStats{ stats := &info.ContainerStats{
Cpu: &info.CpuStats{}, Cpu: &info.CpuStats{},
Memory: &info.MemoryStats{}, Memory: &info.MemoryStats{},
Network: &info.NetworkStats{}, Network: &info.NetworkStats{},
Filesystem: make([]info.FsStats, 0),
} }
var err error var err error
for i, col := range columns { for i, col := range columns {
@ -176,6 +214,36 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i
stats.Network.TxBytes, err = convertToUint64(v) stats.Network.TxBytes, err = convertToUint64(v)
case col == colTxErrors: case col == colTxErrors:
stats.Network.TxErrors, err = convertToUint64(v) stats.Network.TxErrors, err = convertToUint64(v)
case col == colFsDevice:
device, ok := v.(string)
if !ok {
return nil, fmt.Errorf("filesystem name field is not a string: %+v", v)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Device: device})
} else {
stats.Filesystem[0].Device = device
}
case col == colFsCapacity:
capacity, err := convertToUint64(v)
if err != nil {
return nil, fmt.Errorf("filesystem capacity field %+v invalid: %s", v, err)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Capacity: capacity})
} else {
stats.Filesystem[0].Capacity = capacity
}
case col == colFsFree:
free, err := convertToUint64(v)
if err != nil {
return nil, fmt.Errorf("filesystem free field %+v invalid: %s", v, err)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Free: free})
} else {
stats.Filesystem[0].Free = free
}
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
@ -196,13 +264,14 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C
if stats == nil || stats.Cpu == nil || stats.Memory == nil { if stats == nil || stats.Cpu == nil || stats.Memory == nil {
return nil return nil
} }
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
var seriesToFlush []*influxdb.Series var seriesToFlush []*influxdb.Series
func() { func() {
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
self.lock.Lock() self.lock.Lock()
defer self.lock.Unlock() defer self.lock.Unlock()
series := self.newSeries(self.containerStatsToValues(ref, stats))
self.series = append(self.series, series) self.series = append(self.series, self.newSeries(self.containerStatsToValues(ref, stats)))
self.series = append(self.series, self.containerFilesystemStatsToSeries(ref, stats)...)
if self.readyToFlush() { if self.readyToFlush() {
seriesToFlush = self.series seriesToFlush = self.series
self.series = make([]*influxdb.Series, 0) self.series = make([]*influxdb.Series, 0)

View File

@ -86,6 +86,10 @@ func (self *influxDbTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool {
if !reflect.DeepEqual(a.Network, b.Network) { if !reflect.DeepEqual(a.Network, b.Network) {
return false return false
} }
if !reflect.DeepEqual(a.Filesystem, b.Filesystem) {
return false
}
return true return true
} }

View File

@ -58,6 +58,11 @@ func buildTrace(cpu, mem []uint64, duration time.Duration) []*info.ContainerStat
stats.Network.RxErrors = uint64(rand.Intn(1000)) stats.Network.RxErrors = uint64(rand.Intn(1000))
stats.Network.TxBytes = uint64(rand.Intn(100000)) stats.Network.TxBytes = uint64(rand.Intn(100000))
stats.Network.TxErrors = uint64(rand.Intn(1000)) stats.Network.TxErrors = uint64(rand.Intn(1000))
stats.Filesystem = make([]info.FsStats, 1)
stats.Filesystem[0].Device = "/dev/sda1"
stats.Filesystem[0].Capacity = 1024000000
stats.Filesystem[0].Free = 1024000
ret[i] = stats ret[i] = stats
} }
return ret return ret
@ -106,6 +111,10 @@ func DefaultStatsEq(a, b *info.ContainerStats) bool {
if !reflect.DeepEqual(a.Network, b.Network) { if !reflect.DeepEqual(a.Network, b.Network) {
return false return false
} }
if !reflect.DeepEqual(a.Filesystem, b.Filesystem) {
return false
}
return true return true
} }