From b0ce5720b03b009c4bc48a57857c0737cb2ae07f Mon Sep 17 00:00:00 2001 From: Jimmi Dyson Date: Fri, 27 Nov 2015 15:03:27 +0000 Subject: [PATCH] Refactor storage plugins --- storage/bigquery/bigquery.go | 23 +++++- storage/common_flags.go | 28 +++++++ storage/elasticsearch/elasticsearch.go | 30 ++++++- storage/influxdb/influxdb.go | 26 +++++- storage/redis/redis.go | 21 ++++- storage/statsd/statsd.go | 11 ++- storage/stdout/stdout.go | 11 ++- storage/storage.go | 25 +++++- storagedriver.go | 109 ++----------------------- 9 files changed, 174 insertions(+), 110 deletions(-) create mode 100644 storage/common_flags.go diff --git a/storage/bigquery/bigquery.go b/storage/bigquery/bigquery.go index 5b198a57..316f8f3c 100644 --- a/storage/bigquery/bigquery.go +++ b/storage/bigquery/bigquery.go @@ -15,6 +15,8 @@ package bigquery import ( + "os" + info "github.com/google/cadvisor/info/v1" "github.com/google/cadvisor/storage" "github.com/google/cadvisor/storage/bigquery/client" @@ -22,6 +24,10 @@ import ( bigquery "google.golang.org/api/bigquery/v2" ) +func init() { + storage.RegisterStorageDriver("bigquery", new) +} + type bigqueryStorage struct { client *client.Client machineName string @@ -68,6 +74,18 @@ const ( colFsUsage = "fs_usage" ) +func new() (storage.StorageDriver, error) { + hostname, err := os.Hostname() + if err != nil { + return nil, err + } + return newStorage( + hostname, + *storage.ArgDbTable, + *storage.ArgDbName, + ) +} + // TODO(jnagal): Infer schema through reflection. (See bigquery/client/example) func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema { fields := make([]*bigquery.TableFieldSchema, 19) @@ -271,10 +289,7 @@ func (self *bigqueryStorage) Close() error { // machineName: A unique identifier to identify the host that current cAdvisor // instance is running on. // tableName: BigQuery table used for storing stats. -func New(machineName, - datasetId, - tableName string, -) (storage.StorageDriver, error) { +func newStorage(machineName, datasetId, tableName string) (storage.StorageDriver, error) { bqClient, err := client.NewClient() if err != nil { return nil, err diff --git a/storage/common_flags.go b/storage/common_flags.go new file mode 100644 index 00000000..1214d3e8 --- /dev/null +++ b/storage/common_flags.go @@ -0,0 +1,28 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "flag" + "time" +) + +var ArgDbUsername = flag.String("storage_driver_user", "root", "database username") +var ArgDbPassword = flag.String("storage_driver_password", "root", "database password") +var ArgDbHost = flag.String("storage_driver_host", "localhost:8086", "database host:port") +var ArgDbName = flag.String("storage_driver_db", "cadvisor", "database name") +var ArgDbTable = flag.String("storage_driver_table", "stats", "table name") +var ArgDbIsSecure = flag.Bool("storage_driver_secure", false, "use secure connection with database") +var ArgDbBufferDuration = flag.Duration("storage_driver_buffer_duration", 60*time.Second, "Writes in the storage driver will be buffered for this duration, and committed to the non memory backends as a single transaction") diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go index 053dcdca..6a3f0c40 100644 --- a/storage/elasticsearch/elasticsearch.go +++ b/storage/elasticsearch/elasticsearch.go @@ -15,7 +15,9 @@ package elasticsearch import ( + "flag" "fmt" + "os" "sync" "time" @@ -25,6 +27,10 @@ import ( "gopkg.in/olivere/elastic.v2" ) +func init() { + storage.RegisterStorageDriver("elasticsearch", new) +} + type elasticStorage struct { client *elastic.Client machineName string @@ -40,6 +46,27 @@ type detailSpec struct { ContainerStats *info.ContainerStats `json:"container_stats,omitempty"` } +var ( + argElasticHost = flag.String("storage_driver_es_host", "http://localhost:9200", "ElasticSearch host:port") + argIndexName = flag.String("storage_driver_es_index", "cadvisor", "ElasticSearch index name") + argTypeName = flag.String("storage_driver_es_type", "stats", "ElasticSearch type name") + argEnableSniffer = flag.Bool("storage_driver_es_enable_sniffer", false, "ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically") +) + +func new() (storage.StorageDriver, error) { + hostname, err := os.Hostname() + if err != nil { + return nil, err + } + return newStorage( + hostname, + *argIndexName, + *argTypeName, + *argElasticHost, + *argEnableSniffer, + ) +} + func (self *elasticStorage) containerStatsAndDefaultValues( ref info.ContainerReference, stats *info.ContainerStats) *detailSpec { timestamp := stats.Timestamp.UnixNano() / 1E3 @@ -90,7 +117,8 @@ func (self *elasticStorage) Close() error { // machineName: A unique identifier to identify the host that current cAdvisor // instance is running on. // ElasticHost: The host which runs ElasticSearch. -func New(machineName, +func newStorage( + machineName, indexName, typeName, elasticHost string, diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index e6967a8c..58d97c62 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -16,14 +16,20 @@ package influxdb import ( "fmt" + "os" "sync" "time" info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/storage" influxdb "github.com/influxdb/influxdb/client" ) +func init() { + storage.RegisterStorageDriver("influxdb", new) +} + type influxdbStorage struct { client *influxdb.Client machineName string @@ -60,6 +66,23 @@ const ( colFsUsage = "fs_usage" ) +func new() (storage.StorageDriver, error) { + hostname, err := os.Hostname() + if err != nil { + return nil, err + } + return newStorage( + hostname, + *storage.ArgDbTable, + *storage.ArgDbName, + *storage.ArgDbUsername, + *storage.ArgDbPassword, + *storage.ArgDbHost, + *storage.ArgDbIsSecure, + *storage.ArgDbBufferDuration, + ) +} + func (self *influxdbStorage) getSeriesDefaultValues( ref info.ContainerReference, stats *info.ContainerStats, @@ -196,7 +219,8 @@ func (self *influxdbStorage) newSeries(columns []string, points []interface{}) * // machineName: A unique identifier to identify the host that current cAdvisor // instance is running on. // influxdbHost: The host which runs influxdb. -func New(machineName, +func newStorage( + machineName, tablename, database, username, diff --git a/storage/redis/redis.go b/storage/redis/redis.go index 5aa03575..44f7e342 100644 --- a/storage/redis/redis.go +++ b/storage/redis/redis.go @@ -16,6 +16,7 @@ package redis import ( "encoding/json" + "os" "sync" "time" @@ -25,6 +26,10 @@ import ( redis "github.com/garyburd/redigo/redis" ) +func init() { + storage.RegisterStorageDriver("redis", new) +} + type redisStorage struct { conn redis.Conn machineName string @@ -42,6 +47,19 @@ type detailSpec struct { ContainerStats *info.ContainerStats `json:"container_stats,omitempty"` } +func new() (storage.StorageDriver, error) { + hostname, err := os.Hostname() + if err != nil { + return nil, err + } + return newStorage( + hostname, + *storage.ArgDbName, + *storage.ArgDbHost, + *storage.ArgDbBufferDuration, + ) +} + func (self *redisStorage) defaultReadyToFlush() bool { return time.Since(self.lastWrite) >= self.bufferDuration } @@ -99,7 +117,8 @@ func (self *redisStorage) Close() error { // instance is running on. // redisHost: The host which runs redis. // redisKey: The key for the Data that stored in the redis -func New(machineName, +func newStorage( + machineName, redisKey, redisHost string, bufferDuration time.Duration, diff --git a/storage/statsd/statsd.go b/storage/statsd/statsd.go index 0b4ce9f4..0b571dcf 100644 --- a/storage/statsd/statsd.go +++ b/storage/statsd/statsd.go @@ -16,9 +16,14 @@ package statsd import ( info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/storage" client "github.com/google/cadvisor/storage/statsd/client" ) +func init() { + storage.RegisterStorageDriver("statsd", new) +} + type statsdStorage struct { client *client.Client Namespace string @@ -46,6 +51,10 @@ const ( colFsUsage = "fs_usage" ) +func new() (storage.StorageDriver, error) { + return newStorage(*storage.ArgDbName, *storage.ArgDbHost) +} + func (self *statsdStorage) containerStatsToValues( stats *info.ContainerStats, ) (series map[string]uint64) { @@ -114,7 +123,7 @@ func (self *statsdStorage) Close() error { return nil } -func New(namespace, hostPort string) (*statsdStorage, error) { +func newStorage(namespace, hostPort string) (*statsdStorage, error) { statsdClient, err := client.New(hostPort) if err != nil { return nil, err diff --git a/storage/stdout/stdout.go b/storage/stdout/stdout.go index af7b58cc..aa8aafe9 100644 --- a/storage/stdout/stdout.go +++ b/storage/stdout/stdout.go @@ -19,8 +19,13 @@ import ( "fmt" info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/storage" ) +func init() { + storage.RegisterStorageDriver("stdout", new) +} + type stdoutStorage struct { Namespace string } @@ -47,6 +52,10 @@ const ( colFsUsage = "fs_usage" ) +func new() (storage.StorageDriver, error) { + return newStorage(*storage.ArgDbHost) +} + func (driver *stdoutStorage) containerStatsToValues(stats *info.ContainerStats) (series map[string]uint64) { series = make(map[string]uint64) @@ -108,7 +117,7 @@ func (driver *stdoutStorage) Close() error { return nil } -func New(namespace string) (*stdoutStorage, error) { +func newStorage(namespace string) (*stdoutStorage, error) { stdoutStorage := &stdoutStorage{ Namespace: namespace, } diff --git a/storage/storage.go b/storage/storage.go index a079ebdd..1a0dccf0 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -14,7 +14,11 @@ package storage -import info "github.com/google/cadvisor/info/v1" +import ( + "fmt" + + info "github.com/google/cadvisor/info/v1" +) type StorageDriver interface { AddStats(ref info.ContainerReference, stats *info.ContainerStats) error @@ -24,3 +28,22 @@ type StorageDriver interface { // on the implementation of the storage driver. Close() error } + +type StorageDriverFunc func() (StorageDriver, error) + +var registeredPlugins = map[string](StorageDriverFunc){} + +func RegisterStorageDriver(name string, f StorageDriverFunc) { + registeredPlugins[name] = f +} + +func New(name string) (StorageDriver, error) { + if name == "" { + return nil, nil + } + f, ok := registeredPlugins[name] + if !ok { + return nil, fmt.Errorf("unknown backend storage driver: %s", name) + } + return f() +} diff --git a/storagedriver.go b/storagedriver.go index dff8ba02..43f72987 100644 --- a/storagedriver.go +++ b/storagedriver.go @@ -16,123 +16,32 @@ package main import ( "flag" - "fmt" - "os" "time" "github.com/google/cadvisor/cache/memory" "github.com/google/cadvisor/storage" - "github.com/google/cadvisor/storage/bigquery" - "github.com/google/cadvisor/storage/elasticsearch" - "github.com/google/cadvisor/storage/influxdb" - "github.com/google/cadvisor/storage/redis" - "github.com/google/cadvisor/storage/statsd" - "github.com/google/cadvisor/storage/stdout" + _ "github.com/google/cadvisor/storage/bigquery" + _ "github.com/google/cadvisor/storage/elasticsearch" + _ "github.com/google/cadvisor/storage/influxdb" + _ "github.com/google/cadvisor/storage/redis" + _ "github.com/google/cadvisor/storage/statsd" + _ "github.com/google/cadvisor/storage/stdout" "github.com/golang/glog" ) -var argDbUsername = flag.String("storage_driver_user", "root", "database username") -var argDbPassword = flag.String("storage_driver_password", "root", "database password") -var argDbHost = flag.String("storage_driver_host", "localhost:8086", "database host:port") -var argDbName = flag.String("storage_driver_db", "cadvisor", "database name") -var argDbTable = flag.String("storage_driver_table", "stats", "table name") -var argDbIsSecure = flag.Bool("storage_driver_secure", false, "use secure connection with database") -var argDbBufferDuration = flag.Duration("storage_driver_buffer_duration", 60*time.Second, "Writes in the storage driver will be buffered for this duration, and committed to the non memory backends as a single transaction") var storageDuration = flag.Duration("storage_duration", 2*time.Minute, "How long to keep data stored (Default: 2min).") -var argElasticHost = flag.String("storage_driver_es_host", "http://localhost:9200", "ElasticSearch host:port") -var argIndexName = flag.String("storage_driver_es_index", "cadvisor", "ElasticSearch index name") -var argTypeName = flag.String("storage_driver_es_type", "stats", "ElasticSearch type name") -var argEnableSniffer = flag.Bool("storage_driver_es_enable_sniffer", false, "ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically") -// Creates a memory storage with an optional backend storage option. +// NewMemoryStorage creates a memory storage with an optional backend storage option. func NewMemoryStorage(backendStorageName string) (*memory.InMemoryCache, error) { - var storageDriver *memory.InMemoryCache - var backendStorage storage.StorageDriver - var err error - switch backendStorageName { - case "": - backendStorage = nil - case "influxdb": - var hostname string - hostname, err = os.Hostname() - if err != nil { - return nil, err - } - - backendStorage, err = influxdb.New( - hostname, - *argDbTable, - *argDbName, - *argDbUsername, - *argDbPassword, - *argDbHost, - *argDbIsSecure, - *argDbBufferDuration, - ) - case "bigquery": - var hostname string - hostname, err = os.Hostname() - if err != nil { - return nil, err - } - backendStorage, err = bigquery.New( - hostname, - *argDbTable, - *argDbName, - ) - case "redis": - //machineName: We use os.Hostname as the machineName (A unique identifier to identify the host that runs the current cAdvisor) - //argDbName: the key for redis's data - //argDbHost: the redis's server host - var machineName string - machineName, err = os.Hostname() - if err != nil { - return nil, err - } - backendStorage, err = redis.New( - machineName, - *argDbName, - *argDbHost, - *argDbBufferDuration, - ) - case "elasticsearch": - //argIndexName: the index for elasticsearch - //argTypeName: the type for index - //argElasticHost: the elasticsearch's server host - var machineName string - machineName, err = os.Hostname() - if err != nil { - return nil, err - } - backendStorage, err = elasticsearch.New( - machineName, - *argIndexName, - *argTypeName, - *argElasticHost, - *argEnableSniffer, - ) - case "statsd": - backendStorage, err = statsd.New( - *argDbName, - *argDbHost, - ) - case "stdout": - backendStorage, err = stdout.New( - *argDbHost, - ) - default: - err = fmt.Errorf("unknown backend storage driver: %v", *argDbDriver) - } + backendStorage, err := storage.New(backendStorageName) if err != nil { return nil, err } if backendStorageName != "" { glog.Infof("Using backend storage type %q", backendStorageName) - } else { - glog.Infof("No backend storage selected") } glog.Infof("Caching stats in memory for %v", *storageDuration) - storageDriver = memory.New(*storageDuration, backendStorage) + storageDriver := memory.New(*storageDuration, backendStorage) return storageDriver, nil }