From 9b5b221a57f643ce7a640090794c1f5ddccc77d0 Mon Sep 17 00:00:00 2001 From: Vishnu Kannan Date: Tue, 19 Aug 2014 00:40:00 +0000 Subject: [PATCH 1/2] Buffer stats in influxdb driver before writing to the Db. The buffering duration is configurable via a flag and is set to 60 seconds by default. The storage driver test code has been refactored a bit to help test the new buffering feature and will be future proof. Docker-DCO-1.1-Signed-off-by: Vishnu Kannan (github: vishh) --- storage/cache/memcache.go | 1 + storage/cache/memcache_test.go | 40 +++++++++++++--- storage/influxdb/influxdb.go | 76 ++++++++++++++++++++++-------- storage/influxdb/influxdb_test.go | 77 +++++++++++++++++++++++------- storage/memory/memory_test.go | 41 ++++++++++++---- storage/test/storagetests.go | 78 +++++++++++++++---------------- storagedriver.go | 7 +-- 7 files changed, 227 insertions(+), 93 deletions(-) diff --git a/storage/cache/memcache.go b/storage/cache/memcache.go index e323f866..e3848984 100644 --- a/storage/cache/memcache.go +++ b/storage/cache/memcache.go @@ -62,6 +62,7 @@ func (self *cachedStorageDriver) Close() error { return self.backend.Close() } +// TODO(vishh): Cache all samples for a given duration and do not cap the maximum number of samples. This is useful if we happen to change the housekeeping duration. func MemoryCache(maxNumSamplesInCache, maxNumStatsInCache int, driver storage.StorageDriver) storage.StorageDriver { return &cachedStorageDriver{ maxNumStatsInCache: maxNumStatsInCache, diff --git a/storage/cache/memcache_test.go b/storage/cache/memcache_test.go index 4ff154a4..aeb3eb21 100644 --- a/storage/cache/memcache_test.go +++ b/storage/cache/memcache_test.go @@ -17,21 +17,47 @@ package cache import ( "testing" + "github.com/google/cadvisor/info" "github.com/google/cadvisor/storage" "github.com/google/cadvisor/storage/memory" "github.com/google/cadvisor/storage/test" ) +type cacheTestStorageDriver struct { + base storage.StorageDriver +} + +func (self *cacheTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool { + return test.DefaultStatsEq(a, b) +} + +func (self *cacheTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { + return self.base.AddStats(ref, stats) +} + +func (self *cacheTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { + return self.base.RecentStats(containerName, numStats) +} + +func (self *cacheTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) { + return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles) +} + +func (self *cacheTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { + return self.base.Samples(containerName, numSamples) +} + +func (self *cacheTestStorageDriver) Close() error { + return self.base.Close() +} + func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) { maxSize := 200 - var driver storage.StorageDriver - var testDriver test.TestStorageDriver - testDriver.StatsEq = test.DefaultStatsEq for N := 10; N < maxSize; N += 10 { + testDriver := &cacheTestStorageDriver{} backend := memory.New(N*2, N*2) - driver = MemoryCache(N, N, backend) - testDriver.Driver = driver + testDriver.base = MemoryCache(N, N, backend) f(testDriver, t) } @@ -55,9 +81,9 @@ func TestPercentilesWithoutSample(t *testing.T) { func TestPercentiles(t *testing.T) { N := 100 + testDriver := &cacheTestStorageDriver{} backend := memory.New(N*2, N*2) - driver := MemoryCache(N, N, backend) - testDriver := test.TestStorageDriver{Driver: driver, StatsEq: test.DefaultStatsEq} + testDriver.base = MemoryCache(N, N, backend) test.StorageDriverTestPercentiles(testDriver, t) } diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index d1905039..6585e00f 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -17,19 +17,24 @@ package influxdb import ( "fmt" "strings" + "sync" "time" "github.com/google/cadvisor/info" - "github.com/google/cadvisor/storage" influxdb "github.com/influxdb/influxdb/client" ) type influxdbStorage struct { - client *influxdb.Client - prevStats *info.ContainerStats - machineName string - tableName string - windowLen time.Duration + client *influxdb.Client + prevStats *info.ContainerStats + machineName string + tableName string + windowLen time.Duration + bufferDuration time.Duration + lastWrite time.Time + series []*influxdb.Series + lock sync.Mutex + readyToFlush func() bool } const ( @@ -242,20 +247,34 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values [] return sample, nil } +func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) { + self.readyToFlush = readyToFlush +} + +func (self *influxdbStorage) defaultReadyToFlush() bool { + return time.Since(self.lastWrite) >= self.bufferDuration +} + func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { - series := &influxdb.Series{ - Name: self.tableName, - // There's only one point for each stats - Points: make([][]interface{}, 1), - } if stats == nil || stats.Cpu == nil || stats.Memory == nil { return nil } - series.Columns, series.Points[0] = self.containerStatsToValues(ref, stats) + // AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write. + self.lock.Lock() + series := self.newSeries(self.containerStatsToValues(ref, stats)) + self.series = append(self.series, series) self.prevStats = stats.Copy(self.prevStats) - err := self.client.WriteSeries([]*influxdb.Series{series}) - if err != nil { - return err + if self.readyToFlush() { + series := self.series + self.series = make([]*influxdb.Series, 0) + self.lastWrite = time.Now() + self.lock.Unlock() + err := self.client.WriteSeries(series) + if err != nil { + return fmt.Errorf("failed to write stats to influxDb - %s", err) + } + } else { + self.lock.Unlock() } return nil } @@ -405,6 +424,18 @@ func (self *influxdbStorage) Percentiles( return ret, nil } +// Returns a new influxdb series. +func (self *influxdbStorage) newSeries(columns []string, points []interface{}) *influxdb.Series { + out := &influxdb.Series{ + Name: self.tableName, + Columns: columns, + // There's only one point for each stats + Points: make([][]interface{}, 1), + } + out.Points[0] = points + return out +} + // machineName: A unique identifier to identify the host that current cAdvisor // instance is running on. // influxdbHost: The host which runs influxdb. @@ -416,8 +447,9 @@ func New(machineName, password, influxdbHost string, isSecure bool, + bufferDuration time.Duration, percentilesDuration time.Duration, -) (storage.StorageDriver, error) { +) (*influxdbStorage, error) { config := &influxdb.ClientConfig{ Host: influxdbHost, Username: username, @@ -436,10 +468,14 @@ func New(machineName, } ret := &influxdbStorage{ - client: client, - windowLen: percentilesDuration, - machineName: machineName, - tableName: tablename, + client: client, + windowLen: percentilesDuration, + machineName: machineName, + tableName: tablename, + bufferDuration: bufferDuration, + lastWrite: time.Now(), + series: make([]*influxdb.Series, 0), } + ret.readyToFlush = ret.defaultReadyToFlush return ret, nil } diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index c68e23fa..ba9ec1b6 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -21,11 +21,49 @@ import ( "time" "github.com/google/cadvisor/info" + "github.com/google/cadvisor/storage" "github.com/google/cadvisor/storage/test" influxdb "github.com/influxdb/influxdb/client" ) -func StatsEq(a, b *info.ContainerStats) bool { +// The duration in seconds for which stats will be buffered in the influxdb driver. +const kCacheDuration = 1 + +type influxDbTestStorageDriver struct { + count int + buffer int + base storage.StorageDriver +} + +func (self *influxDbTestStorageDriver) readyToFlush() bool { + if self.count >= self.buffer { + return true + } + return false +} + +func (self *influxDbTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { + self.count++ + return self.base.AddStats(ref, stats) +} + +func (self *influxDbTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { + return self.base.RecentStats(containerName, numStats) +} + +func (self *influxDbTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) { + return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles) +} + +func (self *influxDbTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { + return self.base.Samples(containerName, numSamples) +} + +func (self *influxDbTestStorageDriver) Close() error { + return self.base.Close() +} + +func (self *influxDbTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool { if !test.TimeEq(a.Timestamp, b.Timestamp, 10*time.Millisecond) { return false } @@ -48,7 +86,7 @@ func StatsEq(a, b *info.ContainerStats) bool { return true } -func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) { +func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bufferCount int) { machineName := "machineA" tablename := "t" database := "cadvisor" @@ -95,12 +133,15 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) { password, hostname, false, + time.Duration(bufferCount), percentilesDuration) if err != nil { t.Fatal(err) } + testDriver := &influxDbTestStorageDriver{buffer: bufferCount} + driver.OverrideReadyToFlush(testDriver.readyToFlush) + testDriver.base = driver - testDriver := test.TestStorageDriver{Driver: driver, StatsEq: StatsEq} // generate another container's data on same machine. test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100, testDriver, t) @@ -112,62 +153,66 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) { password, hostname, false, + time.Duration(bufferCount), percentilesDuration) if err != nil { t.Fatal(err) } defer driverForAnotherMachine.Close() - testDriverOtherMachine := test.TestStorageDriver{Driver: driverForAnotherMachine, StatsEq: StatsEq} + testDriverOtherMachine := &influxDbTestStorageDriver{buffer: bufferCount} + driverForAnotherMachine.OverrideReadyToFlush(testDriverOtherMachine.readyToFlush) + testDriverOtherMachine.base = driverForAnotherMachine + test.StorageDriverFillRandomStatsFunc("containerOnAnotherMachine", 100, testDriverOtherMachine, t) f(testDriver, t) } func TestSampleCpuUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestSampleCpuUsage, t) + runStorageTest(test.StorageDriverTestSampleCpuUsage, t, kCacheDuration) } func TestRetrievePartialRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t) + runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t, 20) } func TestSamplesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverTestSamplesWithoutSample, t) + runStorageTest(test.StorageDriverTestSamplesWithoutSample, t, kCacheDuration) } func TestRetrieveAllRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t) + runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t, 10) } func TestNoRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverTestNoRecentStats, t) + runStorageTest(test.StorageDriverTestNoRecentStats, t, kCacheDuration) } func TestNoSamples(t *testing.T) { - runStorageTest(test.StorageDriverTestNoSamples, t) + runStorageTest(test.StorageDriverTestNoSamples, t, kCacheDuration) } func TestPercentiles(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentiles, t) + runStorageTest(test.StorageDriverTestPercentiles, t, kCacheDuration) } func TestMaxMemoryUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestMaxMemoryUsage, t) + runStorageTest(test.StorageDriverTestMaxMemoryUsage, t, kCacheDuration) } func TestPercentilesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t) + runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t, kCacheDuration) } func TestPercentilesWithoutStats(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t) + runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t, kCacheDuration) } func TestRetrieveZeroStats(t *testing.T) { t.SkipNow() - runStorageTest(test.StorageDriverTestRetrieveZeroRecentStats, t) + runStorageTest(test.StorageDriverTestRetrieveZeroRecentStats, t, kCacheDuration) } func TestRetrieveZeroSamples(t *testing.T) { t.SkipNow() - runStorageTest(test.StorageDriverTestRetrieveZeroSamples, t) + runStorageTest(test.StorageDriverTestRetrieveZeroSamples, t, kCacheDuration) } diff --git a/storage/memory/memory_test.go b/storage/memory/memory_test.go index 7d44957d..e29759ab 100644 --- a/storage/memory/memory_test.go +++ b/storage/memory/memory_test.go @@ -17,22 +17,47 @@ package memory import ( "testing" + "github.com/google/cadvisor/info" "github.com/google/cadvisor/storage" "github.com/google/cadvisor/storage/test" ) +type memoryTestStorageDriver struct { + base storage.StorageDriver +} + +func (self *memoryTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool { + return test.DefaultStatsEq(a, b) +} + +func (self *memoryTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { + return self.base.AddStats(ref, stats) +} + +func (self *memoryTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { + return self.base.RecentStats(containerName, numStats) +} + +func (self *memoryTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) { + return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles) +} + +func (self *memoryTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { + return self.base.Samples(containerName, numSamples) +} + +func (self *memoryTestStorageDriver) Close() error { + return self.base.Close() +} + func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) { maxSize := 200 - var driver storage.StorageDriver - var testDriver test.TestStorageDriver - testDriver.StatsEq = test.DefaultStatsEq for N := 10; N < maxSize; N += 10 { - driver = New(N, N) - testDriver.Driver = driver + testDriver := &memoryTestStorageDriver{} + testDriver.base = New(N, N) f(testDriver, t) } - } func TestMaxMemoryUsage(t *testing.T) { @@ -53,8 +78,8 @@ func TestPercentilesWithoutSample(t *testing.T) { func TestPercentiles(t *testing.T) { N := 100 - driver := New(N, N) - testDriver := test.TestStorageDriver{Driver: driver, StatsEq: test.DefaultStatsEq} + testDriver := &memoryTestStorageDriver{} + testDriver.base = New(N, N) test.StorageDriverTestPercentiles(testDriver, t) } diff --git a/storage/test/storagetests.go b/storage/test/storagetests.go index 0136e4d9..6f6c761f 100644 --- a/storage/test/storagetests.go +++ b/storage/test/storagetests.go @@ -24,9 +24,9 @@ import ( "github.com/google/cadvisor/storage" ) -type TestStorageDriver struct { - Driver storage.StorageDriver - StatsEq func(a *info.ContainerStats, b *info.ContainerStats) bool +type TestStorageDriver interface { + StatsEq(a *info.ContainerStats, b *info.ContainerStats) bool + storage.StorageDriver } func buildTrace(cpu, mem []uint64, duration time.Duration) []*info.ContainerStats { @@ -183,7 +183,7 @@ func StorageDriverFillRandomStatsFunc( trace := buildTrace(cpuTrace, memTrace, samplePeriod) for _, stats := range trace { - err := driver.Driver.AddStats(ref, stats) + err := driver.AddStats(ref, stats) if err != nil { t.Fatalf("unable to add stats: %v", err) } @@ -191,7 +191,7 @@ func StorageDriverFillRandomStatsFunc( } func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() N := 100 cpuTrace := make([]uint64, 0, N) memTrace := make([]uint64, 0, N) @@ -211,7 +211,7 @@ func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) { trace := buildTrace(cpuTrace, memTrace, samplePeriod) for _, stats := range trace { - err := driver.Driver.AddStats(ref, stats) + err := driver.AddStats(ref, stats) if err != nil { t.Fatalf("unable to add stats: %v", err) } @@ -222,7 +222,7 @@ func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) { stats.Cpu.Usage.User = 0 } - samples, err := driver.Driver.Samples(ref.Name, N) + samples, err := driver.Samples(ref.Name, N) if err != nil { t.Errorf("unable to sample stats: %v", err) } @@ -231,13 +231,13 @@ func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) { } samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t) - samples, err = driver.Driver.Samples(ref.Name, -1) + samples, err = driver.Samples(ref.Name, -1) if err != nil { t.Errorf("unable to sample stats: %v", err) } samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t) - samples, err = driver.Driver.Samples(ref.Name, N-5) + samples, err = driver.Samples(ref.Name, N-5) if err != nil { t.Errorf("unable to sample stats: %v", err) } @@ -245,7 +245,7 @@ func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) { } func StorageDriverTestMaxMemoryUsage(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() N := 100 memTrace := make([]uint64, N) cpuTrace := make([]uint64, N) @@ -261,7 +261,7 @@ func StorageDriverTestMaxMemoryUsage(driver TestStorageDriver, t *testing.T) { trace := buildTrace(cpuTrace, memTrace, 1*time.Second) for _, stats := range trace { - err := driver.Driver.AddStats(ref, stats) + err := driver.AddStats(ref, stats) if err != nil { t.Fatalf("unable to add stats: %v", err) } @@ -273,7 +273,7 @@ func StorageDriverTestMaxMemoryUsage(driver TestStorageDriver, t *testing.T) { stats.Memory.Usage = 0 } - percentiles, err := driver.Driver.Percentiles(ref.Name, []int{50}, []int{50}) + percentiles, err := driver.Percentiles(ref.Name, []int{50}, []int{50}) if err != nil { t.Errorf("unable to call Percentiles(): %v", err) } @@ -284,7 +284,7 @@ func StorageDriverTestMaxMemoryUsage(driver TestStorageDriver, t *testing.T) { } func StorageDriverTestSamplesWithoutSample(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() trace := buildTrace( []uint64{10}, []uint64{10}, @@ -292,8 +292,8 @@ func StorageDriverTestSamplesWithoutSample(driver TestStorageDriver, t *testing. ref := info.ContainerReference{ Name: "container", } - driver.Driver.AddStats(ref, trace[0]) - samples, err := driver.Driver.Samples(ref.Name, -1) + driver.AddStats(ref, trace[0]) + samples, err := driver.Samples(ref.Name, -1) if err != nil { t.Fatal(err) } @@ -303,7 +303,7 @@ func StorageDriverTestSamplesWithoutSample(driver TestStorageDriver, t *testing. } func StorageDriverTestPercentilesWithoutSample(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() trace := buildTrace( []uint64{10}, []uint64{10}, @@ -311,8 +311,8 @@ func StorageDriverTestPercentilesWithoutSample(driver TestStorageDriver, t *test ref := info.ContainerReference{ Name: "container", } - driver.Driver.AddStats(ref, trace[0]) - percentiles, err := driver.Driver.Percentiles( + driver.AddStats(ref, trace[0]) + percentiles, err := driver.Percentiles( ref.Name, []int{50}, []int{50}, @@ -326,7 +326,7 @@ func StorageDriverTestPercentilesWithoutSample(driver TestStorageDriver, t *test } func StorageDriverTestPercentiles(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() N := 100 cpuTrace := make([]uint64, N) memTrace := make([]uint64, N) @@ -341,14 +341,14 @@ func StorageDriverTestPercentiles(driver TestStorageDriver, t *testing.T) { Name: "container", } for _, stats := range trace { - driver.Driver.AddStats(ref, stats) + driver.AddStats(ref, stats) } percentages := []int{ 80, 90, 50, } - percentiles, err := driver.Driver.Percentiles(ref.Name, percentages, percentages) + percentiles, err := driver.Percentiles(ref.Name, percentages, percentages) if err != nil { t.Fatal(err) } @@ -372,7 +372,7 @@ func StorageDriverTestPercentiles(driver TestStorageDriver, t *testing.T) { } func StorageDriverTestRetrievePartialRecentStats(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() N := 100 memTrace := make([]uint64, N) cpuTrace := make([]uint64, N) @@ -388,10 +388,10 @@ func StorageDriverTestRetrievePartialRecentStats(driver TestStorageDriver, t *te trace := buildTrace(cpuTrace, memTrace, 1*time.Second) for _, stats := range trace { - driver.Driver.AddStats(ref, stats) + driver.AddStats(ref, stats) } - recentStats, err := driver.Driver.RecentStats(ref.Name, 10) + recentStats, err := driver.RecentStats(ref.Name, 10) if err != nil { t.Fatal(err) } @@ -415,7 +415,7 @@ func StorageDriverTestRetrievePartialRecentStats(driver TestStorageDriver, t *te } func StorageDriverTestRetrieveAllRecentStats(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() N := 100 memTrace := make([]uint64, N) cpuTrace := make([]uint64, N) @@ -431,10 +431,10 @@ func StorageDriverTestRetrieveAllRecentStats(driver TestStorageDriver, t *testin trace := buildTrace(cpuTrace, memTrace, 1*time.Second) for _, stats := range trace { - driver.Driver.AddStats(ref, stats) + driver.AddStats(ref, stats) } - recentStats, err := driver.Driver.RecentStats(ref.Name, -1) + recentStats, err := driver.RecentStats(ref.Name, -1) if err != nil { t.Fatal(err) } @@ -457,27 +457,27 @@ func StorageDriverTestRetrieveAllRecentStats(driver TestStorageDriver, t *testin } func StorageDriverTestNoRecentStats(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() nonExistContainer := "somerandomecontainer" - stats, _ := driver.Driver.RecentStats(nonExistContainer, -1) + stats, _ := driver.RecentStats(nonExistContainer, -1) if len(stats) > 0 { t.Errorf("RecentStats() returns %v stats on non exist container", len(stats)) } } func StorageDriverTestNoSamples(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() nonExistContainer := "somerandomecontainer" - samples, _ := driver.Driver.Samples(nonExistContainer, -1) + samples, _ := driver.Samples(nonExistContainer, -1) if len(samples) > 0 { t.Errorf("Samples() returns %v samples on non exist container", len(samples)) } } func StorageDriverTestPercentilesWithoutStats(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() nonExistContainer := "somerandomecontainer" - percentiles, _ := driver.Driver.Percentiles(nonExistContainer, []int{50, 80}, []int{50, 80}) + percentiles, _ := driver.Percentiles(nonExistContainer, []int{50, 80}, []int{50, 80}) if percentiles == nil { return } @@ -497,7 +497,7 @@ func StorageDriverTestPercentilesWithoutStats(driver TestStorageDriver, t *testi } func StorageDriverTestRetrieveZeroRecentStats(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() N := 100 memTrace := make([]uint64, N) cpuTrace := make([]uint64, N) @@ -513,10 +513,10 @@ func StorageDriverTestRetrieveZeroRecentStats(driver TestStorageDriver, t *testi trace := buildTrace(cpuTrace, memTrace, 1*time.Second) for _, stats := range trace { - driver.Driver.AddStats(ref, stats) + driver.AddStats(ref, stats) } - recentStats, err := driver.Driver.RecentStats(ref.Name, 0) + recentStats, err := driver.RecentStats(ref.Name, 0) if err != nil { t.Fatal(err) } @@ -526,7 +526,7 @@ func StorageDriverTestRetrieveZeroRecentStats(driver TestStorageDriver, t *testi } func StorageDriverTestRetrieveZeroSamples(driver TestStorageDriver, t *testing.T) { - defer driver.Driver.Close() + defer driver.Close() N := 100 memTrace := make([]uint64, N) cpuTrace := make([]uint64, N) @@ -542,10 +542,10 @@ func StorageDriverTestRetrieveZeroSamples(driver TestStorageDriver, t *testing.T trace := buildTrace(cpuTrace, memTrace, 1*time.Second) for _, stats := range trace { - driver.Driver.AddStats(ref, stats) + driver.AddStats(ref, stats) } - samples, err := driver.Driver.Samples(ref.Name, 0) + samples, err := driver.Samples(ref.Name, 0) if err != nil { t.Fatal(err) } diff --git a/storagedriver.go b/storagedriver.go index 655ff547..d9dd9411 100644 --- a/storagedriver.go +++ b/storagedriver.go @@ -27,12 +27,12 @@ import ( ) var argSampleSize = flag.Int("samples", 1024, "number of samples we want to keep") -var argHistoryDuration = flag.Int("history_duration", 60, "number of seconds of container history to keep") var argDbUsername = flag.String("storage_driver_user", "root", "database username") var argDbPassword = flag.String("storage_driver_password", "root", "database password") var argDbHost = flag.String("storage_driver_host", "localhost:8086", "database host:port") var argDbName = flag.String("storage_driver_db", "cadvisor", "database name") var argDbIsSecure = flag.Bool("storage_driver_secure", false, "use secure connection with database") +var argDbBufferDuration = flag.Duration("storage_driver_buffer_duration", 60, "Writes in the storage driver will be bufferd for this duration (in seconds), and committed to the non memory backends as a single transaction") func NewStorageDriver(driverName string) (storage.StorageDriver, error) { var storageDriver storage.StorageDriver @@ -42,7 +42,7 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) { // empty string by default is the in memory store fallthrough case "memory": - storageDriver = memory.New(*argSampleSize, *argHistoryDuration) + storageDriver = memory.New(*argSampleSize, int(*argDbBufferDuration)) return storageDriver, nil case "influxdb": var hostname string @@ -59,10 +59,11 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) { *argDbPassword, *argDbHost, *argDbIsSecure, + *argDbBufferDuration*time.Second, // TODO(monnand): One hour? Or user-defined? 1*time.Hour, ) - storageDriver = cache.MemoryCache(*argHistoryDuration, *argHistoryDuration, storageDriver) + storageDriver = cache.MemoryCache(int(*argDbBufferDuration), int(*argDbBufferDuration), storageDriver) default: err = fmt.Errorf("Unknown database driver: %v", *argDbDriver) } From 45c9fb80e1d50bb3ee676b4623a7d1bee69caef3 Mon Sep 17 00:00:00 2001 From: Vishnu Kannan Date: Wed, 20 Aug 2014 04:31:13 +0000 Subject: [PATCH 2/2] Simplify lock handling in influxdb driver. Docker-DCO-1.1-Signed-off-by: Vishnu Kannan (github: vishh) --- storage/influxdb/influxdb.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 6585e00f..a1e9fe3f 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -260,22 +260,26 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C return nil } // AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write. - self.lock.Lock() - series := self.newSeries(self.containerStatsToValues(ref, stats)) - self.series = append(self.series, series) - self.prevStats = stats.Copy(self.prevStats) - if self.readyToFlush() { - series := self.series - self.series = make([]*influxdb.Series, 0) - self.lastWrite = time.Now() - self.lock.Unlock() - err := self.client.WriteSeries(series) + var seriesToFlush []*influxdb.Series + func() { + self.lock.Lock() + defer self.lock.Unlock() + series := self.newSeries(self.containerStatsToValues(ref, stats)) + self.series = append(self.series, series) + self.prevStats = stats.Copy(self.prevStats) + if self.readyToFlush() { + seriesToFlush = self.series + self.series = make([]*influxdb.Series, 0) + self.lastWrite = time.Now() + } + }() + if len(seriesToFlush) > 0 { + err := self.client.WriteSeries(seriesToFlush) if err != nil { return fmt.Errorf("failed to write stats to influxDb - %s", err) } - } else { - self.lock.Unlock() } + return nil }