From 80fabb3e607489288c282f6c2aacb13b3d97a406 Mon Sep 17 00:00:00 2001 From: Rohit Jnagal Date: Mon, 1 Jun 2015 16:52:11 +0000 Subject: [PATCH] Remove RecentStats() from all storage drivers except memory. We should probably make memory as a cache type rather than storage. RecentStats() can then be removed from the storage interface. Will try it out as a separate PR. --- storage/bigquery/bigquery.go | 168 +-------------------- storage/bigquery/client/client.go | 53 ------- storage/bigquery/client/example/example.go | 21 --- storage/influxdb/influxdb.go | 149 +----------------- storage/influxdb/influxdb_test.go | 2 +- storage/redis/redis.go | 3 +- storage/storage.go | 3 + 7 files changed, 12 insertions(+), 387 deletions(-) diff --git a/storage/bigquery/bigquery.go b/storage/bigquery/bigquery.go index 5f5761ef..57a40c5d 100644 --- a/storage/bigquery/bigquery.go +++ b/storage/bigquery/bigquery.go @@ -15,10 +15,6 @@ package bigquery import ( - "fmt" - "strconv" - "time" - bigquery "code.google.com/p/google-api-go-client/bigquery/v2" info "github.com/google/cadvisor/info/v1" "github.com/google/cadvisor/storage" @@ -248,135 +244,6 @@ func (self *bigqueryStorage) containerFilesystemStatsToRows( return rows } -func convertToUint64(v interface{}) (uint64, error) { - if v == nil { - return 0, nil - } - switch x := v.(type) { - case uint64: - return x, nil - case int: - if x < 0 { - return 0, fmt.Errorf("negative value: %v", x) - } - return uint64(x), nil - case int32: - if x < 0 { - return 0, fmt.Errorf("negative value: %v", x) - } - return uint64(x), nil - case int64: - if x < 0 { - return 0, fmt.Errorf("negative value: %v", x) - } - return uint64(x), nil - case float64: - if x < 0 { - return 0, fmt.Errorf("negative value: %v", x) - } - return uint64(x), nil - case uint32: - return uint64(x), nil - case string: - return strconv.ParseUint(x, 10, 64) - } - - return 0, fmt.Errorf("unknown type") -} - -func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { - stats := &info.ContainerStats{ - Filesystem: make([]info.FsStats, 0), - } - var err error - for i, col := range columns { - v := values[i] - switch { - case col == colTimestamp: - if t, ok := v.(time.Time); ok { - stats.Timestamp = t - } - case col == colMachineName: - if m, ok := v.(string); ok { - if m != self.machineName { - return nil, fmt.Errorf("different machine") - } - } else { - return nil, fmt.Errorf("machine name field is not a string: %v", v) - } - // Cumulative Cpu Usage - case col == colCpuCumulativeUsage: - stats.Cpu.Usage.Total, err = convertToUint64(v) - // Cumulative Cpu used by the system - case col == colCpuCumulativeUsageSystem: - stats.Cpu.Usage.System, err = convertToUint64(v) - // Cumulative Cpu Usage in user mode - case col == colCpuCumulativeUsageUser: - stats.Cpu.Usage.User, err = convertToUint64(v) - // Memory Usage - case col == colMemoryUsage: - stats.Memory.Usage, err = convertToUint64(v) - // Working set size - case col == colMemoryWorkingSet: - stats.Memory.WorkingSet, err = convertToUint64(v) - // container page fault - case col == colMemoryContainerPgfault: - stats.Memory.ContainerData.Pgfault, err = convertToUint64(v) - // container major page fault - case col == colMemoryContainerPgmajfault: - stats.Memory.ContainerData.Pgmajfault, err = convertToUint64(v) - // hierarchical page fault - case col == colMemoryHierarchicalPgfault: - stats.Memory.HierarchicalData.Pgfault, err = convertToUint64(v) - // hierarchical major page fault - case col == colMemoryHierarchicalPgmajfault: - stats.Memory.HierarchicalData.Pgmajfault, err = convertToUint64(v) - case col == colRxBytes: - stats.Network.RxBytes, err = convertToUint64(v) - case col == colRxErrors: - stats.Network.RxErrors, err = convertToUint64(v) - case col == colTxBytes: - stats.Network.TxBytes, err = convertToUint64(v) - case col == colTxErrors: - stats.Network.TxErrors, err = convertToUint64(v) - case col == colFsDevice: - device, ok := v.(string) - if !ok { - return nil, fmt.Errorf("filesystem name field is not a string: %+v", v) - } - if len(stats.Filesystem) == 0 { - stats.Filesystem = append(stats.Filesystem, info.FsStats{Device: device}) - } else { - stats.Filesystem[0].Device = device - } - case col == colFsLimit: - limit, err := convertToUint64(v) - if err != nil { - return nil, fmt.Errorf("filesystem limit field %+v invalid: %s", v, err) - } - if len(stats.Filesystem) == 0 { - stats.Filesystem = append(stats.Filesystem, info.FsStats{Limit: limit}) - } else { - stats.Filesystem[0].Limit = limit - } - case col == colFsUsage: - usage, err := convertToUint64(v) - if err != nil { - return nil, fmt.Errorf("filesystem usage field %+v invalid: %s", v, err) - } - if len(stats.Filesystem) == 0 { - stats.Filesystem = append(stats.Filesystem, info.FsStats{Usage: usage}) - } else { - stats.Filesystem[0].Usage = usage - } - } - if err != nil { - return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) - } - } - return stats, nil -} - func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { if stats == nil { return nil @@ -393,40 +260,9 @@ func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.C return nil } -func (self *bigqueryStorage) getRecentRows(containerName string, numRows int) ([]string, [][]interface{}, error) { - tableName, err := self.client.GetTableName() - if err != nil { - return nil, nil, err - } - - query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName) - if numRows > 0 { - query = fmt.Sprintf("%v LIMIT %v", query, numRows) - } - - return self.client.Query(query) -} - +// Recent stats is not required to be implemented by any storage driver other than the in-memory cache. func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { - if numStats == 0 { - return nil, nil - } - header, rows, err := self.getRecentRows(containerName, numStats) - if err != nil { - return nil, err - } - statsList := make([]*info.ContainerStats, 0, len(rows)) - for _, row := range rows { - stats, err := self.valuesToContainerStats(header, row) - if err != nil { - return nil, err - } - if stats == nil { - continue - } - statsList = append(statsList, stats) - } - return statsList, nil + return nil, nil } func (self *bigqueryStorage) Close() error { diff --git a/storage/bigquery/client/client.go b/storage/bigquery/client/client.go index 4187db7d..af7736b5 100644 --- a/storage/bigquery/client/client.go +++ b/storage/bigquery/client/client.go @@ -232,56 +232,3 @@ func (c *Client) InsertRow(rowData map[string]interface{}) error { } return nil } - -// Returns a bigtable table name (format: datasetID.tableID) -func (c *Client) GetTableName() (string, error) { - if c.service == nil || c.datasetId == "" || c.tableId == "" { - return "", fmt.Errorf("table not setup") - } - return fmt.Sprintf("%s.%s", c.datasetId, c.tableId), nil -} - -// Do a synchronous query on bigtable and return a header and data rows. -// Number of rows are capped to queryLimit. -func (c *Client) Query(query string) ([]string, [][]interface{}, error) { - service, err := c.getService() - if err != nil { - return nil, nil, err - } - datasetRef := &bigquery.DatasetReference{ - DatasetId: c.datasetId, - ProjectId: *projectId, - } - - queryRequest := &bigquery.QueryRequest{ - DefaultDataset: datasetRef, - MaxResults: queryLimit, - Kind: "json", - Query: query, - } - - results, err := service.Jobs.Query(*projectId, queryRequest).Do() - if err != nil { - return nil, nil, err - } - numRows := results.TotalRows - if numRows < 1 { - return nil, nil, fmt.Errorf("query returned no data") - } - - headers := []string{} - for _, col := range results.Schema.Fields { - headers = append(headers, col.Name) - } - - rows := [][]interface{}{} - numColumns := len(results.Schema.Fields) - for _, data := range results.Rows { - row := make([]interface{}, numColumns) - for c := 0; c < numColumns; c++ { - row[c] = data.F[c].V - } - rows = append(rows, row) - } - return headers, rows, nil -} diff --git a/storage/bigquery/client/example/example.go b/storage/bigquery/client/example/example.go index 08eaffd3..f21ad0b7 100644 --- a/storage/bigquery/client/example/example.go +++ b/storage/bigquery/client/example/example.go @@ -84,25 +84,4 @@ func main() { panic(err) } } - - // Query - tableName, err := c.GetTableName() - if err != nil { - fmt.Printf("table not set") - panic(err) - } - - query := "SELECT * FROM " + tableName + " ORDER BY Timestamp LIMIT 100" - header, rows, err := c.Query(query) - if err != nil { - fmt.Printf("Failed query") - panic(err) - } - fmt.Printf("Headers: %v", header) - for _, row := range rows { - for i, val := range row { - fmt.Printf("%s:%v ", header[i], val) - } - fmt.Printf("\n") - } } diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 00aafcd8..3eac8ef2 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -139,114 +139,6 @@ func (self *influxdbStorage) containerStatsToValues( return columns, values } -func convertToUint64(v interface{}) (uint64, error) { - if v == nil { - return 0, nil - } - switch x := v.(type) { - case uint64: - return x, nil - case int: - if x < 0 { - return 0, fmt.Errorf("negative value: %v", x) - } - return uint64(x), nil - case int32: - if x < 0 { - return 0, fmt.Errorf("negative value: %v", x) - } - return uint64(x), nil - case int64: - if x < 0 { - return 0, fmt.Errorf("negative value: %v", x) - } - return uint64(x), nil - case float64: - if x < 0 { - return 0, fmt.Errorf("negative value: %v", x) - } - return uint64(x), nil - case uint32: - return uint64(x), nil - } - return 0, fmt.Errorf("unknown type") -} - -func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { - stats := &info.ContainerStats{ - Filesystem: make([]info.FsStats, 0), - } - var err error - for i, col := range columns { - v := values[i] - switch { - case col == colTimestamp: - if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() { - stats.Timestamp = time.Unix(int64(f64sec)/1E3, (int64(f64sec)%1E3)*1E6) - } - case col == colMachineName: - if m, ok := v.(string); ok { - if m != self.machineName { - return nil, fmt.Errorf("different machine") - } - } else { - return nil, fmt.Errorf("machine name field is not a string: %v", v) - } - // Cumulative Cpu Usage - case col == colCpuCumulativeUsage: - stats.Cpu.Usage.Total, err = convertToUint64(v) - // Memory Usage - case col == colMemoryUsage: - stats.Memory.Usage, err = convertToUint64(v) - // Working set size - case col == colMemoryWorkingSet: - stats.Memory.WorkingSet, err = convertToUint64(v) - case col == colRxBytes: - stats.Network.RxBytes, err = convertToUint64(v) - case col == colRxErrors: - stats.Network.RxErrors, err = convertToUint64(v) - case col == colTxBytes: - stats.Network.TxBytes, err = convertToUint64(v) - case col == colTxErrors: - stats.Network.TxErrors, err = convertToUint64(v) - case col == colFsDevice: - device, ok := v.(string) - if !ok { - return nil, fmt.Errorf("filesystem name field is not a string: %+v", v) - } - if len(stats.Filesystem) == 0 { - stats.Filesystem = append(stats.Filesystem, info.FsStats{Device: device}) - } else { - stats.Filesystem[0].Device = device - } - case col == colFsLimit: - limit, err := convertToUint64(v) - if err != nil { - return nil, fmt.Errorf("filesystem limit field %+v invalid: %s", v, err) - } - if len(stats.Filesystem) == 0 { - stats.Filesystem = append(stats.Filesystem, info.FsStats{Limit: limit}) - } else { - stats.Filesystem[0].Limit = limit - } - case col == colFsUsage: - usage, err := convertToUint64(v) - if err != nil { - return nil, fmt.Errorf("filesystem usage field %+v invalid: %s", v, err) - } - if len(stats.Filesystem) == 0 { - stats.Filesystem = append(stats.Filesystem, info.FsStats{Usage: usage}) - } else { - stats.Filesystem[0].Usage = usage - } - } - if err != nil { - return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) - } - } - return stats, nil -} - func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) { self.readyToFlush = readyToFlush } @@ -283,42 +175,6 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C return nil } -func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { - if numStats == 0 { - return nil, nil - } - // TODO(dengnan): select only columns that we need - // TODO(dengnan): escape names - query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName) - if numStats > 0 { - query = fmt.Sprintf("%v limit %v", query, numStats) - } - series, err := self.client.Query(query) - if err != nil { - return nil, err - } - statsList := make([]*info.ContainerStats, 0, len(series)) - // By default, influxDB returns data in time descending order. - // RecentStats() requires stats in time increasing order, - // so we need to go through from the last one to the first one. - for i := len(series) - 1; i >= 0; i-- { - s := series[i] - - for j := len(s.Points) - 1; j >= 0; j-- { - values := s.Points[j] - stats, err := self.valuesToContainerStats(s.Columns, values) - if err != nil { - return nil, err - } - if stats == nil { - continue - } - statsList = append(statsList, stats) - } - } - return statsList, nil -} - func (self *influxdbStorage) Close() error { self.client = nil return nil @@ -373,3 +229,8 @@ func New(machineName, ret.readyToFlush = ret.defaultReadyToFlush return ret, nil } + +// RecentStats is only implemented by in-memory cache storage. +func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { + return nil, nil +} diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 63a7dd4b..d7261be7 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -51,7 +51,7 @@ func (self *influxDbTestStorageDriver) AddStats(ref info.ContainerReference, sta } func (self *influxDbTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { - return self.base.RecentStats(containerName, numStats) + return nil, nil } func (self *influxDbTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) { diff --git a/storage/redis/redis.go b/storage/redis/redis.go index e4c8fd58..117f83f1 100644 --- a/storage/redis/redis.go +++ b/storage/redis/redis.go @@ -89,8 +89,7 @@ func (self *redisStorage) AddStats(ref info.ContainerReference, stats *info.Cont return nil } -// We just need to push the data to the redis, do not need to pull from the redis, -//so we do not override RecentStats() +// RecentStats is only implemented by in-memory cache storage. func (self *redisStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { return nil, nil } diff --git a/storage/storage.go b/storage/storage.go index 3404b9a1..a3e9c822 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -19,6 +19,9 @@ import info "github.com/google/cadvisor/info/v1" type StorageDriver interface { AddStats(ref info.ContainerReference, stats *info.ContainerStats) error + // TODO(rjnagal): RecentStats() is only required by in-memory cache + // storage. Refactor and remove from the interface. + // // Read most recent stats. numStats indicates max number of stats // returned. The returned stats must be consecutive observed stats. If // numStats < 0, then return all stats stored in the storage. The