Format fixes and small refactoring.
Docker-DCO-1.1-Signed-off-by: Rohit Jnagal <jnagal@google.com> (github: rjnagal)
This commit is contained in:
parent
35325ae920
commit
570b145281
@ -58,14 +58,10 @@ const (
|
||||
colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault"
|
||||
// Hierarchical major page fault
|
||||
colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault"
|
||||
// Cumulative per-core usage
|
||||
colPerCoreCumulativeUsagePrefix string = "per_core_cumulative_usage_core_"
|
||||
// Optional: sample duration. Unit: nanoseconds.
|
||||
colSampleDuration string = "sample_duration"
|
||||
// Optional: Instant cpu usage.
|
||||
colCpuInstantUsage string = "cpu_instant_usage"
|
||||
// Optiona: Instant per-core usage.
|
||||
colPerCoreInstantUsagePrefix string = "per_core_instant_usage_core_"
|
||||
// Cumulative count of bytes received.
|
||||
colRxBytes string = "rx_bytes"
|
||||
// Cumulative count of receive errors encountered.
|
||||
@ -85,89 +81,89 @@ func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema {
|
||||
Name: colTimestamp,
|
||||
Mode: "REQUIRED",
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeString,
|
||||
Name: colMachineName,
|
||||
Mode: "REQUIRED",
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeString,
|
||||
Name: colContainerName,
|
||||
Mode: "REQUIRED",
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colCpuCumulativeUsage,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colCpuCumulativeUsageSystem,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colCpuCumulativeUsageUser,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryUsage,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryWorkingSet,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryContainerPgfault,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryContainerPgmajfault,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryHierarchicalPgfault,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colMemoryHierarchicalPgmajfault,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colSampleDuration,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colCpuInstantUsage,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colRxBytes,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colRxErrors,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colTxBytes,
|
||||
}
|
||||
i = i + 1
|
||||
i++
|
||||
fields[i] = &bigquery.TableFieldSchema{
|
||||
Type: typeInteger,
|
||||
Name: colTxErrors,
|
||||
@ -276,10 +272,10 @@ func convertToUint64(v interface{}) (uint64, error) {
|
||||
case uint32:
|
||||
return uint64(x), nil
|
||||
case string:
|
||||
return strconv.ParseUint(v.(string), 10, 64)
|
||||
return strconv.ParseUint(x, 10, 64)
|
||||
}
|
||||
|
||||
return 0, fmt.Errorf("Unknown type")
|
||||
return 0, fmt.Errorf("unknown type")
|
||||
}
|
||||
|
||||
func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) {
|
||||
@ -299,10 +295,10 @@ func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []i
|
||||
case col == colMachineName:
|
||||
if m, ok := v.(string); ok {
|
||||
if m != self.machineName {
|
||||
return nil, fmt.Errorf("Different machine")
|
||||
return nil, fmt.Errorf("different machine")
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("Machine name field is not a string: %v", v)
|
||||
return nil, fmt.Errorf("machine name field is not a string: %v", v)
|
||||
}
|
||||
// Cumulative Cpu Usage
|
||||
case col == colCpuCumulativeUsage:
|
||||
@ -360,10 +356,10 @@ func (self *bigqueryStorage) valuesToContainerSample(columns []string, values []
|
||||
case col == colMachineName:
|
||||
if m, ok := v.(string); ok {
|
||||
if m != self.machineName {
|
||||
return nil, fmt.Errorf("Different machine")
|
||||
return nil, fmt.Errorf("different machine")
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("Machine name field is not a string: %v", v)
|
||||
return nil, fmt.Errorf("machine name field is not a string: %v", v)
|
||||
}
|
||||
// Memory Usage
|
||||
case col == colMemoryUsage:
|
||||
@ -404,20 +400,25 @@ func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.C
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *bigqueryStorage) getRecentRows(containerName string, numRows int) ([]string, [][]interface{}, error) {
|
||||
tableName, err := self.client.GetTableName()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName)
|
||||
if numRows > 0 {
|
||||
query = fmt.Sprintf("%v LIMIT %v", query, numRows)
|
||||
}
|
||||
|
||||
return self.client.Query(query)
|
||||
}
|
||||
|
||||
func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
|
||||
if numStats == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
tableName, err := self.client.GetTableName()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName)
|
||||
if numStats > 0 {
|
||||
query = fmt.Sprintf("%v LIMIT %v", query, numStats)
|
||||
}
|
||||
header, rows, err := self.client.Query(query)
|
||||
header, rows, err := self.getRecentRows(containerName, numStats)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -439,15 +440,7 @@ func (self *bigqueryStorage) Samples(containerName string, numSamples int) ([]*i
|
||||
if numSamples == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
tableName, err := self.client.GetTableName()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
query := fmt.Sprintf("SELECT * FROM %v WHERE %v='%v' and %v='%v'", tableName, colContainerName, containerName, colMachineName, self.machineName)
|
||||
if numSamples > 0 {
|
||||
query = fmt.Sprintf("%v LIMIT %v", query, numSamples)
|
||||
}
|
||||
header, rows, err := self.client.Query(query)
|
||||
header, rows, err := self.getRecentRows(containerName, numSamples)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -15,14 +15,15 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"code.google.com/p/goauth2/oauth"
|
||||
"code.google.com/p/goauth2/oauth/jwt"
|
||||
bigquery "code.google.com/p/google-api-go-client/bigquery/v2"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"code.google.com/p/goauth2/oauth"
|
||||
"code.google.com/p/goauth2/oauth/jwt"
|
||||
bigquery "code.google.com/p/google-api-go-client/bigquery/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -46,6 +47,7 @@ type Client struct {
|
||||
tableId string
|
||||
}
|
||||
|
||||
// Helper method to create an authenticated connection.
|
||||
func connect() (*oauth.Token, *bigquery.Service, error) {
|
||||
if *clientId == "" {
|
||||
return nil, nil, fmt.Errorf("No client id specified")
|
||||
@ -94,6 +96,7 @@ func connect() (*oauth.Token, *bigquery.Service, error) {
|
||||
return token, service, nil
|
||||
}
|
||||
|
||||
// Creates a new client instance with an authenticated connection to bigquery.
|
||||
func NewClient() (*Client, error) {
|
||||
token, service, err := connect()
|
||||
if err != nil {
|
||||
@ -111,6 +114,8 @@ func (c *Client) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Helper method to return the bigquery service connection.
|
||||
// Expired connection is refreshed.
|
||||
func (c *Client) getService() (*bigquery.Service, error) {
|
||||
if c.token == nil || c.service == nil {
|
||||
return nil, fmt.Errorf("Service not initialized")
|
||||
@ -162,6 +167,8 @@ func (c *Client) CreateDataset(datasetId string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create a table with provided table ID and schema.
|
||||
// Schema is currently not updated if the table already exists.
|
||||
func (c *Client) CreateTable(tableId string, schema *bigquery.TableSchema) error {
|
||||
if c.service == nil || c.datasetId == "" {
|
||||
return fmt.Errorf("No dataset created")
|
||||
@ -186,6 +193,7 @@ func (c *Client) CreateTable(tableId string, schema *bigquery.TableSchema) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add a row to the connected table.
|
||||
func (c *Client) InsertRow(rowData map[string]interface{}) error {
|
||||
service, _ := c.getService()
|
||||
if service == nil || c.datasetId == "" || c.tableId == "" {
|
||||
@ -215,6 +223,7 @@ func (c *Client) InsertRow(rowData map[string]interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns a bigtable table name (format: datasetID.tableID)
|
||||
func (c *Client) GetTableName() (string, error) {
|
||||
if c.service == nil || c.datasetId == "" || c.tableId == "" {
|
||||
return "", fmt.Errorf("Table not setup")
|
||||
@ -222,6 +231,8 @@ func (c *Client) GetTableName() (string, error) {
|
||||
return fmt.Sprintf("%s.%s", c.datasetId, c.tableId), nil
|
||||
}
|
||||
|
||||
// Do a synchronous query on bigtable and return a header and data rows.
|
||||
// Number of rows are capped to queryLimit.
|
||||
func (c *Client) Query(query string) ([]string, [][]interface{}, error) {
|
||||
service, err := c.getService()
|
||||
if err != nil {
|
||||
|
@ -17,9 +17,10 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/SeanDolphin/bqschema"
|
||||
"github.com/google/cadvisor/storage/bigquery/client"
|
||||
"time"
|
||||
)
|
||||
|
||||
type container struct {
|
||||
|
Loading…
Reference in New Issue
Block a user