From 60a1b6a900e53e4bbba4df08f8910989dd056883 Mon Sep 17 00:00:00 2001 From: Nan Monnand Deng Date: Fri, 5 Sep 2014 13:27:26 -0400 Subject: [PATCH 1/9] supports backend storage in in-memory storage --- storage/memory/memory.go | 15 ++++++++++++++- storage/memory/memory_test.go | 26 +++----------------------- storagedriver.go | 22 +++++++--------------- 3 files changed, 24 insertions(+), 39 deletions(-) diff --git a/storage/memory/memory.go b/storage/memory/memory.go index d30a3df6..a8577bdb 100644 --- a/storage/memory/memory.go +++ b/storage/memory/memory.go @@ -151,6 +151,7 @@ type InMemoryStorage struct { containerStorageMap map[string]*containerStorage maxNumSamples int maxNumStats int + backend storage.StorageDriver } func (self *InMemoryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { @@ -165,6 +166,13 @@ func (self *InMemoryStorage) AddStats(ref info.ContainerReference, stats *info.C self.containerStorageMap[ref.Name] = cstore } }() + + if self.backend != nil { + // TODO(monnand): To deal with long delay write operations, we + // may want to start a pool of goroutines to do write + // operations. + self.backend.AddStats(ref, stats) + } return cstore.AddStats(stats) } @@ -231,11 +239,16 @@ func (self *InMemoryStorage) Close() error { return nil } -func New(maxNumSamples, maxNumStats int) storage.StorageDriver { +func New( + maxNumSamples, + maxNumStats int, + backend storage.StorageDriver, +) *InMemoryStorage { ret := &InMemoryStorage{ containerStorageMap: make(map[string]*containerStorage, 32), maxNumSamples: maxNumSamples, maxNumStats: maxNumStats, + backend: backend, } return ret } diff --git a/storage/memory/memory_test.go b/storage/memory/memory_test.go index e29759ab..52f09417 100644 --- a/storage/memory/memory_test.go +++ b/storage/memory/memory_test.go @@ -23,39 +23,19 @@ import ( ) type memoryTestStorageDriver struct { - base storage.StorageDriver + storage.StorageDriver } func (self *memoryTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool { return test.DefaultStatsEq(a, b) } -func (self *memoryTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { - return self.base.AddStats(ref, stats) -} - -func (self *memoryTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { - return self.base.RecentStats(containerName, numStats) -} - -func (self *memoryTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) { - return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles) -} - -func (self *memoryTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - return self.base.Samples(containerName, numSamples) -} - -func (self *memoryTestStorageDriver) Close() error { - return self.base.Close() -} - func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) { maxSize := 200 for N := 10; N < maxSize; N += 10 { testDriver := &memoryTestStorageDriver{} - testDriver.base = New(N, N) + testDriver.StorageDriver = New(N, N, nil) f(testDriver, t) } } @@ -79,7 +59,7 @@ func TestPercentilesWithoutSample(t *testing.T) { func TestPercentiles(t *testing.T) { N := 100 testDriver := &memoryTestStorageDriver{} - testDriver.base = New(N, N) + testDriver.StorageDriver = New(N, N, nil) test.StorageDriverTestPercentiles(testDriver, t) } diff --git a/storagedriver.go b/storagedriver.go index ab6e09e2..9b1fbd83 100644 --- a/storagedriver.go +++ b/storagedriver.go @@ -24,7 +24,6 @@ import ( "github.com/google/cadvisor/manager" "github.com/google/cadvisor/storage" "github.com/google/cadvisor/storage/bigquery" - "github.com/google/cadvisor/storage/cache" "github.com/google/cadvisor/storage/influxdb" "github.com/google/cadvisor/storage/memory" ) @@ -39,8 +38,9 @@ var argDbBufferDuration = flag.Duration("storage_driver_buffer_duration", 60*tim const statsRequestedByUI = 60 -func NewStorageDriver(driverName string) (storage.StorageDriver, error) { - var storageDriver storage.StorageDriver +func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) { + var storageDriver *memory.InMemoryStorage + var backendStorage storage.StorageDriver var err error // TODO(vmarmol): We shouldn't need the housekeeping interval here and it shouldn't be public. samplesToCache := int(*argDbBufferDuration / *manager.HousekeepingInterval) @@ -49,12 +49,6 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) { samplesToCache = statsRequestedByUI } switch driverName { - case "": - // empty string by default is the in memory store - fallthrough - case "memory": - storageDriver = memory.New(*argSampleSize, int(*argDbBufferDuration)) - return storageDriver, nil case "influxdb": var hostname string hostname, err = os.Hostname() @@ -62,7 +56,7 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) { return nil, err } - storageDriver, err = influxdb.New( + backendStorage, err = influxdb.New( hostname, "stats", *argDbName, @@ -74,22 +68,18 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) { // TODO(monnand): One hour? Or user-defined? 1*time.Hour, ) - glog.V(2).Infof("Caching %d recent stats in memory\n", samplesToCache) - storageDriver = cache.MemoryCache(samplesToCache, samplesToCache, storageDriver) case "bigquery": var hostname string hostname, err = os.Hostname() if err != nil { return nil, err } - storageDriver, err = bigquery.New( + backendStorage, err = bigquery.New( hostname, "cadvisor", *argDbName, 1*time.Hour, ) - glog.V(2).Infof("Caching %d recent stats in memory\n", samplesToCache) - storageDriver = cache.MemoryCache(samplesToCache, samplesToCache, storageDriver) default: err = fmt.Errorf("Unknown database driver: %v", *argDbDriver) @@ -97,5 +87,7 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) { if err != nil { return nil, err } + glog.V(2).Infof("Caching %d recent stats in memory; using %v storage driver\n", samplesToCache, driverName) + storageDriver = memory.New(samplesToCache, samplesToCache, backendStorage) return storageDriver, nil } From 9d6235f4d09bfa539bdd0a4380e93424c9a699c6 Mon Sep 17 00:00:00 2001 From: Nan Monnand Deng Date: Fri, 5 Sep 2014 15:33:25 -0400 Subject: [PATCH 2/9] remove Samples() Percentiles() from influxdb driver --- storage/influxdb/influxdb.go | 172 +++--------------------------- storage/influxdb/influxdb_test.go | 44 +------- 2 files changed, 19 insertions(+), 197 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 25f46188..c1934c77 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -16,7 +16,6 @@ package influxdb import ( "fmt" - "strings" "sync" "time" @@ -26,7 +25,6 @@ import ( type influxdbStorage struct { client *influxdb.Client - prevStats *info.ContainerStats machineName string tableName string windowLen time.Duration @@ -39,6 +37,7 @@ type influxdbStorage struct { const ( colTimestamp string = "time" + colTimestampStr string = "timestamp_str" colMachineName string = "machine" colContainerName string = "container_name" colCpuCumulativeUsage string = "cpu_cumulative_usage" @@ -46,10 +45,6 @@ const ( colMemoryUsage string = "memory_usage" // Working set size colMemoryWorkingSet string = "memory_working_set" - // Optional: sample duration. Unit: Nanosecond. - colSampleDuration string = "sample_duration" - // Optional: Instant cpu usage - colCpuInstantUsage string = "cpu_instant_usage" // Cumulative count of bytes received. colRxBytes string = "rx_bytes" // Cumulative count of receive errors encountered. @@ -65,6 +60,9 @@ func (self *influxdbStorage) containerStatsToValues( stats *info.ContainerStats, ) (columns []string, values []interface{}) { + columns = append(columns, colTimestampStr) + values = append(values, stats.Timestamp.Format(time.RFC3339Nano)) + // Timestamp columns = append(columns, colTimestamp) values = append(values, stats.Timestamp.Unix()) @@ -108,20 +106,7 @@ func (self *influxdbStorage) containerStatsToValues( values = append(values, stats.Network.TxErrors) } - sample, err := info.NewSample(self.prevStats, stats) - if err != nil || sample == nil { - return columns, values - } // DO NOT ADD ANY STATS BELOW THAT ARE NOT PART OF SAMPLING - - // Optional: sample duration. Unit: Nanosecond. - columns = append(columns, colSampleDuration) - values = append(values, sample.Duration.String()) - - // Optional: Instant cpu usage - columns = append(columns, colCpuInstantUsage) - values = append(values, sample.Cpu.Usage) - return columns, values } @@ -169,8 +154,15 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i v := values[i] switch { case col == colTimestamp: + if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() { + // now := time.Now() + // fmt.Printf("time now: %vns; %vs; infludb time: %v\n", now.UnixNano(), now.Unix(), int64(f64sec)) + stats.Timestamp = time.Unix(int64(f64sec)/1E3, int64(f64sec)%1E3*1E6) + } + case col == colTimestampStr: if str, ok := v.(string); ok { stats.Timestamp, err = time.Parse(time.RFC3339Nano, str) + fmt.Printf("timestamp: %v; str: %v\n", stats.Timestamp, str) } case col == colMachineName: if m, ok := v.(string); ok { @@ -205,48 +197,6 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i return stats, nil } -func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) { - sample := &info.ContainerStatsSample{} - var err error - for i, col := range columns { - v := values[i] - switch { - case col == colTimestamp: - if str, ok := v.(string); ok { - sample.Timestamp, err = time.Parse(time.RFC3339Nano, str) - } - case col == colMachineName: - if m, ok := v.(string); ok { - if m != self.machineName { - return nil, fmt.Errorf("different machine") - } - } else { - return nil, fmt.Errorf("machine name field is not a string: %v", v) - } - // Memory Usage - case col == colMemoryUsage: - sample.Memory.Usage, err = convertToUint64(v) - // sample duration. Unit: Nanosecond. - case col == colSampleDuration: - if v == nil { - // this record does not have sample_duration, so it's the first stats. - return nil, nil - } - sample.Duration, err = time.ParseDuration(v.(string)) - // Instant cpu usage - case col == colCpuInstantUsage: - sample.Cpu.Usage, err = convertToUint64(v) - } - if err != nil { - return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) - } - } - if sample.Duration.Nanoseconds() == 0 { - return nil, nil - } - return sample, nil -} - func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) { self.readyToFlush = readyToFlush } @@ -266,7 +216,6 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C defer self.lock.Unlock() series := self.newSeries(self.containerStatsToValues(ref, stats)) self.series = append(self.series, series) - self.prevStats = stats.Copy(self.prevStats) if self.readyToFlush() { seriesToFlush = self.series self.series = make([]*influxdb.Series, 0) @@ -303,6 +252,8 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] // so we need to go through from the last one to the first one. for i := len(series) - 1; i >= 0; i-- { s := series[i] + fmt.Printf("query=%v; len(s.Points) %+v\n", query, len(s.Points)) + for j := len(s.Points) - 1; j >= 0; j-- { values := s.Points[j] stats, err := self.valuesToContainerStats(s.Columns, values) @@ -319,35 +270,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] } func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - if numSamples == 0 { - return nil, nil - } - // TODO(dengnan): select only columns that we need - // TODO(dengnan): escape names - query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName) - if numSamples > 0 { - query = fmt.Sprintf("%v limit %v", query, numSamples) - } - series, err := self.client.Query(query) - if err != nil { - return nil, err - } - sampleList := make([]*info.ContainerStatsSample, 0, len(series)) - for i := len(series) - 1; i >= 0; i-- { - s := series[i] - for j := len(s.Points) - 1; j >= 0; j-- { - values := s.Points[j] - sample, err := self.valuesToContainerSample(s.Columns, values) - if err != nil { - return nil, err - } - if sample == nil { - continue - } - sampleList = append(sampleList, sample) - } - } - return sampleList, nil + panic("should not implement") } func (self *influxdbStorage) Close() error { @@ -360,72 +283,7 @@ func (self *influxdbStorage) Percentiles( cpuUsagePercentiles []int, memUsagePercentiles []int, ) (*info.ContainerStatsPercentiles, error) { - selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1) - - selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage)) - for _, p := range cpuUsagePercentiles { - selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colCpuInstantUsage, p)) - } - for _, p := range memUsagePercentiles { - selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colMemoryUsage, p)) - } - - query := fmt.Sprintf("select %v from %v where %v='%v' and %v='%v' and time > now() - %v", - strings.Join(selectedCol, ","), - self.tableName, - colContainerName, - containerName, - colMachineName, - self.machineName, - fmt.Sprintf("%vs", self.windowLen.Seconds()), - ) - series, err := self.client.Query(query) - if err != nil { - return nil, err - } - if len(series) != 1 { - return nil, nil - } - if len(series[0].Points) == 0 { - return nil, nil - } - - point := series[0].Points[0] - - ret := new(info.ContainerStatsPercentiles) - ret.MaxMemoryUsage, err = convertToUint64(point[1]) - if err != nil { - return nil, fmt.Errorf("invalid max memory usage: %v", err) - } - retrievedCpuPercentiles := point[2 : 2+len(cpuUsagePercentiles)] - for i, p := range cpuUsagePercentiles { - v, err := convertToUint64(retrievedCpuPercentiles[i]) - if err != nil { - return nil, fmt.Errorf("invalid cpu usage: %v", err) - } - ret.CpuUsagePercentiles = append( - ret.CpuUsagePercentiles, - info.Percentile{ - Percentage: p, - Value: v, - }, - ) - } - retrievedMemoryPercentiles := point[2+len(cpuUsagePercentiles):] - for i, p := range memUsagePercentiles { - v, err := convertToUint64(retrievedMemoryPercentiles[i]) - if err != nil { - return nil, fmt.Errorf("invalid memory usage: %v", err) - } - ret.MemoryUsagePercentiles = append( - ret.MemoryUsagePercentiles, - info.Percentile{ - Percentage: p, - Value: v, - }, - ) - } - return ret, nil + panic("should not implement") } // Returns a new influxdb series. diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 9f34d1c0..a2711d3f 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -1,4 +1,3 @@ -//+build ignore // Copyright 2014 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// +build influxdb_test +// To run unit test: go test -tags influxdb_test + package influxdb import ( @@ -125,7 +127,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu t.Fatal(err) } // delete all data by the end of the call - defer client.Query(deleteAll) + // defer client.Query(deleteAll) driver, err := New(machineName, tablename, @@ -168,18 +170,10 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu f(testDriver, t) } -func TestSampleCpuUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestSampleCpuUsage, t, kCacheDuration) -} - func TestRetrievePartialRecentStats(t *testing.T) { runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t, 20) } -func TestSamplesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverTestSamplesWithoutSample, t, kCacheDuration) -} - func TestRetrieveAllRecentStats(t *testing.T) { runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t, 10) } @@ -187,33 +181,3 @@ func TestRetrieveAllRecentStats(t *testing.T) { func TestNoRecentStats(t *testing.T) { runStorageTest(test.StorageDriverTestNoRecentStats, t, kCacheDuration) } - -func TestNoSamples(t *testing.T) { - runStorageTest(test.StorageDriverTestNoSamples, t, kCacheDuration) -} - -func TestPercentiles(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentiles, t, kCacheDuration) -} - -func TestMaxMemoryUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestMaxMemoryUsage, t, kCacheDuration) -} - -func TestPercentilesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t, kCacheDuration) -} - -func TestPercentilesWithoutStats(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t, kCacheDuration) -} - -func TestRetrieveZeroStats(t *testing.T) { - t.SkipNow() - runStorageTest(test.StorageDriverTestRetrieveZeroRecentStats, t, kCacheDuration) -} - -func TestRetrieveZeroSamples(t *testing.T) { - t.SkipNow() - runStorageTest(test.StorageDriverTestRetrieveZeroSamples, t, kCacheDuration) -} From 260625f421a35252770e3b3cb4607e8b181590bb Mon Sep 17 00:00:00 2001 From: Nan Monnand Deng Date: Fri, 5 Sep 2014 15:35:31 -0400 Subject: [PATCH 3/9] remove Samples() Percentiles() from bigquery storage driver --- storage/bigquery/bigquery.go | 90 +-------------------------- storage/cache/memcache.go | 74 ---------------------- storage/cache/memcache_test.go | 108 --------------------------------- 3 files changed, 2 insertions(+), 270 deletions(-) delete mode 100644 storage/cache/memcache.go delete mode 100644 storage/cache/memcache_test.go diff --git a/storage/bigquery/bigquery.go b/storage/bigquery/bigquery.go index 76ed8966..bafd411e 100644 --- a/storage/bigquery/bigquery.go +++ b/storage/bigquery/bigquery.go @@ -17,7 +17,6 @@ package bigquery import ( "fmt" "strconv" - "strings" "time" bigquery "code.google.com/p/google-api-go-client/bigquery/v2" @@ -437,25 +436,7 @@ func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([] } func (self *bigqueryStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - if numSamples == 0 { - return nil, nil - } - header, rows, err := self.getRecentRows(containerName, numSamples) - if err != nil { - return nil, err - } - sampleList := make([]*info.ContainerStatsSample, 0, len(rows)) - for _, row := range rows { - sample, err := self.valuesToContainerSample(header, row) - if err != nil { - return nil, err - } - if sample == nil { - continue - } - sampleList = append(sampleList, sample) - } - return sampleList, nil + panic("will be removed") } func (self *bigqueryStorage) Close() error { @@ -469,74 +450,7 @@ func (self *bigqueryStorage) Percentiles( cpuUsagePercentiles []int, memUsagePercentiles []int, ) (*info.ContainerStatsPercentiles, error) { - selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1) - - selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage)) - for _, p := range cpuUsagePercentiles { - selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colCpuInstantUsage, p)) - } - for _, p := range memUsagePercentiles { - selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colMemoryUsage, p)) - } - - tableName, err := self.client.GetTableName() - if err != nil { - return nil, err - } - query := fmt.Sprintf("SELECT %v FROM %v WHERE %v='%v' AND %v='%v' AND timestamp > DATE_ADD(CURRENT_TIMESTAMP(), -%v, 'SECOND')", - strings.Join(selectedCol, ","), - tableName, - colContainerName, - containerName, - colMachineName, - self.machineName, - self.windowLen.Seconds(), - ) - _, rows, err := self.client.Query(query) - if err != nil { - return nil, err - } - - if len(rows) != 1 { - return nil, nil - } - - point := rows[0] - - ret := new(info.ContainerStatsPercentiles) - ret.MaxMemoryUsage, err = convertToUint64(point[0]) - if err != nil { - return nil, fmt.Errorf("invalid max memory usage: %v", err) - } - retrievedCpuPercentiles := point[1 : 1+len(cpuUsagePercentiles)] - for i, p := range cpuUsagePercentiles { - v, err := convertToUint64(retrievedCpuPercentiles[i]) - if err != nil { - return nil, fmt.Errorf("invalid cpu usage: %v", err) - } - ret.CpuUsagePercentiles = append( - ret.CpuUsagePercentiles, - info.Percentile{ - Percentage: p, - Value: v, - }, - ) - } - retrievedMemoryPercentiles := point[1+len(cpuUsagePercentiles):] - for i, p := range memUsagePercentiles { - v, err := convertToUint64(retrievedMemoryPercentiles[i]) - if err != nil { - return nil, fmt.Errorf("invalid memory usage: %v", err) - } - ret.MemoryUsagePercentiles = append( - ret.MemoryUsagePercentiles, - info.Percentile{ - Percentage: p, - Value: v, - }, - ) - } - return ret, nil + panic("will be removed") } // Create a new bigquery storage driver. diff --git a/storage/cache/memcache.go b/storage/cache/memcache.go deleted file mode 100644 index a0fed663..00000000 --- a/storage/cache/memcache.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cache - -import ( - "github.com/google/cadvisor/info" - "github.com/google/cadvisor/storage" - "github.com/google/cadvisor/storage/memory" -) - -type cachedStorageDriver struct { - maxNumStatsInCache int - maxNumSamplesInCache int - cache storage.StorageDriver - backend storage.StorageDriver -} - -func (self *cachedStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { - err := self.cache.AddStats(ref, stats) - if err != nil { - return err - } - err = self.backend.AddStats(ref, stats) - if err != nil { - return err - } - return nil -} - -func (self *cachedStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { - if numStats <= self.maxNumStatsInCache { - return self.cache.RecentStats(containerName, numStats) - } - return self.backend.RecentStats(containerName, numStats) -} - -// TODO(vishh): Calculate percentiles from cached stats instead of reaching the DB. This will make the UI truly independent of the backend storage. -func (self *cachedStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) { - return self.backend.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles) -} - -func (self *cachedStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - if numSamples <= self.maxNumSamplesInCache { - return self.cache.Samples(containerName, numSamples) - } - return self.backend.Samples(containerName, numSamples) -} - -func (self *cachedStorageDriver) Close() error { - self.cache.Close() - return self.backend.Close() -} - -// TODO(vishh): Cache all samples for a given duration and do not cap the maximum number of samples. This is useful if we happen to change the housekeeping duration. -func MemoryCache(maxNumSamplesInCache, maxNumStatsInCache int, driver storage.StorageDriver) storage.StorageDriver { - return &cachedStorageDriver{ - maxNumStatsInCache: maxNumStatsInCache, - maxNumSamplesInCache: maxNumSamplesInCache, - cache: memory.New(maxNumSamplesInCache, maxNumStatsInCache), - backend: driver, - } -} diff --git a/storage/cache/memcache_test.go b/storage/cache/memcache_test.go deleted file mode 100644 index aeb3eb21..00000000 --- a/storage/cache/memcache_test.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cache - -import ( - "testing" - - "github.com/google/cadvisor/info" - "github.com/google/cadvisor/storage" - "github.com/google/cadvisor/storage/memory" - "github.com/google/cadvisor/storage/test" -) - -type cacheTestStorageDriver struct { - base storage.StorageDriver -} - -func (self *cacheTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool { - return test.DefaultStatsEq(a, b) -} - -func (self *cacheTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { - return self.base.AddStats(ref, stats) -} - -func (self *cacheTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { - return self.base.RecentStats(containerName, numStats) -} - -func (self *cacheTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) { - return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles) -} - -func (self *cacheTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - return self.base.Samples(containerName, numSamples) -} - -func (self *cacheTestStorageDriver) Close() error { - return self.base.Close() -} - -func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) { - maxSize := 200 - - for N := 10; N < maxSize; N += 10 { - testDriver := &cacheTestStorageDriver{} - backend := memory.New(N*2, N*2) - testDriver.base = MemoryCache(N, N, backend) - f(testDriver, t) - } - -} - -func TestMaxMemoryUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestMaxMemoryUsage, t) -} - -func TestSampleCpuUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestSampleCpuUsage, t) -} - -func TestSamplesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverTestSamplesWithoutSample, t) -} - -func TestPercentilesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t) -} - -func TestPercentiles(t *testing.T) { - N := 100 - testDriver := &cacheTestStorageDriver{} - backend := memory.New(N*2, N*2) - testDriver.base = MemoryCache(N, N, backend) - test.StorageDriverTestPercentiles(testDriver, t) -} - -func TestRetrievePartialRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t) -} - -func TestRetrieveAllRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t) -} - -func TestNoRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverTestNoRecentStats, t) -} - -func TestNoSamples(t *testing.T) { - runStorageTest(test.StorageDriverTestNoSamples, t) -} - -func TestPercentilesWithoutStats(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t) -} From 5bc9425c548d6a1b1b648eefa530e2e703474649 Mon Sep 17 00:00:00 2001 From: Nan Monnand Deng Date: Fri, 5 Sep 2014 17:37:55 -0400 Subject: [PATCH 4/9] panic->error --- storage/bigquery/bigquery.go | 4 ++-- storage/influxdb/influxdb.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/storage/bigquery/bigquery.go b/storage/bigquery/bigquery.go index bafd411e..168df090 100644 --- a/storage/bigquery/bigquery.go +++ b/storage/bigquery/bigquery.go @@ -436,7 +436,7 @@ func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([] } func (self *bigqueryStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - panic("will be removed") + return nil, fmt.Errorf("will be removed") } func (self *bigqueryStorage) Close() error { @@ -450,7 +450,7 @@ func (self *bigqueryStorage) Percentiles( cpuUsagePercentiles []int, memUsagePercentiles []int, ) (*info.ContainerStatsPercentiles, error) { - panic("will be removed") + return nil, fmt.Errorf("will be removed") } // Create a new bigquery storage driver. diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index c1934c77..fdf1283b 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -270,7 +270,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] } func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - panic("should not implement") + return nil, fmt.Errorf("will be removed") } func (self *influxdbStorage) Close() error { @@ -283,7 +283,7 @@ func (self *influxdbStorage) Percentiles( cpuUsagePercentiles []int, memUsagePercentiles []int, ) (*info.ContainerStatsPercentiles, error) { - panic("should not implement") + return nil, fmt.Errorf("will be removed") } // Returns a new influxdb series. From b9f68da4f1fa966960005896359e22caf061e6a7 Mon Sep 17 00:00:00 2001 From: Nan Monnand Deng Date: Fri, 5 Sep 2014 17:38:34 -0400 Subject: [PATCH 5/9] remove debug code --- storage/influxdb/influxdb.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index fdf1283b..3b09a7a8 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -106,7 +106,6 @@ func (self *influxdbStorage) containerStatsToValues( values = append(values, stats.Network.TxErrors) } - // DO NOT ADD ANY STATS BELOW THAT ARE NOT PART OF SAMPLING return columns, values } @@ -155,14 +154,11 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i switch { case col == colTimestamp: if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() { - // now := time.Now() - // fmt.Printf("time now: %vns; %vs; infludb time: %v\n", now.UnixNano(), now.Unix(), int64(f64sec)) stats.Timestamp = time.Unix(int64(f64sec)/1E3, int64(f64sec)%1E3*1E6) } case col == colTimestampStr: if str, ok := v.(string); ok { stats.Timestamp, err = time.Parse(time.RFC3339Nano, str) - fmt.Printf("timestamp: %v; str: %v\n", stats.Timestamp, str) } case col == colMachineName: if m, ok := v.(string); ok { @@ -252,7 +248,6 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] // so we need to go through from the last one to the first one. for i := len(series) - 1; i >= 0; i-- { s := series[i] - fmt.Printf("query=%v; len(s.Points) %+v\n", query, len(s.Points)) for j := len(s.Points) - 1; j >= 0; j-- { values := s.Points[j] From f3f098f3d23fc50536aa7e52f476f387f25799f2 Mon Sep 17 00:00:00 2001 From: Nan Monnand Deng Date: Fri, 5 Sep 2014 17:39:34 -0400 Subject: [PATCH 6/9] delete data from influxdb after unit test --- storage/influxdb/influxdb_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index a2711d3f..9f93a921 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -127,7 +127,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu t.Fatal(err) } // delete all data by the end of the call - // defer client.Query(deleteAll) + defer client.Query(deleteAll) driver, err := New(machineName, tablename, From f32457747db26732c5983422ea1762788af1ba66 Mon Sep 17 00:00:00 2001 From: Nan Monnand Deng Date: Fri, 5 Sep 2014 17:42:58 -0400 Subject: [PATCH 7/9] glog.Infof() --- storagedriver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storagedriver.go b/storagedriver.go index 9b1fbd83..0009e96a 100644 --- a/storagedriver.go +++ b/storagedriver.go @@ -87,7 +87,7 @@ func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) { if err != nil { return nil, err } - glog.V(2).Infof("Caching %d recent stats in memory; using %v storage driver\n", samplesToCache, driverName) + glog.Infof("Caching %d recent stats in memory; using %v storage driver\n", samplesToCache, driverName) storageDriver = memory.New(samplesToCache, samplesToCache, backendStorage) return storageDriver, nil } From 719b9ead2091d4737d940f154b06f4c2213f0eb7 Mon Sep 17 00:00:00 2001 From: Nan Monnand Deng Date: Fri, 5 Sep 2014 18:14:02 -0400 Subject: [PATCH 8/9] remove string rep of time --- storage/influxdb/influxdb.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 3b09a7a8..82e1bb71 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -37,7 +37,6 @@ type influxdbStorage struct { const ( colTimestamp string = "time" - colTimestampStr string = "timestamp_str" colMachineName string = "machine" colContainerName string = "container_name" colCpuCumulativeUsage string = "cpu_cumulative_usage" @@ -60,12 +59,9 @@ func (self *influxdbStorage) containerStatsToValues( stats *info.ContainerStats, ) (columns []string, values []interface{}) { - columns = append(columns, colTimestampStr) - values = append(values, stats.Timestamp.Format(time.RFC3339Nano)) - // Timestamp columns = append(columns, colTimestamp) - values = append(values, stats.Timestamp.Unix()) + values = append(values, stats.Timestamp.UnixNano()/1E3) // Machine name columns = append(columns, colMachineName) @@ -156,10 +152,6 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() { stats.Timestamp = time.Unix(int64(f64sec)/1E3, int64(f64sec)%1E3*1E6) } - case col == colTimestampStr: - if str, ok := v.(string); ok { - stats.Timestamp, err = time.Parse(time.RFC3339Nano, str) - } case col == colMachineName: if m, ok := v.(string); ok { if m != self.machineName { @@ -219,7 +211,7 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C } }() if len(seriesToFlush) > 0 { - err := self.client.WriteSeriesWithTimePrecision(seriesToFlush, influxdb.Second) + err := self.client.WriteSeriesWithTimePrecision(seriesToFlush, influxdb.Microsecond) if err != nil { return fmt.Errorf("failed to write stats to influxDb - %s", err) } From 74abe4c152e74d172ccf51094798361100864d67 Mon Sep 17 00:00:00 2001 From: Nan Monnand Deng Date: Sat, 6 Sep 2014 15:03:12 -0400 Subject: [PATCH 9/9] parenthesis. --- storage/influxdb/influxdb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 82e1bb71..f7731896 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -150,7 +150,7 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i switch { case col == colTimestamp: if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() { - stats.Timestamp = time.Unix(int64(f64sec)/1E3, int64(f64sec)%1E3*1E6) + stats.Timestamp = time.Unix(int64(f64sec)/1E3, (int64(f64sec)%1E3)*1E6) } case col == colMachineName: if m, ok := v.(string); ok {