From 570b1452815717939a3f3a37ff7b45a4e00a7df2 Mon Sep 17 00:00:00 2001 From: Rohit Jnagal Date: Mon, 25 Aug 2014 22:59:10 +0000 Subject: [PATCH] Format fixes and small refactoring. Docker-DCO-1.1-Signed-off-by: Rohit Jnagal (github: rjnagal) --- storage/bigquery/bigquery.go | 85 ++++++++++------------ storage/bigquery/client/client.go | 17 ++++- storage/bigquery/client/example/example.go | 3 +- 3 files changed, 55 insertions(+), 50 deletions(-) diff --git a/storage/bigquery/bigquery.go b/storage/bigquery/bigquery.go index e2568e33..76ed8966 100644 --- a/storage/bigquery/bigquery.go +++ b/storage/bigquery/bigquery.go @@ -58,14 +58,10 @@ const ( colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault" // Hierarchical major page fault colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault" - // Cumulative per-core usage - colPerCoreCumulativeUsagePrefix string = "per_core_cumulative_usage_core_" // Optional: sample duration. Unit: nanoseconds. colSampleDuration string = "sample_duration" // Optional: Instant cpu usage. colCpuInstantUsage string = "cpu_instant_usage" - // Optiona: Instant per-core usage. - colPerCoreInstantUsagePrefix string = "per_core_instant_usage_core_" // Cumulative count of bytes received. colRxBytes string = "rx_bytes" // Cumulative count of receive errors encountered. @@ -85,89 +81,89 @@ func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema { Name: colTimestamp, Mode: "REQUIRED", } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeString, Name: colMachineName, Mode: "REQUIRED", } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeString, Name: colContainerName, Mode: "REQUIRED", } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colCpuCumulativeUsage, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colCpuCumulativeUsageSystem, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colCpuCumulativeUsageUser, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryUsage, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryWorkingSet, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryContainerPgfault, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryContainerPgmajfault, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryHierarchicalPgfault, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colMemoryHierarchicalPgmajfault, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colSampleDuration, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colCpuInstantUsage, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colRxBytes, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colRxErrors, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colTxBytes, } - i = i + 1 + i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colTxErrors, @@ -276,10 +272,10 @@ func convertToUint64(v interface{}) (uint64, error) { case uint32: return uint64(x), nil case string: - return strconv.ParseUint(v.(string), 10, 64) + return strconv.ParseUint(x, 10, 64) } - return 0, fmt.Errorf("Unknown type") + return 0, fmt.Errorf("unknown type") } func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { @@ -299,10 +295,10 @@ func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []i case col == colMachineName: if m, ok := v.(string); ok { if m != self.machineName { - return nil, fmt.Errorf("Different machine") + return nil, fmt.Errorf("different machine") } } else { - return nil, fmt.Errorf("Machine name field is not a string: %v", v) + return nil, fmt.Errorf("machine name field is not a string: %v", v) } // Cumulative Cpu Usage case col == colCpuCumulativeUsage: @@ -360,10 +356,10 @@ func (self *bigqueryStorage) valuesToContainerSample(columns []string, values [] case col == colMachineName: if m, ok := v.(string); ok { if m != self.machineName { - return nil, fmt.Errorf("Different machine") + return nil, fmt.Errorf("different machine") } } else { - return nil, fmt.Errorf("Machine name field is not a string: %v", v) + return nil, fmt.Errorf("machine name field is not a string: %v", v) } // Memory Usage case col == colMemoryUsage: @@ -404,20 +400,25 @@ func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.C return nil } +func (self *bigqueryStorage) getRecentRows(containerName string, numRows int) ([]string, [][]interface{}, error) { + tableName, err := self.client.GetTableName() + if err != nil { + return nil, nil, err + } + + query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName) + if numRows > 0 { + query = fmt.Sprintf("%v LIMIT %v", query, numRows) + } + + return self.client.Query(query) +} + func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { if numStats == 0 { return nil, nil } - tableName, err := self.client.GetTableName() - if err != nil { - return nil, err - } - - query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName) - if numStats > 0 { - query = fmt.Sprintf("%v LIMIT %v", query, numStats) - } - header, rows, err := self.client.Query(query) + header, rows, err := self.getRecentRows(containerName, numStats) if err != nil { return nil, err } @@ -439,15 +440,7 @@ func (self *bigqueryStorage) Samples(containerName string, numSamples int) ([]*i if numSamples == 0 { return nil, nil } - tableName, err := self.client.GetTableName() - if err != nil { - return nil, err - } - query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName) - if numSamples > 0 { - query = fmt.Sprintf("%v LIMIT %v", query, numSamples) - } - header, rows, err := self.client.Query(query) + header, rows, err := self.getRecentRows(containerName, numSamples) if err != nil { return nil, err } diff --git a/storage/bigquery/client/client.go b/storage/bigquery/client/client.go index 17525e9e..4e807632 100644 --- a/storage/bigquery/client/client.go +++ b/storage/bigquery/client/client.go @@ -15,14 +15,15 @@ package client import ( - "code.google.com/p/goauth2/oauth" - "code.google.com/p/goauth2/oauth/jwt" - bigquery "code.google.com/p/google-api-go-client/bigquery/v2" "flag" "fmt" "io/ioutil" "net/http" "strings" + + "code.google.com/p/goauth2/oauth" + "code.google.com/p/goauth2/oauth/jwt" + bigquery "code.google.com/p/google-api-go-client/bigquery/v2" ) var ( @@ -46,6 +47,7 @@ type Client struct { tableId string } +// Helper method to create an authenticated connection. func connect() (*oauth.Token, *bigquery.Service, error) { if *clientId == "" { return nil, nil, fmt.Errorf("No client id specified") @@ -94,6 +96,7 @@ func connect() (*oauth.Token, *bigquery.Service, error) { return token, service, nil } +// Creates a new client instance with an authenticated connection to bigquery. func NewClient() (*Client, error) { token, service, err := connect() if err != nil { @@ -111,6 +114,8 @@ func (c *Client) Close() error { return nil } +// Helper method to return the bigquery service connection. +// Expired connection is refreshed. func (c *Client) getService() (*bigquery.Service, error) { if c.token == nil || c.service == nil { return nil, fmt.Errorf("Service not initialized") @@ -162,6 +167,8 @@ func (c *Client) CreateDataset(datasetId string) error { return nil } +// Create a table with provided table ID and schema. +// Schema is currently not updated if the table already exists. func (c *Client) CreateTable(tableId string, schema *bigquery.TableSchema) error { if c.service == nil || c.datasetId == "" { return fmt.Errorf("No dataset created") @@ -186,6 +193,7 @@ func (c *Client) CreateTable(tableId string, schema *bigquery.TableSchema) error return nil } +// Add a row to the connected table. func (c *Client) InsertRow(rowData map[string]interface{}) error { service, _ := c.getService() if service == nil || c.datasetId == "" || c.tableId == "" { @@ -215,6 +223,7 @@ func (c *Client) InsertRow(rowData map[string]interface{}) error { return nil } +// Returns a bigtable table name (format: datasetID.tableID) func (c *Client) GetTableName() (string, error) { if c.service == nil || c.datasetId == "" || c.tableId == "" { return "", fmt.Errorf("Table not setup") @@ -222,6 +231,8 @@ func (c *Client) GetTableName() (string, error) { return fmt.Sprintf("%s.%s", c.datasetId, c.tableId), nil } +// Do a synchronous query on bigtable and return a header and data rows. +// Number of rows are capped to queryLimit. func (c *Client) Query(query string) ([]string, [][]interface{}, error) { service, err := c.getService() if err != nil { diff --git a/storage/bigquery/client/example/example.go b/storage/bigquery/client/example/example.go index 86c72b48..08eaffd3 100644 --- a/storage/bigquery/client/example/example.go +++ b/storage/bigquery/client/example/example.go @@ -17,9 +17,10 @@ package main import ( "flag" "fmt" + "time" + "github.com/SeanDolphin/bqschema" "github.com/google/cadvisor/storage/bigquery/client" - "time" ) type container struct {