diff --git a/info/container_test.go b/info/container_test.go index 120a05c4..4b3c84a2 100644 --- a/info/container_test.go +++ b/info/container_test.go @@ -289,7 +289,7 @@ func TestContainerStatsCopy(t *testing.T) { stats.Cpu.Load = shadowStats.Cpu.Load + 1 stats.Memory.Usage = shadowStats.Memory.Usage + 1 if reflect.DeepEqual(stats, shadowStats) { - t.Errorf("Copy() did not deeply copied the object") + t.Errorf("Copy() did not deeply copy the object") } stats = shadowStats.Copy(stats) if !reflect.DeepEqual(stats, shadowStats) { diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index b9c75ab0..eb2cad31 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -33,6 +33,37 @@ type influxdbStorage struct { windowLen time.Duration } +const ( + colTimestamp string = "timestamp" + colMachineName string = "machine" + colContainerName string = "container_name" + colCpuCumulativeUsage string = "cpu_cumulative_usage" + // Cumulative Cpu Usage in system mode + colCpuCumulativeUsageSystem string = "cpu_cumulative_usage_system" + // Cumulative Cpu Usage in user mode + colCpuCumulativeUsageUser string = "cpu_cumulative_usage_user" + // Memory Usage + colMemoryUsage string = "memory_usage" + // Working set size + colMemoryWorkingSet string = "memory_working_set" + // container page fault + colMemoryContainerPgfault string = "memory_container_pgfault" + // container major page fault + colMemoryContainerPgmajfault string = "memory_container_pgmajfault" + // hierarchical page fault + colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault" + // hierarchical major page fault + colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault" + // Cumulative per core usage + colPerCoreCumulativeUsagePrefix string = "per_core_cumulative_usage_core_" + // Optional: sample duration. Unit: Nanosecond. + colSampleDuration string = "sample_duration" + // Optional: Instant cpu usage + colCpuInstantUsage string = "cpu_instant_usage" + // Optional: Instant per core usage + colPerCoreInstantUsagePrefix string = "per_core_instant_usage_core_" +) + func (self *influxdbStorage) containerStatsToValues( ref info.ContainerReference, stats *info.ContainerStats, @@ -46,8 +77,8 @@ func (self *influxdbStorage) containerStatsToValues( columns = append(columns, "machine") values = append(values, self.machineName) - // Container path - columns = append(columns, "container_path") + // Container name + columns = append(columns, "container_name") values = append(values, ref.Name) // Cumulative Cpu Usage @@ -220,33 +251,29 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values [] var err error for i, col := range columns { v := values[i] - switch col { - case "timestamp": + switch { + case col == "timestamp": if str, ok := v.(string); ok { sample.Timestamp, err = time.Parse(time.RFC3339Nano, str) } - case "machine": + case col == "machine": if v.(string) != self.machineName { return nil, fmt.Errorf("different machine") } // Memory Usage - case "memory_usage": + case col == "memory_usage": sample.Memory.Usage, err = convertToUint64(v) // sample duration. Unit: Nanosecond. - case "sample_duration": + case col == "sample_duration": if v == nil { // this record does not have sample_duration, so it's the first stats. return nil, nil } sample.Duration, err = time.ParseDuration(v.(string)) - // Instant cpu usage - case "cpu_instant_usage": + // Instant cpu usage + case col == "cpu_instant_usage": sample.Cpu.Usage, err = convertToUint64(v) - - default: - if !strings.HasPrefix(col, "per_core_instant_usage_core_") { - continue - } + case strings.HasPrefix(col, "per_core_instant_usage_core_"): idxStr := col[len("per_core_instant_usage_core_"):] idx, err := strconv.Atoi(idxStr) if err != nil { @@ -290,7 +317,7 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName) + query := fmt.Sprintf("select * from %v where container_name='%v' and machine='%v'", self.tableName, containerName, self.machineName) if numStats > 0 { query = fmt.Sprintf("%v limit %v", query, numStats) } @@ -317,7 +344,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName) + query := fmt.Sprintf("select * from %v where container_name='%v' and machine='%v'", self.tableName, containerName, self.machineName) if numSamples > 0 { query = fmt.Sprintf("%v limit %v", query, numSamples) } @@ -352,24 +379,87 @@ func (self *influxdbStorage) Percentiles( memUsagePercentiles []int, ) (*info.ContainerStatsPercentiles, error) { // TODO(dengnan): Implement it - return nil, nil + selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1) + + selectedCol = append(selectedCol, "max(memory_usage)") + for _, p := range cpuUsagePercentiles { + selectedCol = append(selectedCol, fmt.Sprintf("percentile(cpu_instant_usage, %v)", p)) + } + for _, p := range memUsagePercentiles { + selectedCol = append(selectedCol, fmt.Sprintf("percentile(memory_usage, %v)", p)) + } + + query := fmt.Sprintf("select %v from %v where container_name='%v' and machine='%v' and time > now() - %v", + strings.Join(selectedCol, ","), + self.tableName, + containerName, + self.machineName, + fmt.Sprintf("%vs", self.windowLen.Seconds()), + ) + series, err := self.client.Query(query) + if err != nil { + return nil, err + } + if len(series) != 1 { + return nil, nil + } + if len(series[0].Points) == 0 { + return nil, nil + } + + point := series[0].Points[0] + + ret := new(info.ContainerStatsPercentiles) + ret.MaxMemoryUsage, err = convertToUint64(point[1]) + if err != nil { + return nil, fmt.Errorf("invalid max memory usage: %v", err) + } + retrievedCpuPercentiles := point[2:] + for i, p := range cpuUsagePercentiles { + v, err := convertToUint64(retrievedCpuPercentiles[i]) + if err != nil { + return nil, fmt.Errorf("invalid cpu usage: %v", err) + } + ret.CpuUsagePercentiles = append( + ret.CpuUsagePercentiles, + info.Percentile{ + Percentage: p, + Value: v, + }, + ) + } + retrievedMemoryPercentiles := point[2+len(cpuUsagePercentiles):] + for i, p := range memUsagePercentiles { + v, err := convertToUint64(retrievedMemoryPercentiles[i]) + if err != nil { + return nil, fmt.Errorf("invalid memory usage: %v", err) + } + ret.MemoryUsagePercentiles = append( + ret.MemoryUsagePercentiles, + info.Percentile{ + Percentage: p, + Value: v, + }, + ) + } + return ret, nil } // machineName: A unique identifier to identify the host that current cAdvisor // instance is running on. -// hostname: The host which runs influxdb. +// influxdbHost: The host which runs influxdb. // percentilesDuration: Time window which will be considered when calls Percentiles() func New(machineName, tablename, database, username, password, - hostname string, + influxdbHost string, isSecure bool, percentilesDuration time.Duration, ) (storage.StorageDriver, error) { config := &influxdb.ClientConfig{ - Host: hostname, + Host: influxdbHost, Username: username, Password: password, Database: database, diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index e5f5287b..515cb732 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -106,3 +106,19 @@ func TestNoRecentStats(t *testing.T) { func TestNoSamples(t *testing.T) { runStorageTest(test.StorageDriverTestNoSamples, t) } + +func TestPercentiles(t *testing.T) { + runStorageTest(test.StorageDriverTestPercentiles, t) +} + +func TestMaxMemoryUsage(t *testing.T) { + runStorageTest(test.StorageDriverTestMaxMemoryUsage, t) +} + +func TestPercentilesWithoutSample(t *testing.T) { + runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t) +} + +func TestPercentilesWithoutStats(t *testing.T) { + runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t) +}