remove Samples() Percentiles() from influxdb driver

This commit is contained in:
Nan Monnand Deng 2014-09-05 15:33:25 -04:00
parent 60a1b6a900
commit 9d6235f4d0
2 changed files with 19 additions and 197 deletions

View File

@ -16,7 +16,6 @@ package influxdb
import (
"fmt"
"strings"
"sync"
"time"
@ -26,7 +25,6 @@ import (
type influxdbStorage struct {
client *influxdb.Client
prevStats *info.ContainerStats
machineName string
tableName string
windowLen time.Duration
@ -39,6 +37,7 @@ type influxdbStorage struct {
const (
colTimestamp string = "time"
colTimestampStr string = "timestamp_str"
colMachineName string = "machine"
colContainerName string = "container_name"
colCpuCumulativeUsage string = "cpu_cumulative_usage"
@ -46,10 +45,6 @@ const (
colMemoryUsage string = "memory_usage"
// Working set size
colMemoryWorkingSet string = "memory_working_set"
// Optional: sample duration. Unit: Nanosecond.
colSampleDuration string = "sample_duration"
// Optional: Instant cpu usage
colCpuInstantUsage string = "cpu_instant_usage"
// Cumulative count of bytes received.
colRxBytes string = "rx_bytes"
// Cumulative count of receive errors encountered.
@ -65,6 +60,9 @@ func (self *influxdbStorage) containerStatsToValues(
stats *info.ContainerStats,
) (columns []string, values []interface{}) {
columns = append(columns, colTimestampStr)
values = append(values, stats.Timestamp.Format(time.RFC3339Nano))
// Timestamp
columns = append(columns, colTimestamp)
values = append(values, stats.Timestamp.Unix())
@ -108,20 +106,7 @@ func (self *influxdbStorage) containerStatsToValues(
values = append(values, stats.Network.TxErrors)
}
sample, err := info.NewSample(self.prevStats, stats)
if err != nil || sample == nil {
return columns, values
}
// DO NOT ADD ANY STATS BELOW THAT ARE NOT PART OF SAMPLING
// Optional: sample duration. Unit: Nanosecond.
columns = append(columns, colSampleDuration)
values = append(values, sample.Duration.String())
// Optional: Instant cpu usage
columns = append(columns, colCpuInstantUsage)
values = append(values, sample.Cpu.Usage)
return columns, values
}
@ -169,8 +154,15 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i
v := values[i]
switch {
case col == colTimestamp:
if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() {
// now := time.Now()
// fmt.Printf("time now: %vns; %vs; infludb time: %v\n", now.UnixNano(), now.Unix(), int64(f64sec))
stats.Timestamp = time.Unix(int64(f64sec)/1E3, int64(f64sec)%1E3*1E6)
}
case col == colTimestampStr:
if str, ok := v.(string); ok {
stats.Timestamp, err = time.Parse(time.RFC3339Nano, str)
fmt.Printf("timestamp: %v; str: %v\n", stats.Timestamp, str)
}
case col == colMachineName:
if m, ok := v.(string); ok {
@ -205,48 +197,6 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i
return stats, nil
}
func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) {
sample := &info.ContainerStatsSample{}
var err error
for i, col := range columns {
v := values[i]
switch {
case col == colTimestamp:
if str, ok := v.(string); ok {
sample.Timestamp, err = time.Parse(time.RFC3339Nano, str)
}
case col == colMachineName:
if m, ok := v.(string); ok {
if m != self.machineName {
return nil, fmt.Errorf("different machine")
}
} else {
return nil, fmt.Errorf("machine name field is not a string: %v", v)
}
// Memory Usage
case col == colMemoryUsage:
sample.Memory.Usage, err = convertToUint64(v)
// sample duration. Unit: Nanosecond.
case col == colSampleDuration:
if v == nil {
// this record does not have sample_duration, so it's the first stats.
return nil, nil
}
sample.Duration, err = time.ParseDuration(v.(string))
// Instant cpu usage
case col == colCpuInstantUsage:
sample.Cpu.Usage, err = convertToUint64(v)
}
if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
}
}
if sample.Duration.Nanoseconds() == 0 {
return nil, nil
}
return sample, nil
}
func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) {
self.readyToFlush = readyToFlush
}
@ -266,7 +216,6 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C
defer self.lock.Unlock()
series := self.newSeries(self.containerStatsToValues(ref, stats))
self.series = append(self.series, series)
self.prevStats = stats.Copy(self.prevStats)
if self.readyToFlush() {
seriesToFlush = self.series
self.series = make([]*influxdb.Series, 0)
@ -303,6 +252,8 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]
// so we need to go through from the last one to the first one.
for i := len(series) - 1; i >= 0; i-- {
s := series[i]
fmt.Printf("query=%v; len(s.Points) %+v\n", query, len(s.Points))
for j := len(s.Points) - 1; j >= 0; j-- {
values := s.Points[j]
stats, err := self.valuesToContainerStats(s.Columns, values)
@ -319,35 +270,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]
}
func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
if numSamples == 0 {
return nil, nil
}
// TODO(dengnan): select only columns that we need
// TODO(dengnan): escape names
query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName)
if numSamples > 0 {
query = fmt.Sprintf("%v limit %v", query, numSamples)
}
series, err := self.client.Query(query)
if err != nil {
return nil, err
}
sampleList := make([]*info.ContainerStatsSample, 0, len(series))
for i := len(series) - 1; i >= 0; i-- {
s := series[i]
for j := len(s.Points) - 1; j >= 0; j-- {
values := s.Points[j]
sample, err := self.valuesToContainerSample(s.Columns, values)
if err != nil {
return nil, err
}
if sample == nil {
continue
}
sampleList = append(sampleList, sample)
}
}
return sampleList, nil
panic("should not implement")
}
func (self *influxdbStorage) Close() error {
@ -360,72 +283,7 @@ func (self *influxdbStorage) Percentiles(
cpuUsagePercentiles []int,
memUsagePercentiles []int,
) (*info.ContainerStatsPercentiles, error) {
selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1)
selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage))
for _, p := range cpuUsagePercentiles {
selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colCpuInstantUsage, p))
}
for _, p := range memUsagePercentiles {
selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colMemoryUsage, p))
}
query := fmt.Sprintf("select %v from %v where %v='%v' and %v='%v' and time > now() - %v",
strings.Join(selectedCol, ","),
self.tableName,
colContainerName,
containerName,
colMachineName,
self.machineName,
fmt.Sprintf("%vs", self.windowLen.Seconds()),
)
series, err := self.client.Query(query)
if err != nil {
return nil, err
}
if len(series) != 1 {
return nil, nil
}
if len(series[0].Points) == 0 {
return nil, nil
}
point := series[0].Points[0]
ret := new(info.ContainerStatsPercentiles)
ret.MaxMemoryUsage, err = convertToUint64(point[1])
if err != nil {
return nil, fmt.Errorf("invalid max memory usage: %v", err)
}
retrievedCpuPercentiles := point[2 : 2+len(cpuUsagePercentiles)]
for i, p := range cpuUsagePercentiles {
v, err := convertToUint64(retrievedCpuPercentiles[i])
if err != nil {
return nil, fmt.Errorf("invalid cpu usage: %v", err)
}
ret.CpuUsagePercentiles = append(
ret.CpuUsagePercentiles,
info.Percentile{
Percentage: p,
Value: v,
},
)
}
retrievedMemoryPercentiles := point[2+len(cpuUsagePercentiles):]
for i, p := range memUsagePercentiles {
v, err := convertToUint64(retrievedMemoryPercentiles[i])
if err != nil {
return nil, fmt.Errorf("invalid memory usage: %v", err)
}
ret.MemoryUsagePercentiles = append(
ret.MemoryUsagePercentiles,
info.Percentile{
Percentage: p,
Value: v,
},
)
}
return ret, nil
panic("should not implement")
}
// Returns a new influxdb series.

View File

@ -1,4 +1,3 @@
//+build ignore
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -13,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build influxdb_test
// To run unit test: go test -tags influxdb_test
package influxdb
import (
@ -125,7 +127,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu
t.Fatal(err)
}
// delete all data by the end of the call
defer client.Query(deleteAll)
// defer client.Query(deleteAll)
driver, err := New(machineName,
tablename,
@ -168,18 +170,10 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu
f(testDriver, t)
}
func TestSampleCpuUsage(t *testing.T) {
runStorageTest(test.StorageDriverTestSampleCpuUsage, t, kCacheDuration)
}
func TestRetrievePartialRecentStats(t *testing.T) {
runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t, 20)
}
func TestSamplesWithoutSample(t *testing.T) {
runStorageTest(test.StorageDriverTestSamplesWithoutSample, t, kCacheDuration)
}
func TestRetrieveAllRecentStats(t *testing.T) {
runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t, 10)
}
@ -187,33 +181,3 @@ func TestRetrieveAllRecentStats(t *testing.T) {
func TestNoRecentStats(t *testing.T) {
runStorageTest(test.StorageDriverTestNoRecentStats, t, kCacheDuration)
}
func TestNoSamples(t *testing.T) {
runStorageTest(test.StorageDriverTestNoSamples, t, kCacheDuration)
}
func TestPercentiles(t *testing.T) {
runStorageTest(test.StorageDriverTestPercentiles, t, kCacheDuration)
}
func TestMaxMemoryUsage(t *testing.T) {
runStorageTest(test.StorageDriverTestMaxMemoryUsage, t, kCacheDuration)
}
func TestPercentilesWithoutSample(t *testing.T) {
runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t, kCacheDuration)
}
func TestPercentilesWithoutStats(t *testing.T) {
runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t, kCacheDuration)
}
func TestRetrieveZeroStats(t *testing.T) {
t.SkipNow()
runStorageTest(test.StorageDriverTestRetrieveZeroRecentStats, t, kCacheDuration)
}
func TestRetrieveZeroSamples(t *testing.T) {
t.SkipNow()
runStorageTest(test.StorageDriverTestRetrieveZeroSamples, t, kCacheDuration)
}