From ada6e3d35420714863a2c4de16f7f9ecea46b703 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthias=20N=C3=BC=C3=9Fler?= Date: Wed, 8 Jul 2015 09:46:54 +0200 Subject: [PATCH] Upgrade InfluxDB storage to InfluxDB 0.9 - Fix #743 - Rewrite InfluxDB storage for new InfluxDB API data structures. - Store each measurement separately instead of storing all measurements in a single big "table" with many columns/fields. - Use tags add metadata to points, such as the container name. Tags are a new feature in InfluxDB 0.9. --- storage/influxdb/influxdb.go | 353 ++++++++++++++++++++---------- storage/influxdb/influxdb_test.go | 228 ++++++++++++++----- 2 files changed, 411 insertions(+), 170 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 58d97c62..4be4db46 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -16,12 +16,14 @@ package influxdb import ( "fmt" + "net/url" "os" "sync" "time" info "github.com/google/cadvisor/info/v1" "github.com/google/cadvisor/storage" + "github.com/google/cadvisor/version" influxdb "github.com/influxdb/influxdb/client" ) @@ -31,39 +33,44 @@ func init() { } type influxdbStorage struct { - client *influxdb.Client - machineName string - tableName string - bufferDuration time.Duration - lastWrite time.Time - series []*influxdb.Series - lock sync.Mutex - readyToFlush func() bool + client *influxdb.Client + machineName string + database string + retentionPolicy string + bufferDuration time.Duration + lastWrite time.Time + points []*influxdb.Point + lock sync.Mutex + readyToFlush func() bool } +// Series names const ( - colTimestamp string = "time" - colMachineName string = "machine" - colContainerName string = "container_name" - colCpuCumulativeUsage string = "cpu_cumulative_usage" + // Cumulative CPU usage + serCpuUsageTotal string = "cpu_usage_total" + serCpuUsageSystem string = "cpu_usage_system" + serCpuUsageUser string = "cpu_usage_user" + serCpuUsagePerCpu string = "cpu_usage_per_cpu" + // Smoothed average of number of runnable threads x 1000. + serLoadAverage string = "load_average" // Memory Usage - colMemoryUsage string = "memory_usage" + serMemoryUsage string = "memory_usage" // Working set size - colMemoryWorkingSet string = "memory_working_set" + serMemoryWorkingSet string = "memory_working_set" // Cumulative count of bytes received. - colRxBytes string = "rx_bytes" + serRxBytes string = "rx_bytes" // Cumulative count of receive errors encountered. - colRxErrors string = "rx_errors" + serRxErrors string = "rx_errors" // Cumulative count of bytes transmitted. - colTxBytes string = "tx_bytes" + serTxBytes string = "tx_bytes" // Cumulative count of transmit errors encountered. - colTxErrors string = "tx_errors" + serTxErrors string = "tx_errors" // Filesystem device. - colFsDevice = "fs_device" + serFsDevice string = "fs_device" // Filesystem limit. - colFsLimit = "fs_limit" + serFsLimit string = "fs_limit" // Filesystem usage. - colFsUsage = "fs_usage" + serFsUsage string = "fs_usage" ) func new() (storage.StorageDriver, error) { @@ -83,84 +90,122 @@ func new() (storage.StorageDriver, error) { ) } -func (self *influxdbStorage) getSeriesDefaultValues( +// Field names +const ( + fieldValue string = "value" + fieldType string = "type" + fieldInstance string = "instance" +) + +// Tag names +const ( + tagMachineName string = "machine" + tagContainerName string = "container_name" +) + +func (self *influxdbStorage) containerFilesystemStatsToPoints( ref info.ContainerReference, - stats *info.ContainerStats, - columns *[]string, - values *[]interface{}) { - // Timestamp - *columns = append(*columns, colTimestamp) - *values = append(*values, stats.Timestamp.UnixNano()/1E3) - - // Machine name - *columns = append(*columns, colMachineName) - *values = append(*values, self.machineName) - - // Container name - *columns = append(*columns, colContainerName) - if len(ref.Aliases) > 0 { - *values = append(*values, ref.Aliases[0]) - } else { - *values = append(*values, ref.Name) - } -} - -// In order to maintain a fixed column format, we add a new series for each filesystem partition. -func (self *influxdbStorage) containerFilesystemStatsToSeries( - ref info.ContainerReference, - stats *info.ContainerStats) (series []*influxdb.Series) { + stats *info.ContainerStats) (points []*influxdb.Point) { if len(stats.Filesystem) == 0 { - return series + return points } for _, fsStat := range stats.Filesystem { - columns := make([]string, 0) - values := make([]interface{}, 0) - self.getSeriesDefaultValues(ref, stats, &columns, &values) + tagsFsUsage := map[string]string{ + fieldInstance: fsStat.Device, + fieldType: "usage", + } + fieldsFsUsage := map[string]interface{}{ + fieldValue: int64(fsStat.Usage), + } + pointFsUsage := &influxdb.Point{ + Measurement: serFsUsage, + Tags: tagsFsUsage, + Fields: fieldsFsUsage, + } - columns = append(columns, colFsDevice) - values = append(values, fsStat.Device) + tagsFsLimit := map[string]string{ + fieldInstance: fsStat.Device, + fieldType: "limit", + } + fieldsFsLimit := map[string]interface{}{ + fieldValue: int64(fsStat.Limit), + } + pointFsLimit := &influxdb.Point{ + Measurement: serFsLimit, + Tags: tagsFsLimit, + Fields: fieldsFsLimit, + } - columns = append(columns, colFsLimit) - values = append(values, fsStat.Limit) - - columns = append(columns, colFsUsage) - values = append(values, fsStat.Usage) - series = append(series, self.newSeries(columns, values)) + points = append(points, pointFsUsage, pointFsLimit) } - return series + + self.tagPoints(ref, stats, points) + + return points } -func (self *influxdbStorage) containerStatsToValues( +// Set tags and timestamp for all points of the batch. +// Points should inherit the tags that are set for BatchPoints, but that does not seem to work. +func (self *influxdbStorage) tagPoints(ref info.ContainerReference, stats *info.ContainerStats, points []*influxdb.Point) { + // Use container alias if possible + var containerName string + if len(ref.Aliases) > 0 { + containerName = ref.Aliases[0] + } else { + containerName = ref.Name + } + + commonTags := map[string]string{ + tagMachineName: self.machineName, + tagContainerName: containerName, + } + for i := 0; i < len(points); i++ { + // merge with existing tags if any + addTagsToPoint(points[i], commonTags) + points[i].Time = stats.Timestamp + } +} + +func (self *influxdbStorage) containerStatsToPoints( ref info.ContainerReference, stats *info.ContainerStats, -) (columns []string, values []interface{}) { - self.getSeriesDefaultValues(ref, stats, &columns, &values) - // Cumulative Cpu Usage - columns = append(columns, colCpuCumulativeUsage) - values = append(values, stats.Cpu.Usage.Total) +) (points []*influxdb.Point) { + // CPU usage: Total usage in nanoseconds + points = append(points, makePoint(serCpuUsageTotal, stats.Cpu.Usage.Total)) + + // CPU usage: Time spend in system space (in nanoseconds) + points = append(points, makePoint(serCpuUsageSystem, stats.Cpu.Usage.System)) + + // CPU usage: Time spent in user space (in nanoseconds) + points = append(points, makePoint(serCpuUsageUser, stats.Cpu.Usage.User)) + + // CPU usage per CPU + for i := 0; i < len(stats.Cpu.Usage.PerCpu); i++ { + point := makePoint(serCpuUsagePerCpu, stats.Cpu.Usage.PerCpu[i]) + tags := map[string]string{"instance": fmt.Sprintf("%v", i)} + addTagsToPoint(point, tags) + + points = append(points, point) + } + + // Load Average + points = append(points, makePoint(serLoadAverage, stats.Cpu.LoadAverage)) // Memory Usage - columns = append(columns, colMemoryUsage) - values = append(values, stats.Memory.Usage) + points = append(points, makePoint(serMemoryUsage, stats.Memory.Usage)) - // Working set size - columns = append(columns, colMemoryWorkingSet) - values = append(values, stats.Memory.WorkingSet) + // Working Set Size + points = append(points, makePoint(serMemoryWorkingSet, stats.Memory.WorkingSet)) - // Network stats. - columns = append(columns, colRxBytes) - values = append(values, stats.Network.RxBytes) + // Network Stats + points = append(points, makePoint(serRxBytes, stats.Network.RxBytes)) + points = append(points, makePoint(serRxErrors, stats.Network.RxErrors)) + points = append(points, makePoint(serTxBytes, stats.Network.TxBytes)) + points = append(points, makePoint(serTxErrors, stats.Network.TxErrors)) - columns = append(columns, colRxErrors) - values = append(values, stats.Network.RxErrors) + self.tagPoints(ref, stats, points) - columns = append(columns, colTxBytes) - values = append(values, stats.Network.TxBytes) - - columns = append(columns, colTxErrors) - values = append(values, stats.Network.TxErrors) - - return columns, values + return points } func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) { @@ -175,27 +220,38 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C if stats == nil { return nil } - var seriesToFlush []*influxdb.Series + var pointsToFlush []*influxdb.Point func() { // AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write. self.lock.Lock() defer self.lock.Unlock() - self.series = append(self.series, self.newSeries(self.containerStatsToValues(ref, stats))) - self.series = append(self.series, self.containerFilesystemStatsToSeries(ref, stats)...) + self.points = append(self.points, self.containerStatsToPoints(ref, stats)...) + self.points = append(self.points, self.containerFilesystemStatsToPoints(ref, stats)...) if self.readyToFlush() { - seriesToFlush = self.series - self.series = make([]*influxdb.Series, 0) + pointsToFlush = self.points + self.points = make([]*influxdb.Point, 0) self.lastWrite = time.Now() } }() - if len(seriesToFlush) > 0 { - err := self.client.WriteSeriesWithTimePrecision(seriesToFlush, influxdb.Microsecond) - if err != nil { + if len(pointsToFlush) > 0 { + points := make([]influxdb.Point, len(pointsToFlush)) + for i, p := range pointsToFlush { + points[i] = *p + } + + batchTags := map[string]string{tagMachineName: self.machineName} + bp := influxdb.BatchPoints{ + Points: points, + Database: self.database, + Tags: batchTags, + Time: stats.Timestamp, + } + response, err := self.client.Write(bp) + if err != nil || checkResponseForErrors(response) != nil { return fmt.Errorf("failed to write stats to influxDb - %s", err) } } - return nil } @@ -204,21 +260,9 @@ func (self *influxdbStorage) Close() error { return nil } -// Returns a new influxdb series. -func (self *influxdbStorage) newSeries(columns []string, points []interface{}) *influxdb.Series { - out := &influxdb.Series{ - Name: self.tableName, - Columns: columns, - // There's only one point for each stats - Points: make([][]interface{}, 1), - } - out.Points[0] = points - return out -} - // machineName: A unique identifier to identify the host that current cAdvisor // instance is running on. -// influxdbHost: The host which runs influxdb. +// influxdbHost: The host which runs influxdb (host:port) func newStorage( machineName, tablename, @@ -229,28 +273,107 @@ func newStorage( isSecure bool, bufferDuration time.Duration, ) (*influxdbStorage, error) { - config := &influxdb.ClientConfig{ - Host: influxdbHost, - Username: username, - Password: password, - Database: database, - IsSecure: isSecure, + url := &url.URL{ + Scheme: "http", + Host: influxdbHost, } - client, err := influxdb.NewClient(config) + if isSecure { + url.Scheme = "https" + } + + config := &influxdb.Config{ + URL: *url, + Username: username, + Password: password, + UserAgent: fmt.Sprintf("%v/%v", "cAdvisor", version.Info["version"]), + } + client, err := influxdb.NewClient(*config) if err != nil { return nil, err } - // TODO(monnand): With go 1.3, we cannot compress data now. - client.DisableCompression() ret := &influxdbStorage{ client: client, machineName: machineName, - tableName: tablename, + database: database, bufferDuration: bufferDuration, lastWrite: time.Now(), - series: make([]*influxdb.Series, 0), + points: make([]*influxdb.Point, 0), } ret.readyToFlush = ret.defaultReadyToFlush return ret, nil } + +// Creates a measurement point with a single value field +func makePoint(name string, value interface{}) *influxdb.Point { + fields := map[string]interface{}{ + fieldValue: toSignedIfUnsigned(value), + } + + return &influxdb.Point{ + Measurement: name, + Fields: fields, + } +} + +// Adds additional tags to the existing tags of a point +func addTagsToPoint(point *influxdb.Point, tags map[string]string) { + if point.Tags == nil { + point.Tags = tags + } else { + for k, v := range tags { + point.Tags[k] = v + } + } +} + +// Checks response for possible errors +func checkResponseForErrors(response *influxdb.Response) error { + const msg = "failed to write stats to influxDb - %s" + + if response != nil && response.Err != nil { + return fmt.Errorf(msg, response.Err) + } + if response != nil && response.Results != nil { + for _, result := range response.Results { + if result.Err != nil { + return fmt.Errorf(msg, result.Err) + } + if result.Series != nil { + for _, row := range result.Series { + if row.Err != nil { + return fmt.Errorf(msg, row.Err) + } + } + } + } + } + return nil +} + +// Some stats have type unsigned integer, but the InfluxDB client accepts only signed integers. +func toSignedIfUnsigned(value interface{}) interface{} { + switch value.(type) { + case uint64: + if v, ok := value.(uint64); ok { + return int64(v) + } + case uint32: + if v, ok := value.(uint32); ok { + return int32(v) + } + case uint16: + if v, ok := value.(uint16); ok { + return int16(v) + } + case uint8: + if v, ok := value.(uint8); ok { + return int8(v) + } + case uint: + if v, ok := value.(uint); ok { + return int(v) + } + } + return value +} diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 20057cba..39433ae6 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -19,6 +19,8 @@ package influxdb import ( "fmt" + "math/rand" + "net/url" "reflect" "testing" "time" @@ -28,6 +30,8 @@ import ( "github.com/google/cadvisor/storage/test" influxdb "github.com/influxdb/influxdb/client" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // The duration in seconds for which stats will be buffered in the influxdb driver. @@ -40,10 +44,7 @@ type influxDbTestStorageDriver struct { } func (self *influxDbTestStorageDriver) readyToFlush() bool { - if self.count >= self.buffer { - return true - } - return false + return self.count >= self.buffer } func (self *influxDbTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { @@ -51,18 +52,6 @@ func (self *influxDbTestStorageDriver) AddStats(ref info.ContainerReference, sta return self.base.AddStats(ref, stats) } -func (self *influxDbTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { - return nil, nil -} - -func (self *influxDbTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) { - return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles) -} - -func (self *influxDbTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - return self.base.Samples(containerName, numSamples) -} - func (self *influxDbTestStorageDriver) Close() error { return self.base.Close() } @@ -75,6 +64,28 @@ func (self *influxDbTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool { if a.Cpu.Usage.Total != b.Cpu.Usage.Total { return false } + if a.Cpu.Usage.System != b.Cpu.Usage.System { + return false + } + if a.Cpu.Usage.User != b.Cpu.Usage.User { + return false + } + + // TODO simpler way to check if arrays are equal? + if a.Cpu.Usage.PerCpu == nil && b.Cpu.Usage.PerCpu != nil { + return false + } + if a.Cpu.Usage.PerCpu != nil && b.Cpu.Usage.PerCpu == nil { + return false + } + if len(a.Cpu.Usage.PerCpu) != len(b.Cpu.Usage.PerCpu) { + return false + } + for i, usage := range a.Cpu.Usage.PerCpu { + if usage != b.Cpu.Usage.PerCpu[i] { + return false + } + } if a.Memory.Usage != b.Memory.Usage { return false @@ -96,73 +107,56 @@ func (self *influxDbTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool { func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bufferCount int) { machineName := "machineA" - tablename := "t" - database := "cadvisor" + database := "cadvisor_test" username := "root" password := "root" hostname := "localhost:8086" - percentilesDuration := 10 * time.Minute - rootConfig := &influxdb.ClientConfig{ - Host: hostname, + //percentilesDuration := 10 * time.Minute + + config := influxdb.Config{ + URL: url.URL{Scheme: "http", Host: hostname}, Username: username, Password: password, - IsSecure: false, - } - rootClient, err := influxdb.NewClient(rootConfig) - if err != nil { - t.Fatal(err) - } - // create the data base first. - rootClient.CreateDatabase(database) - config := &influxdb.ClientConfig{ - Host: hostname, - Username: username, - Password: password, - Database: database, - IsSecure: false, } client, err := influxdb.NewClient(config) if err != nil { t.Fatal(err) } - client.DisableCompression() - deleteAll := fmt.Sprintf("drop series %v", tablename) - _, err = client.Query(deleteAll) - if err != nil { + + // Re-create the database first. + if err := prepareDatabase(client, database); err != nil { t.Fatal(err) } - // delete all data by the end of the call - defer client.Query(deleteAll) + + // Delete all data by the end of the call. + //defer client.Query(influxdb.Query{Command: fmt.Sprintf("drop database \"%v\"", database)}) driver, err := New(machineName, - tablename, database, username, password, hostname, false, - time.Duration(bufferCount), - percentilesDuration) + time.Duration(bufferCount)) if err != nil { t.Fatal(err) } + defer driver.Close() testDriver := &influxDbTestStorageDriver{buffer: bufferCount} driver.OverrideReadyToFlush(testDriver.readyToFlush) testDriver.base = driver - // generate another container's data on same machine. + // Generate another container's data on same machine. test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100, testDriver, t) - // generate another container's data on another machine. + // Generate another container's data on another machine. driverForAnotherMachine, err := New("machineB", - tablename, database, username, password, hostname, false, - time.Duration(bufferCount), - percentilesDuration) + time.Duration(bufferCount)) if err != nil { t.Fatal(err) } @@ -175,14 +169,138 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu f(testDriver, t) } -func TestRetrievePartialRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t, 20) +func prepareDatabase(client *influxdb.Client, database string) error { + dropDbQuery := influxdb.Query{ + Command: fmt.Sprintf("drop database \"%v\"", database), + } + createDbQuery := influxdb.Query{ + Command: fmt.Sprintf("create database \"%v\"", database), + } + // A default retention policy must always be present. + // Depending on the InfluxDB configuration it may be created automatically with the database or not. + // TODO create ret. policy only if not present + createPolicyQuery := influxdb.Query{ + Command: fmt.Sprintf("create retention policy \"default\" on \"%v\" duration 1h replication 1 default", database), + } + _, err := client.Query(dropDbQuery) + if err != nil { + return err + } + _, err = client.Query(createDbQuery) + if err != nil { + return err + } + _, err = client.Query(createPolicyQuery) + return err } -func TestRetrieveAllRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t, 10) +func TestContainerFileSystemStatsToPoints(t *testing.T) { + assert := assert.New(t) + + machineName := "testMachine" + database := "cadvisor_test" + username := "root" + password := "root" + influxdbHost := "localhost:8086" + + storage, err := New(machineName, + database, + username, + password, + influxdbHost, + false, 2*time.Minute) + assert.Nil(err) + + ref := info.ContainerReference{ + Name: "containerName", + } + stats := &info.ContainerStats{} + points := storage.containerFilesystemStatsToPoints(ref, stats) + + // stats.Filesystem is always nil, not sure why + assert.Nil(points) } -func TestNoRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverTestNoRecentStats, t, kCacheDuration) +func TestContainerStatsToPoints(t *testing.T) { + // Given + storage, err := createTestStorage() + require.Nil(t, err) + require.NotNil(t, storage) + + ref, stats := createTestStats() + require.Nil(t, err) + require.NotNil(t, stats) + + // When + points := storage.containerStatsToPoints(*ref, stats) + + // Then + assert.NotEmpty(t, points) + assert.Len(t, points, 10+len(stats.Cpu.Usage.PerCpu)) + + assertContainsPointWithValue(t, points, serCpuUsageTotal, stats.Cpu.Usage.Total) + assertContainsPointWithValue(t, points, serCpuUsageSystem, stats.Cpu.Usage.System) + assertContainsPointWithValue(t, points, serCpuUsageUser, stats.Cpu.Usage.User) + assertContainsPointWithValue(t, points, serMemoryUsage, stats.Memory.Usage) + assertContainsPointWithValue(t, points, serLoadAverage, stats.Cpu.LoadAverage) + assertContainsPointWithValue(t, points, serMemoryWorkingSet, stats.Memory.WorkingSet) + assertContainsPointWithValue(t, points, serRxBytes, stats.Network.RxBytes) + assertContainsPointWithValue(t, points, serRxErrors, stats.Network.RxErrors) + assertContainsPointWithValue(t, points, serTxBytes, stats.Network.TxBytes) + assertContainsPointWithValue(t, points, serTxBytes, stats.Network.TxErrors) + + for _, cpu_usage := range stats.Cpu.Usage.PerCpu { + assertContainsPointWithValue(t, points, serCpuUsagePerCpu, cpu_usage) + } +} + +func assertContainsPointWithValue(t *testing.T, points []*influxdb.Point, name string, value interface{}) bool { + found := false + for _, point := range points { + if point.Measurement == name && point.Fields[fieldValue] == toSignedIfUnsigned(value) { + found = true + break + } + } + return assert.True(t, found, "no point found with name='%v' and value=%v", name, value) +} + +func createTestStorage() (*influxdbStorage, error) { + machineName := "testMachine" + database := "cadvisor_test" + username := "root" + password := "root" + influxdbHost := "localhost:8086" + + storage, err := New(machineName, + database, + username, + password, + influxdbHost, + false, 2*time.Minute) + + return storage, err +} + +func createTestStats() (*info.ContainerReference, *info.ContainerStats) { + ref := &info.ContainerReference{ + Name: "testContainername", + Aliases: []string{"testContainerAlias1", "testContainerAlias2"}, + } + + cpuUsage := info.CpuUsage{ + Total: uint64(rand.Intn(10000)), + PerCpu: []uint64{uint64(rand.Intn(1000)), uint64(rand.Intn(1000)), uint64(rand.Intn(1000))}, + User: uint64(rand.Intn(10000)), + System: uint64(rand.Intn(10000)), + } + + stats := &info.ContainerStats{ + Timestamp: time.Now(), + Cpu: info.CpuStats{ + Usage: cpuUsage, + LoadAverage: int32(rand.Intn(1000)), + }, + } + return ref, stats }