From 2608c68fe42da84cb17a43bb25f15fd82e9a394e Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Tue, 1 Jul 2014 15:28:58 -0700 Subject: [PATCH 01/17] deep copy of container stats --- info/container.go | 36 ++++++++++++++++++++++++++++++++++++ info/container_test.go | 19 +++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/info/container.go b/info/container.go index 3e2447d9..c66346bc 100644 --- a/info/container.go +++ b/info/container.go @@ -165,6 +165,42 @@ type ContainerStats struct { Memory *MemoryStats `json:"memory,omitempty"` } +// Makes a deep copy of the ContainerStats and returns the pointer to the new +// copy. Copy() will allocate a new ContainerStats object if dst is nil. +func (self *ContainerStats) Copy(dst *ContainerStats) *ContainerStats { + if dst == nil { + dst = new(ContainerStats) + } + dst.Timestamp = self.Timestamp + if self.Cpu != nil { + if dst.Cpu == nil { + dst.Cpu = new(CpuStats) + } + // To make a deep copy of a slice, we need to copy every value + // in the slice. To make less memory allocation, we would like + // to reuse the slice in dst if possible. + percpu := dst.Cpu.Usage.PerCpu + if len(percpu) != len(self.Cpu.Usage.PerCpu) { + percpu = make([]uint64, len(self.Cpu.Usage.PerCpu)) + } + dst.Cpu.Usage = self.Cpu.Usage + dst.Cpu.Load = self.Cpu.Load + copy(percpu, self.Cpu.Usage.PerCpu) + dst.Cpu.Usage.PerCpu = percpu + } else { + dst.Cpu = nil + } + if self.Memory != nil { + if dst.Memory == nil { + dst.Memory = new(MemoryStats) + } + *dst.Memory = *self.Memory + } else { + dst.Memory = nil + } + return dst +} + type ContainerStatsSample struct { // Timetamp of the end of the sample period Timestamp time.Time `json:"timestamp"` diff --git a/info/container_test.go b/info/container_test.go index 071ed914..120a05c4 100644 --- a/info/container_test.go +++ b/info/container_test.go @@ -15,6 +15,7 @@ package info import ( + "reflect" "testing" "time" ) @@ -277,3 +278,21 @@ func TestAddSampleHotUnpluggingCpu(t *testing.T) { t.Errorf("First cpu usage is %v. should be %v", sample.Cpu.PerCpuUsage[0], cpuCurrentUsage-cpuPrevUsage) } } + +func TestContainerStatsCopy(t *testing.T) { + stats := createStats(100, 101, time.Now()) + shadowStats := stats.Copy(nil) + if !reflect.DeepEqual(stats, shadowStats) { + t.Errorf("Copy() returned different object") + } + stats.Cpu.Usage.PerCpu[0] = shadowStats.Cpu.Usage.PerCpu[0] + 1 + stats.Cpu.Load = shadowStats.Cpu.Load + 1 + stats.Memory.Usage = shadowStats.Memory.Usage + 1 + if reflect.DeepEqual(stats, shadowStats) { + t.Errorf("Copy() did not deeply copied the object") + } + stats = shadowStats.Copy(stats) + if !reflect.DeepEqual(stats, shadowStats) { + t.Errorf("Copy() returned different object") + } +} From b94c9936d076e5aa0a122bb06a3e9127f09192e5 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Tue, 1 Jul 2014 19:21:43 -0700 Subject: [PATCH 02/17] influxdb --- storage/influxdb/influxdb.go | 313 ++++++++++++++++++++++++++++++ storage/influxdb/influxdb_test.go | 83 ++++++++ storage/memory/memory.go | 22 +-- storage/storage.go | 2 +- 4 files changed, 398 insertions(+), 22 deletions(-) create mode 100644 storage/influxdb/influxdb.go create mode 100644 storage/influxdb/influxdb_test.go diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go new file mode 100644 index 00000000..b6685688 --- /dev/null +++ b/storage/influxdb/influxdb.go @@ -0,0 +1,313 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxdb + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/google/cadvisor/info" + "github.com/google/cadvisor/storage" + "github.com/influxdb/influxdb-go" + "github.com/kr/pretty" +) + +type influxdbStorage struct { + client *influxdb.Client + prevStats *info.ContainerStats + machineName string + tableName string + windowLen time.Duration +} + +func (self *influxdbStorage) containerStatsToValues( + ref info.ContainerReference, + stats *info.ContainerStats, +) (columns []string, values []interface{}) { + + // Machine name + columns = append(columns, "machine") + values = append(values, self.machineName) + + // Container path + columns = append(columns, "container_path") + values = append(values, ref.Name) + + // Cumulative Cpu Usage + columns = append(columns, "cpu_cumulative_usage") + values = append(values, stats.Cpu.Usage.Total) + + // Cumulative Cpu Usage in kernel mode + columns = append(columns, "cpu_cumulative_usage_kernel") + values = append(values, stats.Cpu.Usage.System) + + // Cumulative Cpu Usage in user mode + columns = append(columns, "cpu_cumulative_usage_user") + values = append(values, stats.Cpu.Usage.User) + + // Memory Usage + columns = append(columns, "memory_usage") + values = append(values, stats.Memory.Usage) + + // Working set size + columns = append(columns, "memory_working_set") + values = append(values, stats.Memory.WorkingSet) + + // container page fault + columns = append(columns, "memory_container_pgfault") + values = append(values, stats.Memory.ContainerData.Pgfault) + + // container major page fault + columns = append(columns, "memory_container_pgmajfault") + values = append(values, stats.Memory.ContainerData.Pgmajfault) + + // hierarchical page fault + columns = append(columns, "memory_hierarchical_pgfault") + values = append(values, stats.Memory.HierarchicalData.Pgfault) + + // hierarchical major page fault + columns = append(columns, "memory_hierarchical_pgmajfault") + values = append(values, stats.Memory.HierarchicalData.Pgmajfault) + + // per cpu cumulative usage + for i, u := range stats.Cpu.Usage.PerCpu { + columns = append(columns, fmt.Sprintf("per_core_cumulative_usage_core_%v", i)) + values = append(values, u) + } + + sample, err := info.NewSample(self.prevStats, stats) + if err != nil || sample == nil { + return columns, values + } + + // Optional: sample duration. Unit: Nanosecond. + columns = append(columns, "sample_duration") + values = append(values, sample.Duration.Nanoseconds()) + + // Optional: Instant cpu usage + columns = append(columns, "cpu_instant_usage") + values = append(values, sample.Cpu.Usage) + + // Optional: Instant per core usage + for i, u := range sample.Cpu.PerCpuUsage { + columns = append(columns, fmt.Sprintf("per_core_instant_usage_core_%v", i)) + values = append(values, u) + } + + return columns, values +} + +func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) *info.ContainerStats { + stats := &info.ContainerStats{ + Cpu: &info.CpuStats{}, + Memory: &info.MemoryStats{}, + } + perCoreUsage := make(map[int]uint64, 32) + for i, col := range columns { + v := values[i] + switch col { + case "machine": + if v.(string) != self.machineName { + return nil + } + // Cumulative Cpu Usage + case "cpu_cumulative_usage": + stats.Cpu.Usage.Total = v.(uint64) + // Cumulative Cpu Usage in kernel mode + case "cpu_cumulative_usage_kernel": + stats.Cpu.Usage.System = v.(uint64) + // Cumulative Cpu Usage in user mode + case "cpu_cumulative_usage_user": + stats.Cpu.Usage.User = v.(uint64) + // Memory Usage + case "memory_usage": + stats.Memory.Usage = v.(uint64) + // Working set size + case "memory_working_set": + stats.Memory.WorkingSet = v.(uint64) + // container page fault + case "memory_container_pgfault": + stats.Memory.ContainerData.Pgfault = v.(uint64) + // container major page fault + case "memory_container_pgmajfault": + stats.Memory.ContainerData.Pgmajfault = v.(uint64) + // hierarchical page fault + case "memory_hierarchical_pgfault": + stats.Memory.HierarchicalData.Pgfault = v.(uint64) + // hierarchical major page fault + case "memory_hierarchical_pgmajfault": + stats.Memory.HierarchicalData.Pgmajfault = v.(uint64) + default: + if !strings.HasPrefix(col, "per_core_cumulative_usage_core_") { + continue + } + idxStr := col[len("per_core_cumulative_usage_core_"):] + idx, err := strconv.Atoi(idxStr) + if err != nil { + continue + } + perCoreUsage[idx] = v.(uint64) + } + } + stats.Cpu.Usage.PerCpu = make([]uint64, len(perCoreUsage)) + for idx, usage := range perCoreUsage { + stats.Cpu.Usage.PerCpu[idx] = usage + } + return stats +} + +func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) *info.ContainerStatsSample { + sample := &info.ContainerStatsSample{} + perCoreUsage := make(map[int]uint64, 32) + for i, col := range columns { + v := values[i] + switch col { + case "machine": + if v.(string) != self.machineName { + return nil + } + // Memory Usage + case "memory_usage": + sample.Memory.Usage = v.(uint64) + // sample duration. Unit: Nanosecond. + case "sample_duration": + sample.Duration = time.Duration(v.(int64)) + // Instant cpu usage + case "cpu_instant_usage": + sample.Cpu.Usage = v.(uint64) + + default: + if !strings.HasPrefix(col, "per_core_instant_usage_core_") { + continue + } + idxStr := col[len("per_core_instant_usage_core_"):] + idx, err := strconv.Atoi(idxStr) + if err != nil { + continue + } + perCoreUsage[idx] = v.(uint64) + } + } + sample.Cpu.PerCpuUsage = make([]uint64, len(perCoreUsage)) + for idx, usage := range perCoreUsage { + sample.Cpu.PerCpuUsage[idx] = usage + } + if sample.Duration.Nanoseconds() == 0 { + return nil + } + return sample +} + +func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { + series := &influxdb.Series{ + Name: self.tableName, + // There's only one point for each stats + Points: make([][]interface{}, 1), + } + series.Columns, series.Points[0] = self.containerStatsToValues(ref, stats) + + self.prevStats = stats.Copy(self.prevStats) + pretty.Printf("% #v", series) + err := self.client.WriteSeries([]*influxdb.Series{series}) + if err != nil { + return err + } + return nil +} + +func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { + // TODO(dengnan): select only columns that we need + // TODO(dengnan): escape containerName + query := fmt.Sprintf("select * from %v limit %v where container_path=\"%v\"", self.tableName, numStats, containerName) + series, err := self.client.Query(query) + if err != nil { + return nil, err + } + statsList := make([]*info.ContainerStats, 0, len(series)) + for _, s := range series { + for _, values := range s.Points { + stats := self.valuesToContainerStats(s.Columns, values) + statsList = append(statsList, stats) + } + } + return statsList, nil +} + +func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { + query := fmt.Sprintf("select * from %v limit %v where container_path=\"%v\"", self.tableName, numSamples, containerName) + series, err := self.client.Query(query) + if err != nil { + return nil, err + } + sampleList := make([]*info.ContainerStatsSample, 0, len(series)) + for _, s := range series { + for _, values := range s.Points { + sample := self.valuesToContainerSample(s.Columns, values) + sampleList = append(sampleList, sample) + } + } + return sampleList, nil +} + +func (self *influxdbStorage) Close() error { + self.client = nil + return nil +} + +func (self *influxdbStorage) Percentiles( + containerName string, + cpuUsagePercentiles []int, + memUsagePercentiles []int, +) (*info.ContainerStatsPercentiles, error) { + // TODO(dengnan): Implement it + return nil, nil +} + +// machineName: A unique identifier to identify the host that current cAdvisor +// instance is running on. +// hostname: The host which runs influxdb. +// percentilesDuration: Time window which will be considered when calls Percentiles() +func New(machineName, + tablename, + database, + username, + password, + hostname string, + percentilesDuration time.Duration, +) (storage.StorageDriver, error) { + config := &influxdb.ClientConfig{ + Host: hostname, + Username: username, + Password: password, + Database: database, + // IsSecure: true, + } + client, err := influxdb.NewClient(config) + if err != nil { + return nil, err + } + if percentilesDuration.Seconds() < 1.0 { + percentilesDuration = 5 * time.Minute + } + + ret := &influxdbStorage{ + client: client, + windowLen: percentilesDuration, + machineName: machineName, + } + return ret, nil +} diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go new file mode 100644 index 00000000..1fe15b6d --- /dev/null +++ b/storage/influxdb/influxdb_test.go @@ -0,0 +1,83 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxdb + +import ( + "fmt" + "testing" + "time" + + "github.com/google/cadvisor/storage" + "github.com/google/cadvisor/storage/test" + "github.com/influxdb/influxdb-go" +) + +func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { + machineName := "mymachine" + tablename := "cadivsorTable" + database := "cadvisor" + username := "root" + password := "root" + hostname := "localhost:8086" + percentilesDuration := 10 * time.Minute + config := &influxdb.ClientConfig{ + Host: hostname, + Username: username, + Password: password, + Database: database, + // IsSecure: true, + } + client, err := influxdb.NewClient(config) + if err != nil { + t.Fatal(err) + } + deleteAll := fmt.Sprintf("drop series %v", tablename) + _, err = client.Query(deleteAll) + if err != nil { + t.Fatal(err) + } + // delete all data by the end of the call + defer client.Query(deleteAll) + + /* + series := &influxdb.Series{ + Name: tablename, + Columns: []string{"col1"}, + Points: [][]interface{}{ + []interface{}{1}, + }, + } + err = client.WriteSeries([]*influxdb.Series{series}) + if err != nil { + t.Fatal(err) + } + */ + + driver, err := New(machineName, + tablename, + database, + username, + password, + hostname, + percentilesDuration) + if err != nil { + t.Fatal(err) + } + f(driver, t) +} + +func TestMaxMemoryUsage(t *testing.T) { + runStorageTest(test.StorageDriverTestMaxMemoryUsage, t) +} diff --git a/storage/memory/memory.go b/storage/memory/memory.go index 63c6d552..d30a3df6 100644 --- a/storage/memory/memory.go +++ b/storage/memory/memory.go @@ -41,27 +41,7 @@ func (self *containerStorage) updatePrevStats(stats *info.ContainerStats) { self.prevStats = nil return } - if self.prevStats == nil { - self.prevStats = &info.ContainerStats{ - Cpu: &info.CpuStats{}, - Memory: &info.MemoryStats{}, - } - } - // make a deep copy. - self.prevStats.Timestamp = stats.Timestamp - // copy the slice first. - percpuSlice := self.prevStats.Cpu.Usage.PerCpu - *self.prevStats.Cpu = *stats.Cpu - // If the old slice is enough to hold the new data, then don't allocate - // a new slice. - if len(percpuSlice) != len(stats.Cpu.Usage.PerCpu) { - percpuSlice = make([]uint64, len(stats.Cpu.Usage.PerCpu)) - } - for i, perCpu := range stats.Cpu.Usage.PerCpu { - percpuSlice[i] = perCpu - } - self.prevStats.Cpu.Usage.PerCpu = percpuSlice - *self.prevStats.Memory = *stats.Memory + self.prevStats = stats.Copy(self.prevStats) } func (self *containerStorage) AddStats(stats *info.ContainerStats) error { diff --git a/storage/storage.go b/storage/storage.go index 51f0b9c0..8b264a93 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -31,7 +31,7 @@ type StorageDriver interface { // Returns samples of the container stats. If numSamples < 0, then // the number of returned samples is implementation defined. Otherwise, the driver // should return at most numSamples samples. - Samples(containername string, numSamples int) ([]*info.ContainerStatsSample, error) + Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) // Close will clear the state of the storage driver. The elements // stored in the underlying storage may or may not be deleted depending From 7bd8cc2c0b7aa07c3e7040509d18f09526367967 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 11:15:48 -0700 Subject: [PATCH 03/17] Passed first unit test. Waiting for a fix for influxdb/influxdb-go#12 --- storage/influxdb/influxdb.go | 20 +++++++++++++++----- storage/influxdb/influxdb_test.go | 24 ++++++------------------ 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index b6685688..0d8d1dda 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -23,7 +23,6 @@ import ( "github.com/google/cadvisor/info" "github.com/google/cadvisor/storage" "github.com/influxdb/influxdb-go" - "github.com/kr/pretty" ) type influxdbStorage struct { @@ -221,7 +220,6 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C series.Columns, series.Points[0] = self.containerStatsToValues(ref, stats) self.prevStats = stats.Copy(self.prevStats) - pretty.Printf("% #v", series) err := self.client.WriteSeries([]*influxdb.Series{series}) if err != nil { return err @@ -232,7 +230,10 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v limit %v where container_path=\"%v\"", self.tableName, numStats, containerName) + query := fmt.Sprintf("select * from %v where container_path=\"%v\"", self.tableName, containerName) + if numStats > 0 { + query = fmt.Sprintf("%v limit %v", query, numStats) + } series, err := self.client.Query(query) if err != nil { return nil, err @@ -248,7 +249,12 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] } func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - query := fmt.Sprintf("select * from %v limit %v where container_path=\"%v\"", self.tableName, numSamples, containerName) + // TODO(dengnan): select only columns that we need + // TODO(dengnan): escape containerName + query := fmt.Sprintf("select * from %v where container_path=\"%v\"", self.tableName, containerName) + if numSamples > 0 { + query = fmt.Sprintf("%v limit %v", query, numSamples) + } series, err := self.client.Query(query) if err != nil { return nil, err @@ -287,6 +293,7 @@ func New(machineName, username, password, hostname string, + isSecure bool, percentilesDuration time.Duration, ) (storage.StorageDriver, error) { config := &influxdb.ClientConfig{ @@ -294,12 +301,14 @@ func New(machineName, Username: username, Password: password, Database: database, - // IsSecure: true, + IsSecure: isSecure, } client, err := influxdb.NewClient(config) if err != nil { return nil, err } + // TODO(monnand): With go 1.3, we cannot compress data now. + client.DisableCompression() if percentilesDuration.Seconds() < 1.0 { percentilesDuration = 5 * time.Minute } @@ -308,6 +317,7 @@ func New(machineName, client: client, windowLen: percentilesDuration, machineName: machineName, + tableName: tablename, } return ret, nil } diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 1fe15b6d..8fc584ed 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -37,33 +37,20 @@ func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { Username: username, Password: password, Database: database, - // IsSecure: true, + IsSecure: false, } client, err := influxdb.NewClient(config) if err != nil { t.Fatal(err) } + client.DisableCompression() deleteAll := fmt.Sprintf("drop series %v", tablename) _, err = client.Query(deleteAll) if err != nil { t.Fatal(err) } // delete all data by the end of the call - defer client.Query(deleteAll) - - /* - series := &influxdb.Series{ - Name: tablename, - Columns: []string{"col1"}, - Points: [][]interface{}{ - []interface{}{1}, - }, - } - err = client.WriteSeries([]*influxdb.Series{series}) - if err != nil { - t.Fatal(err) - } - */ + // defer client.Query(deleteAll) driver, err := New(machineName, tablename, @@ -71,6 +58,7 @@ func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { username, password, hostname, + false, percentilesDuration) if err != nil { t.Fatal(err) @@ -78,6 +66,6 @@ func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { f(driver, t) } -func TestMaxMemoryUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestMaxMemoryUsage, t) +func TestSampleCpuUsage(t *testing.T) { + runStorageTest(test.StorageDriverTestSampleCpuUsage, t) } From 4d9f9d7aaf508e05f91d2574dcc6177773bffb00 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 11:19:38 -0700 Subject: [PATCH 04/17] more unit tests. passed --- storage/influxdb/influxdb_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 8fc584ed..46e87c09 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -69,3 +69,23 @@ func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { func TestSampleCpuUsage(t *testing.T) { runStorageTest(test.StorageDriverTestSampleCpuUsage, t) } + +func TestRetrievePartialRecentStats(t *testing.T) { + runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t) +} + +func TestSamplesWithoutSample(t *testing.T) { + runStorageTest(test.StorageDriverTestSamplesWithoutSample, t) +} + +func TestRetrieveAllRecentStats(t *testing.T) { + runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t) +} + +func TestNoRecentStats(t *testing.T) { + runStorageTest(test.StorageDriverTestNoRecentStats, t) +} + +func TestNoSamples(t *testing.T) { + runStorageTest(test.StorageDriverTestNoSamples, t) +} From e4c439ce259f5686cd2d6f97382ae8ea75c0994d Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 14:05:06 -0700 Subject: [PATCH 05/17] passed all existing unit tests --- storage/influxdb/influxdb.go | 121 +++++++++++++++++++++++------- storage/influxdb/influxdb_test.go | 2 +- 2 files changed, 96 insertions(+), 27 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 0d8d1dda..43d956e2 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -38,6 +38,10 @@ func (self *influxdbStorage) containerStatsToValues( stats *info.ContainerStats, ) (columns []string, values []interface{}) { + // Timestamp + columns = append(columns, "timestamp") + values = append(values, stats.Timestamp.Format(time.RFC3339Nano)) + // Machine name columns = append(columns, "machine") values = append(values, self.machineName) @@ -95,7 +99,7 @@ func (self *influxdbStorage) containerStatsToValues( // Optional: sample duration. Unit: Nanosecond. columns = append(columns, "sample_duration") - values = append(values, sample.Duration.Nanoseconds()) + values = append(values, sample.Duration.String()) // Optional: Instant cpu usage columns = append(columns, "cpu_instant_usage") @@ -110,46 +114,84 @@ func (self *influxdbStorage) containerStatsToValues( return columns, values } -func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) *info.ContainerStats { +func convertToUint64(v interface{}) (uint64, error) { + if v == nil { + return 0, nil + } + switch x := v.(type) { + case uint64: + return x, nil + case int: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case int32: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case int64: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case float64: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case uint32: + return uint64(x), nil + } + return 0, fmt.Errorf("Unknown type") +} + +func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { stats := &info.ContainerStats{ Cpu: &info.CpuStats{}, Memory: &info.MemoryStats{}, } perCoreUsage := make(map[int]uint64, 32) + var err error for i, col := range columns { v := values[i] switch col { + case "timestamp": + if str, ok := v.(string); ok { + stats.Timestamp, err = time.Parse(time.RFC3339Nano, str) + } case "machine": if v.(string) != self.machineName { - return nil + return nil, fmt.Errorf("different machine") } // Cumulative Cpu Usage case "cpu_cumulative_usage": - stats.Cpu.Usage.Total = v.(uint64) + stats.Cpu.Usage.Total, err = convertToUint64(v) // Cumulative Cpu Usage in kernel mode case "cpu_cumulative_usage_kernel": - stats.Cpu.Usage.System = v.(uint64) + stats.Cpu.Usage.System, err = convertToUint64(v) // Cumulative Cpu Usage in user mode case "cpu_cumulative_usage_user": - stats.Cpu.Usage.User = v.(uint64) + stats.Cpu.Usage.User, err = convertToUint64(v) // Memory Usage case "memory_usage": - stats.Memory.Usage = v.(uint64) + stats.Memory.Usage, err = convertToUint64(v) // Working set size case "memory_working_set": - stats.Memory.WorkingSet = v.(uint64) + stats.Memory.WorkingSet, err = convertToUint64(v) // container page fault case "memory_container_pgfault": - stats.Memory.ContainerData.Pgfault = v.(uint64) + stats.Memory.ContainerData.Pgfault, err = convertToUint64(v) // container major page fault case "memory_container_pgmajfault": - stats.Memory.ContainerData.Pgmajfault = v.(uint64) + stats.Memory.ContainerData.Pgmajfault, err = convertToUint64(v) // hierarchical page fault case "memory_hierarchical_pgfault": - stats.Memory.HierarchicalData.Pgfault = v.(uint64) + stats.Memory.HierarchicalData.Pgfault, err = convertToUint64(v) // hierarchical major page fault case "memory_hierarchical_pgmajfault": - stats.Memory.HierarchicalData.Pgmajfault = v.(uint64) + stats.Memory.HierarchicalData.Pgmajfault, err = convertToUint64(v) default: if !strings.HasPrefix(col, "per_core_cumulative_usage_core_") { continue @@ -159,35 +201,47 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i if err != nil { continue } - perCoreUsage[idx] = v.(uint64) + perCoreUsage[idx], err = convertToUint64(v) + } + if err != nil { + return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) } } stats.Cpu.Usage.PerCpu = make([]uint64, len(perCoreUsage)) for idx, usage := range perCoreUsage { stats.Cpu.Usage.PerCpu[idx] = usage } - return stats + return stats, nil } -func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) *info.ContainerStatsSample { +func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) { sample := &info.ContainerStatsSample{} perCoreUsage := make(map[int]uint64, 32) + var err error for i, col := range columns { v := values[i] switch col { + case "timestamp": + if str, ok := v.(string); ok { + sample.Timestamp, err = time.Parse(time.RFC3339Nano, str) + } case "machine": if v.(string) != self.machineName { - return nil + return nil, fmt.Errorf("different machine") } // Memory Usage case "memory_usage": - sample.Memory.Usage = v.(uint64) + sample.Memory.Usage, err = convertToUint64(v) // sample duration. Unit: Nanosecond. case "sample_duration": - sample.Duration = time.Duration(v.(int64)) + if v == nil { + // this record does not have sample_duration, so it's the first stats. + return nil, nil + } + sample.Duration, err = time.ParseDuration(v.(string)) // Instant cpu usage case "cpu_instant_usage": - sample.Cpu.Usage = v.(uint64) + sample.Cpu.Usage, err = convertToUint64(v) default: if !strings.HasPrefix(col, "per_core_instant_usage_core_") { @@ -198,7 +252,10 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values [] if err != nil { continue } - perCoreUsage[idx] = v.(uint64) + perCoreUsage[idx], err = convertToUint64(v) + } + if err != nil { + return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) } } sample.Cpu.PerCpuUsage = make([]uint64, len(perCoreUsage)) @@ -206,9 +263,9 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values [] sample.Cpu.PerCpuUsage[idx] = usage } if sample.Duration.Nanoseconds() == 0 { - return nil + return nil, nil } - return sample + return sample, nil } func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { @@ -230,7 +287,7 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v where container_path=\"%v\"", self.tableName, containerName) + query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName) if numStats > 0 { query = fmt.Sprintf("%v limit %v", query, numStats) } @@ -241,7 +298,13 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] statsList := make([]*info.ContainerStats, 0, len(series)) for _, s := range series { for _, values := range s.Points { - stats := self.valuesToContainerStats(s.Columns, values) + stats, err := self.valuesToContainerStats(s.Columns, values) + if err != nil { + return nil, err + } + if stats == nil { + continue + } statsList = append(statsList, stats) } } @@ -251,7 +314,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v where container_path=\"%v\"", self.tableName, containerName) + query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName) if numSamples > 0 { query = fmt.Sprintf("%v limit %v", query, numSamples) } @@ -262,7 +325,13 @@ func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*i sampleList := make([]*info.ContainerStatsSample, 0, len(series)) for _, s := range series { for _, values := range s.Points { - sample := self.valuesToContainerSample(s.Columns, values) + sample, err := self.valuesToContainerSample(s.Columns, values) + if err != nil { + return nil, err + } + if sample == nil { + continue + } sampleList = append(sampleList, sample) } } diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 46e87c09..ead4cecb 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -26,7 +26,7 @@ import ( func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { machineName := "mymachine" - tablename := "cadivsorTable" + tablename := "t" database := "cadvisor" username := "root" password := "root" From 795b4512882d7083e629b4e918faa399de45d0f9 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 14:24:45 -0700 Subject: [PATCH 06/17] tested on multi-machine multi-container scenario --- storage/influxdb/influxdb_test.go | 14 ++++++++++++- storage/test/storagetests.go | 34 +++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index ead4cecb..803e1842 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -16,6 +16,7 @@ package influxdb import ( "fmt" + "math/rand" "testing" "time" @@ -25,7 +26,8 @@ import ( ) func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { - machineName := "mymachine" + // randomly generate a machine name to mimic multi-machine senario. + machineName := fmt.Sprintf("machine-%v-%v", time.Now(), rand.Int63()) tablename := "t" database := "cadvisor" username := "root" @@ -63,29 +65,39 @@ func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { if err != nil { t.Fatal(err) } + defer driver.Close() + // generate another container's data on same machine. + test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100)(driver, t) f(driver, t) } func TestSampleCpuUsage(t *testing.T) { + // we generates more than one container's data. + runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestSampleCpuUsage, t) } func TestRetrievePartialRecentStats(t *testing.T) { + runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t) } func TestSamplesWithoutSample(t *testing.T) { + runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestSamplesWithoutSample, t) } func TestRetrieveAllRecentStats(t *testing.T) { + runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t) } func TestNoRecentStats(t *testing.T) { + runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestNoRecentStats, t) } func TestNoSamples(t *testing.T) { + runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestNoSamples, t) } diff --git a/storage/test/storagetests.go b/storage/test/storagetests.go index 9c4ec0ae..cf4226a1 100644 --- a/storage/test/storagetests.go +++ b/storage/test/storagetests.go @@ -83,6 +83,40 @@ func samplesInTrace(samples []*info.ContainerStatsSample, cpuTrace, memTrace []u } } +// This function returns a function that will generate random stats and write +// them into the storage. The returned function will not close the driver of +// the call, which could be served as a building block to do other works +func StorageDriverFillRandomStatsFunc( + containerName string, + N int, +) func(driver storage.StorageDriver, t *testing.T) { + return func(driver storage.StorageDriver, t *testing.T) { + cpuTrace := make([]uint64, 0, N) + memTrace := make([]uint64, 0, N) + + // We need N+1 observations to get N samples + for i := 0; i < N+1; i++ { + cpuTrace = append(cpuTrace, uint64(rand.Intn(1000))) + memTrace = append(memTrace, uint64(rand.Intn(1000))) + } + + samplePeriod := 1 * time.Second + + ref := info.ContainerReference{ + Name: containerName, + } + + trace := buildTrace(cpuTrace, memTrace, samplePeriod) + + for _, stats := range trace { + err := driver.AddStats(ref, stats) + if err != nil { + t.Fatalf("unable to add stats: %v", err) + } + } + } +} + func StorageDriverTestSampleCpuUsage(driver storage.StorageDriver, t *testing.T) { defer driver.Close() N := 100 From 92c34ae490c9d4d779b578f62799b336d181f4a8 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 14:57:23 -0700 Subject: [PATCH 07/17] check if the stats is valid --- storage/influxdb/influxdb.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 43d956e2..b9c75ab0 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -54,8 +54,8 @@ func (self *influxdbStorage) containerStatsToValues( columns = append(columns, "cpu_cumulative_usage") values = append(values, stats.Cpu.Usage.Total) - // Cumulative Cpu Usage in kernel mode - columns = append(columns, "cpu_cumulative_usage_kernel") + // Cumulative Cpu Usage in system mode + columns = append(columns, "cpu_cumulative_usage_system") values = append(values, stats.Cpu.Usage.System) // Cumulative Cpu Usage in user mode @@ -168,8 +168,8 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i // Cumulative Cpu Usage case "cpu_cumulative_usage": stats.Cpu.Usage.Total, err = convertToUint64(v) - // Cumulative Cpu Usage in kernel mode - case "cpu_cumulative_usage_kernel": + // Cumulative Cpu used by the system + case "cpu_cumulative_usage_system": stats.Cpu.Usage.System, err = convertToUint64(v) // Cumulative Cpu Usage in user mode case "cpu_cumulative_usage_user": @@ -274,6 +274,9 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C // There's only one point for each stats Points: make([][]interface{}, 1), } + if stats == nil || stats.Cpu == nil || stats.Memory == nil { + return nil + } series.Columns, series.Points[0] = self.containerStatsToValues(ref, stats) self.prevStats = stats.Copy(self.prevStats) From 562a150592144ae60b1086c41a69c561ecd67a01 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 15:12:37 -0700 Subject: [PATCH 08/17] more on test --- storage/influxdb/influxdb_test.go | 25 ++++++++++++++++--------- storage/test/storagetests.go | 9 +++++++++ 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 803e1842..4d79de5e 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -16,7 +16,6 @@ package influxdb import ( "fmt" - "math/rand" "testing" "time" @@ -27,7 +26,7 @@ import ( func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { // randomly generate a machine name to mimic multi-machine senario. - machineName := fmt.Sprintf("machine-%v-%v", time.Now(), rand.Int63()) + machineName := "machine-A" tablename := "t" database := "cadvisor" username := "root" @@ -65,35 +64,43 @@ func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { if err != nil { t.Fatal(err) } - defer driver.Close() // generate another container's data on same machine. test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100)(driver, t) + + // generate another container's data on another machine. + driverForAnotherMachine, err := New("machineB", + tablename, + database, + username, + password, + hostname, + false, + percentilesDuration) + if err != nil { + t.Fatal(err) + } + defer driverForAnotherMachine.Close() + test.StorageDriverFillRandomStatsFunc("containerOnAnotherMachine", 100)(driverForAnotherMachine, t) f(driver, t) } func TestSampleCpuUsage(t *testing.T) { - // we generates more than one container's data. - runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestSampleCpuUsage, t) } func TestRetrievePartialRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t) } func TestSamplesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestSamplesWithoutSample, t) } func TestRetrieveAllRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t) } func TestNoRecentStats(t *testing.T) { - runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestNoRecentStats, t) } diff --git a/storage/test/storagetests.go b/storage/test/storagetests.go index cf4226a1..9755d0ab 100644 --- a/storage/test/storagetests.go +++ b/storage/test/storagetests.go @@ -153,6 +153,9 @@ func StorageDriverTestSampleCpuUsage(driver storage.StorageDriver, t *testing.T) if err != nil { t.Errorf("unable to sample stats: %v", err) } + if len(samples) == 0 { + t.Fatal("should at least store one sample") + } samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t) samples, err = driver.Samples(ref.Name, -1) @@ -319,6 +322,9 @@ func StorageDriverTestRetrievePartialRecentStats(driver storage.StorageDriver, t if err != nil { t.Fatal(err) } + if len(recentStats) == 0 { + t.Fatal("should at least store one stats") + } if len(recentStats) > 10 { t.Fatalf("returned %v stats, not 10.", len(recentStats)) @@ -363,6 +369,9 @@ func StorageDriverTestRetrieveAllRecentStats(driver storage.StorageDriver, t *te if err != nil { t.Fatal(err) } + if len(recentStats) == 0 { + t.Fatal("should at least store one stats") + } if len(recentStats) > N { t.Fatalf("returned %v stats, not 100.", len(recentStats)) } From 41dcf6d42df86777ea22ff374e76eb5f85f505fb Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 15:20:21 -0700 Subject: [PATCH 09/17] no curry --- storage/influxdb/influxdb_test.go | 8 +++--- storage/test/storagetests.go | 43 +++++++++++++++---------------- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 4d79de5e..e5f5287b 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -25,8 +25,7 @@ import ( ) func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { - // randomly generate a machine name to mimic multi-machine senario. - machineName := "machine-A" + machineName := "machineA" tablename := "t" database := "cadvisor" username := "root" @@ -65,7 +64,7 @@ func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { t.Fatal(err) } // generate another container's data on same machine. - test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100)(driver, t) + test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100, driver, t) // generate another container's data on another machine. driverForAnotherMachine, err := New("machineB", @@ -80,7 +79,7 @@ func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { t.Fatal(err) } defer driverForAnotherMachine.Close() - test.StorageDriverFillRandomStatsFunc("containerOnAnotherMachine", 100)(driverForAnotherMachine, t) + test.StorageDriverFillRandomStatsFunc("containerOnAnotherMachine", 100, driverForAnotherMachine, t) f(driver, t) } @@ -105,6 +104,5 @@ func TestNoRecentStats(t *testing.T) { } func TestNoSamples(t *testing.T) { - runStorageTest(test.StorageDriverFillRandomStatsFunc("otherContainer", 100), t) runStorageTest(test.StorageDriverTestNoSamples, t) } diff --git a/storage/test/storagetests.go b/storage/test/storagetests.go index 9755d0ab..d953f2b9 100644 --- a/storage/test/storagetests.go +++ b/storage/test/storagetests.go @@ -83,36 +83,35 @@ func samplesInTrace(samples []*info.ContainerStatsSample, cpuTrace, memTrace []u } } -// This function returns a function that will generate random stats and write -// them into the storage. The returned function will not close the driver of -// the call, which could be served as a building block to do other works +// This function will generate random stats and write +// them into the storage. The function will not close the driver func StorageDriverFillRandomStatsFunc( containerName string, N int, -) func(driver storage.StorageDriver, t *testing.T) { - return func(driver storage.StorageDriver, t *testing.T) { - cpuTrace := make([]uint64, 0, N) - memTrace := make([]uint64, 0, N) + driver storage.StorageDriver, + t *testing.T, +) { + cpuTrace := make([]uint64, 0, N) + memTrace := make([]uint64, 0, N) - // We need N+1 observations to get N samples - for i := 0; i < N+1; i++ { - cpuTrace = append(cpuTrace, uint64(rand.Intn(1000))) - memTrace = append(memTrace, uint64(rand.Intn(1000))) - } + // We need N+1 observations to get N samples + for i := 0; i < N+1; i++ { + cpuTrace = append(cpuTrace, uint64(rand.Intn(1000))) + memTrace = append(memTrace, uint64(rand.Intn(1000))) + } - samplePeriod := 1 * time.Second + samplePeriod := 1 * time.Second - ref := info.ContainerReference{ - Name: containerName, - } + ref := info.ContainerReference{ + Name: containerName, + } - trace := buildTrace(cpuTrace, memTrace, samplePeriod) + trace := buildTrace(cpuTrace, memTrace, samplePeriod) - for _, stats := range trace { - err := driver.AddStats(ref, stats) - if err != nil { - t.Fatalf("unable to add stats: %v", err) - } + for _, stats := range trace { + err := driver.AddStats(ref, stats) + if err != nil { + t.Fatalf("unable to add stats: %v", err) } } } From fceb2707dd8a988b55afd77a7a319f74611c0a9d Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 16:06:27 -0700 Subject: [PATCH 10/17] percentiles --- info/container_test.go | 2 +- storage/influxdb/influxdb.go | 130 +++++++++++++++++++++++++----- storage/influxdb/influxdb_test.go | 16 ++++ 3 files changed, 127 insertions(+), 21 deletions(-) diff --git a/info/container_test.go b/info/container_test.go index 120a05c4..4b3c84a2 100644 --- a/info/container_test.go +++ b/info/container_test.go @@ -289,7 +289,7 @@ func TestContainerStatsCopy(t *testing.T) { stats.Cpu.Load = shadowStats.Cpu.Load + 1 stats.Memory.Usage = shadowStats.Memory.Usage + 1 if reflect.DeepEqual(stats, shadowStats) { - t.Errorf("Copy() did not deeply copied the object") + t.Errorf("Copy() did not deeply copy the object") } stats = shadowStats.Copy(stats) if !reflect.DeepEqual(stats, shadowStats) { diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index b9c75ab0..eb2cad31 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -33,6 +33,37 @@ type influxdbStorage struct { windowLen time.Duration } +const ( + colTimestamp string = "timestamp" + colMachineName string = "machine" + colContainerName string = "container_name" + colCpuCumulativeUsage string = "cpu_cumulative_usage" + // Cumulative Cpu Usage in system mode + colCpuCumulativeUsageSystem string = "cpu_cumulative_usage_system" + // Cumulative Cpu Usage in user mode + colCpuCumulativeUsageUser string = "cpu_cumulative_usage_user" + // Memory Usage + colMemoryUsage string = "memory_usage" + // Working set size + colMemoryWorkingSet string = "memory_working_set" + // container page fault + colMemoryContainerPgfault string = "memory_container_pgfault" + // container major page fault + colMemoryContainerPgmajfault string = "memory_container_pgmajfault" + // hierarchical page fault + colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault" + // hierarchical major page fault + colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault" + // Cumulative per core usage + colPerCoreCumulativeUsagePrefix string = "per_core_cumulative_usage_core_" + // Optional: sample duration. Unit: Nanosecond. + colSampleDuration string = "sample_duration" + // Optional: Instant cpu usage + colCpuInstantUsage string = "cpu_instant_usage" + // Optional: Instant per core usage + colPerCoreInstantUsagePrefix string = "per_core_instant_usage_core_" +) + func (self *influxdbStorage) containerStatsToValues( ref info.ContainerReference, stats *info.ContainerStats, @@ -46,8 +77,8 @@ func (self *influxdbStorage) containerStatsToValues( columns = append(columns, "machine") values = append(values, self.machineName) - // Container path - columns = append(columns, "container_path") + // Container name + columns = append(columns, "container_name") values = append(values, ref.Name) // Cumulative Cpu Usage @@ -220,33 +251,29 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values [] var err error for i, col := range columns { v := values[i] - switch col { - case "timestamp": + switch { + case col == "timestamp": if str, ok := v.(string); ok { sample.Timestamp, err = time.Parse(time.RFC3339Nano, str) } - case "machine": + case col == "machine": if v.(string) != self.machineName { return nil, fmt.Errorf("different machine") } // Memory Usage - case "memory_usage": + case col == "memory_usage": sample.Memory.Usage, err = convertToUint64(v) // sample duration. Unit: Nanosecond. - case "sample_duration": + case col == "sample_duration": if v == nil { // this record does not have sample_duration, so it's the first stats. return nil, nil } sample.Duration, err = time.ParseDuration(v.(string)) - // Instant cpu usage - case "cpu_instant_usage": + // Instant cpu usage + case col == "cpu_instant_usage": sample.Cpu.Usage, err = convertToUint64(v) - - default: - if !strings.HasPrefix(col, "per_core_instant_usage_core_") { - continue - } + case strings.HasPrefix(col, "per_core_instant_usage_core_"): idxStr := col[len("per_core_instant_usage_core_"):] idx, err := strconv.Atoi(idxStr) if err != nil { @@ -290,7 +317,7 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName) + query := fmt.Sprintf("select * from %v where container_name='%v' and machine='%v'", self.tableName, containerName, self.machineName) if numStats > 0 { query = fmt.Sprintf("%v limit %v", query, numStats) } @@ -317,7 +344,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName) + query := fmt.Sprintf("select * from %v where container_name='%v' and machine='%v'", self.tableName, containerName, self.machineName) if numSamples > 0 { query = fmt.Sprintf("%v limit %v", query, numSamples) } @@ -352,24 +379,87 @@ func (self *influxdbStorage) Percentiles( memUsagePercentiles []int, ) (*info.ContainerStatsPercentiles, error) { // TODO(dengnan): Implement it - return nil, nil + selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1) + + selectedCol = append(selectedCol, "max(memory_usage)") + for _, p := range cpuUsagePercentiles { + selectedCol = append(selectedCol, fmt.Sprintf("percentile(cpu_instant_usage, %v)", p)) + } + for _, p := range memUsagePercentiles { + selectedCol = append(selectedCol, fmt.Sprintf("percentile(memory_usage, %v)", p)) + } + + query := fmt.Sprintf("select %v from %v where container_name='%v' and machine='%v' and time > now() - %v", + strings.Join(selectedCol, ","), + self.tableName, + containerName, + self.machineName, + fmt.Sprintf("%vs", self.windowLen.Seconds()), + ) + series, err := self.client.Query(query) + if err != nil { + return nil, err + } + if len(series) != 1 { + return nil, nil + } + if len(series[0].Points) == 0 { + return nil, nil + } + + point := series[0].Points[0] + + ret := new(info.ContainerStatsPercentiles) + ret.MaxMemoryUsage, err = convertToUint64(point[1]) + if err != nil { + return nil, fmt.Errorf("invalid max memory usage: %v", err) + } + retrievedCpuPercentiles := point[2:] + for i, p := range cpuUsagePercentiles { + v, err := convertToUint64(retrievedCpuPercentiles[i]) + if err != nil { + return nil, fmt.Errorf("invalid cpu usage: %v", err) + } + ret.CpuUsagePercentiles = append( + ret.CpuUsagePercentiles, + info.Percentile{ + Percentage: p, + Value: v, + }, + ) + } + retrievedMemoryPercentiles := point[2+len(cpuUsagePercentiles):] + for i, p := range memUsagePercentiles { + v, err := convertToUint64(retrievedMemoryPercentiles[i]) + if err != nil { + return nil, fmt.Errorf("invalid memory usage: %v", err) + } + ret.MemoryUsagePercentiles = append( + ret.MemoryUsagePercentiles, + info.Percentile{ + Percentage: p, + Value: v, + }, + ) + } + return ret, nil } // machineName: A unique identifier to identify the host that current cAdvisor // instance is running on. -// hostname: The host which runs influxdb. +// influxdbHost: The host which runs influxdb. // percentilesDuration: Time window which will be considered when calls Percentiles() func New(machineName, tablename, database, username, password, - hostname string, + influxdbHost string, isSecure bool, percentilesDuration time.Duration, ) (storage.StorageDriver, error) { config := &influxdb.ClientConfig{ - Host: hostname, + Host: influxdbHost, Username: username, Password: password, Database: database, diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index e5f5287b..515cb732 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -106,3 +106,19 @@ func TestNoRecentStats(t *testing.T) { func TestNoSamples(t *testing.T) { runStorageTest(test.StorageDriverTestNoSamples, t) } + +func TestPercentiles(t *testing.T) { + runStorageTest(test.StorageDriverTestPercentiles, t) +} + +func TestMaxMemoryUsage(t *testing.T) { + runStorageTest(test.StorageDriverTestMaxMemoryUsage, t) +} + +func TestPercentilesWithoutSample(t *testing.T) { + runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t) +} + +func TestPercentilesWithoutStats(t *testing.T) { + runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t) +} From 84fefcdab94334382e355df970e27edbc121d2c7 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 16:16:51 -0700 Subject: [PATCH 11/17] use const --- storage/influxdb/influxdb.go | 91 ++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 46 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index eb2cad31..f665fd1c 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -70,56 +70,56 @@ func (self *influxdbStorage) containerStatsToValues( ) (columns []string, values []interface{}) { // Timestamp - columns = append(columns, "timestamp") + columns = append(columns, colTimestamp) values = append(values, stats.Timestamp.Format(time.RFC3339Nano)) // Machine name - columns = append(columns, "machine") + columns = append(columns, colMachineName) values = append(values, self.machineName) // Container name - columns = append(columns, "container_name") + columns = append(columns, colContainerName) values = append(values, ref.Name) // Cumulative Cpu Usage - columns = append(columns, "cpu_cumulative_usage") + columns = append(columns, colCpuCumulativeUsage) values = append(values, stats.Cpu.Usage.Total) // Cumulative Cpu Usage in system mode - columns = append(columns, "cpu_cumulative_usage_system") + columns = append(columns, colCpuCumulativeUsageSystem) values = append(values, stats.Cpu.Usage.System) // Cumulative Cpu Usage in user mode - columns = append(columns, "cpu_cumulative_usage_user") + columns = append(columns, colCpuCumulativeUsageUser) values = append(values, stats.Cpu.Usage.User) // Memory Usage - columns = append(columns, "memory_usage") + columns = append(columns, colMemoryUsage) values = append(values, stats.Memory.Usage) // Working set size - columns = append(columns, "memory_working_set") + columns = append(columns, colMemoryWorkingSet) values = append(values, stats.Memory.WorkingSet) // container page fault - columns = append(columns, "memory_container_pgfault") + columns = append(columns, colMemoryContainerPgfault) values = append(values, stats.Memory.ContainerData.Pgfault) // container major page fault - columns = append(columns, "memory_container_pgmajfault") + columns = append(columns, colMemoryContainerPgmajfault) values = append(values, stats.Memory.ContainerData.Pgmajfault) // hierarchical page fault - columns = append(columns, "memory_hierarchical_pgfault") + columns = append(columns, colMemoryHierarchicalPgfault) values = append(values, stats.Memory.HierarchicalData.Pgfault) // hierarchical major page fault - columns = append(columns, "memory_hierarchical_pgmajfault") + columns = append(columns, colMemoryHierarchicalPgmajfault) values = append(values, stats.Memory.HierarchicalData.Pgmajfault) // per cpu cumulative usage for i, u := range stats.Cpu.Usage.PerCpu { - columns = append(columns, fmt.Sprintf("per_core_cumulative_usage_core_%v", i)) + columns = append(columns, fmt.Sprintf("%v%v", colPerCoreCumulativeUsagePrefix, i)) values = append(values, u) } @@ -129,16 +129,16 @@ func (self *influxdbStorage) containerStatsToValues( } // Optional: sample duration. Unit: Nanosecond. - columns = append(columns, "sample_duration") + columns = append(columns, colSampleDuration) values = append(values, sample.Duration.String()) // Optional: Instant cpu usage - columns = append(columns, "cpu_instant_usage") + columns = append(columns, colCpuInstantUsage) values = append(values, sample.Cpu.Usage) // Optional: Instant per core usage for i, u := range sample.Cpu.PerCpuUsage { - columns = append(columns, fmt.Sprintf("per_core_instant_usage_core_%v", i)) + columns = append(columns, fmt.Sprintf("%v%v", colPerCoreInstantUsagePrefix, i)) values = append(values, u) } @@ -187,47 +187,44 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i var err error for i, col := range columns { v := values[i] - switch col { - case "timestamp": + switch { + case col == colTimestamp: if str, ok := v.(string); ok { stats.Timestamp, err = time.Parse(time.RFC3339Nano, str) } - case "machine": + case col == colMachineName: if v.(string) != self.machineName { return nil, fmt.Errorf("different machine") } // Cumulative Cpu Usage - case "cpu_cumulative_usage": + case col == colCpuCumulativeUsage: stats.Cpu.Usage.Total, err = convertToUint64(v) // Cumulative Cpu used by the system - case "cpu_cumulative_usage_system": + case col == colCpuCumulativeUsageSystem: stats.Cpu.Usage.System, err = convertToUint64(v) // Cumulative Cpu Usage in user mode - case "cpu_cumulative_usage_user": + case col == colCpuCumulativeUsageUser: stats.Cpu.Usage.User, err = convertToUint64(v) // Memory Usage - case "memory_usage": + case col == colMemoryUsage: stats.Memory.Usage, err = convertToUint64(v) // Working set size - case "memory_working_set": + case col == colMemoryWorkingSet: stats.Memory.WorkingSet, err = convertToUint64(v) // container page fault - case "memory_container_pgfault": + case col == colMemoryContainerPgfault: stats.Memory.ContainerData.Pgfault, err = convertToUint64(v) // container major page fault - case "memory_container_pgmajfault": + case col == colMemoryContainerPgmajfault: stats.Memory.ContainerData.Pgmajfault, err = convertToUint64(v) // hierarchical page fault - case "memory_hierarchical_pgfault": + case col == colMemoryHierarchicalPgfault: stats.Memory.HierarchicalData.Pgfault, err = convertToUint64(v) // hierarchical major page fault - case "memory_hierarchical_pgmajfault": + case col == colMemoryHierarchicalPgmajfault: stats.Memory.HierarchicalData.Pgmajfault, err = convertToUint64(v) - default: - if !strings.HasPrefix(col, "per_core_cumulative_usage_core_") { - continue - } - idxStr := col[len("per_core_cumulative_usage_core_"):] + case strings.HasPrefix(col, colPerCoreCumulativeUsagePrefix): + idxStr := col[len(colPerCoreCumulativeUsagePrefix):] idx, err := strconv.Atoi(idxStr) if err != nil { continue @@ -252,29 +249,29 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values [] for i, col := range columns { v := values[i] switch { - case col == "timestamp": + case col == colTimestamp: if str, ok := v.(string); ok { sample.Timestamp, err = time.Parse(time.RFC3339Nano, str) } - case col == "machine": + case col == colMachineName: if v.(string) != self.machineName { return nil, fmt.Errorf("different machine") } // Memory Usage - case col == "memory_usage": + case col == colMemoryUsage: sample.Memory.Usage, err = convertToUint64(v) // sample duration. Unit: Nanosecond. - case col == "sample_duration": + case col == colSampleDuration: if v == nil { // this record does not have sample_duration, so it's the first stats. return nil, nil } sample.Duration, err = time.ParseDuration(v.(string)) // Instant cpu usage - case col == "cpu_instant_usage": + case col == colCpuInstantUsage: sample.Cpu.Usage, err = convertToUint64(v) - case strings.HasPrefix(col, "per_core_instant_usage_core_"): - idxStr := col[len("per_core_instant_usage_core_"):] + case strings.HasPrefix(col, colPerCoreInstantUsagePrefix): + idxStr := col[len(colPerCoreInstantUsagePrefix):] idx, err := strconv.Atoi(idxStr) if err != nil { continue @@ -317,7 +314,7 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v where container_name='%v' and machine='%v'", self.tableName, containerName, self.machineName) + query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName) if numStats > 0 { query = fmt.Sprintf("%v limit %v", query, numStats) } @@ -344,7 +341,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { // TODO(dengnan): select only columns that we need // TODO(dengnan): escape containerName - query := fmt.Sprintf("select * from %v where container_name='%v' and machine='%v'", self.tableName, containerName, self.machineName) + query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName) if numSamples > 0 { query = fmt.Sprintf("%v limit %v", query, numSamples) } @@ -381,18 +378,20 @@ func (self *influxdbStorage) Percentiles( // TODO(dengnan): Implement it selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1) - selectedCol = append(selectedCol, "max(memory_usage)") + selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage)) for _, p := range cpuUsagePercentiles { - selectedCol = append(selectedCol, fmt.Sprintf("percentile(cpu_instant_usage, %v)", p)) + selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colCpuInstantUsage, p)) } for _, p := range memUsagePercentiles { - selectedCol = append(selectedCol, fmt.Sprintf("percentile(memory_usage, %v)", p)) + selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colMemoryUsage, p)) } - query := fmt.Sprintf("select %v from %v where container_name='%v' and machine='%v' and time > now() - %v", + query := fmt.Sprintf("select %v from %v where %v='%v' and %v='%v' and time > now() - %v", strings.Join(selectedCol, ","), self.tableName, + colContainerName, containerName, + colMachineName, self.machineName, fmt.Sprintf("%vs", self.windowLen.Seconds()), ) From 4115f733435c1f6d4a10cf71d0d058d6bc7858c6 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 16:20:05 -0700 Subject: [PATCH 12/17] type check --- storage/influxdb/influxdb.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index f665fd1c..1d42ef0d 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -193,8 +193,12 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i stats.Timestamp, err = time.Parse(time.RFC3339Nano, str) } case col == colMachineName: - if v.(string) != self.machineName { - return nil, fmt.Errorf("different machine") + if m, ok := v.(string); ok { + if m != self.machineName { + return nil, fmt.Errorf("different machine") + } + } else { + return nil, fmt.Errorf("machine name field is not a string: %v", v) } // Cumulative Cpu Usage case col == colCpuCumulativeUsage: @@ -254,8 +258,12 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values [] sample.Timestamp, err = time.Parse(time.RFC3339Nano, str) } case col == colMachineName: - if v.(string) != self.machineName { - return nil, fmt.Errorf("different machine") + if m, ok := v.(string); ok { + if m != self.machineName { + return nil, fmt.Errorf("different machine") + } + } else { + return nil, fmt.Errorf("machine name field is not a string: %v", v) } // Memory Usage case col == colMemoryUsage: From 05b997d6eced7cb387a6d04bb9102f5450262593 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 16:23:31 -0700 Subject: [PATCH 13/17] comment --- storage/influxdb/influxdb.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index 1d42ef0d..cb25a20e 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -321,7 +321,7 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { // TODO(dengnan): select only columns that we need - // TODO(dengnan): escape containerName + // TODO(dengnan): escape names query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName) if numStats > 0 { query = fmt.Sprintf("%v limit %v", query, numStats) @@ -348,7 +348,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { // TODO(dengnan): select only columns that we need - // TODO(dengnan): escape containerName + // TODO(dengnan): escape names query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName) if numSamples > 0 { query = fmt.Sprintf("%v limit %v", query, numSamples) @@ -383,7 +383,6 @@ func (self *influxdbStorage) Percentiles( cpuUsagePercentiles []int, memUsagePercentiles []int, ) (*info.ContainerStatsPercentiles, error) { - // TODO(dengnan): Implement it selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1) selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage)) From e2c6798c92aae719bc2bc9b79dd8658452afbd9e Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Wed, 2 Jul 2014 16:31:19 -0700 Subject: [PATCH 14/17] grammar --- info/container.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/info/container.go b/info/container.go index c66346bc..6e987916 100644 --- a/info/container.go +++ b/info/container.go @@ -165,7 +165,7 @@ type ContainerStats struct { Memory *MemoryStats `json:"memory,omitempty"` } -// Makes a deep copy of the ContainerStats and returns the pointer to the new +// Makes a deep copy of the ContainerStats and returns a pointer to the new // copy. Copy() will allocate a new ContainerStats object if dst is nil. func (self *ContainerStats) Copy(dst *ContainerStats) *ContainerStats { if dst == nil { From 02d6549f3e902b3b02e9c7b796fe29717d1d3e31 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Thu, 3 Jul 2014 20:21:47 +0000 Subject: [PATCH 15/17] travis: support influxdb --- .travis.yml | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8542bd19..0f85034d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,17 @@ language: go go: - - 1.2 + - 1.3 before_script: - go get github.com/stretchr/testify/mock - go get github.com/kr/pretty + - wget http://s3.amazonaws.com/influxdb/influxdb_latest_amd64.deb + - sudo dpkg -i influxdb_latest_amd64.deb + - sudo service influxdb start script: - go test -v -race github.com/google/cadvisor/container - - go test -v github.com/google/cadvisor/info - - go test -v github.com/google/cadvisor/client - - go test -v github.com/google/cadvisor/sampling - - go test -v github.com/google/cadvisor/storage/memory + - go test -v -race github.com/google/cadvisor/info + - go test -v -race github.com/google/cadvisor/client + - go test -v -race github.com/google/cadvisor/sampling + - go test -v -race github.com/google/cadvisor/storage/memory + - go test -v -race github.com/google/cadvisor/storage/influxdb - go build github.com/google/cadvisor From a1cb5bf4efb0610f7e476cd204dad888a4f631da Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Thu, 3 Jul 2014 20:55:56 +0000 Subject: [PATCH 16/17] allow the storage to store some inaccurate time --- storage/influxdb/influxdb_test.go | 12 ++++++ storage/test/storagetests.go | 65 ++++++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index 515cb732..fb8a8b42 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -32,6 +32,18 @@ func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { password := "root" hostname := "localhost:8086" percentilesDuration := 10 * time.Minute + rootConfig := &influxdb.ClientConfig{ + Host: hostname, + Username: username, + Password: password, + IsSecure: false, + } + rootClient, err := influxdb.NewClient(rootConfig) + if err != nil { + t.Fatal(err) + } + // create the data base first. + rootClient.CreateDatabase(database) config := &influxdb.ClientConfig{ Host: hostname, Username: username, diff --git a/storage/test/storagetests.go b/storage/test/storagetests.go index d953f2b9..c18ceede 100644 --- a/storage/test/storagetests.go +++ b/storage/test/storagetests.go @@ -53,6 +53,67 @@ func buildTrace(cpu, mem []uint64, duration time.Duration) []*info.ContainerStat return ret } +func timeEq(t1, t2 time.Time, tolerance time.Duration) bool { + // t1 should not be later than t2 + if t1.After(t2) { + t1, t2 = t2, t1 + } + diff := t2.Sub(t1) + if diff < tolerance { + return true + } + return false +} + +func durationEq(a, b time.Duration, tolerance time.Duration) bool { + if a > b { + a, b = b, a + } + diff := a - b + if diff < tolerance { + return true + } + return false +} + +const ( + // 10ms, i.e. 0.01s + timePrecision time.Duration = 10000000 +) + +// This function is useful because we do not require precise time +// representation. +func statsEq(a, b *info.ContainerStats) bool { + if !timeEq(a.Timestamp, b.Timestamp, timePrecision) { + return false + } + if !reflect.DeepEqual(a.Cpu, b.Cpu) { + return false + } + if !reflect.DeepEqual(a.Memory, b.Memory) { + return false + } + return true +} + +// This function is useful because we do not require precise time +// representation. +func sampleEq(a, b *info.ContainerStatsSample) bool { + if !timeEq(a.Timestamp, b.Timestamp, timePrecision) { + return false + } + if !durationEq(a.Duration, b.Duration, timePrecision) { + return false + } + if !reflect.DeepEqual(a.Cpu, b.Cpu) { + return false + } + if !reflect.DeepEqual(a.Memory, b.Memory) { + return false + } + return true +} + func samplesInTrace(samples []*info.ContainerStatsSample, cpuTrace, memTrace []uint64, samplePeriod time.Duration, t *testing.T) { for _, sample := range samples { if sample.Duration != samplePeriod { @@ -334,7 +395,7 @@ func StorageDriverTestRetrievePartialRecentStats(driver storage.StorageDriver, t for _, r := range recentStats { found := false for _, s := range actualRecentStats { - if reflect.DeepEqual(s, r) { + if statsEq(s, r) { found = true } } @@ -380,7 +441,7 @@ func StorageDriverTestRetrieveAllRecentStats(driver storage.StorageDriver, t *te for _, r := range recentStats { found := false for _, s := range actualRecentStats { - if reflect.DeepEqual(s, r) { + if statsEq(s, r) { found = true } } From a96e4e12e244f0c2ca7ff6fe743a1b038b65134b Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 7 Jul 2014 16:10:09 -0700 Subject: [PATCH 17/17] style --- storage/influxdb/influxdb.go | 2 +- storage/influxdb/influxdb_test.go | 2 +- storage/test/storagetests.go | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index cb25a20e..1fcb170f 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -420,7 +420,7 @@ func (self *influxdbStorage) Percentiles( if err != nil { return nil, fmt.Errorf("invalid max memory usage: %v", err) } - retrievedCpuPercentiles := point[2:] + retrievedCpuPercentiles := point[2 : 2+len(cpuUsagePercentiles)] for i, p := range cpuUsagePercentiles { v, err := convertToUint64(retrievedCpuPercentiles[i]) if err != nil { diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go index fb8a8b42..c5770d35 100644 --- a/storage/influxdb/influxdb_test.go +++ b/storage/influxdb/influxdb_test.go @@ -62,7 +62,7 @@ func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { t.Fatal(err) } // delete all data by the end of the call - // defer client.Query(deleteAll) + defer client.Query(deleteAll) driver, err := New(machineName, tablename, diff --git a/storage/test/storagetests.go b/storage/test/storagetests.go index c18ceede..99912032 100644 --- a/storage/test/storagetests.go +++ b/storage/test/storagetests.go @@ -59,7 +59,7 @@ func timeEq(t1, t2 time.Time, tolerance time.Duration) bool { t1, t2 = t2, t1 } diff := t2.Sub(t1) - if diff < tolerance { + if diff <= tolerance { return true } return false @@ -70,7 +70,7 @@ func durationEq(a, b time.Duration, tolerance time.Duration) bool { a, b = b, a } diff := a - b - if diff < tolerance { + if diff <= tolerance { return true } return false @@ -78,7 +78,7 @@ func durationEq(a, b time.Duration, tolerance time.Duration) bool { const ( // 10ms, i.e. 0.01s - timePrecision time.Duration = 10000000 + timePrecision time.Duration = 10 * time.Millisecond ) // This function is useful because we do not require precise time