Upgrade InfluxDB storage to InfluxDB 0.9
- Fix #743 - Rewrite InfluxDB storage for new InfluxDB API data structures. - Store each measurement separately instead of storing all measurements in a single big "table" with many columns/fields. - Use tags add metadata to points, such as the container name. Tags are a new feature in InfluxDB 0.9.
This commit is contained in:
parent
38e05a6ad4
commit
ada6e3d354
@ -16,12 +16,14 @@ package influxdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
info "github.com/google/cadvisor/info/v1"
|
info "github.com/google/cadvisor/info/v1"
|
||||||
"github.com/google/cadvisor/storage"
|
"github.com/google/cadvisor/storage"
|
||||||
|
"github.com/google/cadvisor/version"
|
||||||
|
|
||||||
influxdb "github.com/influxdb/influxdb/client"
|
influxdb "github.com/influxdb/influxdb/client"
|
||||||
)
|
)
|
||||||
@ -31,39 +33,44 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type influxdbStorage struct {
|
type influxdbStorage struct {
|
||||||
client *influxdb.Client
|
client *influxdb.Client
|
||||||
machineName string
|
machineName string
|
||||||
tableName string
|
database string
|
||||||
bufferDuration time.Duration
|
retentionPolicy string
|
||||||
lastWrite time.Time
|
bufferDuration time.Duration
|
||||||
series []*influxdb.Series
|
lastWrite time.Time
|
||||||
lock sync.Mutex
|
points []*influxdb.Point
|
||||||
readyToFlush func() bool
|
lock sync.Mutex
|
||||||
|
readyToFlush func() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Series names
|
||||||
const (
|
const (
|
||||||
colTimestamp string = "time"
|
// Cumulative CPU usage
|
||||||
colMachineName string = "machine"
|
serCpuUsageTotal string = "cpu_usage_total"
|
||||||
colContainerName string = "container_name"
|
serCpuUsageSystem string = "cpu_usage_system"
|
||||||
colCpuCumulativeUsage string = "cpu_cumulative_usage"
|
serCpuUsageUser string = "cpu_usage_user"
|
||||||
|
serCpuUsagePerCpu string = "cpu_usage_per_cpu"
|
||||||
|
// Smoothed average of number of runnable threads x 1000.
|
||||||
|
serLoadAverage string = "load_average"
|
||||||
// Memory Usage
|
// Memory Usage
|
||||||
colMemoryUsage string = "memory_usage"
|
serMemoryUsage string = "memory_usage"
|
||||||
// Working set size
|
// Working set size
|
||||||
colMemoryWorkingSet string = "memory_working_set"
|
serMemoryWorkingSet string = "memory_working_set"
|
||||||
// Cumulative count of bytes received.
|
// Cumulative count of bytes received.
|
||||||
colRxBytes string = "rx_bytes"
|
serRxBytes string = "rx_bytes"
|
||||||
// Cumulative count of receive errors encountered.
|
// Cumulative count of receive errors encountered.
|
||||||
colRxErrors string = "rx_errors"
|
serRxErrors string = "rx_errors"
|
||||||
// Cumulative count of bytes transmitted.
|
// Cumulative count of bytes transmitted.
|
||||||
colTxBytes string = "tx_bytes"
|
serTxBytes string = "tx_bytes"
|
||||||
// Cumulative count of transmit errors encountered.
|
// Cumulative count of transmit errors encountered.
|
||||||
colTxErrors string = "tx_errors"
|
serTxErrors string = "tx_errors"
|
||||||
// Filesystem device.
|
// Filesystem device.
|
||||||
colFsDevice = "fs_device"
|
serFsDevice string = "fs_device"
|
||||||
// Filesystem limit.
|
// Filesystem limit.
|
||||||
colFsLimit = "fs_limit"
|
serFsLimit string = "fs_limit"
|
||||||
// Filesystem usage.
|
// Filesystem usage.
|
||||||
colFsUsage = "fs_usage"
|
serFsUsage string = "fs_usage"
|
||||||
)
|
)
|
||||||
|
|
||||||
func new() (storage.StorageDriver, error) {
|
func new() (storage.StorageDriver, error) {
|
||||||
@ -83,84 +90,122 @@ func new() (storage.StorageDriver, error) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *influxdbStorage) getSeriesDefaultValues(
|
// Field names
|
||||||
|
const (
|
||||||
|
fieldValue string = "value"
|
||||||
|
fieldType string = "type"
|
||||||
|
fieldInstance string = "instance"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Tag names
|
||||||
|
const (
|
||||||
|
tagMachineName string = "machine"
|
||||||
|
tagContainerName string = "container_name"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (self *influxdbStorage) containerFilesystemStatsToPoints(
|
||||||
ref info.ContainerReference,
|
ref info.ContainerReference,
|
||||||
stats *info.ContainerStats,
|
stats *info.ContainerStats) (points []*influxdb.Point) {
|
||||||
columns *[]string,
|
|
||||||
values *[]interface{}) {
|
|
||||||
// Timestamp
|
|
||||||
*columns = append(*columns, colTimestamp)
|
|
||||||
*values = append(*values, stats.Timestamp.UnixNano()/1E3)
|
|
||||||
|
|
||||||
// Machine name
|
|
||||||
*columns = append(*columns, colMachineName)
|
|
||||||
*values = append(*values, self.machineName)
|
|
||||||
|
|
||||||
// Container name
|
|
||||||
*columns = append(*columns, colContainerName)
|
|
||||||
if len(ref.Aliases) > 0 {
|
|
||||||
*values = append(*values, ref.Aliases[0])
|
|
||||||
} else {
|
|
||||||
*values = append(*values, ref.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// In order to maintain a fixed column format, we add a new series for each filesystem partition.
|
|
||||||
func (self *influxdbStorage) containerFilesystemStatsToSeries(
|
|
||||||
ref info.ContainerReference,
|
|
||||||
stats *info.ContainerStats) (series []*influxdb.Series) {
|
|
||||||
if len(stats.Filesystem) == 0 {
|
if len(stats.Filesystem) == 0 {
|
||||||
return series
|
return points
|
||||||
}
|
}
|
||||||
for _, fsStat := range stats.Filesystem {
|
for _, fsStat := range stats.Filesystem {
|
||||||
columns := make([]string, 0)
|
tagsFsUsage := map[string]string{
|
||||||
values := make([]interface{}, 0)
|
fieldInstance: fsStat.Device,
|
||||||
self.getSeriesDefaultValues(ref, stats, &columns, &values)
|
fieldType: "usage",
|
||||||
|
}
|
||||||
|
fieldsFsUsage := map[string]interface{}{
|
||||||
|
fieldValue: int64(fsStat.Usage),
|
||||||
|
}
|
||||||
|
pointFsUsage := &influxdb.Point{
|
||||||
|
Measurement: serFsUsage,
|
||||||
|
Tags: tagsFsUsage,
|
||||||
|
Fields: fieldsFsUsage,
|
||||||
|
}
|
||||||
|
|
||||||
columns = append(columns, colFsDevice)
|
tagsFsLimit := map[string]string{
|
||||||
values = append(values, fsStat.Device)
|
fieldInstance: fsStat.Device,
|
||||||
|
fieldType: "limit",
|
||||||
|
}
|
||||||
|
fieldsFsLimit := map[string]interface{}{
|
||||||
|
fieldValue: int64(fsStat.Limit),
|
||||||
|
}
|
||||||
|
pointFsLimit := &influxdb.Point{
|
||||||
|
Measurement: serFsLimit,
|
||||||
|
Tags: tagsFsLimit,
|
||||||
|
Fields: fieldsFsLimit,
|
||||||
|
}
|
||||||
|
|
||||||
columns = append(columns, colFsLimit)
|
points = append(points, pointFsUsage, pointFsLimit)
|
||||||
values = append(values, fsStat.Limit)
|
|
||||||
|
|
||||||
columns = append(columns, colFsUsage)
|
|
||||||
values = append(values, fsStat.Usage)
|
|
||||||
series = append(series, self.newSeries(columns, values))
|
|
||||||
}
|
}
|
||||||
return series
|
|
||||||
|
self.tagPoints(ref, stats, points)
|
||||||
|
|
||||||
|
return points
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *influxdbStorage) containerStatsToValues(
|
// Set tags and timestamp for all points of the batch.
|
||||||
|
// Points should inherit the tags that are set for BatchPoints, but that does not seem to work.
|
||||||
|
func (self *influxdbStorage) tagPoints(ref info.ContainerReference, stats *info.ContainerStats, points []*influxdb.Point) {
|
||||||
|
// Use container alias if possible
|
||||||
|
var containerName string
|
||||||
|
if len(ref.Aliases) > 0 {
|
||||||
|
containerName = ref.Aliases[0]
|
||||||
|
} else {
|
||||||
|
containerName = ref.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
commonTags := map[string]string{
|
||||||
|
tagMachineName: self.machineName,
|
||||||
|
tagContainerName: containerName,
|
||||||
|
}
|
||||||
|
for i := 0; i < len(points); i++ {
|
||||||
|
// merge with existing tags if any
|
||||||
|
addTagsToPoint(points[i], commonTags)
|
||||||
|
points[i].Time = stats.Timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *influxdbStorage) containerStatsToPoints(
|
||||||
ref info.ContainerReference,
|
ref info.ContainerReference,
|
||||||
stats *info.ContainerStats,
|
stats *info.ContainerStats,
|
||||||
) (columns []string, values []interface{}) {
|
) (points []*influxdb.Point) {
|
||||||
self.getSeriesDefaultValues(ref, stats, &columns, &values)
|
// CPU usage: Total usage in nanoseconds
|
||||||
// Cumulative Cpu Usage
|
points = append(points, makePoint(serCpuUsageTotal, stats.Cpu.Usage.Total))
|
||||||
columns = append(columns, colCpuCumulativeUsage)
|
|
||||||
values = append(values, stats.Cpu.Usage.Total)
|
// CPU usage: Time spend in system space (in nanoseconds)
|
||||||
|
points = append(points, makePoint(serCpuUsageSystem, stats.Cpu.Usage.System))
|
||||||
|
|
||||||
|
// CPU usage: Time spent in user space (in nanoseconds)
|
||||||
|
points = append(points, makePoint(serCpuUsageUser, stats.Cpu.Usage.User))
|
||||||
|
|
||||||
|
// CPU usage per CPU
|
||||||
|
for i := 0; i < len(stats.Cpu.Usage.PerCpu); i++ {
|
||||||
|
point := makePoint(serCpuUsagePerCpu, stats.Cpu.Usage.PerCpu[i])
|
||||||
|
tags := map[string]string{"instance": fmt.Sprintf("%v", i)}
|
||||||
|
addTagsToPoint(point, tags)
|
||||||
|
|
||||||
|
points = append(points, point)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load Average
|
||||||
|
points = append(points, makePoint(serLoadAverage, stats.Cpu.LoadAverage))
|
||||||
|
|
||||||
// Memory Usage
|
// Memory Usage
|
||||||
columns = append(columns, colMemoryUsage)
|
points = append(points, makePoint(serMemoryUsage, stats.Memory.Usage))
|
||||||
values = append(values, stats.Memory.Usage)
|
|
||||||
|
|
||||||
// Working set size
|
// Working Set Size
|
||||||
columns = append(columns, colMemoryWorkingSet)
|
points = append(points, makePoint(serMemoryWorkingSet, stats.Memory.WorkingSet))
|
||||||
values = append(values, stats.Memory.WorkingSet)
|
|
||||||
|
|
||||||
// Network stats.
|
// Network Stats
|
||||||
columns = append(columns, colRxBytes)
|
points = append(points, makePoint(serRxBytes, stats.Network.RxBytes))
|
||||||
values = append(values, stats.Network.RxBytes)
|
points = append(points, makePoint(serRxErrors, stats.Network.RxErrors))
|
||||||
|
points = append(points, makePoint(serTxBytes, stats.Network.TxBytes))
|
||||||
|
points = append(points, makePoint(serTxErrors, stats.Network.TxErrors))
|
||||||
|
|
||||||
columns = append(columns, colRxErrors)
|
self.tagPoints(ref, stats, points)
|
||||||
values = append(values, stats.Network.RxErrors)
|
|
||||||
|
|
||||||
columns = append(columns, colTxBytes)
|
return points
|
||||||
values = append(values, stats.Network.TxBytes)
|
|
||||||
|
|
||||||
columns = append(columns, colTxErrors)
|
|
||||||
values = append(values, stats.Network.TxErrors)
|
|
||||||
|
|
||||||
return columns, values
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) {
|
func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) {
|
||||||
@ -175,27 +220,38 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C
|
|||||||
if stats == nil {
|
if stats == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var seriesToFlush []*influxdb.Series
|
var pointsToFlush []*influxdb.Point
|
||||||
func() {
|
func() {
|
||||||
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
|
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
|
||||||
self.lock.Lock()
|
self.lock.Lock()
|
||||||
defer self.lock.Unlock()
|
defer self.lock.Unlock()
|
||||||
|
|
||||||
self.series = append(self.series, self.newSeries(self.containerStatsToValues(ref, stats)))
|
self.points = append(self.points, self.containerStatsToPoints(ref, stats)...)
|
||||||
self.series = append(self.series, self.containerFilesystemStatsToSeries(ref, stats)...)
|
self.points = append(self.points, self.containerFilesystemStatsToPoints(ref, stats)...)
|
||||||
if self.readyToFlush() {
|
if self.readyToFlush() {
|
||||||
seriesToFlush = self.series
|
pointsToFlush = self.points
|
||||||
self.series = make([]*influxdb.Series, 0)
|
self.points = make([]*influxdb.Point, 0)
|
||||||
self.lastWrite = time.Now()
|
self.lastWrite = time.Now()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if len(seriesToFlush) > 0 {
|
if len(pointsToFlush) > 0 {
|
||||||
err := self.client.WriteSeriesWithTimePrecision(seriesToFlush, influxdb.Microsecond)
|
points := make([]influxdb.Point, len(pointsToFlush))
|
||||||
if err != nil {
|
for i, p := range pointsToFlush {
|
||||||
|
points[i] = *p
|
||||||
|
}
|
||||||
|
|
||||||
|
batchTags := map[string]string{tagMachineName: self.machineName}
|
||||||
|
bp := influxdb.BatchPoints{
|
||||||
|
Points: points,
|
||||||
|
Database: self.database,
|
||||||
|
Tags: batchTags,
|
||||||
|
Time: stats.Timestamp,
|
||||||
|
}
|
||||||
|
response, err := self.client.Write(bp)
|
||||||
|
if err != nil || checkResponseForErrors(response) != nil {
|
||||||
return fmt.Errorf("failed to write stats to influxDb - %s", err)
|
return fmt.Errorf("failed to write stats to influxDb - %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,21 +260,9 @@ func (self *influxdbStorage) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns a new influxdb series.
|
|
||||||
func (self *influxdbStorage) newSeries(columns []string, points []interface{}) *influxdb.Series {
|
|
||||||
out := &influxdb.Series{
|
|
||||||
Name: self.tableName,
|
|
||||||
Columns: columns,
|
|
||||||
// There's only one point for each stats
|
|
||||||
Points: make([][]interface{}, 1),
|
|
||||||
}
|
|
||||||
out.Points[0] = points
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
// machineName: A unique identifier to identify the host that current cAdvisor
|
// machineName: A unique identifier to identify the host that current cAdvisor
|
||||||
// instance is running on.
|
// instance is running on.
|
||||||
// influxdbHost: The host which runs influxdb.
|
// influxdbHost: The host which runs influxdb (host:port)
|
||||||
func newStorage(
|
func newStorage(
|
||||||
machineName,
|
machineName,
|
||||||
tablename,
|
tablename,
|
||||||
@ -229,28 +273,107 @@ func newStorage(
|
|||||||
isSecure bool,
|
isSecure bool,
|
||||||
bufferDuration time.Duration,
|
bufferDuration time.Duration,
|
||||||
) (*influxdbStorage, error) {
|
) (*influxdbStorage, error) {
|
||||||
config := &influxdb.ClientConfig{
|
url := &url.URL{
|
||||||
Host: influxdbHost,
|
Scheme: "http",
|
||||||
Username: username,
|
Host: influxdbHost,
|
||||||
Password: password,
|
|
||||||
Database: database,
|
|
||||||
IsSecure: isSecure,
|
|
||||||
}
|
}
|
||||||
client, err := influxdb.NewClient(config)
|
if isSecure {
|
||||||
|
url.Scheme = "https"
|
||||||
|
}
|
||||||
|
|
||||||
|
config := &influxdb.Config{
|
||||||
|
URL: *url,
|
||||||
|
Username: username,
|
||||||
|
Password: password,
|
||||||
|
UserAgent: fmt.Sprintf("%v/%v", "cAdvisor", version.Info["version"]),
|
||||||
|
}
|
||||||
|
client, err := influxdb.NewClient(*config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// TODO(monnand): With go 1.3, we cannot compress data now.
|
|
||||||
client.DisableCompression()
|
|
||||||
|
|
||||||
ret := &influxdbStorage{
|
ret := &influxdbStorage{
|
||||||
client: client,
|
client: client,
|
||||||
machineName: machineName,
|
machineName: machineName,
|
||||||
tableName: tablename,
|
database: database,
|
||||||
bufferDuration: bufferDuration,
|
bufferDuration: bufferDuration,
|
||||||
lastWrite: time.Now(),
|
lastWrite: time.Now(),
|
||||||
series: make([]*influxdb.Series, 0),
|
points: make([]*influxdb.Point, 0),
|
||||||
}
|
}
|
||||||
ret.readyToFlush = ret.defaultReadyToFlush
|
ret.readyToFlush = ret.defaultReadyToFlush
|
||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Creates a measurement point with a single value field
|
||||||
|
func makePoint(name string, value interface{}) *influxdb.Point {
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
fieldValue: toSignedIfUnsigned(value),
|
||||||
|
}
|
||||||
|
|
||||||
|
return &influxdb.Point{
|
||||||
|
Measurement: name,
|
||||||
|
Fields: fields,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adds additional tags to the existing tags of a point
|
||||||
|
func addTagsToPoint(point *influxdb.Point, tags map[string]string) {
|
||||||
|
if point.Tags == nil {
|
||||||
|
point.Tags = tags
|
||||||
|
} else {
|
||||||
|
for k, v := range tags {
|
||||||
|
point.Tags[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Checks response for possible errors
|
||||||
|
func checkResponseForErrors(response *influxdb.Response) error {
|
||||||
|
const msg = "failed to write stats to influxDb - %s"
|
||||||
|
|
||||||
|
if response != nil && response.Err != nil {
|
||||||
|
return fmt.Errorf(msg, response.Err)
|
||||||
|
}
|
||||||
|
if response != nil && response.Results != nil {
|
||||||
|
for _, result := range response.Results {
|
||||||
|
if result.Err != nil {
|
||||||
|
return fmt.Errorf(msg, result.Err)
|
||||||
|
}
|
||||||
|
if result.Series != nil {
|
||||||
|
for _, row := range result.Series {
|
||||||
|
if row.Err != nil {
|
||||||
|
return fmt.Errorf(msg, row.Err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Some stats have type unsigned integer, but the InfluxDB client accepts only signed integers.
|
||||||
|
func toSignedIfUnsigned(value interface{}) interface{} {
|
||||||
|
switch value.(type) {
|
||||||
|
case uint64:
|
||||||
|
if v, ok := value.(uint64); ok {
|
||||||
|
return int64(v)
|
||||||
|
}
|
||||||
|
case uint32:
|
||||||
|
if v, ok := value.(uint32); ok {
|
||||||
|
return int32(v)
|
||||||
|
}
|
||||||
|
case uint16:
|
||||||
|
if v, ok := value.(uint16); ok {
|
||||||
|
return int16(v)
|
||||||
|
}
|
||||||
|
case uint8:
|
||||||
|
if v, ok := value.(uint8); ok {
|
||||||
|
return int8(v)
|
||||||
|
}
|
||||||
|
case uint:
|
||||||
|
if v, ok := value.(uint); ok {
|
||||||
|
return int(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
@ -19,6 +19,8 @@ package influxdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -28,6 +30,8 @@ import (
|
|||||||
"github.com/google/cadvisor/storage/test"
|
"github.com/google/cadvisor/storage/test"
|
||||||
|
|
||||||
influxdb "github.com/influxdb/influxdb/client"
|
influxdb "github.com/influxdb/influxdb/client"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The duration in seconds for which stats will be buffered in the influxdb driver.
|
// The duration in seconds for which stats will be buffered in the influxdb driver.
|
||||||
@ -40,10 +44,7 @@ type influxDbTestStorageDriver struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (self *influxDbTestStorageDriver) readyToFlush() bool {
|
func (self *influxDbTestStorageDriver) readyToFlush() bool {
|
||||||
if self.count >= self.buffer {
|
return self.count >= self.buffer
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *influxDbTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
func (self *influxDbTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||||
@ -51,18 +52,6 @@ func (self *influxDbTestStorageDriver) AddStats(ref info.ContainerReference, sta
|
|||||||
return self.base.AddStats(ref, stats)
|
return self.base.AddStats(ref, stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *influxDbTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *influxDbTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) {
|
|
||||||
return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *influxDbTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
|
||||||
return self.base.Samples(containerName, numSamples)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *influxDbTestStorageDriver) Close() error {
|
func (self *influxDbTestStorageDriver) Close() error {
|
||||||
return self.base.Close()
|
return self.base.Close()
|
||||||
}
|
}
|
||||||
@ -75,6 +64,28 @@ func (self *influxDbTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool {
|
|||||||
if a.Cpu.Usage.Total != b.Cpu.Usage.Total {
|
if a.Cpu.Usage.Total != b.Cpu.Usage.Total {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
if a.Cpu.Usage.System != b.Cpu.Usage.System {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if a.Cpu.Usage.User != b.Cpu.Usage.User {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO simpler way to check if arrays are equal?
|
||||||
|
if a.Cpu.Usage.PerCpu == nil && b.Cpu.Usage.PerCpu != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if a.Cpu.Usage.PerCpu != nil && b.Cpu.Usage.PerCpu == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if len(a.Cpu.Usage.PerCpu) != len(b.Cpu.Usage.PerCpu) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i, usage := range a.Cpu.Usage.PerCpu {
|
||||||
|
if usage != b.Cpu.Usage.PerCpu[i] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if a.Memory.Usage != b.Memory.Usage {
|
if a.Memory.Usage != b.Memory.Usage {
|
||||||
return false
|
return false
|
||||||
@ -96,73 +107,56 @@ func (self *influxDbTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool {
|
|||||||
|
|
||||||
func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bufferCount int) {
|
func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bufferCount int) {
|
||||||
machineName := "machineA"
|
machineName := "machineA"
|
||||||
tablename := "t"
|
database := "cadvisor_test"
|
||||||
database := "cadvisor"
|
|
||||||
username := "root"
|
username := "root"
|
||||||
password := "root"
|
password := "root"
|
||||||
hostname := "localhost:8086"
|
hostname := "localhost:8086"
|
||||||
percentilesDuration := 10 * time.Minute
|
//percentilesDuration := 10 * time.Minute
|
||||||
rootConfig := &influxdb.ClientConfig{
|
|
||||||
Host: hostname,
|
config := influxdb.Config{
|
||||||
|
URL: url.URL{Scheme: "http", Host: hostname},
|
||||||
Username: username,
|
Username: username,
|
||||||
Password: password,
|
Password: password,
|
||||||
IsSecure: false,
|
|
||||||
}
|
|
||||||
rootClient, err := influxdb.NewClient(rootConfig)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
// create the data base first.
|
|
||||||
rootClient.CreateDatabase(database)
|
|
||||||
config := &influxdb.ClientConfig{
|
|
||||||
Host: hostname,
|
|
||||||
Username: username,
|
|
||||||
Password: password,
|
|
||||||
Database: database,
|
|
||||||
IsSecure: false,
|
|
||||||
}
|
}
|
||||||
client, err := influxdb.NewClient(config)
|
client, err := influxdb.NewClient(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
client.DisableCompression()
|
|
||||||
deleteAll := fmt.Sprintf("drop series %v", tablename)
|
// Re-create the database first.
|
||||||
_, err = client.Query(deleteAll)
|
if err := prepareDatabase(client, database); err != nil {
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// delete all data by the end of the call
|
|
||||||
defer client.Query(deleteAll)
|
// Delete all data by the end of the call.
|
||||||
|
//defer client.Query(influxdb.Query{Command: fmt.Sprintf("drop database \"%v\"", database)})
|
||||||
|
|
||||||
driver, err := New(machineName,
|
driver, err := New(machineName,
|
||||||
tablename,
|
|
||||||
database,
|
database,
|
||||||
username,
|
username,
|
||||||
password,
|
password,
|
||||||
hostname,
|
hostname,
|
||||||
false,
|
false,
|
||||||
time.Duration(bufferCount),
|
time.Duration(bufferCount))
|
||||||
percentilesDuration)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
defer driver.Close()
|
||||||
testDriver := &influxDbTestStorageDriver{buffer: bufferCount}
|
testDriver := &influxDbTestStorageDriver{buffer: bufferCount}
|
||||||
driver.OverrideReadyToFlush(testDriver.readyToFlush)
|
driver.OverrideReadyToFlush(testDriver.readyToFlush)
|
||||||
testDriver.base = driver
|
testDriver.base = driver
|
||||||
|
|
||||||
// generate another container's data on same machine.
|
// Generate another container's data on same machine.
|
||||||
test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100, testDriver, t)
|
test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100, testDriver, t)
|
||||||
|
|
||||||
// generate another container's data on another machine.
|
// Generate another container's data on another machine.
|
||||||
driverForAnotherMachine, err := New("machineB",
|
driverForAnotherMachine, err := New("machineB",
|
||||||
tablename,
|
|
||||||
database,
|
database,
|
||||||
username,
|
username,
|
||||||
password,
|
password,
|
||||||
hostname,
|
hostname,
|
||||||
false,
|
false,
|
||||||
time.Duration(bufferCount),
|
time.Duration(bufferCount))
|
||||||
percentilesDuration)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -175,14 +169,138 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu
|
|||||||
f(testDriver, t)
|
f(testDriver, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetrievePartialRecentStats(t *testing.T) {
|
func prepareDatabase(client *influxdb.Client, database string) error {
|
||||||
runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t, 20)
|
dropDbQuery := influxdb.Query{
|
||||||
|
Command: fmt.Sprintf("drop database \"%v\"", database),
|
||||||
|
}
|
||||||
|
createDbQuery := influxdb.Query{
|
||||||
|
Command: fmt.Sprintf("create database \"%v\"", database),
|
||||||
|
}
|
||||||
|
// A default retention policy must always be present.
|
||||||
|
// Depending on the InfluxDB configuration it may be created automatically with the database or not.
|
||||||
|
// TODO create ret. policy only if not present
|
||||||
|
createPolicyQuery := influxdb.Query{
|
||||||
|
Command: fmt.Sprintf("create retention policy \"default\" on \"%v\" duration 1h replication 1 default", database),
|
||||||
|
}
|
||||||
|
_, err := client.Query(dropDbQuery)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = client.Query(createDbQuery)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = client.Query(createPolicyQuery)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetrieveAllRecentStats(t *testing.T) {
|
func TestContainerFileSystemStatsToPoints(t *testing.T) {
|
||||||
runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t, 10)
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
machineName := "testMachine"
|
||||||
|
database := "cadvisor_test"
|
||||||
|
username := "root"
|
||||||
|
password := "root"
|
||||||
|
influxdbHost := "localhost:8086"
|
||||||
|
|
||||||
|
storage, err := New(machineName,
|
||||||
|
database,
|
||||||
|
username,
|
||||||
|
password,
|
||||||
|
influxdbHost,
|
||||||
|
false, 2*time.Minute)
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
ref := info.ContainerReference{
|
||||||
|
Name: "containerName",
|
||||||
|
}
|
||||||
|
stats := &info.ContainerStats{}
|
||||||
|
points := storage.containerFilesystemStatsToPoints(ref, stats)
|
||||||
|
|
||||||
|
// stats.Filesystem is always nil, not sure why
|
||||||
|
assert.Nil(points)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNoRecentStats(t *testing.T) {
|
func TestContainerStatsToPoints(t *testing.T) {
|
||||||
runStorageTest(test.StorageDriverTestNoRecentStats, t, kCacheDuration)
|
// Given
|
||||||
|
storage, err := createTestStorage()
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.NotNil(t, storage)
|
||||||
|
|
||||||
|
ref, stats := createTestStats()
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.NotNil(t, stats)
|
||||||
|
|
||||||
|
// When
|
||||||
|
points := storage.containerStatsToPoints(*ref, stats)
|
||||||
|
|
||||||
|
// Then
|
||||||
|
assert.NotEmpty(t, points)
|
||||||
|
assert.Len(t, points, 10+len(stats.Cpu.Usage.PerCpu))
|
||||||
|
|
||||||
|
assertContainsPointWithValue(t, points, serCpuUsageTotal, stats.Cpu.Usage.Total)
|
||||||
|
assertContainsPointWithValue(t, points, serCpuUsageSystem, stats.Cpu.Usage.System)
|
||||||
|
assertContainsPointWithValue(t, points, serCpuUsageUser, stats.Cpu.Usage.User)
|
||||||
|
assertContainsPointWithValue(t, points, serMemoryUsage, stats.Memory.Usage)
|
||||||
|
assertContainsPointWithValue(t, points, serLoadAverage, stats.Cpu.LoadAverage)
|
||||||
|
assertContainsPointWithValue(t, points, serMemoryWorkingSet, stats.Memory.WorkingSet)
|
||||||
|
assertContainsPointWithValue(t, points, serRxBytes, stats.Network.RxBytes)
|
||||||
|
assertContainsPointWithValue(t, points, serRxErrors, stats.Network.RxErrors)
|
||||||
|
assertContainsPointWithValue(t, points, serTxBytes, stats.Network.TxBytes)
|
||||||
|
assertContainsPointWithValue(t, points, serTxBytes, stats.Network.TxErrors)
|
||||||
|
|
||||||
|
for _, cpu_usage := range stats.Cpu.Usage.PerCpu {
|
||||||
|
assertContainsPointWithValue(t, points, serCpuUsagePerCpu, cpu_usage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertContainsPointWithValue(t *testing.T, points []*influxdb.Point, name string, value interface{}) bool {
|
||||||
|
found := false
|
||||||
|
for _, point := range points {
|
||||||
|
if point.Measurement == name && point.Fields[fieldValue] == toSignedIfUnsigned(value) {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return assert.True(t, found, "no point found with name='%v' and value=%v", name, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTestStorage() (*influxdbStorage, error) {
|
||||||
|
machineName := "testMachine"
|
||||||
|
database := "cadvisor_test"
|
||||||
|
username := "root"
|
||||||
|
password := "root"
|
||||||
|
influxdbHost := "localhost:8086"
|
||||||
|
|
||||||
|
storage, err := New(machineName,
|
||||||
|
database,
|
||||||
|
username,
|
||||||
|
password,
|
||||||
|
influxdbHost,
|
||||||
|
false, 2*time.Minute)
|
||||||
|
|
||||||
|
return storage, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTestStats() (*info.ContainerReference, *info.ContainerStats) {
|
||||||
|
ref := &info.ContainerReference{
|
||||||
|
Name: "testContainername",
|
||||||
|
Aliases: []string{"testContainerAlias1", "testContainerAlias2"},
|
||||||
|
}
|
||||||
|
|
||||||
|
cpuUsage := info.CpuUsage{
|
||||||
|
Total: uint64(rand.Intn(10000)),
|
||||||
|
PerCpu: []uint64{uint64(rand.Intn(1000)), uint64(rand.Intn(1000)), uint64(rand.Intn(1000))},
|
||||||
|
User: uint64(rand.Intn(10000)),
|
||||||
|
System: uint64(rand.Intn(10000)),
|
||||||
|
}
|
||||||
|
|
||||||
|
stats := &info.ContainerStats{
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Cpu: info.CpuStats{
|
||||||
|
Usage: cpuUsage,
|
||||||
|
LoadAverage: int32(rand.Intn(1000)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return ref, stats
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user