diff --git a/docs/storage/influxdb.md b/docs/storage/influxdb.md index f0f20f88..3409e5cc 100644 --- a/docs/storage/influxdb.md +++ b/docs/storage/influxdb.md @@ -21,6 +21,8 @@ Specify what InfluxDB instance to push data to: -storage_driver_password # Use secure connection with database. False by default -storage_driver_secure + # retention policy. Default is '' which corresponds to the default retention policy of the influxdb database +-storage_driver_influxdb_retention_policy ``` # Examples diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index f728a723..f1fcf0dd 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -15,6 +15,7 @@ package influxdb import ( + "flag" "fmt" "net/url" "os" @@ -32,6 +33,8 @@ func init() { storage.RegisterStorageDriver("influxdb", new) } +var argDbRetentionPolicy = flag.String("storage_driver_influxdb_retention_policy", "", "retention policy") + type influxdbStorage struct { client *influxdb.Client machineName string @@ -82,6 +85,7 @@ func new() (storage.StorageDriver, error) { hostname, *storage.ArgDbTable, *storage.ArgDbName, + *argDbRetentionPolicy, *storage.ArgDbUsername, *storage.ArgDbPassword, *storage.ArgDbHost, @@ -243,10 +247,11 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C batchTags := map[string]string{tagMachineName: self.machineName} bp := influxdb.BatchPoints{ - Points: points, - Database: self.database, - Tags: batchTags, - Time: stats.Timestamp, + Points: points, + Database: self.database, + RetentionPolicy: self.retentionPolicy, + Tags: batchTags, + Time: stats.Timestamp, } response, err := self.client.Write(bp) if err != nil || checkResponseForErrors(response) != nil { @@ -268,6 +273,7 @@ func newStorage( machineName, tablename, database, + retentionPolicy, username, password, influxdbHost string, @@ -294,12 +300,13 @@ func newStorage( } ret := &influxdbStorage{ - client: client, - machineName: machineName, - database: database, - bufferDuration: bufferDuration, - lastWrite: time.Now(), - points: make([]*influxdb.Point, 0), + client: client, + machineName: machineName, + database: database, + retentionPolicy: retentionPolicy, + bufferDuration: bufferDuration, + lastWrite: time.Now(), + points: make([]*influxdb.Point, 0), } ret.readyToFlush = ret.defaultReadyToFlush return ret, nil diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 0b8000c8..bf00e3a4 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -90,6 +90,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu username := "root" password := "root" hostname := "localhost:8086" + retentionPolicy := "cadvisor_test_rp" // percentilesDuration := 10 * time.Minute config := influxdb.Config{ @@ -103,7 +104,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu } // Re-create the database first. - if err := prepareDatabase(client, database); err != nil { + if err := prepareDatabase(client, database, retentionPolicy); err != nil { t.Fatal(err) } @@ -113,6 +114,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu driver, err := newStorage(machineName, table, database, + retentionPolicy, username, password, hostname, @@ -133,6 +135,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu driverForAnotherMachine, err := newStorage("machineB", table, database, + retentionPolicy, username, password, hostname, @@ -150,7 +153,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu f(testDriver, t) } -func prepareDatabase(client *influxdb.Client, database string) error { +func prepareDatabase(client *influxdb.Client, database string, retentionPolicy string) error { dropDbQuery := influxdb.Query{ Command: fmt.Sprintf("drop database \"%v\"", database), } @@ -161,7 +164,7 @@ func prepareDatabase(client *influxdb.Client, database string) error { // Depending on the InfluxDB configuration it may be created automatically with the database or not. // TODO create ret. policy only if not present createPolicyQuery := influxdb.Query{ - Command: fmt.Sprintf("create retention policy \"default\" on \"%v\" duration 1h replication 1 default", database), + Command: fmt.Sprintf("create retention policy \"%v\" on \"%v\" duration 1h replication 1 default", retentionPolicy, database), } _, err := client.Query(dropDbQuery) if err != nil { @@ -181,6 +184,7 @@ func TestContainerFileSystemStatsToPoints(t *testing.T) { machineName := "testMachine" table := "cadvisor_table" database := "cadvisor_test" + retentionPolicy := "cadvisor_test_rp" username := "root" password := "root" influxdbHost := "localhost:8086" @@ -188,6 +192,7 @@ func TestContainerFileSystemStatsToPoints(t *testing.T) { storage, err := newStorage(machineName, table, database, + retentionPolicy, username, password, influxdbHost, @@ -252,6 +257,7 @@ func createTestStorage() (*influxdbStorage, error) { machineName := "testMachine" table := "cadvisor_table" database := "cadvisor_test" + retentionPolicy := "cadvisor_test_rp" username := "root" password := "root" influxdbHost := "localhost:8086" @@ -259,6 +265,7 @@ func createTestStorage() (*influxdbStorage, error) { storage, err := newStorage(machineName, table, database, + retentionPolicy, username, password, influxdbHost,