feat: handling retention policy in influxdb
docs: Adding storage_driver_influxdb_retention_policy flag to the influxdb documentation fix: using rentention policy variable in test
This commit is contained in:
parent
4e25a7951f
commit
ef0f4d1c61
@ -21,6 +21,8 @@ Specify what InfluxDB instance to push data to:
|
|||||||
-storage_driver_password
|
-storage_driver_password
|
||||||
# Use secure connection with database. False by default
|
# Use secure connection with database. False by default
|
||||||
-storage_driver_secure
|
-storage_driver_secure
|
||||||
|
# retention policy. Default is '' which corresponds to the default retention policy of the influxdb database
|
||||||
|
-storage_driver_influxdb_retention_policy
|
||||||
```
|
```
|
||||||
|
|
||||||
# Examples
|
# Examples
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
package influxdb
|
package influxdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
@ -32,6 +33,8 @@ func init() {
|
|||||||
storage.RegisterStorageDriver("influxdb", new)
|
storage.RegisterStorageDriver("influxdb", new)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var argDbRetentionPolicy = flag.String("storage_driver_influxdb_retention_policy", "", "retention policy")
|
||||||
|
|
||||||
type influxdbStorage struct {
|
type influxdbStorage struct {
|
||||||
client *influxdb.Client
|
client *influxdb.Client
|
||||||
machineName string
|
machineName string
|
||||||
@ -82,6 +85,7 @@ func new() (storage.StorageDriver, error) {
|
|||||||
hostname,
|
hostname,
|
||||||
*storage.ArgDbTable,
|
*storage.ArgDbTable,
|
||||||
*storage.ArgDbName,
|
*storage.ArgDbName,
|
||||||
|
*argDbRetentionPolicy,
|
||||||
*storage.ArgDbUsername,
|
*storage.ArgDbUsername,
|
||||||
*storage.ArgDbPassword,
|
*storage.ArgDbPassword,
|
||||||
*storage.ArgDbHost,
|
*storage.ArgDbHost,
|
||||||
@ -243,10 +247,11 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C
|
|||||||
|
|
||||||
batchTags := map[string]string{tagMachineName: self.machineName}
|
batchTags := map[string]string{tagMachineName: self.machineName}
|
||||||
bp := influxdb.BatchPoints{
|
bp := influxdb.BatchPoints{
|
||||||
Points: points,
|
Points: points,
|
||||||
Database: self.database,
|
Database: self.database,
|
||||||
Tags: batchTags,
|
RetentionPolicy: self.retentionPolicy,
|
||||||
Time: stats.Timestamp,
|
Tags: batchTags,
|
||||||
|
Time: stats.Timestamp,
|
||||||
}
|
}
|
||||||
response, err := self.client.Write(bp)
|
response, err := self.client.Write(bp)
|
||||||
if err != nil || checkResponseForErrors(response) != nil {
|
if err != nil || checkResponseForErrors(response) != nil {
|
||||||
@ -268,6 +273,7 @@ func newStorage(
|
|||||||
machineName,
|
machineName,
|
||||||
tablename,
|
tablename,
|
||||||
database,
|
database,
|
||||||
|
retentionPolicy,
|
||||||
username,
|
username,
|
||||||
password,
|
password,
|
||||||
influxdbHost string,
|
influxdbHost string,
|
||||||
@ -294,12 +300,13 @@ func newStorage(
|
|||||||
}
|
}
|
||||||
|
|
||||||
ret := &influxdbStorage{
|
ret := &influxdbStorage{
|
||||||
client: client,
|
client: client,
|
||||||
machineName: machineName,
|
machineName: machineName,
|
||||||
database: database,
|
database: database,
|
||||||
bufferDuration: bufferDuration,
|
retentionPolicy: retentionPolicy,
|
||||||
lastWrite: time.Now(),
|
bufferDuration: bufferDuration,
|
||||||
points: make([]*influxdb.Point, 0),
|
lastWrite: time.Now(),
|
||||||
|
points: make([]*influxdb.Point, 0),
|
||||||
}
|
}
|
||||||
ret.readyToFlush = ret.defaultReadyToFlush
|
ret.readyToFlush = ret.defaultReadyToFlush
|
||||||
return ret, nil
|
return ret, nil
|
||||||
|
@ -90,6 +90,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu
|
|||||||
username := "root"
|
username := "root"
|
||||||
password := "root"
|
password := "root"
|
||||||
hostname := "localhost:8086"
|
hostname := "localhost:8086"
|
||||||
|
retentionPolicy := "cadvisor_test_rp"
|
||||||
// percentilesDuration := 10 * time.Minute
|
// percentilesDuration := 10 * time.Minute
|
||||||
|
|
||||||
config := influxdb.Config{
|
config := influxdb.Config{
|
||||||
@ -103,7 +104,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Re-create the database first.
|
// Re-create the database first.
|
||||||
if err := prepareDatabase(client, database); err != nil {
|
if err := prepareDatabase(client, database, retentionPolicy); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,6 +114,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu
|
|||||||
driver, err := newStorage(machineName,
|
driver, err := newStorage(machineName,
|
||||||
table,
|
table,
|
||||||
database,
|
database,
|
||||||
|
retentionPolicy,
|
||||||
username,
|
username,
|
||||||
password,
|
password,
|
||||||
hostname,
|
hostname,
|
||||||
@ -133,6 +135,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu
|
|||||||
driverForAnotherMachine, err := newStorage("machineB",
|
driverForAnotherMachine, err := newStorage("machineB",
|
||||||
table,
|
table,
|
||||||
database,
|
database,
|
||||||
|
retentionPolicy,
|
||||||
username,
|
username,
|
||||||
password,
|
password,
|
||||||
hostname,
|
hostname,
|
||||||
@ -150,7 +153,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu
|
|||||||
f(testDriver, t)
|
f(testDriver, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareDatabase(client *influxdb.Client, database string) error {
|
func prepareDatabase(client *influxdb.Client, database string, retentionPolicy string) error {
|
||||||
dropDbQuery := influxdb.Query{
|
dropDbQuery := influxdb.Query{
|
||||||
Command: fmt.Sprintf("drop database \"%v\"", database),
|
Command: fmt.Sprintf("drop database \"%v\"", database),
|
||||||
}
|
}
|
||||||
@ -161,7 +164,7 @@ func prepareDatabase(client *influxdb.Client, database string) error {
|
|||||||
// Depending on the InfluxDB configuration it may be created automatically with the database or not.
|
// Depending on the InfluxDB configuration it may be created automatically with the database or not.
|
||||||
// TODO create ret. policy only if not present
|
// TODO create ret. policy only if not present
|
||||||
createPolicyQuery := influxdb.Query{
|
createPolicyQuery := influxdb.Query{
|
||||||
Command: fmt.Sprintf("create retention policy \"default\" on \"%v\" duration 1h replication 1 default", database),
|
Command: fmt.Sprintf("create retention policy \"%v\" on \"%v\" duration 1h replication 1 default", retentionPolicy, database),
|
||||||
}
|
}
|
||||||
_, err := client.Query(dropDbQuery)
|
_, err := client.Query(dropDbQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -181,6 +184,7 @@ func TestContainerFileSystemStatsToPoints(t *testing.T) {
|
|||||||
machineName := "testMachine"
|
machineName := "testMachine"
|
||||||
table := "cadvisor_table"
|
table := "cadvisor_table"
|
||||||
database := "cadvisor_test"
|
database := "cadvisor_test"
|
||||||
|
retentionPolicy := "cadvisor_test_rp"
|
||||||
username := "root"
|
username := "root"
|
||||||
password := "root"
|
password := "root"
|
||||||
influxdbHost := "localhost:8086"
|
influxdbHost := "localhost:8086"
|
||||||
@ -188,6 +192,7 @@ func TestContainerFileSystemStatsToPoints(t *testing.T) {
|
|||||||
storage, err := newStorage(machineName,
|
storage, err := newStorage(machineName,
|
||||||
table,
|
table,
|
||||||
database,
|
database,
|
||||||
|
retentionPolicy,
|
||||||
username,
|
username,
|
||||||
password,
|
password,
|
||||||
influxdbHost,
|
influxdbHost,
|
||||||
@ -252,6 +257,7 @@ func createTestStorage() (*influxdbStorage, error) {
|
|||||||
machineName := "testMachine"
|
machineName := "testMachine"
|
||||||
table := "cadvisor_table"
|
table := "cadvisor_table"
|
||||||
database := "cadvisor_test"
|
database := "cadvisor_test"
|
||||||
|
retentionPolicy := "cadvisor_test_rp"
|
||||||
username := "root"
|
username := "root"
|
||||||
password := "root"
|
password := "root"
|
||||||
influxdbHost := "localhost:8086"
|
influxdbHost := "localhost:8086"
|
||||||
@ -259,6 +265,7 @@ func createTestStorage() (*influxdbStorage, error) {
|
|||||||
storage, err := newStorage(machineName,
|
storage, err := newStorage(machineName,
|
||||||
table,
|
table,
|
||||||
database,
|
database,
|
||||||
|
retentionPolicy,
|
||||||
username,
|
username,
|
||||||
password,
|
password,
|
||||||
influxdbHost,
|
influxdbHost,
|
||||||
|
Loading…
Reference in New Issue
Block a user