// Copyright 2014 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package bigquery import ( "fmt" "strconv" "strings" "time" bigquery "code.google.com/p/google-api-go-client/bigquery/v2" "github.com/google/cadvisor/info" "github.com/google/cadvisor/storage" "github.com/google/cadvisor/storage/bigquery/client" ) type bigqueryStorage struct { client *client.Client prevStats *info.ContainerStats machineName string windowLen time.Duration } const ( // Bigquery schema types typeTimestamp string = "TIMESTAMP" typeString string = "STRING" typeInteger string = "INTEGER" colTimestamp string = "timestamp" colMachineName string = "machine" colContainerName string = "container_name" colCpuCumulativeUsage string = "cpu_cumulative_usage" // Cumulative Cpu usage in system and user mode colCpuCumulativeUsageSystem string = "cpu_cumulative_usage_system" colCpuCumulativeUsageUser string = "cpu_cumulative_usage_user" // Memory usage colMemoryUsage string = "memory_usage" // Working set size colMemoryWorkingSet string = "memory_working_set" // Container page fault colMemoryContainerPgfault string = "memory_container_pgfault" // Constainer major page fault colMemoryContainerPgmajfault string = "memory_container_pgmajfault" // Hierarchical page fault colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault" // Hierarchical major page fault colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault" // Cumulative per-core usage colPerCoreCumulativeUsagePrefix string = "per_core_cumulative_usage_core_" // Optional: sample duration. Unit: nanoseconds. colSampleDuration string = "sample_duration" // Optional: Instant cpu usage. colCpuInstantUsage string = "cpu_instant_usage" // Optiona: Instant per-core usage. colPerCoreInstantUsagePrefix string = "per_core_instant_usage_core_" // Cumulative count of bytes received. colRxBytes string = "rx_bytes" // Cumulative count of receive errors encountered. colRxErrors string = "rx_errors" // Cumulative count of bytes transmitted. colTxBytes string = "tx_bytes" // Cumulative count of transmit errors encountered. colTxErrors string = "tx_errors" ) // TODO(jnagal): Infer schema through reflection. (See bigquery/client/example) func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema { fields := make([]*bigquery.TableFieldSchema, 18) i := 0 fields[i] = &bigquery.TableFieldSchema{ Type: typeTimestamp, Name: colTimestamp, Mode: "REQUIRED", } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeString, Name: colMachineName, Mode: "REQUIRED", } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeString, Name: colContainerName, Mode: "REQUIRED", } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colCpuCumulativeUsage, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colCpuCumulativeUsageSystem, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colCpuCumulativeUsageUser, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryUsage, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryWorkingSet, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryContainerPgfault, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryContainerPgmajfault, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryHierarchicalPgfault, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryHierarchicalPgmajfault, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colSampleDuration, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colCpuInstantUsage, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colRxBytes, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colRxErrors, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colTxBytes, } i = i + 1 fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colTxErrors, } return &bigquery.TableSchema{ Fields: fields, } } func (self *bigqueryStorage) containerStatsToValues( ref info.ContainerReference, stats *info.ContainerStats, ) (row map[string]interface{}) { row = make(map[string]interface{}) // Timestamp row[colTimestamp] = stats.Timestamp // Machine name row[colMachineName] = self.machineName // Container name name := ref.Name if len(ref.Aliases) > 0 { name = ref.Aliases[0] } row[colContainerName] = name // Cumulative Cpu Usage row[colCpuCumulativeUsage] = stats.Cpu.Usage.Total // Cumulative Cpu Usage in system mode row[colCpuCumulativeUsageSystem] = stats.Cpu.Usage.System // Cumulative Cpu Usage in user mode row[colCpuCumulativeUsageUser] = stats.Cpu.Usage.User // Memory Usage row[colMemoryUsage] = stats.Memory.Usage // Working set size row[colMemoryWorkingSet] = stats.Memory.WorkingSet // container page fault row[colMemoryContainerPgfault] = stats.Memory.ContainerData.Pgfault // container major page fault row[colMemoryContainerPgmajfault] = stats.Memory.ContainerData.Pgmajfault // hierarchical page fault row[colMemoryHierarchicalPgfault] = stats.Memory.HierarchicalData.Pgfault // hierarchical major page fault row[colMemoryHierarchicalPgmajfault] = stats.Memory.HierarchicalData.Pgmajfault // Optional: Network stats. if stats.Network != nil { row[colRxBytes] = stats.Network.RxBytes row[colRxErrors] = stats.Network.RxErrors row[colTxBytes] = stats.Network.TxBytes row[colTxErrors] = stats.Network.TxErrors } sample, err := info.NewSample(self.prevStats, stats) if err != nil || sample == nil { return } // TODO(jnagal): Handle per-cpu stats. // Optional: sample duration. Unit: Nanosecond. row[colSampleDuration] = sample.Duration // Optional: Instant cpu usage row[colCpuInstantUsage] = sample.Cpu.Usage return } func convertToUint64(v interface{}) (uint64, error) { if v == nil { return 0, nil } switch x := v.(type) { case uint64: return x, nil case int: if x < 0 { return 0, fmt.Errorf("negative value: %v", x) } return uint64(x), nil case int32: if x < 0 { return 0, fmt.Errorf("negative value: %v", x) } return uint64(x), nil case int64: if x < 0 { return 0, fmt.Errorf("negative value: %v", x) } return uint64(x), nil case float64: if x < 0 { return 0, fmt.Errorf("negative value: %v", x) } return uint64(x), nil case uint32: return uint64(x), nil case string: return strconv.ParseUint(v.(string), 10, 64) } return 0, fmt.Errorf("Unknown type") } func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { stats := &info.ContainerStats{ Cpu: &info.CpuStats{}, Memory: &info.MemoryStats{}, Network: &info.NetworkStats{}, } var err error for i, col := range columns { v := values[i] switch { case col == colTimestamp: if t, ok := v.(time.Time); ok { stats.Timestamp = t } case col == colMachineName: if m, ok := v.(string); ok { if m != self.machineName { return nil, fmt.Errorf("Different machine") } } else { return nil, fmt.Errorf("Machine name field is not a string: %v", v) } // Cumulative Cpu Usage case col == colCpuCumulativeUsage: stats.Cpu.Usage.Total, err = convertToUint64(v) // Cumulative Cpu used by the system case col == colCpuCumulativeUsageSystem: stats.Cpu.Usage.System, err = convertToUint64(v) // Cumulative Cpu Usage in user mode case col == colCpuCumulativeUsageUser: stats.Cpu.Usage.User, err = convertToUint64(v) // Memory Usage case col == colMemoryUsage: stats.Memory.Usage, err = convertToUint64(v) // Working set size case col == colMemoryWorkingSet: stats.Memory.WorkingSet, err = convertToUint64(v) // container page fault case col == colMemoryContainerPgfault: stats.Memory.ContainerData.Pgfault, err = convertToUint64(v) // container major page fault case col == colMemoryContainerPgmajfault: stats.Memory.ContainerData.Pgmajfault, err = convertToUint64(v) // hierarchical page fault case col == colMemoryHierarchicalPgfault: stats.Memory.HierarchicalData.Pgfault, err = convertToUint64(v) // hierarchical major page fault case col == colMemoryHierarchicalPgmajfault: stats.Memory.HierarchicalData.Pgmajfault, err = convertToUint64(v) case col == colRxBytes: stats.Network.RxBytes, err = convertToUint64(v) case col == colRxErrors: stats.Network.RxErrors, err = convertToUint64(v) case col == colTxBytes: stats.Network.TxBytes, err = convertToUint64(v) case col == colTxErrors: stats.Network.TxErrors, err = convertToUint64(v) } if err != nil { return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) } } return stats, nil } func (self *bigqueryStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) { sample := &info.ContainerStatsSample{} var err error for i, col := range columns { v := values[i] switch { case col == colTimestamp: if t, ok := v.(time.Time); ok { sample.Timestamp = t } case col == colMachineName: if m, ok := v.(string); ok { if m != self.machineName { return nil, fmt.Errorf("Different machine") } } else { return nil, fmt.Errorf("Machine name field is not a string: %v", v) } // Memory Usage case col == colMemoryUsage: sample.Memory.Usage, err = convertToUint64(v) // sample duration. Unit: Nanosecond. case col == colSampleDuration: if v == nil { // this record does not have sample_duration, so it's the first stats. return nil, nil } sample.Duration = time.Duration(v.(int64)) // Instant cpu usage case col == colCpuInstantUsage: sample.Cpu.Usage, err = convertToUint64(v) } if err != nil { return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) } } if sample.Duration.Nanoseconds() == 0 { return nil, nil } return sample, nil } func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { if stats == nil || stats.Cpu == nil || stats.Memory == nil { return nil } row := self.containerStatsToValues(ref, stats) self.prevStats = stats.Copy(self.prevStats) err := self.client.InsertRow(row) if err != nil { return err } return nil } func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { if numStats == 0 { return nil, nil } tableName, err := self.client.GetTableName() if err != nil { return nil, err } query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName) if numStats > 0 { query = fmt.Sprintf("%v LIMIT %v", query, numStats) } header, rows, err := self.client.Query(query) if err != nil { return nil, err } statsList := make([]*info.ContainerStats, 0, len(rows)) for _, row := range rows { stats, err := self.valuesToContainerStats(header, row) if err != nil { return nil, err } if stats == nil { continue } statsList = append(statsList, stats) } return statsList, nil } func (self *bigqueryStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { if numSamples == 0 { return nil, nil } tableName, err := self.client.GetTableName() if err != nil { return nil, err } query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName) if numSamples > 0 { query = fmt.Sprintf("%v LIMIT %v", query, numSamples) } header, rows, err := self.client.Query(query) if err != nil { return nil, err } sampleList := make([]*info.ContainerStatsSample, 0, len(rows)) for _, row := range rows { sample, err := self.valuesToContainerSample(header, row) if err != nil { return nil, err } if sample == nil { continue } sampleList = append(sampleList, sample) } return sampleList, nil } func (self *bigqueryStorage) Close() error { self.client.Close() self.client = nil return nil } func (self *bigqueryStorage) Percentiles( containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int, ) (*info.ContainerStatsPercentiles, error) { selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1) selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage)) for _, p := range cpuUsagePercentiles { selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colCpuInstantUsage, p)) } for _, p := range memUsagePercentiles { selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colMemoryUsage, p)) } tableName, err := self.client.GetTableName() if err != nil { return nil, err } query := fmt.Sprintf("SELECT %v FROM %v WHERE %v='%v' AND %v='%v' AND timestamp > DATE_ADD(CURRENT_TIMESTAMP(), -%v, 'SECOND')", strings.Join(selectedCol, ","), tableName, colContainerName, containerName, colMachineName, self.machineName, self.windowLen.Seconds(), ) _, rows, err := self.client.Query(query) if err != nil { return nil, err } if len(rows) != 1 { return nil, nil } point := rows[0] ret := new(info.ContainerStatsPercentiles) ret.MaxMemoryUsage, err = convertToUint64(point[0]) if err != nil { return nil, fmt.Errorf("invalid max memory usage: %v", err) } retrievedCpuPercentiles := point[1 : 1+len(cpuUsagePercentiles)] for i, p := range cpuUsagePercentiles { v, err := convertToUint64(retrievedCpuPercentiles[i]) if err != nil { return nil, fmt.Errorf("invalid cpu usage: %v", err) } ret.CpuUsagePercentiles = append( ret.CpuUsagePercentiles, info.Percentile{ Percentage: p, Value: v, }, ) } retrievedMemoryPercentiles := point[1+len(cpuUsagePercentiles):] for i, p := range memUsagePercentiles { v, err := convertToUint64(retrievedMemoryPercentiles[i]) if err != nil { return nil, fmt.Errorf("invalid memory usage: %v", err) } ret.MemoryUsagePercentiles = append( ret.MemoryUsagePercentiles, info.Percentile{ Percentage: p, Value: v, }, ) } return ret, nil } // Create a new bigquery storage driver. // machineName: A unique identifier to identify the host that current cAdvisor // instance is running on. // tableName: BigQuery table used for storing stats. // percentilesDuration: Time window which will be considered when calls Percentiles() func New(machineName, datasetId, tableName string, percentilesDuration time.Duration, ) (storage.StorageDriver, error) { bqClient, err := client.NewClient() if err != nil { return nil, err } err = bqClient.CreateDataset(datasetId) if err != nil { return nil, err } if percentilesDuration.Seconds() < 1.0 { percentilesDuration = 5 * time.Minute } ret := &bigqueryStorage{ client: bqClient, windowLen: percentilesDuration, machineName: machineName, } schema := ret.GetSchema() err = bqClient.CreateTable(tableName, schema) if err != nil { return nil, err } return ret, nil }