Remove RecentStats() from all storage drivers except memory.

We should probably make memory as a cache type rather than storage.
RecentStats() can then be removed from the storage interface.
Will try it out as a separate PR.
This commit is contained in:
Rohit Jnagal 2015-06-01 16:52:11 +00:00
parent e21cab8d43
commit 80fabb3e60
7 changed files with 12 additions and 387 deletions

View File

@ -15,10 +15,6 @@
package bigquery
import (
"fmt"
"strconv"
"time"
bigquery "code.google.com/p/google-api-go-client/bigquery/v2"
info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/storage"
@ -248,135 +244,6 @@ func (self *bigqueryStorage) containerFilesystemStatsToRows(
return rows
}
func convertToUint64(v interface{}) (uint64, error) {
if v == nil {
return 0, nil
}
switch x := v.(type) {
case uint64:
return x, nil
case int:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case int32:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case int64:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case float64:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case uint32:
return uint64(x), nil
case string:
return strconv.ParseUint(x, 10, 64)
}
return 0, fmt.Errorf("unknown type")
}
func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) {
stats := &info.ContainerStats{
Filesystem: make([]info.FsStats, 0),
}
var err error
for i, col := range columns {
v := values[i]
switch {
case col == colTimestamp:
if t, ok := v.(time.Time); ok {
stats.Timestamp = t
}
case col == colMachineName:
if m, ok := v.(string); ok {
if m != self.machineName {
return nil, fmt.Errorf("different machine")
}
} else {
return nil, fmt.Errorf("machine name field is not a string: %v", v)
}
// Cumulative Cpu Usage
case col == colCpuCumulativeUsage:
stats.Cpu.Usage.Total, err = convertToUint64(v)
// Cumulative Cpu used by the system
case col == colCpuCumulativeUsageSystem:
stats.Cpu.Usage.System, err = convertToUint64(v)
// Cumulative Cpu Usage in user mode
case col == colCpuCumulativeUsageUser:
stats.Cpu.Usage.User, err = convertToUint64(v)
// Memory Usage
case col == colMemoryUsage:
stats.Memory.Usage, err = convertToUint64(v)
// Working set size
case col == colMemoryWorkingSet:
stats.Memory.WorkingSet, err = convertToUint64(v)
// container page fault
case col == colMemoryContainerPgfault:
stats.Memory.ContainerData.Pgfault, err = convertToUint64(v)
// container major page fault
case col == colMemoryContainerPgmajfault:
stats.Memory.ContainerData.Pgmajfault, err = convertToUint64(v)
// hierarchical page fault
case col == colMemoryHierarchicalPgfault:
stats.Memory.HierarchicalData.Pgfault, err = convertToUint64(v)
// hierarchical major page fault
case col == colMemoryHierarchicalPgmajfault:
stats.Memory.HierarchicalData.Pgmajfault, err = convertToUint64(v)
case col == colRxBytes:
stats.Network.RxBytes, err = convertToUint64(v)
case col == colRxErrors:
stats.Network.RxErrors, err = convertToUint64(v)
case col == colTxBytes:
stats.Network.TxBytes, err = convertToUint64(v)
case col == colTxErrors:
stats.Network.TxErrors, err = convertToUint64(v)
case col == colFsDevice:
device, ok := v.(string)
if !ok {
return nil, fmt.Errorf("filesystem name field is not a string: %+v", v)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Device: device})
} else {
stats.Filesystem[0].Device = device
}
case col == colFsLimit:
limit, err := convertToUint64(v)
if err != nil {
return nil, fmt.Errorf("filesystem limit field %+v invalid: %s", v, err)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Limit: limit})
} else {
stats.Filesystem[0].Limit = limit
}
case col == colFsUsage:
usage, err := convertToUint64(v)
if err != nil {
return nil, fmt.Errorf("filesystem usage field %+v invalid: %s", v, err)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Usage: usage})
} else {
stats.Filesystem[0].Usage = usage
}
}
if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
}
}
return stats, nil
}
func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
if stats == nil {
return nil
@ -393,40 +260,9 @@ func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.C
return nil
}
func (self *bigqueryStorage) getRecentRows(containerName string, numRows int) ([]string, [][]interface{}, error) {
tableName, err := self.client.GetTableName()
if err != nil {
return nil, nil, err
}
query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName)
if numRows > 0 {
query = fmt.Sprintf("%v LIMIT %v", query, numRows)
}
return self.client.Query(query)
}
// Recent stats is not required to be implemented by any storage driver other than the in-memory cache.
func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
if numStats == 0 {
return nil, nil
}
header, rows, err := self.getRecentRows(containerName, numStats)
if err != nil {
return nil, err
}
statsList := make([]*info.ContainerStats, 0, len(rows))
for _, row := range rows {
stats, err := self.valuesToContainerStats(header, row)
if err != nil {
return nil, err
}
if stats == nil {
continue
}
statsList = append(statsList, stats)
}
return statsList, nil
return nil, nil
}
func (self *bigqueryStorage) Close() error {

View File

@ -232,56 +232,3 @@ func (c *Client) InsertRow(rowData map[string]interface{}) error {
}
return nil
}
// Returns a bigtable table name (format: datasetID.tableID)
func (c *Client) GetTableName() (string, error) {
if c.service == nil || c.datasetId == "" || c.tableId == "" {
return "", fmt.Errorf("table not setup")
}
return fmt.Sprintf("%s.%s", c.datasetId, c.tableId), nil
}
// Do a synchronous query on bigtable and return a header and data rows.
// Number of rows are capped to queryLimit.
func (c *Client) Query(query string) ([]string, [][]interface{}, error) {
service, err := c.getService()
if err != nil {
return nil, nil, err
}
datasetRef := &bigquery.DatasetReference{
DatasetId: c.datasetId,
ProjectId: *projectId,
}
queryRequest := &bigquery.QueryRequest{
DefaultDataset: datasetRef,
MaxResults: queryLimit,
Kind: "json",
Query: query,
}
results, err := service.Jobs.Query(*projectId, queryRequest).Do()
if err != nil {
return nil, nil, err
}
numRows := results.TotalRows
if numRows < 1 {
return nil, nil, fmt.Errorf("query returned no data")
}
headers := []string{}
for _, col := range results.Schema.Fields {
headers = append(headers, col.Name)
}
rows := [][]interface{}{}
numColumns := len(results.Schema.Fields)
for _, data := range results.Rows {
row := make([]interface{}, numColumns)
for c := 0; c < numColumns; c++ {
row[c] = data.F[c].V
}
rows = append(rows, row)
}
return headers, rows, nil
}

View File

@ -84,25 +84,4 @@ func main() {
panic(err)
}
}
// Query
tableName, err := c.GetTableName()
if err != nil {
fmt.Printf("table not set")
panic(err)
}
query := "SELECT * FROM " + tableName + " ORDER BY Timestamp LIMIT 100"
header, rows, err := c.Query(query)
if err != nil {
fmt.Printf("Failed query")
panic(err)
}
fmt.Printf("Headers: %v", header)
for _, row := range rows {
for i, val := range row {
fmt.Printf("%s:%v ", header[i], val)
}
fmt.Printf("\n")
}
}

View File

@ -139,114 +139,6 @@ func (self *influxdbStorage) containerStatsToValues(
return columns, values
}
func convertToUint64(v interface{}) (uint64, error) {
if v == nil {
return 0, nil
}
switch x := v.(type) {
case uint64:
return x, nil
case int:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case int32:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case int64:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case float64:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case uint32:
return uint64(x), nil
}
return 0, fmt.Errorf("unknown type")
}
func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) {
stats := &info.ContainerStats{
Filesystem: make([]info.FsStats, 0),
}
var err error
for i, col := range columns {
v := values[i]
switch {
case col == colTimestamp:
if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() {
stats.Timestamp = time.Unix(int64(f64sec)/1E3, (int64(f64sec)%1E3)*1E6)
}
case col == colMachineName:
if m, ok := v.(string); ok {
if m != self.machineName {
return nil, fmt.Errorf("different machine")
}
} else {
return nil, fmt.Errorf("machine name field is not a string: %v", v)
}
// Cumulative Cpu Usage
case col == colCpuCumulativeUsage:
stats.Cpu.Usage.Total, err = convertToUint64(v)
// Memory Usage
case col == colMemoryUsage:
stats.Memory.Usage, err = convertToUint64(v)
// Working set size
case col == colMemoryWorkingSet:
stats.Memory.WorkingSet, err = convertToUint64(v)
case col == colRxBytes:
stats.Network.RxBytes, err = convertToUint64(v)
case col == colRxErrors:
stats.Network.RxErrors, err = convertToUint64(v)
case col == colTxBytes:
stats.Network.TxBytes, err = convertToUint64(v)
case col == colTxErrors:
stats.Network.TxErrors, err = convertToUint64(v)
case col == colFsDevice:
device, ok := v.(string)
if !ok {
return nil, fmt.Errorf("filesystem name field is not a string: %+v", v)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Device: device})
} else {
stats.Filesystem[0].Device = device
}
case col == colFsLimit:
limit, err := convertToUint64(v)
if err != nil {
return nil, fmt.Errorf("filesystem limit field %+v invalid: %s", v, err)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Limit: limit})
} else {
stats.Filesystem[0].Limit = limit
}
case col == colFsUsage:
usage, err := convertToUint64(v)
if err != nil {
return nil, fmt.Errorf("filesystem usage field %+v invalid: %s", v, err)
}
if len(stats.Filesystem) == 0 {
stats.Filesystem = append(stats.Filesystem, info.FsStats{Usage: usage})
} else {
stats.Filesystem[0].Usage = usage
}
}
if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
}
}
return stats, nil
}
func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) {
self.readyToFlush = readyToFlush
}
@ -283,42 +175,6 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C
return nil
}
func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
if numStats == 0 {
return nil, nil
}
// TODO(dengnan): select only columns that we need
// TODO(dengnan): escape names
query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName)
if numStats > 0 {
query = fmt.Sprintf("%v limit %v", query, numStats)
}
series, err := self.client.Query(query)
if err != nil {
return nil, err
}
statsList := make([]*info.ContainerStats, 0, len(series))
// By default, influxDB returns data in time descending order.
// RecentStats() requires stats in time increasing order,
// so we need to go through from the last one to the first one.
for i := len(series) - 1; i >= 0; i-- {
s := series[i]
for j := len(s.Points) - 1; j >= 0; j-- {
values := s.Points[j]
stats, err := self.valuesToContainerStats(s.Columns, values)
if err != nil {
return nil, err
}
if stats == nil {
continue
}
statsList = append(statsList, stats)
}
}
return statsList, nil
}
func (self *influxdbStorage) Close() error {
self.client = nil
return nil
@ -373,3 +229,8 @@ func New(machineName,
ret.readyToFlush = ret.defaultReadyToFlush
return ret, nil
}
// RecentStats is only implemented by in-memory cache storage.
func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
return nil, nil
}

View File

@ -51,7 +51,7 @@ func (self *influxDbTestStorageDriver) AddStats(ref info.ContainerReference, sta
}
func (self *influxDbTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
return self.base.RecentStats(containerName, numStats)
return nil, nil
}
func (self *influxDbTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) {

View File

@ -89,8 +89,7 @@ func (self *redisStorage) AddStats(ref info.ContainerReference, stats *info.Cont
return nil
}
// We just need to push the data to the redis, do not need to pull from the redis,
//so we do not override RecentStats()
// RecentStats is only implemented by in-memory cache storage.
func (self *redisStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
return nil, nil
}

View File

@ -19,6 +19,9 @@ import info "github.com/google/cadvisor/info/v1"
type StorageDriver interface {
AddStats(ref info.ContainerReference, stats *info.ContainerStats) error
// TODO(rjnagal): RecentStats() is only required by in-memory cache
// storage. Refactor and remove from the interface.
//
// Read most recent stats. numStats indicates max number of stats
// returned. The returned stats must be consecutive observed stats. If
// numStats < 0, then return all stats stored in the storage. The