From 1e35331848a019bdb8f1f3c71917271ffa5caf0d Mon Sep 17 00:00:00 2001 From: Julien Maitrehenry Date: Sat, 4 Jul 2015 00:00:40 -0400 Subject: [PATCH] Update statsd storage - cpu, memory and network done Add Fs stats to statsd Update import for pointing to official repository Update statsd storage for respecting fmt coding style Create a separate client file Fix gofmt --- storage/statsd/{ => client}/client.go | 43 ++++----- storage/statsd/statsd.go | 127 ++++++++++++++++++++++++++ storagedriver.go | 6 ++ 3 files changed, 150 insertions(+), 26 deletions(-) rename storage/statsd/{ => client}/client.go (61%) create mode 100644 storage/statsd/statsd.go diff --git a/storage/statsd/client.go b/storage/statsd/client/client.go similarity index 61% rename from storage/statsd/client.go rename to storage/statsd/client/client.go index ffef57ff..958468ad 100644 --- a/storage/statsd/client.go +++ b/storage/statsd/client/client.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package statsd +package client import ( "fmt" @@ -22,8 +22,9 @@ import ( ) type Client struct { - HostPort string - conn net.Conn + HostPort string + Namespace string + conn net.Conn } func (self *Client) Open() error { @@ -36,38 +37,28 @@ func (self *Client) Open() error { return nil } -func (self *Client) Close() { +func (self *Client) Close() error { self.conn.Close() + self.conn = nil + return nil } -func (self *Client) UpdateGauge(name, value string) error { - stats := make(map[string]string) - val := fmt.Sprintf("%s|g", value) - stats[name] = val - if err := self.send(stats); err != nil { +// Simple send to statsd daemon without sampling. +func (self *Client) Send(namespace, containerName, key string, value uint64) error { + // only send counter value + formatted := fmt.Sprintf("%s.%s.%s:%d|g", namespace, containerName, key, value) + _, err := fmt.Fprintf(self.conn, formatted) + if err != nil { + glog.V(3).Infof("failed to send data %q: %v", formatted, err) return err } return nil } -// Simple send to statsd daemon without sampling. -func (self *Client) send(data map[string]string) error { - for k, v := range data { - formatted := fmt.Sprintf("%s:%s", k, v) - _, err := fmt.Fprintf(self.conn, formatted) - if err != nil { - glog.V(3).Infof("failed to send data %q: %v", formatted, err) - // return on first error. - return err - } - } - return nil -} - func New(hostPort string) (*Client, error) { - client := Client{HostPort: hostPort} - if err := client.Open(); err != nil { + Client := Client{HostPort: hostPort} + if err := Client.Open(); err != nil { return nil, err } - return &client, nil + return &Client, nil } diff --git a/storage/statsd/statsd.go b/storage/statsd/statsd.go new file mode 100644 index 00000000..0b4ce9f4 --- /dev/null +++ b/storage/statsd/statsd.go @@ -0,0 +1,127 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statsd + +import ( + info "github.com/google/cadvisor/info/v1" + client "github.com/google/cadvisor/storage/statsd/client" +) + +type statsdStorage struct { + client *client.Client + Namespace string +} + +const ( + colCpuCumulativeUsage string = "cpu_cumulative_usage" + // Memory Usage + colMemoryUsage string = "memory_usage" + // Working set size + colMemoryWorkingSet string = "memory_working_set" + // Cumulative count of bytes received. + colRxBytes string = "rx_bytes" + // Cumulative count of receive errors encountered. + colRxErrors string = "rx_errors" + // Cumulative count of bytes transmitted. + colTxBytes string = "tx_bytes" + // Cumulative count of transmit errors encountered. + colTxErrors string = "tx_errors" + // Filesystem summary + colFsSummary = "fs_summary" + // Filesystem limit. + colFsLimit = "fs_limit" + // Filesystem usage. + colFsUsage = "fs_usage" +) + +func (self *statsdStorage) containerStatsToValues( + stats *info.ContainerStats, +) (series map[string]uint64) { + series = make(map[string]uint64) + + // Cumulative Cpu Usage + series[colCpuCumulativeUsage] = stats.Cpu.Usage.Total + + // Memory Usage + series[colMemoryUsage] = stats.Memory.Usage + + // Working set size + series[colMemoryWorkingSet] = stats.Memory.WorkingSet + + // Network stats. + series[colRxBytes] = stats.Network.RxBytes + series[colRxErrors] = stats.Network.RxErrors + series[colTxBytes] = stats.Network.TxBytes + series[colTxErrors] = stats.Network.TxErrors + + return series +} + +func (self *statsdStorage) containerFsStatsToValues( + series *map[string]uint64, + stats *info.ContainerStats, +) { + for _, fsStat := range stats.Filesystem { + // Summary stats. + (*series)[colFsSummary+"."+colFsLimit] += fsStat.Limit + (*series)[colFsSummary+"."+colFsUsage] += fsStat.Usage + + // Per device stats. + (*series)[fsStat.Device+"."+colFsLimit] = fsStat.Limit + (*series)[fsStat.Device+"."+colFsUsage] = fsStat.Usage + } +} + +//Push the data into redis +func (self *statsdStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { + if stats == nil { + return nil + } + + var containerName string + if len(ref.Aliases) > 0 { + containerName = ref.Aliases[0] + } else { + containerName = ref.Name + } + + series := self.containerStatsToValues(stats) + self.containerFsStatsToValues(&series, stats) + for key, value := range series { + err := self.client.Send(self.Namespace, containerName, key, value) + if err != nil { + return err + } + } + return nil +} + +func (self *statsdStorage) Close() error { + self.client.Close() + self.client = nil + return nil +} + +func New(namespace, hostPort string) (*statsdStorage, error) { + statsdClient, err := client.New(hostPort) + if err != nil { + return nil, err + } + statsdStorage := &statsdStorage{ + client: statsdClient, + Namespace: namespace, + } + return statsdStorage, nil +} diff --git a/storagedriver.go b/storagedriver.go index 4dc1fe03..c42cda9b 100644 --- a/storagedriver.go +++ b/storagedriver.go @@ -26,6 +26,7 @@ import ( "github.com/google/cadvisor/storage/bigquery" "github.com/google/cadvisor/storage/influxdb" "github.com/google/cadvisor/storage/redis" + "github.com/google/cadvisor/storage/statsd" ) var argDbUsername = flag.String("storage_driver_user", "root", "database username") @@ -88,6 +89,11 @@ func NewMemoryStorage(backendStorageName string) (*memory.InMemoryCache, error) *argDbHost, *argDbBufferDuration, ) + case "statsd": + backendStorage, err = statsd.New( + *argDbName, + *argDbHost, + ) default: err = fmt.Errorf("unknown backend storage driver: %v", *argDbDriver) }