Refactor storage plugins

This commit is contained in:
Jimmi Dyson 2015-11-27 15:03:27 +00:00
parent 8df37ece9b
commit b0ce5720b0
9 changed files with 174 additions and 110 deletions

View File

@ -15,6 +15,8 @@
package bigquery package bigquery
import ( import (
"os"
info "github.com/google/cadvisor/info/v1" info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/storage" "github.com/google/cadvisor/storage"
"github.com/google/cadvisor/storage/bigquery/client" "github.com/google/cadvisor/storage/bigquery/client"
@ -22,6 +24,10 @@ import (
bigquery "google.golang.org/api/bigquery/v2" bigquery "google.golang.org/api/bigquery/v2"
) )
func init() {
storage.RegisterStorageDriver("bigquery", new)
}
type bigqueryStorage struct { type bigqueryStorage struct {
client *client.Client client *client.Client
machineName string machineName string
@ -68,6 +74,18 @@ const (
colFsUsage = "fs_usage" colFsUsage = "fs_usage"
) )
func new() (storage.StorageDriver, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
return newStorage(
hostname,
*storage.ArgDbTable,
*storage.ArgDbName,
)
}
// TODO(jnagal): Infer schema through reflection. (See bigquery/client/example) // TODO(jnagal): Infer schema through reflection. (See bigquery/client/example)
func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema { func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema {
fields := make([]*bigquery.TableFieldSchema, 19) fields := make([]*bigquery.TableFieldSchema, 19)
@ -271,10 +289,7 @@ func (self *bigqueryStorage) Close() error {
// machineName: A unique identifier to identify the host that current cAdvisor // machineName: A unique identifier to identify the host that current cAdvisor
// instance is running on. // instance is running on.
// tableName: BigQuery table used for storing stats. // tableName: BigQuery table used for storing stats.
func New(machineName, func newStorage(machineName, datasetId, tableName string) (storage.StorageDriver, error) {
datasetId,
tableName string,
) (storage.StorageDriver, error) {
bqClient, err := client.NewClient() bqClient, err := client.NewClient()
if err != nil { if err != nil {
return nil, err return nil, err

28
storage/common_flags.go Normal file
View File

@ -0,0 +1,28 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"flag"
"time"
)
var ArgDbUsername = flag.String("storage_driver_user", "root", "database username")
var ArgDbPassword = flag.String("storage_driver_password", "root", "database password")
var ArgDbHost = flag.String("storage_driver_host", "localhost:8086", "database host:port")
var ArgDbName = flag.String("storage_driver_db", "cadvisor", "database name")
var ArgDbTable = flag.String("storage_driver_table", "stats", "table name")
var ArgDbIsSecure = flag.Bool("storage_driver_secure", false, "use secure connection with database")
var ArgDbBufferDuration = flag.Duration("storage_driver_buffer_duration", 60*time.Second, "Writes in the storage driver will be buffered for this duration, and committed to the non memory backends as a single transaction")

View File

@ -15,7 +15,9 @@
package elasticsearch package elasticsearch
import ( import (
"flag"
"fmt" "fmt"
"os"
"sync" "sync"
"time" "time"
@ -25,6 +27,10 @@ import (
"gopkg.in/olivere/elastic.v2" "gopkg.in/olivere/elastic.v2"
) )
func init() {
storage.RegisterStorageDriver("elasticsearch", new)
}
type elasticStorage struct { type elasticStorage struct {
client *elastic.Client client *elastic.Client
machineName string machineName string
@ -40,6 +46,27 @@ type detailSpec struct {
ContainerStats *info.ContainerStats `json:"container_stats,omitempty"` ContainerStats *info.ContainerStats `json:"container_stats,omitempty"`
} }
var (
argElasticHost = flag.String("storage_driver_es_host", "http://localhost:9200", "ElasticSearch host:port")
argIndexName = flag.String("storage_driver_es_index", "cadvisor", "ElasticSearch index name")
argTypeName = flag.String("storage_driver_es_type", "stats", "ElasticSearch type name")
argEnableSniffer = flag.Bool("storage_driver_es_enable_sniffer", false, "ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically")
)
func new() (storage.StorageDriver, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
return newStorage(
hostname,
*argIndexName,
*argTypeName,
*argElasticHost,
*argEnableSniffer,
)
}
func (self *elasticStorage) containerStatsAndDefaultValues( func (self *elasticStorage) containerStatsAndDefaultValues(
ref info.ContainerReference, stats *info.ContainerStats) *detailSpec { ref info.ContainerReference, stats *info.ContainerStats) *detailSpec {
timestamp := stats.Timestamp.UnixNano() / 1E3 timestamp := stats.Timestamp.UnixNano() / 1E3
@ -90,7 +117,8 @@ func (self *elasticStorage) Close() error {
// machineName: A unique identifier to identify the host that current cAdvisor // machineName: A unique identifier to identify the host that current cAdvisor
// instance is running on. // instance is running on.
// ElasticHost: The host which runs ElasticSearch. // ElasticHost: The host which runs ElasticSearch.
func New(machineName, func newStorage(
machineName,
indexName, indexName,
typeName, typeName,
elasticHost string, elasticHost string,

View File

@ -16,14 +16,20 @@ package influxdb
import ( import (
"fmt" "fmt"
"os"
"sync" "sync"
"time" "time"
info "github.com/google/cadvisor/info/v1" info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/storage"
influxdb "github.com/influxdb/influxdb/client" influxdb "github.com/influxdb/influxdb/client"
) )
func init() {
storage.RegisterStorageDriver("influxdb", new)
}
type influxdbStorage struct { type influxdbStorage struct {
client *influxdb.Client client *influxdb.Client
machineName string machineName string
@ -60,6 +66,23 @@ const (
colFsUsage = "fs_usage" colFsUsage = "fs_usage"
) )
func new() (storage.StorageDriver, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
return newStorage(
hostname,
*storage.ArgDbTable,
*storage.ArgDbName,
*storage.ArgDbUsername,
*storage.ArgDbPassword,
*storage.ArgDbHost,
*storage.ArgDbIsSecure,
*storage.ArgDbBufferDuration,
)
}
func (self *influxdbStorage) getSeriesDefaultValues( func (self *influxdbStorage) getSeriesDefaultValues(
ref info.ContainerReference, ref info.ContainerReference,
stats *info.ContainerStats, stats *info.ContainerStats,
@ -196,7 +219,8 @@ func (self *influxdbStorage) newSeries(columns []string, points []interface{}) *
// machineName: A unique identifier to identify the host that current cAdvisor // machineName: A unique identifier to identify the host that current cAdvisor
// instance is running on. // instance is running on.
// influxdbHost: The host which runs influxdb. // influxdbHost: The host which runs influxdb.
func New(machineName, func newStorage(
machineName,
tablename, tablename,
database, database,
username, username,

View File

@ -16,6 +16,7 @@ package redis
import ( import (
"encoding/json" "encoding/json"
"os"
"sync" "sync"
"time" "time"
@ -25,6 +26,10 @@ import (
redis "github.com/garyburd/redigo/redis" redis "github.com/garyburd/redigo/redis"
) )
func init() {
storage.RegisterStorageDriver("redis", new)
}
type redisStorage struct { type redisStorage struct {
conn redis.Conn conn redis.Conn
machineName string machineName string
@ -42,6 +47,19 @@ type detailSpec struct {
ContainerStats *info.ContainerStats `json:"container_stats,omitempty"` ContainerStats *info.ContainerStats `json:"container_stats,omitempty"`
} }
func new() (storage.StorageDriver, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
return newStorage(
hostname,
*storage.ArgDbName,
*storage.ArgDbHost,
*storage.ArgDbBufferDuration,
)
}
func (self *redisStorage) defaultReadyToFlush() bool { func (self *redisStorage) defaultReadyToFlush() bool {
return time.Since(self.lastWrite) >= self.bufferDuration return time.Since(self.lastWrite) >= self.bufferDuration
} }
@ -99,7 +117,8 @@ func (self *redisStorage) Close() error {
// instance is running on. // instance is running on.
// redisHost: The host which runs redis. // redisHost: The host which runs redis.
// redisKey: The key for the Data that stored in the redis // redisKey: The key for the Data that stored in the redis
func New(machineName, func newStorage(
machineName,
redisKey, redisKey,
redisHost string, redisHost string,
bufferDuration time.Duration, bufferDuration time.Duration,

View File

@ -16,9 +16,14 @@ package statsd
import ( import (
info "github.com/google/cadvisor/info/v1" info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/storage"
client "github.com/google/cadvisor/storage/statsd/client" client "github.com/google/cadvisor/storage/statsd/client"
) )
func init() {
storage.RegisterStorageDriver("statsd", new)
}
type statsdStorage struct { type statsdStorage struct {
client *client.Client client *client.Client
Namespace string Namespace string
@ -46,6 +51,10 @@ const (
colFsUsage = "fs_usage" colFsUsage = "fs_usage"
) )
func new() (storage.StorageDriver, error) {
return newStorage(*storage.ArgDbName, *storage.ArgDbHost)
}
func (self *statsdStorage) containerStatsToValues( func (self *statsdStorage) containerStatsToValues(
stats *info.ContainerStats, stats *info.ContainerStats,
) (series map[string]uint64) { ) (series map[string]uint64) {
@ -114,7 +123,7 @@ func (self *statsdStorage) Close() error {
return nil return nil
} }
func New(namespace, hostPort string) (*statsdStorage, error) { func newStorage(namespace, hostPort string) (*statsdStorage, error) {
statsdClient, err := client.New(hostPort) statsdClient, err := client.New(hostPort)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -19,8 +19,13 @@ import (
"fmt" "fmt"
info "github.com/google/cadvisor/info/v1" info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/storage"
) )
func init() {
storage.RegisterStorageDriver("stdout", new)
}
type stdoutStorage struct { type stdoutStorage struct {
Namespace string Namespace string
} }
@ -47,6 +52,10 @@ const (
colFsUsage = "fs_usage" colFsUsage = "fs_usage"
) )
func new() (storage.StorageDriver, error) {
return newStorage(*storage.ArgDbHost)
}
func (driver *stdoutStorage) containerStatsToValues(stats *info.ContainerStats) (series map[string]uint64) { func (driver *stdoutStorage) containerStatsToValues(stats *info.ContainerStats) (series map[string]uint64) {
series = make(map[string]uint64) series = make(map[string]uint64)
@ -108,7 +117,7 @@ func (driver *stdoutStorage) Close() error {
return nil return nil
} }
func New(namespace string) (*stdoutStorage, error) { func newStorage(namespace string) (*stdoutStorage, error) {
stdoutStorage := &stdoutStorage{ stdoutStorage := &stdoutStorage{
Namespace: namespace, Namespace: namespace,
} }

View File

@ -14,7 +14,11 @@
package storage package storage
import info "github.com/google/cadvisor/info/v1" import (
"fmt"
info "github.com/google/cadvisor/info/v1"
)
type StorageDriver interface { type StorageDriver interface {
AddStats(ref info.ContainerReference, stats *info.ContainerStats) error AddStats(ref info.ContainerReference, stats *info.ContainerStats) error
@ -24,3 +28,22 @@ type StorageDriver interface {
// on the implementation of the storage driver. // on the implementation of the storage driver.
Close() error Close() error
} }
type StorageDriverFunc func() (StorageDriver, error)
var registeredPlugins = map[string](StorageDriverFunc){}
func RegisterStorageDriver(name string, f StorageDriverFunc) {
registeredPlugins[name] = f
}
func New(name string) (StorageDriver, error) {
if name == "" {
return nil, nil
}
f, ok := registeredPlugins[name]
if !ok {
return nil, fmt.Errorf("unknown backend storage driver: %s", name)
}
return f()
}

View File

@ -16,123 +16,32 @@ package main
import ( import (
"flag" "flag"
"fmt"
"os"
"time" "time"
"github.com/google/cadvisor/cache/memory" "github.com/google/cadvisor/cache/memory"
"github.com/google/cadvisor/storage" "github.com/google/cadvisor/storage"
"github.com/google/cadvisor/storage/bigquery" _ "github.com/google/cadvisor/storage/bigquery"
"github.com/google/cadvisor/storage/elasticsearch" _ "github.com/google/cadvisor/storage/elasticsearch"
"github.com/google/cadvisor/storage/influxdb" _ "github.com/google/cadvisor/storage/influxdb"
"github.com/google/cadvisor/storage/redis" _ "github.com/google/cadvisor/storage/redis"
"github.com/google/cadvisor/storage/statsd" _ "github.com/google/cadvisor/storage/statsd"
"github.com/google/cadvisor/storage/stdout" _ "github.com/google/cadvisor/storage/stdout"
"github.com/golang/glog" "github.com/golang/glog"
) )
var argDbUsername = flag.String("storage_driver_user", "root", "database username")
var argDbPassword = flag.String("storage_driver_password", "root", "database password")
var argDbHost = flag.String("storage_driver_host", "localhost:8086", "database host:port")
var argDbName = flag.String("storage_driver_db", "cadvisor", "database name")
var argDbTable = flag.String("storage_driver_table", "stats", "table name")
var argDbIsSecure = flag.Bool("storage_driver_secure", false, "use secure connection with database")
var argDbBufferDuration = flag.Duration("storage_driver_buffer_duration", 60*time.Second, "Writes in the storage driver will be buffered for this duration, and committed to the non memory backends as a single transaction")
var storageDuration = flag.Duration("storage_duration", 2*time.Minute, "How long to keep data stored (Default: 2min).") var storageDuration = flag.Duration("storage_duration", 2*time.Minute, "How long to keep data stored (Default: 2min).")
var argElasticHost = flag.String("storage_driver_es_host", "http://localhost:9200", "ElasticSearch host:port")
var argIndexName = flag.String("storage_driver_es_index", "cadvisor", "ElasticSearch index name")
var argTypeName = flag.String("storage_driver_es_type", "stats", "ElasticSearch type name")
var argEnableSniffer = flag.Bool("storage_driver_es_enable_sniffer", false, "ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically")
// Creates a memory storage with an optional backend storage option. // NewMemoryStorage creates a memory storage with an optional backend storage option.
func NewMemoryStorage(backendStorageName string) (*memory.InMemoryCache, error) { func NewMemoryStorage(backendStorageName string) (*memory.InMemoryCache, error) {
var storageDriver *memory.InMemoryCache backendStorage, err := storage.New(backendStorageName)
var backendStorage storage.StorageDriver
var err error
switch backendStorageName {
case "":
backendStorage = nil
case "influxdb":
var hostname string
hostname, err = os.Hostname()
if err != nil {
return nil, err
}
backendStorage, err = influxdb.New(
hostname,
*argDbTable,
*argDbName,
*argDbUsername,
*argDbPassword,
*argDbHost,
*argDbIsSecure,
*argDbBufferDuration,
)
case "bigquery":
var hostname string
hostname, err = os.Hostname()
if err != nil {
return nil, err
}
backendStorage, err = bigquery.New(
hostname,
*argDbTable,
*argDbName,
)
case "redis":
//machineName: We use os.Hostname as the machineName (A unique identifier to identify the host that runs the current cAdvisor)
//argDbName: the key for redis's data
//argDbHost: the redis's server host
var machineName string
machineName, err = os.Hostname()
if err != nil {
return nil, err
}
backendStorage, err = redis.New(
machineName,
*argDbName,
*argDbHost,
*argDbBufferDuration,
)
case "elasticsearch":
//argIndexName: the index for elasticsearch
//argTypeName: the type for index
//argElasticHost: the elasticsearch's server host
var machineName string
machineName, err = os.Hostname()
if err != nil {
return nil, err
}
backendStorage, err = elasticsearch.New(
machineName,
*argIndexName,
*argTypeName,
*argElasticHost,
*argEnableSniffer,
)
case "statsd":
backendStorage, err = statsd.New(
*argDbName,
*argDbHost,
)
case "stdout":
backendStorage, err = stdout.New(
*argDbHost,
)
default:
err = fmt.Errorf("unknown backend storage driver: %v", *argDbDriver)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
if backendStorageName != "" { if backendStorageName != "" {
glog.Infof("Using backend storage type %q", backendStorageName) glog.Infof("Using backend storage type %q", backendStorageName)
} else {
glog.Infof("No backend storage selected")
} }
glog.Infof("Caching stats in memory for %v", *storageDuration) glog.Infof("Caching stats in memory for %v", *storageDuration)
storageDriver = memory.New(*storageDuration, backendStorage) storageDriver := memory.New(*storageDuration, backendStorage)
return storageDriver, nil return storageDriver, nil
} }