From e4c439ce259f5686cd2d6f97382ae8ea75c0994d Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 14:05:06 -0700 Subject: [PATCH] passed all existing unit tests --- storage/influxdb/influxdb.go | 121 +++++++++++++++++++++++------- storage/influxdb/influxdb_test.go | 2 +- 2 files changed, 96 insertions(+), 27 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 0d8d1dda..43d956e2 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -38,6 +38,10 @@ func (self *influxdbStorage) containerStatsToValues( stats *info.ContainerStats, ) (columns []string, values []interface{}) { + // Timestamp + columns = append(columns, "timestamp") + values = append(values, stats.Timestamp.Format(time.RFC3339Nano)) + // Machine name columns = append(columns, "machine") values = append(values, self.machineName) @@ -95,7 +99,7 @@ func (self *influxdbStorage) containerStatsToValues( // Optional: sample duration. Unit: Nanosecond. columns = append(columns, "sample_duration") - values = append(values, sample.Duration.Nanoseconds()) + values = append(values, sample.Duration.String()) // Optional: Instant cpu usage columns = append(columns, "cpu_instant_usage") @@ -110,46 +114,84 @@ func (self *influxdbStorage) containerStatsToValues( return columns, values } -func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) *info.ContainerStats { +func convertToUint64(v interface{}) (uint64, error) { + if v == nil { + return 0, nil + } + switch x := v.(type) { + case uint64: + return x, nil + case int: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case int32: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case int64: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case float64: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case uint32: + return uint64(x), nil + } + return 0, fmt.Errorf("Unknown type") +} + +func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { stats := &info.ContainerStats{ Cpu: &info.CpuStats{}, Memory: &info.MemoryStats{}, } perCoreUsage := make(map[int]uint64, 32) + var err error for i, col := range columns { v := values[i] switch col { + case "timestamp": + if str, ok := v.(string); ok { + stats.Timestamp, err = time.Parse(time.RFC3339Nano, str) + } case "machine": if v.(string) != self.machineName { - return nil + return nil, fmt.Errorf("different machine") } // Cumulative Cpu Usage case "cpu_cumulative_usage": - stats.Cpu.Usage.Total = v.(uint64) + stats.Cpu.Usage.Total, err = convertToUint64(v) // Cumulative Cpu Usage in kernel mode case "cpu_cumulative_usage_kernel": - stats.Cpu.Usage.System = v.(uint64) + stats.Cpu.Usage.System, err = convertToUint64(v) // Cumulative Cpu Usage in user mode case "cpu_cumulative_usage_user": - stats.Cpu.Usage.User = v.(uint64) + stats.Cpu.Usage.User, err = convertToUint64(v) // Memory Usage case "memory_usage": - stats.Memory.Usage = v.(uint64) + stats.Memory.Usage, err = convertToUint64(v) // Working set size case "memory_working_set": - stats.Memory.WorkingSet = v.(uint64) + stats.Memory.WorkingSet, err = convertToUint64(v) // container page fault case "memory_container_pgfault": - stats.Memory.ContainerData.Pgfault = v.(uint64) + stats.Memory.ContainerData.Pgfault, err = convertToUint64(v) // container major page fault case "memory_container_pgmajfault": - stats.Memory.ContainerData.Pgmajfault = v.(uint64) + stats.Memory.ContainerData.Pgmajfault, err = convertToUint64(v) // hierarchical page fault case "memory_hierarchical_pgfault": - stats.Memory.HierarchicalData.Pgfault = v.(uint64) + stats.Memory.HierarchicalData.Pgfault, err = convertToUint64(v) // hierarchical major page fault case "memory_hierarchical_pgmajfault": - stats.Memory.HierarchicalData.Pgmajfault = v.(uint64) + stats.Memory.HierarchicalData.Pgmajfault, err = convertToUint64(v) default: if !strings.HasPrefix(col, "per_core_cumulative_usage_core_") { continue @@ -159,35 +201,47 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i if err != nil { continue } - perCoreUsage[idx] = v.(uint64) + perCoreUsage[idx], err = convertToUint64(v) + } + if err != nil { + return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) } } stats.Cpu.Usage.PerCpu = make([]uint64, len(perCoreUsage)) for idx, usage := range perCoreUsage { stats.Cpu.Usage.PerCpu[idx] = usage } - return stats + return stats, nil } -func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) *info.ContainerStatsSample { +func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) { sample := &info.ContainerStatsSample{} perCoreUsage := make(map[int]uint64, 32) + var err error for i, col := range columns { v := values[i] switch col { + case "timestamp": + if str, ok := v.(string); ok { + sample.Timestamp, err = time.Parse(time.RFC3339Nano, str) + } case "machine": if v.(string) != self.machineName { - return nil + return nil, fmt.Errorf("different machine") } // Memory Usage case "memory_usage": - sample.Memory.Usage = v.(uint64) + sample.Memory.Usage, err = convertToUint64(v) // sample duration. Unit: Nanosecond. case "sample_duration": - sample.Duration = time.Duration(v.(int64)) + if v == nil { + // this record does not have sample_duration, so it's the first stats. + return nil, nil + } + sample.Duration, err = time.ParseDuration(v.(string)) // Instant cpu usage case "cpu_instant_usage": - sample.Cpu.Usage = v.(uint64) + sample.Cpu.Usage, err = convertToUint64(v) default: if !strings.HasPrefix(col, "per_core_instant_usage_core_") { @@ -198,7 +252,10 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values [] if err != nil { continue } - perCoreUsage[idx] = v.(uint64) + perCoreUsage[idx], err = convertToUint64(v) + } + if err != nil { + return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) } } sample.Cpu.PerCpuUsage = make([]uint64, len(perCoreUsage)) @@ -206,9 +263,9 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values [] sample.Cpu.PerCpuUsage[idx] = usage } if sample.Duration.Nanoseconds() == 0 { - return nil + return nil, nil } - return sample + return sample, nil } func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { @@ -230,7 +287,7 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v where container_path=\"%v\"", self.tableName, containerName) + query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName) if numStats > 0 { query = fmt.Sprintf("%v limit %v", query, numStats) } @@ -241,7 +298,13 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] statsList := make([]*info.ContainerStats, 0, len(series)) for _, s := range series { for _, values := range s.Points { - stats := self.valuesToContainerStats(s.Columns, values) + stats, err := self.valuesToContainerStats(s.Columns, values) + if err != nil { + return nil, err + } + if stats == nil { + continue + } statsList = append(statsList, stats) } } @@ -251,7 +314,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v where container_path=\"%v\"", self.tableName, containerName) + query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName) if numSamples > 0 { query = fmt.Sprintf("%v limit %v", query, numSamples) } @@ -262,7 +325,13 @@ func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*i sampleList := make([]*info.ContainerStatsSample, 0, len(series)) for _, s := range series { for _, values := range s.Points { - sample := self.valuesToContainerSample(s.Columns, values) + sample, err := self.valuesToContainerSample(s.Columns, values) + if err != nil { + return nil, err + } + if sample == nil { + continue + } sampleList = append(sampleList, sample) } } diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 46e87c09..ead4cecb 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -26,7 +26,7 @@ import ( func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { machineName := "mymachine" - tablename := "cadivsorTable" + tablename := "t" database := "cadvisor" username := "root" password := "root"