Merge pull request #185 from vishh/influxdb_optimze
Buffer stats in influxdb driver before writing to the Db.
This commit is contained in:
commit
8e35f8f2e6
1
storage/cache/memcache.go
vendored
1
storage/cache/memcache.go
vendored
@ -62,6 +62,7 @@ func (self *cachedStorageDriver) Close() error {
|
||||
return self.backend.Close()
|
||||
}
|
||||
|
||||
// TODO(vishh): Cache all samples for a given duration and do not cap the maximum number of samples. This is useful if we happen to change the housekeeping duration.
|
||||
func MemoryCache(maxNumSamplesInCache, maxNumStatsInCache int, driver storage.StorageDriver) storage.StorageDriver {
|
||||
return &cachedStorageDriver{
|
||||
maxNumStatsInCache: maxNumStatsInCache,
|
||||
|
40
storage/cache/memcache_test.go
vendored
40
storage/cache/memcache_test.go
vendored
@ -17,21 +17,47 @@ package cache
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/cadvisor/info"
|
||||
"github.com/google/cadvisor/storage"
|
||||
"github.com/google/cadvisor/storage/memory"
|
||||
"github.com/google/cadvisor/storage/test"
|
||||
)
|
||||
|
||||
type cacheTestStorageDriver struct {
|
||||
base storage.StorageDriver
|
||||
}
|
||||
|
||||
func (self *cacheTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool {
|
||||
return test.DefaultStatsEq(a, b)
|
||||
}
|
||||
|
||||
func (self *cacheTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
return self.base.AddStats(ref, stats)
|
||||
}
|
||||
|
||||
func (self *cacheTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
|
||||
return self.base.RecentStats(containerName, numStats)
|
||||
}
|
||||
|
||||
func (self *cacheTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) {
|
||||
return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles)
|
||||
}
|
||||
|
||||
func (self *cacheTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
||||
return self.base.Samples(containerName, numSamples)
|
||||
}
|
||||
|
||||
func (self *cacheTestStorageDriver) Close() error {
|
||||
return self.base.Close()
|
||||
}
|
||||
|
||||
func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
|
||||
maxSize := 200
|
||||
|
||||
var driver storage.StorageDriver
|
||||
var testDriver test.TestStorageDriver
|
||||
testDriver.StatsEq = test.DefaultStatsEq
|
||||
for N := 10; N < maxSize; N += 10 {
|
||||
testDriver := &cacheTestStorageDriver{}
|
||||
backend := memory.New(N*2, N*2)
|
||||
driver = MemoryCache(N, N, backend)
|
||||
testDriver.Driver = driver
|
||||
testDriver.base = MemoryCache(N, N, backend)
|
||||
f(testDriver, t)
|
||||
}
|
||||
|
||||
@ -55,9 +81,9 @@ func TestPercentilesWithoutSample(t *testing.T) {
|
||||
|
||||
func TestPercentiles(t *testing.T) {
|
||||
N := 100
|
||||
testDriver := &cacheTestStorageDriver{}
|
||||
backend := memory.New(N*2, N*2)
|
||||
driver := MemoryCache(N, N, backend)
|
||||
testDriver := test.TestStorageDriver{Driver: driver, StatsEq: test.DefaultStatsEq}
|
||||
testDriver.base = MemoryCache(N, N, backend)
|
||||
test.StorageDriverTestPercentiles(testDriver, t)
|
||||
}
|
||||
|
||||
|
@ -17,19 +17,24 @@ package influxdb
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/cadvisor/info"
|
||||
"github.com/google/cadvisor/storage"
|
||||
influxdb "github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
type influxdbStorage struct {
|
||||
client *influxdb.Client
|
||||
prevStats *info.ContainerStats
|
||||
machineName string
|
||||
tableName string
|
||||
windowLen time.Duration
|
||||
client *influxdb.Client
|
||||
prevStats *info.ContainerStats
|
||||
machineName string
|
||||
tableName string
|
||||
windowLen time.Duration
|
||||
bufferDuration time.Duration
|
||||
lastWrite time.Time
|
||||
series []*influxdb.Series
|
||||
lock sync.Mutex
|
||||
readyToFlush func() bool
|
||||
}
|
||||
|
||||
const (
|
||||
@ -242,21 +247,39 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values []
|
||||
return sample, nil
|
||||
}
|
||||
|
||||
func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) {
|
||||
self.readyToFlush = readyToFlush
|
||||
}
|
||||
|
||||
func (self *influxdbStorage) defaultReadyToFlush() bool {
|
||||
return time.Since(self.lastWrite) >= self.bufferDuration
|
||||
}
|
||||
|
||||
func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
series := &influxdb.Series{
|
||||
Name: self.tableName,
|
||||
// There's only one point for each stats
|
||||
Points: make([][]interface{}, 1),
|
||||
}
|
||||
if stats == nil || stats.Cpu == nil || stats.Memory == nil {
|
||||
return nil
|
||||
}
|
||||
series.Columns, series.Points[0] = self.containerStatsToValues(ref, stats)
|
||||
self.prevStats = stats.Copy(self.prevStats)
|
||||
err := self.client.WriteSeries([]*influxdb.Series{series})
|
||||
if err != nil {
|
||||
return err
|
||||
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
|
||||
var seriesToFlush []*influxdb.Series
|
||||
func() {
|
||||
self.lock.Lock()
|
||||
defer self.lock.Unlock()
|
||||
series := self.newSeries(self.containerStatsToValues(ref, stats))
|
||||
self.series = append(self.series, series)
|
||||
self.prevStats = stats.Copy(self.prevStats)
|
||||
if self.readyToFlush() {
|
||||
seriesToFlush = self.series
|
||||
self.series = make([]*influxdb.Series, 0)
|
||||
self.lastWrite = time.Now()
|
||||
}
|
||||
}()
|
||||
if len(seriesToFlush) > 0 {
|
||||
err := self.client.WriteSeries(seriesToFlush)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write stats to influxDb - %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -405,6 +428,18 @@ func (self *influxdbStorage) Percentiles(
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// Returns a new influxdb series.
|
||||
func (self *influxdbStorage) newSeries(columns []string, points []interface{}) *influxdb.Series {
|
||||
out := &influxdb.Series{
|
||||
Name: self.tableName,
|
||||
Columns: columns,
|
||||
// There's only one point for each stats
|
||||
Points: make([][]interface{}, 1),
|
||||
}
|
||||
out.Points[0] = points
|
||||
return out
|
||||
}
|
||||
|
||||
// machineName: A unique identifier to identify the host that current cAdvisor
|
||||
// instance is running on.
|
||||
// influxdbHost: The host which runs influxdb.
|
||||
@ -416,8 +451,9 @@ func New(machineName,
|
||||
password,
|
||||
influxdbHost string,
|
||||
isSecure bool,
|
||||
bufferDuration time.Duration,
|
||||
percentilesDuration time.Duration,
|
||||
) (storage.StorageDriver, error) {
|
||||
) (*influxdbStorage, error) {
|
||||
config := &influxdb.ClientConfig{
|
||||
Host: influxdbHost,
|
||||
Username: username,
|
||||
@ -436,10 +472,14 @@ func New(machineName,
|
||||
}
|
||||
|
||||
ret := &influxdbStorage{
|
||||
client: client,
|
||||
windowLen: percentilesDuration,
|
||||
machineName: machineName,
|
||||
tableName: tablename,
|
||||
client: client,
|
||||
windowLen: percentilesDuration,
|
||||
machineName: machineName,
|
||||
tableName: tablename,
|
||||
bufferDuration: bufferDuration,
|
||||
lastWrite: time.Now(),
|
||||
series: make([]*influxdb.Series, 0),
|
||||
}
|
||||
ret.readyToFlush = ret.defaultReadyToFlush
|
||||
return ret, nil
|
||||
}
|
||||
|
@ -21,11 +21,49 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/cadvisor/info"
|
||||
"github.com/google/cadvisor/storage"
|
||||
"github.com/google/cadvisor/storage/test"
|
||||
influxdb "github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
func StatsEq(a, b *info.ContainerStats) bool {
|
||||
// The duration in seconds for which stats will be buffered in the influxdb driver.
|
||||
const kCacheDuration = 1
|
||||
|
||||
type influxDbTestStorageDriver struct {
|
||||
count int
|
||||
buffer int
|
||||
base storage.StorageDriver
|
||||
}
|
||||
|
||||
func (self *influxDbTestStorageDriver) readyToFlush() bool {
|
||||
if self.count >= self.buffer {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (self *influxDbTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
self.count++
|
||||
return self.base.AddStats(ref, stats)
|
||||
}
|
||||
|
||||
func (self *influxDbTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
|
||||
return self.base.RecentStats(containerName, numStats)
|
||||
}
|
||||
|
||||
func (self *influxDbTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) {
|
||||
return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles)
|
||||
}
|
||||
|
||||
func (self *influxDbTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
||||
return self.base.Samples(containerName, numSamples)
|
||||
}
|
||||
|
||||
func (self *influxDbTestStorageDriver) Close() error {
|
||||
return self.base.Close()
|
||||
}
|
||||
|
||||
func (self *influxDbTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool {
|
||||
if !test.TimeEq(a.Timestamp, b.Timestamp, 10*time.Millisecond) {
|
||||
return false
|
||||
}
|
||||
@ -48,7 +86,7 @@ func StatsEq(a, b *info.ContainerStats) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
|
||||
func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bufferCount int) {
|
||||
machineName := "machineA"
|
||||
tablename := "t"
|
||||
database := "cadvisor"
|
||||
@ -95,12 +133,15 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
|
||||
password,
|
||||
hostname,
|
||||
false,
|
||||
time.Duration(bufferCount),
|
||||
percentilesDuration)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testDriver := &influxDbTestStorageDriver{buffer: bufferCount}
|
||||
driver.OverrideReadyToFlush(testDriver.readyToFlush)
|
||||
testDriver.base = driver
|
||||
|
||||
testDriver := test.TestStorageDriver{Driver: driver, StatsEq: StatsEq}
|
||||
// generate another container's data on same machine.
|
||||
test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100, testDriver, t)
|
||||
|
||||
@ -112,62 +153,66 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
|
||||
password,
|
||||
hostname,
|
||||
false,
|
||||
time.Duration(bufferCount),
|
||||
percentilesDuration)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer driverForAnotherMachine.Close()
|
||||
testDriverOtherMachine := test.TestStorageDriver{Driver: driverForAnotherMachine, StatsEq: StatsEq}
|
||||
testDriverOtherMachine := &influxDbTestStorageDriver{buffer: bufferCount}
|
||||
driverForAnotherMachine.OverrideReadyToFlush(testDriverOtherMachine.readyToFlush)
|
||||
testDriverOtherMachine.base = driverForAnotherMachine
|
||||
|
||||
test.StorageDriverFillRandomStatsFunc("containerOnAnotherMachine", 100, testDriverOtherMachine, t)
|
||||
f(testDriver, t)
|
||||
}
|
||||
|
||||
func TestSampleCpuUsage(t *testing.T) {
|
||||
runStorageTest(test.StorageDriverTestSampleCpuUsage, t)
|
||||
runStorageTest(test.StorageDriverTestSampleCpuUsage, t, kCacheDuration)
|
||||
}
|
||||
|
||||
func TestRetrievePartialRecentStats(t *testing.T) {
|
||||
runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t)
|
||||
runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t, 20)
|
||||
}
|
||||
|
||||
func TestSamplesWithoutSample(t *testing.T) {
|
||||
runStorageTest(test.StorageDriverTestSamplesWithoutSample, t)
|
||||
runStorageTest(test.StorageDriverTestSamplesWithoutSample, t, kCacheDuration)
|
||||
}
|
||||
|
||||
func TestRetrieveAllRecentStats(t *testing.T) {
|
||||
runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t)
|
||||
runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t, 10)
|
||||
}
|
||||
|
||||
func TestNoRecentStats(t *testing.T) {
|
||||
runStorageTest(test.StorageDriverTestNoRecentStats, t)
|
||||
runStorageTest(test.StorageDriverTestNoRecentStats, t, kCacheDuration)
|
||||
}
|
||||
|
||||
func TestNoSamples(t *testing.T) {
|
||||
runStorageTest(test.StorageDriverTestNoSamples, t)
|
||||
runStorageTest(test.StorageDriverTestNoSamples, t, kCacheDuration)
|
||||
}
|
||||
|
||||
func TestPercentiles(t *testing.T) {
|
||||
runStorageTest(test.StorageDriverTestPercentiles, t)
|
||||
runStorageTest(test.StorageDriverTestPercentiles, t, kCacheDuration)
|
||||
}
|
||||
|
||||
func TestMaxMemoryUsage(t *testing.T) {
|
||||
runStorageTest(test.StorageDriverTestMaxMemoryUsage, t)
|
||||
runStorageTest(test.StorageDriverTestMaxMemoryUsage, t, kCacheDuration)
|
||||
}
|
||||
|
||||
func TestPercentilesWithoutSample(t *testing.T) {
|
||||
runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t)
|
||||
runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t, kCacheDuration)
|
||||
}
|
||||
|
||||
func TestPercentilesWithoutStats(t *testing.T) {
|
||||
runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t)
|
||||
runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t, kCacheDuration)
|
||||
}
|
||||
|
||||
func TestRetrieveZeroStats(t *testing.T) {
|
||||
t.SkipNow()
|
||||
runStorageTest(test.StorageDriverTestRetrieveZeroRecentStats, t)
|
||||
runStorageTest(test.StorageDriverTestRetrieveZeroRecentStats, t, kCacheDuration)
|
||||
}
|
||||
|
||||
func TestRetrieveZeroSamples(t *testing.T) {
|
||||
t.SkipNow()
|
||||
runStorageTest(test.StorageDriverTestRetrieveZeroSamples, t)
|
||||
runStorageTest(test.StorageDriverTestRetrieveZeroSamples, t, kCacheDuration)
|
||||
}
|
||||
|
@ -17,22 +17,47 @@ package memory
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/cadvisor/info"
|
||||
"github.com/google/cadvisor/storage"
|
||||
"github.com/google/cadvisor/storage/test"
|
||||
)
|
||||
|
||||
type memoryTestStorageDriver struct {
|
||||
base storage.StorageDriver
|
||||
}
|
||||
|
||||
func (self *memoryTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool {
|
||||
return test.DefaultStatsEq(a, b)
|
||||
}
|
||||
|
||||
func (self *memoryTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||
return self.base.AddStats(ref, stats)
|
||||
}
|
||||
|
||||
func (self *memoryTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
|
||||
return self.base.RecentStats(containerName, numStats)
|
||||
}
|
||||
|
||||
func (self *memoryTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) {
|
||||
return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles)
|
||||
}
|
||||
|
||||
func (self *memoryTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
||||
return self.base.Samples(containerName, numSamples)
|
||||
}
|
||||
|
||||
func (self *memoryTestStorageDriver) Close() error {
|
||||
return self.base.Close()
|
||||
}
|
||||
|
||||
func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
|
||||
maxSize := 200
|
||||
|
||||
var driver storage.StorageDriver
|
||||
var testDriver test.TestStorageDriver
|
||||
testDriver.StatsEq = test.DefaultStatsEq
|
||||
for N := 10; N < maxSize; N += 10 {
|
||||
driver = New(N, N)
|
||||
testDriver.Driver = driver
|
||||
testDriver := &memoryTestStorageDriver{}
|
||||
testDriver.base = New(N, N)
|
||||
f(testDriver, t)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestMaxMemoryUsage(t *testing.T) {
|
||||
@ -53,8 +78,8 @@ func TestPercentilesWithoutSample(t *testing.T) {
|
||||
|
||||
func TestPercentiles(t *testing.T) {
|
||||
N := 100
|
||||
driver := New(N, N)
|
||||
testDriver := test.TestStorageDriver{Driver: driver, StatsEq: test.DefaultStatsEq}
|
||||
testDriver := &memoryTestStorageDriver{}
|
||||
testDriver.base = New(N, N)
|
||||
test.StorageDriverTestPercentiles(testDriver, t)
|
||||
}
|
||||
|
||||
|
@ -24,9 +24,9 @@ import (
|
||||
"github.com/google/cadvisor/storage"
|
||||
)
|
||||
|
||||
type TestStorageDriver struct {
|
||||
Driver storage.StorageDriver
|
||||
StatsEq func(a *info.ContainerStats, b *info.ContainerStats) bool
|
||||
type TestStorageDriver interface {
|
||||
StatsEq(a *info.ContainerStats, b *info.ContainerStats) bool
|
||||
storage.StorageDriver
|
||||
}
|
||||
|
||||
func buildTrace(cpu, mem []uint64, duration time.Duration) []*info.ContainerStats {
|
||||
@ -183,7 +183,7 @@ func StorageDriverFillRandomStatsFunc(
|
||||
trace := buildTrace(cpuTrace, memTrace, samplePeriod)
|
||||
|
||||
for _, stats := range trace {
|
||||
err := driver.Driver.AddStats(ref, stats)
|
||||
err := driver.AddStats(ref, stats)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to add stats: %v", err)
|
||||
}
|
||||
@ -191,7 +191,7 @@ func StorageDriverFillRandomStatsFunc(
|
||||
}
|
||||
|
||||
func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
N := 100
|
||||
cpuTrace := make([]uint64, 0, N)
|
||||
memTrace := make([]uint64, 0, N)
|
||||
@ -211,7 +211,7 @@ func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) {
|
||||
trace := buildTrace(cpuTrace, memTrace, samplePeriod)
|
||||
|
||||
for _, stats := range trace {
|
||||
err := driver.Driver.AddStats(ref, stats)
|
||||
err := driver.AddStats(ref, stats)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to add stats: %v", err)
|
||||
}
|
||||
@ -222,7 +222,7 @@ func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) {
|
||||
stats.Cpu.Usage.User = 0
|
||||
}
|
||||
|
||||
samples, err := driver.Driver.Samples(ref.Name, N)
|
||||
samples, err := driver.Samples(ref.Name, N)
|
||||
if err != nil {
|
||||
t.Errorf("unable to sample stats: %v", err)
|
||||
}
|
||||
@ -231,13 +231,13 @@ func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) {
|
||||
}
|
||||
samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t)
|
||||
|
||||
samples, err = driver.Driver.Samples(ref.Name, -1)
|
||||
samples, err = driver.Samples(ref.Name, -1)
|
||||
if err != nil {
|
||||
t.Errorf("unable to sample stats: %v", err)
|
||||
}
|
||||
samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t)
|
||||
|
||||
samples, err = driver.Driver.Samples(ref.Name, N-5)
|
||||
samples, err = driver.Samples(ref.Name, N-5)
|
||||
if err != nil {
|
||||
t.Errorf("unable to sample stats: %v", err)
|
||||
}
|
||||
@ -245,7 +245,7 @@ func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) {
|
||||
}
|
||||
|
||||
func StorageDriverTestMaxMemoryUsage(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
N := 100
|
||||
memTrace := make([]uint64, N)
|
||||
cpuTrace := make([]uint64, N)
|
||||
@ -261,7 +261,7 @@ func StorageDriverTestMaxMemoryUsage(driver TestStorageDriver, t *testing.T) {
|
||||
trace := buildTrace(cpuTrace, memTrace, 1*time.Second)
|
||||
|
||||
for _, stats := range trace {
|
||||
err := driver.Driver.AddStats(ref, stats)
|
||||
err := driver.AddStats(ref, stats)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to add stats: %v", err)
|
||||
}
|
||||
@ -273,7 +273,7 @@ func StorageDriverTestMaxMemoryUsage(driver TestStorageDriver, t *testing.T) {
|
||||
stats.Memory.Usage = 0
|
||||
}
|
||||
|
||||
percentiles, err := driver.Driver.Percentiles(ref.Name, []int{50}, []int{50})
|
||||
percentiles, err := driver.Percentiles(ref.Name, []int{50}, []int{50})
|
||||
if err != nil {
|
||||
t.Errorf("unable to call Percentiles(): %v", err)
|
||||
}
|
||||
@ -284,7 +284,7 @@ func StorageDriverTestMaxMemoryUsage(driver TestStorageDriver, t *testing.T) {
|
||||
}
|
||||
|
||||
func StorageDriverTestSamplesWithoutSample(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
trace := buildTrace(
|
||||
[]uint64{10},
|
||||
[]uint64{10},
|
||||
@ -292,8 +292,8 @@ func StorageDriverTestSamplesWithoutSample(driver TestStorageDriver, t *testing.
|
||||
ref := info.ContainerReference{
|
||||
Name: "container",
|
||||
}
|
||||
driver.Driver.AddStats(ref, trace[0])
|
||||
samples, err := driver.Driver.Samples(ref.Name, -1)
|
||||
driver.AddStats(ref, trace[0])
|
||||
samples, err := driver.Samples(ref.Name, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -303,7 +303,7 @@ func StorageDriverTestSamplesWithoutSample(driver TestStorageDriver, t *testing.
|
||||
}
|
||||
|
||||
func StorageDriverTestPercentilesWithoutSample(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
trace := buildTrace(
|
||||
[]uint64{10},
|
||||
[]uint64{10},
|
||||
@ -311,8 +311,8 @@ func StorageDriverTestPercentilesWithoutSample(driver TestStorageDriver, t *test
|
||||
ref := info.ContainerReference{
|
||||
Name: "container",
|
||||
}
|
||||
driver.Driver.AddStats(ref, trace[0])
|
||||
percentiles, err := driver.Driver.Percentiles(
|
||||
driver.AddStats(ref, trace[0])
|
||||
percentiles, err := driver.Percentiles(
|
||||
ref.Name,
|
||||
[]int{50},
|
||||
[]int{50},
|
||||
@ -326,7 +326,7 @@ func StorageDriverTestPercentilesWithoutSample(driver TestStorageDriver, t *test
|
||||
}
|
||||
|
||||
func StorageDriverTestPercentiles(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
N := 100
|
||||
cpuTrace := make([]uint64, N)
|
||||
memTrace := make([]uint64, N)
|
||||
@ -341,14 +341,14 @@ func StorageDriverTestPercentiles(driver TestStorageDriver, t *testing.T) {
|
||||
Name: "container",
|
||||
}
|
||||
for _, stats := range trace {
|
||||
driver.Driver.AddStats(ref, stats)
|
||||
driver.AddStats(ref, stats)
|
||||
}
|
||||
percentages := []int{
|
||||
80,
|
||||
90,
|
||||
50,
|
||||
}
|
||||
percentiles, err := driver.Driver.Percentiles(ref.Name, percentages, percentages)
|
||||
percentiles, err := driver.Percentiles(ref.Name, percentages, percentages)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -372,7 +372,7 @@ func StorageDriverTestPercentiles(driver TestStorageDriver, t *testing.T) {
|
||||
}
|
||||
|
||||
func StorageDriverTestRetrievePartialRecentStats(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
N := 100
|
||||
memTrace := make([]uint64, N)
|
||||
cpuTrace := make([]uint64, N)
|
||||
@ -388,10 +388,10 @@ func StorageDriverTestRetrievePartialRecentStats(driver TestStorageDriver, t *te
|
||||
trace := buildTrace(cpuTrace, memTrace, 1*time.Second)
|
||||
|
||||
for _, stats := range trace {
|
||||
driver.Driver.AddStats(ref, stats)
|
||||
driver.AddStats(ref, stats)
|
||||
}
|
||||
|
||||
recentStats, err := driver.Driver.RecentStats(ref.Name, 10)
|
||||
recentStats, err := driver.RecentStats(ref.Name, 10)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -415,7 +415,7 @@ func StorageDriverTestRetrievePartialRecentStats(driver TestStorageDriver, t *te
|
||||
}
|
||||
|
||||
func StorageDriverTestRetrieveAllRecentStats(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
N := 100
|
||||
memTrace := make([]uint64, N)
|
||||
cpuTrace := make([]uint64, N)
|
||||
@ -431,10 +431,10 @@ func StorageDriverTestRetrieveAllRecentStats(driver TestStorageDriver, t *testin
|
||||
trace := buildTrace(cpuTrace, memTrace, 1*time.Second)
|
||||
|
||||
for _, stats := range trace {
|
||||
driver.Driver.AddStats(ref, stats)
|
||||
driver.AddStats(ref, stats)
|
||||
}
|
||||
|
||||
recentStats, err := driver.Driver.RecentStats(ref.Name, -1)
|
||||
recentStats, err := driver.RecentStats(ref.Name, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -457,27 +457,27 @@ func StorageDriverTestRetrieveAllRecentStats(driver TestStorageDriver, t *testin
|
||||
}
|
||||
|
||||
func StorageDriverTestNoRecentStats(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
nonExistContainer := "somerandomecontainer"
|
||||
stats, _ := driver.Driver.RecentStats(nonExistContainer, -1)
|
||||
stats, _ := driver.RecentStats(nonExistContainer, -1)
|
||||
if len(stats) > 0 {
|
||||
t.Errorf("RecentStats() returns %v stats on non exist container", len(stats))
|
||||
}
|
||||
}
|
||||
|
||||
func StorageDriverTestNoSamples(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
nonExistContainer := "somerandomecontainer"
|
||||
samples, _ := driver.Driver.Samples(nonExistContainer, -1)
|
||||
samples, _ := driver.Samples(nonExistContainer, -1)
|
||||
if len(samples) > 0 {
|
||||
t.Errorf("Samples() returns %v samples on non exist container", len(samples))
|
||||
}
|
||||
}
|
||||
|
||||
func StorageDriverTestPercentilesWithoutStats(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
nonExistContainer := "somerandomecontainer"
|
||||
percentiles, _ := driver.Driver.Percentiles(nonExistContainer, []int{50, 80}, []int{50, 80})
|
||||
percentiles, _ := driver.Percentiles(nonExistContainer, []int{50, 80}, []int{50, 80})
|
||||
if percentiles == nil {
|
||||
return
|
||||
}
|
||||
@ -497,7 +497,7 @@ func StorageDriverTestPercentilesWithoutStats(driver TestStorageDriver, t *testi
|
||||
}
|
||||
|
||||
func StorageDriverTestRetrieveZeroRecentStats(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
N := 100
|
||||
memTrace := make([]uint64, N)
|
||||
cpuTrace := make([]uint64, N)
|
||||
@ -513,10 +513,10 @@ func StorageDriverTestRetrieveZeroRecentStats(driver TestStorageDriver, t *testi
|
||||
trace := buildTrace(cpuTrace, memTrace, 1*time.Second)
|
||||
|
||||
for _, stats := range trace {
|
||||
driver.Driver.AddStats(ref, stats)
|
||||
driver.AddStats(ref, stats)
|
||||
}
|
||||
|
||||
recentStats, err := driver.Driver.RecentStats(ref.Name, 0)
|
||||
recentStats, err := driver.RecentStats(ref.Name, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -526,7 +526,7 @@ func StorageDriverTestRetrieveZeroRecentStats(driver TestStorageDriver, t *testi
|
||||
}
|
||||
|
||||
func StorageDriverTestRetrieveZeroSamples(driver TestStorageDriver, t *testing.T) {
|
||||
defer driver.Driver.Close()
|
||||
defer driver.Close()
|
||||
N := 100
|
||||
memTrace := make([]uint64, N)
|
||||
cpuTrace := make([]uint64, N)
|
||||
@ -542,10 +542,10 @@ func StorageDriverTestRetrieveZeroSamples(driver TestStorageDriver, t *testing.T
|
||||
trace := buildTrace(cpuTrace, memTrace, 1*time.Second)
|
||||
|
||||
for _, stats := range trace {
|
||||
driver.Driver.AddStats(ref, stats)
|
||||
driver.AddStats(ref, stats)
|
||||
}
|
||||
|
||||
samples, err := driver.Driver.Samples(ref.Name, 0)
|
||||
samples, err := driver.Samples(ref.Name, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -27,12 +27,12 @@ import (
|
||||
)
|
||||
|
||||
var argSampleSize = flag.Int("samples", 1024, "number of samples we want to keep")
|
||||
var argHistoryDuration = flag.Int("history_duration", 60, "number of seconds of container history to keep")
|
||||
var argDbUsername = flag.String("storage_driver_user", "root", "database username")
|
||||
var argDbPassword = flag.String("storage_driver_password", "root", "database password")
|
||||
var argDbHost = flag.String("storage_driver_host", "localhost:8086", "database host:port")
|
||||
var argDbName = flag.String("storage_driver_db", "cadvisor", "database name")
|
||||
var argDbIsSecure = flag.Bool("storage_driver_secure", false, "use secure connection with database")
|
||||
var argDbBufferDuration = flag.Duration("storage_driver_buffer_duration", 60, "Writes in the storage driver will be bufferd for this duration (in seconds), and committed to the non memory backends as a single transaction")
|
||||
|
||||
func NewStorageDriver(driverName string) (storage.StorageDriver, error) {
|
||||
var storageDriver storage.StorageDriver
|
||||
@ -42,7 +42,7 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) {
|
||||
// empty string by default is the in memory store
|
||||
fallthrough
|
||||
case "memory":
|
||||
storageDriver = memory.New(*argSampleSize, *argHistoryDuration)
|
||||
storageDriver = memory.New(*argSampleSize, int(*argDbBufferDuration))
|
||||
return storageDriver, nil
|
||||
case "influxdb":
|
||||
var hostname string
|
||||
@ -59,10 +59,11 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) {
|
||||
*argDbPassword,
|
||||
*argDbHost,
|
||||
*argDbIsSecure,
|
||||
*argDbBufferDuration*time.Second,
|
||||
// TODO(monnand): One hour? Or user-defined?
|
||||
1*time.Hour,
|
||||
)
|
||||
storageDriver = cache.MemoryCache(*argHistoryDuration, *argHistoryDuration, storageDriver)
|
||||
storageDriver = cache.MemoryCache(int(*argDbBufferDuration), int(*argDbBufferDuration), storageDriver)
|
||||
default:
|
||||
err = fmt.Errorf("Unknown database driver: %v", *argDbDriver)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user