diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 25f46188..c1934c77 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -16,7 +16,6 @@ package influxdb import ( "fmt" - "strings" "sync" "time" @@ -26,7 +25,6 @@ import ( type influxdbStorage struct { client *influxdb.Client - prevStats *info.ContainerStats machineName string tableName string windowLen time.Duration @@ -39,6 +37,7 @@ type influxdbStorage struct { const ( colTimestamp string = "time" + colTimestampStr string = "timestamp_str" colMachineName string = "machine" colContainerName string = "container_name" colCpuCumulativeUsage string = "cpu_cumulative_usage" @@ -46,10 +45,6 @@ const ( colMemoryUsage string = "memory_usage" // Working set size colMemoryWorkingSet string = "memory_working_set" - // Optional: sample duration. Unit: Nanosecond. - colSampleDuration string = "sample_duration" - // Optional: Instant cpu usage - colCpuInstantUsage string = "cpu_instant_usage" // Cumulative count of bytes received. colRxBytes string = "rx_bytes" // Cumulative count of receive errors encountered. @@ -65,6 +60,9 @@ func (self *influxdbStorage) containerStatsToValues( stats *info.ContainerStats, ) (columns []string, values []interface{}) { + columns = append(columns, colTimestampStr) + values = append(values, stats.Timestamp.Format(time.RFC3339Nano)) + // Timestamp columns = append(columns, colTimestamp) values = append(values, stats.Timestamp.Unix()) @@ -108,20 +106,7 @@ func (self *influxdbStorage) containerStatsToValues( values = append(values, stats.Network.TxErrors) } - sample, err := info.NewSample(self.prevStats, stats) - if err != nil || sample == nil { - return columns, values - } // DO NOT ADD ANY STATS BELOW THAT ARE NOT PART OF SAMPLING - - // Optional: sample duration. Unit: Nanosecond. - columns = append(columns, colSampleDuration) - values = append(values, sample.Duration.String()) - - // Optional: Instant cpu usage - columns = append(columns, colCpuInstantUsage) - values = append(values, sample.Cpu.Usage) - return columns, values } @@ -169,8 +154,15 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i v := values[i] switch { case col == colTimestamp: + if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() { + // now := time.Now() + // fmt.Printf("time now: %vns; %vs; infludb time: %v\n", now.UnixNano(), now.Unix(), int64(f64sec)) + stats.Timestamp = time.Unix(int64(f64sec)/1E3, int64(f64sec)%1E3*1E6) + } + case col == colTimestampStr: if str, ok := v.(string); ok { stats.Timestamp, err = time.Parse(time.RFC3339Nano, str) + fmt.Printf("timestamp: %v; str: %v\n", stats.Timestamp, str) } case col == colMachineName: if m, ok := v.(string); ok { @@ -205,48 +197,6 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i return stats, nil } -func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) { - sample := &info.ContainerStatsSample{} - var err error - for i, col := range columns { - v := values[i] - switch { - case col == colTimestamp: - if str, ok := v.(string); ok { - sample.Timestamp, err = time.Parse(time.RFC3339Nano, str) - } - case col == colMachineName: - if m, ok := v.(string); ok { - if m != self.machineName { - return nil, fmt.Errorf("different machine") - } - } else { - return nil, fmt.Errorf("machine name field is not a string: %v", v) - } - // Memory Usage - case col == colMemoryUsage: - sample.Memory.Usage, err = convertToUint64(v) - // sample duration. Unit: Nanosecond. - case col == colSampleDuration: - if v == nil { - // this record does not have sample_duration, so it's the first stats. - return nil, nil - } - sample.Duration, err = time.ParseDuration(v.(string)) - // Instant cpu usage - case col == colCpuInstantUsage: - sample.Cpu.Usage, err = convertToUint64(v) - } - if err != nil { - return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) - } - } - if sample.Duration.Nanoseconds() == 0 { - return nil, nil - } - return sample, nil -} - func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) { self.readyToFlush = readyToFlush } @@ -266,7 +216,6 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C defer self.lock.Unlock() series := self.newSeries(self.containerStatsToValues(ref, stats)) self.series = append(self.series, series) - self.prevStats = stats.Copy(self.prevStats) if self.readyToFlush() { seriesToFlush = self.series self.series = make([]*influxdb.Series, 0) @@ -303,6 +252,8 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] // so we need to go through from the last one to the first one. for i := len(series) - 1; i >= 0; i-- { s := series[i] + fmt.Printf("query=%v; len(s.Points) %+v\n", query, len(s.Points)) + for j := len(s.Points) - 1; j >= 0; j-- { values := s.Points[j] stats, err := self.valuesToContainerStats(s.Columns, values) @@ -319,35 +270,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] } func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - if numSamples == 0 { - return nil, nil - } - // TODO(dengnan): select only columns that we need - // TODO(dengnan): escape names - query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName) - if numSamples > 0 { - query = fmt.Sprintf("%v limit %v", query, numSamples) - } - series, err := self.client.Query(query) - if err != nil { - return nil, err - } - sampleList := make([]*info.ContainerStatsSample, 0, len(series)) - for i := len(series) - 1; i >= 0; i-- { - s := series[i] - for j := len(s.Points) - 1; j >= 0; j-- { - values := s.Points[j] - sample, err := self.valuesToContainerSample(s.Columns, values) - if err != nil { - return nil, err - } - if sample == nil { - continue - } - sampleList = append(sampleList, sample) - } - } - return sampleList, nil + panic("should not implement") } func (self *influxdbStorage) Close() error { @@ -360,72 +283,7 @@ func (self *influxdbStorage) Percentiles( cpuUsagePercentiles []int, memUsagePercentiles []int, ) (*info.ContainerStatsPercentiles, error) { - selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1) - - selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage)) - for _, p := range cpuUsagePercentiles { - selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colCpuInstantUsage, p)) - } - for _, p := range memUsagePercentiles { - selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colMemoryUsage, p)) - } - - query := fmt.Sprintf("select %v from %v where %v='%v' and %v='%v' and time > now() - %v", - strings.Join(selectedCol, ","), - self.tableName, - colContainerName, - containerName, - colMachineName, - self.machineName, - fmt.Sprintf("%vs", self.windowLen.Seconds()), - ) - series, err := self.client.Query(query) - if err != nil { - return nil, err - } - if len(series) != 1 { - return nil, nil - } - if len(series[0].Points) == 0 { - return nil, nil - } - - point := series[0].Points[0] - - ret := new(info.ContainerStatsPercentiles) - ret.MaxMemoryUsage, err = convertToUint64(point[1]) - if err != nil { - return nil, fmt.Errorf("invalid max memory usage: %v", err) - } - retrievedCpuPercentiles := point[2 : 2+len(cpuUsagePercentiles)] - for i, p := range cpuUsagePercentiles { - v, err := convertToUint64(retrievedCpuPercentiles[i]) - if err != nil { - return nil, fmt.Errorf("invalid cpu usage: %v", err) - } - ret.CpuUsagePercentiles = append( - ret.CpuUsagePercentiles, - info.Percentile{ - Percentage: p, - Value: v, - }, - ) - } - retrievedMemoryPercentiles := point[2+len(cpuUsagePercentiles):] - for i, p := range memUsagePercentiles { - v, err := convertToUint64(retrievedMemoryPercentiles[i]) - if err != nil { - return nil, fmt.Errorf("invalid memory usage: %v", err) - } - ret.MemoryUsagePercentiles = append( - ret.MemoryUsagePercentiles, - info.Percentile{ - Percentage: p, - Value: v, - }, - ) - } - return ret, nil + panic("should not implement") } // Returns a new influxdb series. diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 9f34d1c0..a2711d3f 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -1,4 +1,3 @@ -//+build ignore // Copyright 2014 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// +build influxdb_test +// To run unit test: go test -tags influxdb_test + package influxdb import ( @@ -125,7 +127,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu t.Fatal(err) } // delete all data by the end of the call - defer client.Query(deleteAll) + // defer client.Query(deleteAll) driver, err := New(machineName, tablename, @@ -168,18 +170,10 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu f(testDriver, t) } -func TestSampleCpuUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestSampleCpuUsage, t, kCacheDuration) -} - func TestRetrievePartialRecentStats(t *testing.T) { runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t, 20) } -func TestSamplesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverTestSamplesWithoutSample, t, kCacheDuration) -} - func TestRetrieveAllRecentStats(t *testing.T) { runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t, 10) } @@ -187,33 +181,3 @@ func TestRetrieveAllRecentStats(t *testing.T) { func TestNoRecentStats(t *testing.T) { runStorageTest(test.StorageDriverTestNoRecentStats, t, kCacheDuration) } - -func TestNoSamples(t *testing.T) { - runStorageTest(test.StorageDriverTestNoSamples, t, kCacheDuration) -} - -func TestPercentiles(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentiles, t, kCacheDuration) -} - -func TestMaxMemoryUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestMaxMemoryUsage, t, kCacheDuration) -} - -func TestPercentilesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t, kCacheDuration) -} - -func TestPercentilesWithoutStats(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t, kCacheDuration) -} - -func TestRetrieveZeroStats(t *testing.T) { - t.SkipNow() - runStorageTest(test.StorageDriverTestRetrieveZeroRecentStats, t, kCacheDuration) -} - -func TestRetrieveZeroSamples(t *testing.T) { - t.SkipNow() - runStorageTest(test.StorageDriverTestRetrieveZeroSamples, t, kCacheDuration) -}