From cfe839a0cd6c619f702bc6ab430becfe98986121 Mon Sep 17 00:00:00 2001 From: Rohit Jnagal Date: Wed, 20 Aug 2014 23:04:05 +0000 Subject: [PATCH] First version of bigquery backend. Lot of TODOs. But it should be good enough for anyone who wants to try it out. Marked experimental. Docker-DCO-1.1-Signed-off-by: Rohit Jnagal (github: rjnagal) --- cadvisor.go | 2 +- storage/bigquery/README.md | 24 + storage/bigquery/bigquery.go | 582 +++++++++++++++++++++ storage/bigquery/client/client.go | 266 ++++++++++ storage/bigquery/client/example/example.go | 107 ++++ storagedriver.go | 16 +- 6 files changed, 995 insertions(+), 2 deletions(-) create mode 100644 storage/bigquery/README.md create mode 100644 storage/bigquery/bigquery.go create mode 100644 storage/bigquery/client/client.go create mode 100644 storage/bigquery/client/example/example.go diff --git a/cadvisor.go b/cadvisor.go index 031f9a04..fa814325 100644 --- a/cadvisor.go +++ b/cadvisor.go @@ -31,7 +31,7 @@ import ( var argPort = flag.Int("port", 8080, "port to listen") -var argDbDriver = flag.String("storage_driver", "memory", "storage driver to use. Options are: memory (default) and influxdb") +var argDbDriver = flag.String("storage_driver", "memory", "storage driver to use. Options are: memory (default), bigquery, and influxdb") func main() { flag.Parse() diff --git a/storage/bigquery/README.md b/storage/bigquery/README.md new file mode 100644 index 00000000..7273a2d5 --- /dev/null +++ b/storage/bigquery/README.md @@ -0,0 +1,24 @@ +BigQuery Storage Driver +======= + +[EXPERIMENTAL] Support for bigquery backend as cAdvisor storage driver. +The current implementation takes bunch of BigQuery specific flags for authentication. +These will be merged into a single backend config. + +To run the current version, following flags need to be specified: +``` + # Storage driver to use. + -storage_driver=bigquery + # Information about server-to-server Oauth token. + # These can be obtained by creating a Service Account client id under `Google Developer API` + # service client id + -bq_id="XYZ.apps.googleusercontent.com" + # service email address + -bq_account="ABC@developer.gserviceaccount.com" + # path to pem key (converted from p12 file) + -bq_credentials_file="/path/to/key.pem" + # project id to use for storing datasets. + -bq_project_id="awesome_project" +``` + +See Service account Authentication](https://developers.google.com/accounts/docs/OAuth2) for Oauth related details. diff --git a/storage/bigquery/bigquery.go b/storage/bigquery/bigquery.go new file mode 100644 index 00000000..e2568e33 --- /dev/null +++ b/storage/bigquery/bigquery.go @@ -0,0 +1,582 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bigquery + +import ( + "fmt" + "strconv" + "strings" + "time" + + bigquery "code.google.com/p/google-api-go-client/bigquery/v2" + "github.com/google/cadvisor/info" + "github.com/google/cadvisor/storage" + "github.com/google/cadvisor/storage/bigquery/client" +) + +type bigqueryStorage struct { + client *client.Client + prevStats *info.ContainerStats + machineName string + windowLen time.Duration +} + +const ( + // Bigquery schema types + typeTimestamp string = "TIMESTAMP" + typeString string = "STRING" + typeInteger string = "INTEGER" + + colTimestamp string = "timestamp" + colMachineName string = "machine" + colContainerName string = "container_name" + colCpuCumulativeUsage string = "cpu_cumulative_usage" + // Cumulative Cpu usage in system and user mode + colCpuCumulativeUsageSystem string = "cpu_cumulative_usage_system" + colCpuCumulativeUsageUser string = "cpu_cumulative_usage_user" + // Memory usage + colMemoryUsage string = "memory_usage" + // Working set size + colMemoryWorkingSet string = "memory_working_set" + // Container page fault + colMemoryContainerPgfault string = "memory_container_pgfault" + // Constainer major page fault + colMemoryContainerPgmajfault string = "memory_container_pgmajfault" + // Hierarchical page fault + colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault" + // Hierarchical major page fault + colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault" + // Cumulative per-core usage + colPerCoreCumulativeUsagePrefix string = "per_core_cumulative_usage_core_" + // Optional: sample duration. Unit: nanoseconds. + colSampleDuration string = "sample_duration" + // Optional: Instant cpu usage. + colCpuInstantUsage string = "cpu_instant_usage" + // Optiona: Instant per-core usage. + colPerCoreInstantUsagePrefix string = "per_core_instant_usage_core_" + // Cumulative count of bytes received. + colRxBytes string = "rx_bytes" + // Cumulative count of receive errors encountered. + colRxErrors string = "rx_errors" + // Cumulative count of bytes transmitted. + colTxBytes string = "tx_bytes" + // Cumulative count of transmit errors encountered. + colTxErrors string = "tx_errors" +) + +// TODO(jnagal): Infer schema through reflection. (See bigquery/client/example) +func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema { + fields := make([]*bigquery.TableFieldSchema, 18) + i := 0 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeTimestamp, + Name: colTimestamp, + Mode: "REQUIRED", + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeString, + Name: colMachineName, + Mode: "REQUIRED", + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeString, + Name: colContainerName, + Mode: "REQUIRED", + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colCpuCumulativeUsage, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colCpuCumulativeUsageSystem, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colCpuCumulativeUsageUser, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colMemoryUsage, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colMemoryWorkingSet, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colMemoryContainerPgfault, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colMemoryContainerPgmajfault, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colMemoryHierarchicalPgfault, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colMemoryHierarchicalPgmajfault, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colSampleDuration, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colCpuInstantUsage, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colRxBytes, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colRxErrors, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colTxBytes, + } + i = i + 1 + fields[i] = &bigquery.TableFieldSchema{ + Type: typeInteger, + Name: colTxErrors, + } + return &bigquery.TableSchema{ + Fields: fields, + } +} + +func (self *bigqueryStorage) containerStatsToValues( + ref info.ContainerReference, + stats *info.ContainerStats, +) (row map[string]interface{}) { + + row = make(map[string]interface{}) + + // Timestamp + row[colTimestamp] = stats.Timestamp + + // Machine name + row[colMachineName] = self.machineName + + // Container name + name := ref.Name + if len(ref.Aliases) > 0 { + name = ref.Aliases[0] + } + row[colContainerName] = name + + // Cumulative Cpu Usage + row[colCpuCumulativeUsage] = stats.Cpu.Usage.Total + + // Cumulative Cpu Usage in system mode + row[colCpuCumulativeUsageSystem] = stats.Cpu.Usage.System + + // Cumulative Cpu Usage in user mode + row[colCpuCumulativeUsageUser] = stats.Cpu.Usage.User + + // Memory Usage + row[colMemoryUsage] = stats.Memory.Usage + + // Working set size + row[colMemoryWorkingSet] = stats.Memory.WorkingSet + + // container page fault + row[colMemoryContainerPgfault] = stats.Memory.ContainerData.Pgfault + + // container major page fault + row[colMemoryContainerPgmajfault] = stats.Memory.ContainerData.Pgmajfault + + // hierarchical page fault + row[colMemoryHierarchicalPgfault] = stats.Memory.HierarchicalData.Pgfault + + // hierarchical major page fault + row[colMemoryHierarchicalPgmajfault] = stats.Memory.HierarchicalData.Pgmajfault + + // Optional: Network stats. + if stats.Network != nil { + row[colRxBytes] = stats.Network.RxBytes + row[colRxErrors] = stats.Network.RxErrors + row[colTxBytes] = stats.Network.TxBytes + row[colTxErrors] = stats.Network.TxErrors + } + + sample, err := info.NewSample(self.prevStats, stats) + if err != nil || sample == nil { + return + } + // TODO(jnagal): Handle per-cpu stats. + + // Optional: sample duration. Unit: Nanosecond. + row[colSampleDuration] = sample.Duration + // Optional: Instant cpu usage + row[colCpuInstantUsage] = sample.Cpu.Usage + + return +} + +func convertToUint64(v interface{}) (uint64, error) { + if v == nil { + return 0, nil + } + switch x := v.(type) { + case uint64: + return x, nil + case int: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case int32: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case int64: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case float64: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case uint32: + return uint64(x), nil + case string: + return strconv.ParseUint(v.(string), 10, 64) + } + + return 0, fmt.Errorf("Unknown type") +} + +func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { + stats := &info.ContainerStats{ + Cpu: &info.CpuStats{}, + Memory: &info.MemoryStats{}, + Network: &info.NetworkStats{}, + } + var err error + for i, col := range columns { + v := values[i] + switch { + case col == colTimestamp: + if t, ok := v.(time.Time); ok { + stats.Timestamp = t + } + case col == colMachineName: + if m, ok := v.(string); ok { + if m != self.machineName { + return nil, fmt.Errorf("Different machine") + } + } else { + return nil, fmt.Errorf("Machine name field is not a string: %v", v) + } + // Cumulative Cpu Usage + case col == colCpuCumulativeUsage: + stats.Cpu.Usage.Total, err = convertToUint64(v) + // Cumulative Cpu used by the system + case col == colCpuCumulativeUsageSystem: + stats.Cpu.Usage.System, err = convertToUint64(v) + // Cumulative Cpu Usage in user mode + case col == colCpuCumulativeUsageUser: + stats.Cpu.Usage.User, err = convertToUint64(v) + // Memory Usage + case col == colMemoryUsage: + stats.Memory.Usage, err = convertToUint64(v) + // Working set size + case col == colMemoryWorkingSet: + stats.Memory.WorkingSet, err = convertToUint64(v) + // container page fault + case col == colMemoryContainerPgfault: + stats.Memory.ContainerData.Pgfault, err = convertToUint64(v) + // container major page fault + case col == colMemoryContainerPgmajfault: + stats.Memory.ContainerData.Pgmajfault, err = convertToUint64(v) + // hierarchical page fault + case col == colMemoryHierarchicalPgfault: + stats.Memory.HierarchicalData.Pgfault, err = convertToUint64(v) + // hierarchical major page fault + case col == colMemoryHierarchicalPgmajfault: + stats.Memory.HierarchicalData.Pgmajfault, err = convertToUint64(v) + case col == colRxBytes: + stats.Network.RxBytes, err = convertToUint64(v) + case col == colRxErrors: + stats.Network.RxErrors, err = convertToUint64(v) + case col == colTxBytes: + stats.Network.TxBytes, err = convertToUint64(v) + case col == colTxErrors: + stats.Network.TxErrors, err = convertToUint64(v) + } + if err != nil { + return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) + } + } + return stats, nil +} + +func (self *bigqueryStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) { + sample := &info.ContainerStatsSample{} + var err error + for i, col := range columns { + v := values[i] + switch { + case col == colTimestamp: + if t, ok := v.(time.Time); ok { + sample.Timestamp = t + } + case col == colMachineName: + if m, ok := v.(string); ok { + if m != self.machineName { + return nil, fmt.Errorf("Different machine") + } + } else { + return nil, fmt.Errorf("Machine name field is not a string: %v", v) + } + // Memory Usage + case col == colMemoryUsage: + sample.Memory.Usage, err = convertToUint64(v) + // sample duration. Unit: Nanosecond. + case col == colSampleDuration: + if v == nil { + // this record does not have sample_duration, so it's the first stats. + return nil, nil + } + sample.Duration = time.Duration(v.(int64)) + // Instant cpu usage + case col == colCpuInstantUsage: + sample.Cpu.Usage, err = convertToUint64(v) + } + if err != nil { + return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) + } + } + if sample.Duration.Nanoseconds() == 0 { + return nil, nil + } + return sample, nil +} + +func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { + if stats == nil || stats.Cpu == nil || stats.Memory == nil { + return nil + } + + row := self.containerStatsToValues(ref, stats) + self.prevStats = stats.Copy(self.prevStats) + + err := self.client.InsertRow(row) + if err != nil { + return err + } + return nil +} + +func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { + if numStats == 0 { + return nil, nil + } + tableName, err := self.client.GetTableName() + if err != nil { + return nil, err + } + + query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName) + if numStats > 0 { + query = fmt.Sprintf("%v LIMIT %v", query, numStats) + } + header, rows, err := self.client.Query(query) + if err != nil { + return nil, err + } + statsList := make([]*info.ContainerStats, 0, len(rows)) + for _, row := range rows { + stats, err := self.valuesToContainerStats(header, row) + if err != nil { + return nil, err + } + if stats == nil { + continue + } + statsList = append(statsList, stats) + } + return statsList, nil +} + +func (self *bigqueryStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { + if numSamples == 0 { + return nil, nil + } + tableName, err := self.client.GetTableName() + if err != nil { + return nil, err + } + query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName) + if numSamples > 0 { + query = fmt.Sprintf("%v LIMIT %v", query, numSamples) + } + header, rows, err := self.client.Query(query) + if err != nil { + return nil, err + } + sampleList := make([]*info.ContainerStatsSample, 0, len(rows)) + for _, row := range rows { + sample, err := self.valuesToContainerSample(header, row) + if err != nil { + return nil, err + } + if sample == nil { + continue + } + sampleList = append(sampleList, sample) + } + return sampleList, nil +} + +func (self *bigqueryStorage) Close() error { + self.client.Close() + self.client = nil + return nil +} + +func (self *bigqueryStorage) Percentiles( + containerName string, + cpuUsagePercentiles []int, + memUsagePercentiles []int, +) (*info.ContainerStatsPercentiles, error) { + selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1) + + selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage)) + for _, p := range cpuUsagePercentiles { + selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colCpuInstantUsage, p)) + } + for _, p := range memUsagePercentiles { + selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colMemoryUsage, p)) + } + + tableName, err := self.client.GetTableName() + if err != nil { + return nil, err + } + query := fmt.Sprintf("SELECT %v FROM %v WHERE %v='%v' AND %v='%v' AND timestamp > DATE_ADD(CURRENT_TIMESTAMP(), -%v, 'SECOND')", + strings.Join(selectedCol, ","), + tableName, + colContainerName, + containerName, + colMachineName, + self.machineName, + self.windowLen.Seconds(), + ) + _, rows, err := self.client.Query(query) + if err != nil { + return nil, err + } + + if len(rows) != 1 { + return nil, nil + } + + point := rows[0] + + ret := new(info.ContainerStatsPercentiles) + ret.MaxMemoryUsage, err = convertToUint64(point[0]) + if err != nil { + return nil, fmt.Errorf("invalid max memory usage: %v", err) + } + retrievedCpuPercentiles := point[1 : 1+len(cpuUsagePercentiles)] + for i, p := range cpuUsagePercentiles { + v, err := convertToUint64(retrievedCpuPercentiles[i]) + if err != nil { + return nil, fmt.Errorf("invalid cpu usage: %v", err) + } + ret.CpuUsagePercentiles = append( + ret.CpuUsagePercentiles, + info.Percentile{ + Percentage: p, + Value: v, + }, + ) + } + retrievedMemoryPercentiles := point[1+len(cpuUsagePercentiles):] + for i, p := range memUsagePercentiles { + v, err := convertToUint64(retrievedMemoryPercentiles[i]) + if err != nil { + return nil, fmt.Errorf("invalid memory usage: %v", err) + } + ret.MemoryUsagePercentiles = append( + ret.MemoryUsagePercentiles, + info.Percentile{ + Percentage: p, + Value: v, + }, + ) + } + return ret, nil +} + +// Create a new bigquery storage driver. +// machineName: A unique identifier to identify the host that current cAdvisor +// instance is running on. +// tableName: BigQuery table used for storing stats. +// percentilesDuration: Time window which will be considered when calls Percentiles() +func New(machineName, + datasetId, + tableName string, + percentilesDuration time.Duration, +) (storage.StorageDriver, error) { + bqClient, err := client.NewClient() + if err != nil { + return nil, err + } + err = bqClient.CreateDataset(datasetId) + if err != nil { + return nil, err + } + if percentilesDuration.Seconds() < 1.0 { + percentilesDuration = 5 * time.Minute + } + + ret := &bigqueryStorage{ + client: bqClient, + windowLen: percentilesDuration, + machineName: machineName, + } + schema := ret.GetSchema() + err = bqClient.CreateTable(tableName, schema) + if err != nil { + return nil, err + } + return ret, nil +} diff --git a/storage/bigquery/client/client.go b/storage/bigquery/client/client.go new file mode 100644 index 00000000..17525e9e --- /dev/null +++ b/storage/bigquery/client/client.go @@ -0,0 +1,266 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "code.google.com/p/goauth2/oauth" + "code.google.com/p/goauth2/oauth/jwt" + bigquery "code.google.com/p/google-api-go-client/bigquery/v2" + "flag" + "fmt" + "io/ioutil" + "net/http" + "strings" +) + +var ( + // TODO(jnagal): Condense all flags to an identity file and a pem key file. + clientId = flag.String("bq_id", "", "Client ID") + clientSecret = flag.String("bq_secret", "notasecret", "Client Secret") + projectId = flag.String("bq_project_id", "", "Bigquery project ID") + serviceAccount = flag.String("bq_account", "", "Service account email") + pemFile = flag.String("bq_credentials_file", "", "Credential Key file (pem)") +) + +const ( + errAlreadyExists string = "Error 409: Already Exists" + queryLimit int64 = 200 +) + +type Client struct { + service *bigquery.Service + token *oauth.Token + datasetId string + tableId string +} + +func connect() (*oauth.Token, *bigquery.Service, error) { + if *clientId == "" { + return nil, nil, fmt.Errorf("No client id specified") + } + if *serviceAccount == "" { + return nil, nil, fmt.Errorf("No service account specified") + } + if *projectId == "" { + return nil, nil, fmt.Errorf("No project id specified") + } + authScope := bigquery.BigqueryScope + if *pemFile == "" { + return nil, nil, fmt.Errorf("No credentials specified") + } + pemBytes, err := ioutil.ReadFile(*pemFile) + if err != nil { + return nil, nil, fmt.Errorf("Could not access credential file %v - %v", pemFile, err) + } + + t := jwt.NewToken(*serviceAccount, authScope, pemBytes) + token, err := t.Assert(&http.Client{}) + if err != nil { + fmt.Printf("Invalid token: %v\n", err) + return nil, nil, err + } + config := &oauth.Config{ + ClientId: *clientId, + ClientSecret: *clientSecret, + Scope: authScope, + AuthURL: "https://accounts.google.com/o/oauth2/auth", + TokenURL: "https://accounts.google.com/o/oauth2/token", + } + + transport := &oauth.Transport{ + Token: token, + Config: config, + } + client := transport.Client() + + service, err := bigquery.New(client) + if err != nil { + fmt.Printf("Failed to create new service: %v\n", err) + return nil, nil, err + } + + return token, service, nil +} + +func NewClient() (*Client, error) { + token, service, err := connect() + if err != nil { + return nil, err + } + c := &Client{ + token: token, + service: service, + } + return c, nil +} + +func (c *Client) Close() error { + c.service = nil + return nil +} + +func (c *Client) getService() (*bigquery.Service, error) { + if c.token == nil || c.service == nil { + return nil, fmt.Errorf("Service not initialized") + } + + // Refresh expired token. + if c.token.Expired() { + token, service, err := connect() + if err != nil { + return nil, err + } + c.token = token + c.service = service + return service, nil + } + return c.service, nil +} + +func (c *Client) PrintDatasets() error { + datasetList, err := c.service.Datasets.List(*projectId).Do() + if err != nil { + fmt.Printf("Failed to get list of datasets\n") + return err + } else { + fmt.Printf("Successfully retrieved datasets. Retrieved: %d\n", len(datasetList.Datasets)) + } + + for _, d := range datasetList.Datasets { + fmt.Printf("%s %s\n", d.Id, d.FriendlyName) + } + return nil +} + +func (c *Client) CreateDataset(datasetId string) error { + if c.service == nil { + return fmt.Errorf("No service created") + } + _, err := c.service.Datasets.Insert(*projectId, &bigquery.Dataset{ + DatasetReference: &bigquery.DatasetReference{ + DatasetId: datasetId, + ProjectId: *projectId, + }, + }).Do() + // TODO(jnagal): Do a Get() to verify dataset already exists. + if err != nil && !strings.Contains(err.Error(), errAlreadyExists) { + return err + } + c.datasetId = datasetId + return nil +} + +func (c *Client) CreateTable(tableId string, schema *bigquery.TableSchema) error { + if c.service == nil || c.datasetId == "" { + return fmt.Errorf("No dataset created") + } + _, err := c.service.Tables.Get(*projectId, c.datasetId, tableId).Do() + if err != nil { + // Create a new table. + _, err := c.service.Tables.Insert(*projectId, c.datasetId, &bigquery.Table{ + Schema: schema, + TableReference: &bigquery.TableReference{ + DatasetId: c.datasetId, + ProjectId: *projectId, + TableId: tableId, + }, + }).Do() + if err != nil { + return err + } + } + // TODO(jnagal): Update schema if it has changed. We can only extend existing schema. + c.tableId = tableId + return nil +} + +func (c *Client) InsertRow(rowData map[string]interface{}) error { + service, _ := c.getService() + if service == nil || c.datasetId == "" || c.tableId == "" { + return fmt.Errorf("Table not setup to add rows") + } + jsonRows := make(map[string]bigquery.JsonValue) + for key, value := range rowData { + jsonRows[key] = bigquery.JsonValue(value) + } + rows := []*bigquery.TableDataInsertAllRequestRows{ + { + Json: jsonRows, + }, + } + + // TODO(jnagal): Batch insert requests. + insertRequest := &bigquery.TableDataInsertAllRequest{Rows: rows} + + result, err := service.Tabledata.InsertAll(*projectId, c.datasetId, c.tableId, insertRequest).Do() + if err != nil { + return fmt.Errorf("Error inserting row: %v", err) + } + + if len(result.InsertErrors) > 0 { + return fmt.Errorf("Insertion for %d rows failed") + } + return nil +} + +func (c *Client) GetTableName() (string, error) { + if c.service == nil || c.datasetId == "" || c.tableId == "" { + return "", fmt.Errorf("Table not setup") + } + return fmt.Sprintf("%s.%s", c.datasetId, c.tableId), nil +} + +func (c *Client) Query(query string) ([]string, [][]interface{}, error) { + service, err := c.getService() + if err != nil { + return nil, nil, err + } + datasetRef := &bigquery.DatasetReference{ + DatasetId: c.datasetId, + ProjectId: *projectId, + } + + queryRequest := &bigquery.QueryRequest{ + DefaultDataset: datasetRef, + MaxResults: queryLimit, + Kind: "json", + Query: query, + } + + results, err := service.Jobs.Query(*projectId, queryRequest).Do() + if err != nil { + return nil, nil, err + } + numRows := results.TotalRows + if numRows < 1 { + return nil, nil, fmt.Errorf("Query returned no data") + } + + headers := []string{} + for _, col := range results.Schema.Fields { + headers = append(headers, col.Name) + } + + rows := [][]interface{}{} + numColumns := len(results.Schema.Fields) + for _, data := range results.Rows { + row := make([]interface{}, numColumns) + for c := 0; c < numColumns; c++ { + row[c] = data.F[c].V + } + rows = append(rows, row) + } + return headers, rows, nil +} diff --git a/storage/bigquery/client/example/example.go b/storage/bigquery/client/example/example.go new file mode 100644 index 00000000..86c72b48 --- /dev/null +++ b/storage/bigquery/client/example/example.go @@ -0,0 +1,107 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "fmt" + "github.com/SeanDolphin/bqschema" + "github.com/google/cadvisor/storage/bigquery/client" + "time" +) + +type container struct { + Name string `json:"name"` + CpuUsage uint64 `json:"cpuusage,omitempty"` + MemoryUsage uint64 `json:"memoryusage,omitempty"` + NetworkUsage uint64 `json:"networkusage,omitempty"` + Timestamp time.Time `json:"timestamp"` +} + +func main() { + flag.Parse() + c, err := client.NewClient() + if err != nil { + fmt.Printf("Failed to connect to bigquery\n") + panic(err) + } + + c.PrintDatasets() + + // Create a new dataset. + err = c.CreateDataset("sampledataset") + if err != nil { + fmt.Printf("Failed to create dataset %v\n", err) + panic(err) + } + + // Create a new table + containerData := container{ + Name: "test_container", + CpuUsage: 123456, + MemoryUsage: 1024, + NetworkUsage: 9046, + Timestamp: time.Now(), + } + schema, err := bqschema.ToSchema(containerData) + if err != nil { + fmt.Printf("Failed to create schema") + panic(err) + } + + err = c.CreateTable("sampletable", schema) + if err != nil { + fmt.Printf("Failed to create table") + panic(err) + } + + // Add Data + m := make(map[string]interface{}) + t := time.Now() + for i := 0; i < 10; i++ { + m["Name"] = containerData.Name + m["CpuUsage"] = containerData.CpuUsage + uint64(i*100) + m["MemoryUsage"] = containerData.MemoryUsage - uint64(i*10) + m["NetworkUsage"] = containerData.NetworkUsage + uint64(i*10) + m["Timestamp"] = t.Add(time.Duration(i) * time.Second) + + err = c.InsertRow(m) + if err != nil { + fmt.Printf("Failed to insert row") + panic(err) + } + } + + // Query + tableName, err := c.GetTableName() + if err != nil { + fmt.Printf("table not set") + panic(err) + } + + query := "SELECT * FROM " + tableName + " ORDER BY Timestamp LIMIT 100" + header, rows, err := c.Query(query) + if err != nil { + fmt.Printf("Failed query") + panic(err) + } + fmt.Printf("Headers: %v", header) + for _, row := range rows { + for i, val := range row { + fmt.Printf("%s:%v ", header[i], val) + } + fmt.Printf("\n") + } +} diff --git a/storagedriver.go b/storagedriver.go index a0e74165..d3178aa4 100644 --- a/storagedriver.go +++ b/storagedriver.go @@ -21,6 +21,7 @@ import ( "time" "github.com/google/cadvisor/storage" + "github.com/google/cadvisor/storage/bigquery" "github.com/google/cadvisor/storage/cache" "github.com/google/cadvisor/storage/influxdb" "github.com/google/cadvisor/storage/memory" @@ -53,7 +54,7 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) { storageDriver, err = influxdb.New( hostname, - "cadvisorTable", + "cadvisor", *argDbName, *argDbUsername, *argDbPassword, @@ -63,6 +64,19 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) { 1*time.Hour, ) storageDriver = cache.MemoryCache(*argHistoryDuration, *argHistoryDuration, storageDriver) + case "bigquery": + var hostname string + hostname, err = os.Hostname() + if err != nil { + return nil, err + } + storageDriver, err = bigquery.New( + hostname, + "cadvisor", + *argDbName, + 1*time.Hour, + ) + storageDriver = cache.MemoryCache(*argHistoryDuration, *argHistoryDuration, storageDriver) default: err = fmt.Errorf("Unknown database driver: %v", *argDbDriver) }