376 lines
10 KiB
Go
376 lines
10 KiB
Go
// Copyright 2014 Google Inc. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package influxdb
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
info "github.com/google/cadvisor/info/v1"
|
|
influxdb "github.com/influxdb/influxdb/client"
|
|
)
|
|
|
|
type influxdbStorage struct {
|
|
client *influxdb.Client
|
|
machineName string
|
|
tableName string
|
|
bufferDuration time.Duration
|
|
lastWrite time.Time
|
|
series []*influxdb.Series
|
|
lock sync.Mutex
|
|
readyToFlush func() bool
|
|
}
|
|
|
|
const (
|
|
colTimestamp string = "time"
|
|
colMachineName string = "machine"
|
|
colContainerName string = "container_name"
|
|
colCpuCumulativeUsage string = "cpu_cumulative_usage"
|
|
// Memory Usage
|
|
colMemoryUsage string = "memory_usage"
|
|
// Working set size
|
|
colMemoryWorkingSet string = "memory_working_set"
|
|
// Cumulative count of bytes received.
|
|
colRxBytes string = "rx_bytes"
|
|
// Cumulative count of receive errors encountered.
|
|
colRxErrors string = "rx_errors"
|
|
// Cumulative count of bytes transmitted.
|
|
colTxBytes string = "tx_bytes"
|
|
// Cumulative count of transmit errors encountered.
|
|
colTxErrors string = "tx_errors"
|
|
// Filesystem device.
|
|
colFsDevice = "fs_device"
|
|
// Filesystem limit.
|
|
colFsLimit = "fs_limit"
|
|
// Filesystem usage.
|
|
colFsUsage = "fs_usage"
|
|
)
|
|
|
|
func (self *influxdbStorage) getSeriesDefaultValues(
|
|
ref info.ContainerReference,
|
|
stats *info.ContainerStats,
|
|
columns *[]string,
|
|
values *[]interface{}) {
|
|
// Timestamp
|
|
*columns = append(*columns, colTimestamp)
|
|
*values = append(*values, stats.Timestamp.UnixNano()/1E3)
|
|
|
|
// Machine name
|
|
*columns = append(*columns, colMachineName)
|
|
*values = append(*values, self.machineName)
|
|
|
|
// Container name
|
|
*columns = append(*columns, colContainerName)
|
|
if len(ref.Aliases) > 0 {
|
|
*values = append(*values, ref.Aliases[0])
|
|
} else {
|
|
*values = append(*values, ref.Name)
|
|
}
|
|
}
|
|
|
|
// In order to maintain a fixed column format, we add a new series for each filesystem partition.
|
|
func (self *influxdbStorage) containerFilesystemStatsToSeries(
|
|
ref info.ContainerReference,
|
|
stats *info.ContainerStats) (series []*influxdb.Series) {
|
|
if len(stats.Filesystem) == 0 {
|
|
return series
|
|
}
|
|
for _, fsStat := range stats.Filesystem {
|
|
columns := make([]string, 0)
|
|
values := make([]interface{}, 0)
|
|
self.getSeriesDefaultValues(ref, stats, &columns, &values)
|
|
|
|
columns = append(columns, colFsDevice)
|
|
values = append(values, fsStat.Device)
|
|
|
|
columns = append(columns, colFsLimit)
|
|
values = append(values, fsStat.Limit)
|
|
|
|
columns = append(columns, colFsUsage)
|
|
values = append(values, fsStat.Usage)
|
|
series = append(series, self.newSeries(columns, values))
|
|
}
|
|
return series
|
|
}
|
|
|
|
func (self *influxdbStorage) containerStatsToValues(
|
|
ref info.ContainerReference,
|
|
stats *info.ContainerStats,
|
|
) (columns []string, values []interface{}) {
|
|
self.getSeriesDefaultValues(ref, stats, &columns, &values)
|
|
// Cumulative Cpu Usage
|
|
columns = append(columns, colCpuCumulativeUsage)
|
|
values = append(values, stats.Cpu.Usage.Total)
|
|
|
|
// Memory Usage
|
|
columns = append(columns, colMemoryUsage)
|
|
values = append(values, stats.Memory.Usage)
|
|
|
|
// Working set size
|
|
columns = append(columns, colMemoryWorkingSet)
|
|
values = append(values, stats.Memory.WorkingSet)
|
|
|
|
// Network stats.
|
|
columns = append(columns, colRxBytes)
|
|
values = append(values, stats.Network.RxBytes)
|
|
|
|
columns = append(columns, colRxErrors)
|
|
values = append(values, stats.Network.RxErrors)
|
|
|
|
columns = append(columns, colTxBytes)
|
|
values = append(values, stats.Network.TxBytes)
|
|
|
|
columns = append(columns, colTxErrors)
|
|
values = append(values, stats.Network.TxErrors)
|
|
|
|
return columns, values
|
|
}
|
|
|
|
func convertToUint64(v interface{}) (uint64, error) {
|
|
if v == nil {
|
|
return 0, nil
|
|
}
|
|
switch x := v.(type) {
|
|
case uint64:
|
|
return x, nil
|
|
case int:
|
|
if x < 0 {
|
|
return 0, fmt.Errorf("negative value: %v", x)
|
|
}
|
|
return uint64(x), nil
|
|
case int32:
|
|
if x < 0 {
|
|
return 0, fmt.Errorf("negative value: %v", x)
|
|
}
|
|
return uint64(x), nil
|
|
case int64:
|
|
if x < 0 {
|
|
return 0, fmt.Errorf("negative value: %v", x)
|
|
}
|
|
return uint64(x), nil
|
|
case float64:
|
|
if x < 0 {
|
|
return 0, fmt.Errorf("negative value: %v", x)
|
|
}
|
|
return uint64(x), nil
|
|
case uint32:
|
|
return uint64(x), nil
|
|
}
|
|
return 0, fmt.Errorf("unknown type")
|
|
}
|
|
|
|
func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) {
|
|
stats := &info.ContainerStats{
|
|
Filesystem: make([]info.FsStats, 0),
|
|
}
|
|
var err error
|
|
for i, col := range columns {
|
|
v := values[i]
|
|
switch {
|
|
case col == colTimestamp:
|
|
if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() {
|
|
stats.Timestamp = time.Unix(int64(f64sec)/1E3, (int64(f64sec)%1E3)*1E6)
|
|
}
|
|
case col == colMachineName:
|
|
if m, ok := v.(string); ok {
|
|
if m != self.machineName {
|
|
return nil, fmt.Errorf("different machine")
|
|
}
|
|
} else {
|
|
return nil, fmt.Errorf("machine name field is not a string: %v", v)
|
|
}
|
|
// Cumulative Cpu Usage
|
|
case col == colCpuCumulativeUsage:
|
|
stats.Cpu.Usage.Total, err = convertToUint64(v)
|
|
// Memory Usage
|
|
case col == colMemoryUsage:
|
|
stats.Memory.Usage, err = convertToUint64(v)
|
|
// Working set size
|
|
case col == colMemoryWorkingSet:
|
|
stats.Memory.WorkingSet, err = convertToUint64(v)
|
|
case col == colRxBytes:
|
|
stats.Network.RxBytes, err = convertToUint64(v)
|
|
case col == colRxErrors:
|
|
stats.Network.RxErrors, err = convertToUint64(v)
|
|
case col == colTxBytes:
|
|
stats.Network.TxBytes, err = convertToUint64(v)
|
|
case col == colTxErrors:
|
|
stats.Network.TxErrors, err = convertToUint64(v)
|
|
case col == colFsDevice:
|
|
device, ok := v.(string)
|
|
if !ok {
|
|
return nil, fmt.Errorf("filesystem name field is not a string: %+v", v)
|
|
}
|
|
if len(stats.Filesystem) == 0 {
|
|
stats.Filesystem = append(stats.Filesystem, info.FsStats{Device: device})
|
|
} else {
|
|
stats.Filesystem[0].Device = device
|
|
}
|
|
case col == colFsLimit:
|
|
limit, err := convertToUint64(v)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("filesystem limit field %+v invalid: %s", v, err)
|
|
}
|
|
if len(stats.Filesystem) == 0 {
|
|
stats.Filesystem = append(stats.Filesystem, info.FsStats{Limit: limit})
|
|
} else {
|
|
stats.Filesystem[0].Limit = limit
|
|
}
|
|
case col == colFsUsage:
|
|
usage, err := convertToUint64(v)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("filesystem usage field %+v invalid: %s", v, err)
|
|
}
|
|
if len(stats.Filesystem) == 0 {
|
|
stats.Filesystem = append(stats.Filesystem, info.FsStats{Usage: usage})
|
|
} else {
|
|
stats.Filesystem[0].Usage = usage
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
|
|
}
|
|
}
|
|
return stats, nil
|
|
}
|
|
|
|
func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) {
|
|
self.readyToFlush = readyToFlush
|
|
}
|
|
|
|
func (self *influxdbStorage) defaultReadyToFlush() bool {
|
|
return time.Since(self.lastWrite) >= self.bufferDuration
|
|
}
|
|
|
|
func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
|
if stats == nil {
|
|
return nil
|
|
}
|
|
var seriesToFlush []*influxdb.Series
|
|
func() {
|
|
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
|
|
self.lock.Lock()
|
|
defer self.lock.Unlock()
|
|
|
|
self.series = append(self.series, self.newSeries(self.containerStatsToValues(ref, stats)))
|
|
self.series = append(self.series, self.containerFilesystemStatsToSeries(ref, stats)...)
|
|
if self.readyToFlush() {
|
|
seriesToFlush = self.series
|
|
self.series = make([]*influxdb.Series, 0)
|
|
self.lastWrite = time.Now()
|
|
}
|
|
}()
|
|
if len(seriesToFlush) > 0 {
|
|
err := self.client.WriteSeriesWithTimePrecision(seriesToFlush, influxdb.Microsecond)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to write stats to influxDb - %s", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
|
|
if numStats == 0 {
|
|
return nil, nil
|
|
}
|
|
// TODO(dengnan): select only columns that we need
|
|
// TODO(dengnan): escape names
|
|
query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName)
|
|
if numStats > 0 {
|
|
query = fmt.Sprintf("%v limit %v", query, numStats)
|
|
}
|
|
series, err := self.client.Query(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
statsList := make([]*info.ContainerStats, 0, len(series))
|
|
// By default, influxDB returns data in time descending order.
|
|
// RecentStats() requires stats in time increasing order,
|
|
// so we need to go through from the last one to the first one.
|
|
for i := len(series) - 1; i >= 0; i-- {
|
|
s := series[i]
|
|
|
|
for j := len(s.Points) - 1; j >= 0; j-- {
|
|
values := s.Points[j]
|
|
stats, err := self.valuesToContainerStats(s.Columns, values)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if stats == nil {
|
|
continue
|
|
}
|
|
statsList = append(statsList, stats)
|
|
}
|
|
}
|
|
return statsList, nil
|
|
}
|
|
|
|
func (self *influxdbStorage) Close() error {
|
|
self.client = nil
|
|
return nil
|
|
}
|
|
|
|
// Returns a new influxdb series.
|
|
func (self *influxdbStorage) newSeries(columns []string, points []interface{}) *influxdb.Series {
|
|
out := &influxdb.Series{
|
|
Name: self.tableName,
|
|
Columns: columns,
|
|
// There's only one point for each stats
|
|
Points: make([][]interface{}, 1),
|
|
}
|
|
out.Points[0] = points
|
|
return out
|
|
}
|
|
|
|
// machineName: A unique identifier to identify the host that current cAdvisor
|
|
// instance is running on.
|
|
// influxdbHost: The host which runs influxdb.
|
|
func New(machineName,
|
|
tablename,
|
|
database,
|
|
username,
|
|
password,
|
|
influxdbHost string,
|
|
isSecure bool,
|
|
bufferDuration time.Duration,
|
|
) (*influxdbStorage, error) {
|
|
config := &influxdb.ClientConfig{
|
|
Host: influxdbHost,
|
|
Username: username,
|
|
Password: password,
|
|
Database: database,
|
|
IsSecure: isSecure,
|
|
}
|
|
client, err := influxdb.NewClient(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// TODO(monnand): With go 1.3, we cannot compress data now.
|
|
client.DisableCompression()
|
|
|
|
ret := &influxdbStorage{
|
|
client: client,
|
|
machineName: machineName,
|
|
tableName: tablename,
|
|
bufferDuration: bufferDuration,
|
|
lastWrite: time.Now(),
|
|
series: make([]*influxdb.Series, 0),
|
|
}
|
|
ret.readyToFlush = ret.defaultReadyToFlush
|
|
return ret, nil
|
|
}
|