diff --git a/fs/fs.go b/fs/fs.go index 4309e0db..bf07959e 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -16,6 +16,7 @@ import ( "github.com/docker/docker/pkg/mount" "github.com/golang/glog" + "github.com/google/cadvisor/info" ) type partition struct { @@ -47,14 +48,14 @@ func NewFsInfo() (FsInfo, error) { return &FsInfoImpl{partitions}, nil } -func (self *FsInfoImpl) GetFsStats() ([]FsStats, error) { - filesystems := make([]FsStats, 0) +func (self *FsInfoImpl) GetFsStats() ([]info.FsStats, error) { + filesystems := make([]info.FsStats, 0) for device, partition := range self.partitions { total, free, err := getVfsStats(partition.mountpoint) if err != nil { glog.Errorf("Statvfs failed. Error: %v", err) } else { - fsStat := FsStats{ + fsStat := info.FsStats{ Device: device, Major: uint(partition.major), Minor: uint(partition.minor), diff --git a/fs/types.go b/fs/types.go index 4fabdbc2..8cc78877 100644 --- a/fs/types.go +++ b/fs/types.go @@ -1,14 +1,8 @@ package fs -type FsStats struct { - Device string `json:"device,omitempty"` - Major uint `json:"major"` - Minor uint `json:"minor"` - Capacity uint64 `json:"capacity"` - Free uint64 `json:"free"` -} +import "github.com/google/cadvisor/info" type FsInfo interface { // Returns capacity and free space, in bytes, of all the ext2, ext3, ext4 filesystems on the host. - GetFsStats() ([]FsStats, error) + GetFsStats() ([]info.FsStats, error) } diff --git a/info/container.go b/info/container.go index 22c31e78..28cd1118 100644 --- a/info/container.go +++ b/info/container.go @@ -17,8 +17,6 @@ package info import ( "reflect" "time" - - "github.com/google/cadvisor/fs" ) type CpuSpec struct { @@ -233,6 +231,14 @@ type NetworkStats struct { TxDropped uint64 `json:"tx_dropped"` } +type FsStats struct { + Device string `json:"device,omitempty"` + Major uint `json:"major"` + Minor uint `json:"minor"` + Capacity uint64 `json:"capacity"` + Free uint64 `json:"free"` +} + type ContainerStats struct { // The time of this stat point. Timestamp time.Time `json:"timestamp"` @@ -241,7 +247,7 @@ type ContainerStats struct { Memory *MemoryStats `json:"memory,omitempty"` Network *NetworkStats `json:"network,omitempty"` // Filesystem statistics - Filesystem []fs.FsStats `json:"filesystem,omitempty"` + Filesystem []FsStats `json:"filesystem,omitempty"` } // Makes a deep copy of the ContainerStats and returns a pointer to the new diff --git a/pages/containers.go b/pages/containers.go index 18b86ccd..640e7725 100644 --- a/pages/containers.go +++ b/pages/containers.go @@ -27,7 +27,6 @@ import ( "time" "github.com/golang/glog" - "github.com/google/cadvisor/fs" "github.com/google/cadvisor/info" "github.com/google/cadvisor/manager" ) @@ -256,9 +255,9 @@ func getColdMemoryPercent(spec *info.ContainerSpec, stats []*info.ContainerStats return toMemoryPercent((latestStats.Usage)-(latestStats.WorkingSet), spec, machine) } -func getFsStats(stats []*info.ContainerStats) []fs.FsStats { +func getFsStats(stats []*info.ContainerStats) []info.FsStats { if len(stats) == 0 { - return []fs.FsStats{} + return []info.FsStats{} } return stats[len(stats)-1].Filesystem } diff --git a/storage/bigquery/bigquery.go b/storage/bigquery/bigquery.go index 4586e4a0..e57dafa8 100644 --- a/storage/bigquery/bigquery.go +++ b/storage/bigquery/bigquery.go @@ -63,6 +63,12 @@ const ( colTxBytes string = "tx_bytes" // Cumulative count of transmit errors encountered. colTxErrors string = "tx_errors" + // Filesystem device. + colFsDevice = "fs_device" + // Filesystem capacity. + colFsCapacity = "fs_capacity" + // Filesystem available space. + colFsFree = "fs_free" ) // TODO(jnagal): Infer schema through reflection. (See bigquery/client/example) @@ -151,16 +157,30 @@ func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema { Type: typeInteger, Name: colTxErrors, } + i++ + fields[i] = &bigquery.TableFieldSchema{ + Type: typeString, + Name: colFsDevice, + } + i++ + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colFsCapacity, + } + i++ + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colFsFree, + } return &bigquery.TableSchema{ Fields: fields, } } -func (self *bigqueryStorage) containerStatsToValues( +func (self *bigqueryStorage) containerStatsToRows( ref info.ContainerReference, stats *info.ContainerStats, ) (row map[string]interface{}) { - row = make(map[string]interface{}) // Timestamp @@ -216,6 +236,20 @@ func (self *bigqueryStorage) containerStatsToValues( return } +func (self *bigqueryStorage) containerFilesystemStatsToRows( + ref info.ContainerReference, + stats *info.ContainerStats, +) (rows []map[string]interface{}) { + for _, fsStat := range stats.Filesystem { + row := make(map[string]interface{}, 0) + row[colFsDevice] = fsStat.Device + row[colFsCapacity] = fsStat.Capacity + row[colFsFree] = fsStat.Free + rows = append(rows, row) + } + return rows +} + func convertToUint64(v interface{}) (uint64, error) { if v == nil { return 0, nil @@ -254,9 +288,10 @@ func convertToUint64(v interface{}) (uint64, error) { func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { stats := &info.ContainerStats{ - Cpu: &info.CpuStats{}, - Memory: &info.MemoryStats{}, - Network: &info.NetworkStats{}, + Cpu: &info.CpuStats{}, + Memory: &info.MemoryStats{}, + Network: &info.NetworkStats{}, + Filesystem: make([]info.FsStats, 0), } var err error for i, col := range columns { @@ -309,6 +344,36 @@ func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []i stats.Network.TxBytes, err = convertToUint64(v) case col == colTxErrors: stats.Network.TxErrors, err = convertToUint64(v) + case col == colFsDevice: + device, ok := v.(string) + if !ok { + return nil, fmt.Errorf("filesystem name field is not a string: %+v", v) + } + if len(stats.Filesystem) == 0 { + stats.Filesystem = append(stats.Filesystem, info.FsStats{Device: device}) + } else { + stats.Filesystem[0].Device = device + } + case col == colFsCapacity: + capacity, err := convertToUint64(v) + if err != nil { + return nil, fmt.Errorf("filesystem capacity field %+v invalid: %s", v, err) + } + if len(stats.Filesystem) == 0 { + stats.Filesystem = append(stats.Filesystem, info.FsStats{Capacity: capacity}) + } else { + stats.Filesystem[0].Capacity = capacity + } + case col == colFsFree: + free, err := convertToUint64(v) + if err != nil { + return nil, fmt.Errorf("filesystem free field %+v invalid: %s", v, err) + } + if len(stats.Filesystem) == 0 { + stats.Filesystem = append(stats.Filesystem, info.FsStats{Free: free}) + } else { + stats.Filesystem[0].Free = free + } } if err != nil { return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) @@ -321,12 +386,14 @@ func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.C if stats == nil || stats.Cpu == nil || stats.Memory == nil { return nil } - - row := self.containerStatsToValues(ref, stats) - - err := self.client.InsertRow(row) - if err != nil { - return err + rows := make([]map[string]interface{}, 0) + rows = append(rows, self.containerStatsToRows(ref, stats)) + rows = append(rows, self.containerFilesystemStatsToRows(ref, stats)...) + for _, row := range rows { + err := self.client.InsertRow(row) + if err != nil { + return err + } } return nil } diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index ae7b3e6f..fc604e72 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -51,13 +51,19 @@ const ( colTxBytes string = "tx_bytes" // Cumulative count of transmit errors encountered. colTxErrors string = "tx_errors" + // Filesystem device. + colFsDevice = "fs_device" + // Filesystem capacity. + colFsCapacity = "fs_capacity" + // Filesystem available space. + colFsFree = "fs_free" ) -func (self *influxdbStorage) containerStatsToValues( +func (self *influxdbStorage) getSeriesDefaultValues( ref info.ContainerReference, stats *info.ContainerStats, -) (columns []string, values []interface{}) { - + columns []string, + values []interface{}) { // Timestamp columns = append(columns, colTimestamp) values = append(values, stats.Timestamp.UnixNano()/1E3) @@ -73,7 +79,38 @@ func (self *influxdbStorage) containerStatsToValues( } else { values = append(values, ref.Name) } +} +// In order to maintain a fixed column format, we add a new series for each filesystem partition. +func (self *influxdbStorage) containerFilesystemStatsToSeries( + ref info.ContainerReference, + stats *info.ContainerStats) (series []*influxdb.Series) { + if len(stats.Filesystem) == 0 { + return series + } + for _, fsStat := range stats.Filesystem { + columns := make([]string, 0) + values := make([]interface{}, 0) + self.getSeriesDefaultValues(ref, stats, columns, values) + + columns = append(columns, colFsDevice) + values = append(values, fsStat.Device) + + columns = append(columns, colFsCapacity) + values = append(values, fsStat.Capacity) + + columns = append(columns, colFsFree) + values = append(values, fsStat.Free) + series = append(series, self.newSeries(columns, values)) + } + return series +} + +func (self *influxdbStorage) containerStatsToValues( + ref info.ContainerReference, + stats *info.ContainerStats, +) (columns []string, values []interface{}) { + self.getSeriesDefaultValues(ref, stats, columns, values) // Cumulative Cpu Usage columns = append(columns, colCpuCumulativeUsage) values = append(values, stats.Cpu.Usage.Total) @@ -139,9 +176,10 @@ func convertToUint64(v interface{}) (uint64, error) { func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { stats := &info.ContainerStats{ - Cpu: &info.CpuStats{}, - Memory: &info.MemoryStats{}, - Network: &info.NetworkStats{}, + Cpu: &info.CpuStats{}, + Memory: &info.MemoryStats{}, + Network: &info.NetworkStats{}, + Filesystem: make([]info.FsStats, 0), } var err error for i, col := range columns { @@ -176,6 +214,36 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i stats.Network.TxBytes, err = convertToUint64(v) case col == colTxErrors: stats.Network.TxErrors, err = convertToUint64(v) + case col == colFsDevice: + device, ok := v.(string) + if !ok { + return nil, fmt.Errorf("filesystem name field is not a string: %+v", v) + } + if len(stats.Filesystem) == 0 { + stats.Filesystem = append(stats.Filesystem, info.FsStats{Device: device}) + } else { + stats.Filesystem[0].Device = device + } + case col == colFsCapacity: + capacity, err := convertToUint64(v) + if err != nil { + return nil, fmt.Errorf("filesystem capacity field %+v invalid: %s", v, err) + } + if len(stats.Filesystem) == 0 { + stats.Filesystem = append(stats.Filesystem, info.FsStats{Capacity: capacity}) + } else { + stats.Filesystem[0].Capacity = capacity + } + case col == colFsFree: + free, err := convertToUint64(v) + if err != nil { + return nil, fmt.Errorf("filesystem free field %+v invalid: %s", v, err) + } + if len(stats.Filesystem) == 0 { + stats.Filesystem = append(stats.Filesystem, info.FsStats{Free: free}) + } else { + stats.Filesystem[0].Free = free + } } if err != nil { return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) @@ -196,13 +264,14 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C if stats == nil || stats.Cpu == nil || stats.Memory == nil { return nil } - // AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write. var seriesToFlush []*influxdb.Series func() { + // AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write. self.lock.Lock() defer self.lock.Unlock() - series := self.newSeries(self.containerStatsToValues(ref, stats)) - self.series = append(self.series, series) + + self.series = append(self.series, self.newSeries(self.containerStatsToValues(ref, stats))) + self.series = append(self.series, self.containerFilesystemStatsToSeries(ref, stats)...) if self.readyToFlush() { seriesToFlush = self.series self.series = make([]*influxdb.Series, 0) diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 9f93a921..a50208ff 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -86,6 +86,10 @@ func (self *influxDbTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool { if !reflect.DeepEqual(a.Network, b.Network) { return false } + + if !reflect.DeepEqual(a.Filesystem, b.Filesystem) { + return false + } return true } diff --git a/storage/test/storagetests.go b/storage/test/storagetests.go index 21385332..6aeff3bc 100644 --- a/storage/test/storagetests.go +++ b/storage/test/storagetests.go @@ -58,6 +58,11 @@ func buildTrace(cpu, mem []uint64, duration time.Duration) []*info.ContainerStat stats.Network.RxErrors = uint64(rand.Intn(1000)) stats.Network.TxBytes = uint64(rand.Intn(100000)) stats.Network.TxErrors = uint64(rand.Intn(1000)) + + stats.Filesystem = make([]info.FsStats, 1) + stats.Filesystem[0].Device = "/dev/sda1" + stats.Filesystem[0].Capacity = 1024000000 + stats.Filesystem[0].Free = 1024000 ret[i] = stats } return ret @@ -106,6 +111,10 @@ func DefaultStatsEq(a, b *info.ContainerStats) bool { if !reflect.DeepEqual(a.Network, b.Network) { return false } + if !reflect.DeepEqual(a.Filesystem, b.Filesystem) { + return false + } + return true }