diff --git a/.travis.yml b/.travis.yml index 8542bd19..0f85034d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,17 @@ language: go go: - - 1.2 + - 1.3 before_script: - go get github.com/stretchr/testify/mock - go get github.com/kr/pretty + - wget http://s3.amazonaws.com/influxdb/influxdb_latest_amd64.deb + - sudo dpkg -i influxdb_latest_amd64.deb + - sudo service influxdb start script: - go test -v -race github.com/google/cadvisor/container - - go test -v github.com/google/cadvisor/info - - go test -v github.com/google/cadvisor/client - - go test -v github.com/google/cadvisor/sampling - - go test -v github.com/google/cadvisor/storage/memory + - go test -v -race github.com/google/cadvisor/info + - go test -v -race github.com/google/cadvisor/client + - go test -v -race github.com/google/cadvisor/sampling + - go test -v -race github.com/google/cadvisor/storage/memory + - go test -v -race github.com/google/cadvisor/storage/influxdb - go build github.com/google/cadvisor diff --git a/info/container.go b/info/container.go index 3e2447d9..6e987916 100644 --- a/info/container.go +++ b/info/container.go @@ -165,6 +165,42 @@ type ContainerStats struct { Memory *MemoryStats `json:"memory,omitempty"` } +// Makes a deep copy of the ContainerStats and returns a pointer to the new +// copy. Copy() will allocate a new ContainerStats object if dst is nil. +func (self *ContainerStats) Copy(dst *ContainerStats) *ContainerStats { + if dst == nil { + dst = new(ContainerStats) + } + dst.Timestamp = self.Timestamp + if self.Cpu != nil { + if dst.Cpu == nil { + dst.Cpu = new(CpuStats) + } + // To make a deep copy of a slice, we need to copy every value + // in the slice. To make less memory allocation, we would like + // to reuse the slice in dst if possible. + percpu := dst.Cpu.Usage.PerCpu + if len(percpu) != len(self.Cpu.Usage.PerCpu) { + percpu = make([]uint64, len(self.Cpu.Usage.PerCpu)) + } + dst.Cpu.Usage = self.Cpu.Usage + dst.Cpu.Load = self.Cpu.Load + copy(percpu, self.Cpu.Usage.PerCpu) + dst.Cpu.Usage.PerCpu = percpu + } else { + dst.Cpu = nil + } + if self.Memory != nil { + if dst.Memory == nil { + dst.Memory = new(MemoryStats) + } + *dst.Memory = *self.Memory + } else { + dst.Memory = nil + } + return dst +} + type ContainerStatsSample struct { // Timetamp of the end of the sample period Timestamp time.Time `json:"timestamp"` diff --git a/info/container_test.go b/info/container_test.go index 071ed914..4b3c84a2 100644 --- a/info/container_test.go +++ b/info/container_test.go @@ -15,6 +15,7 @@ package info import ( + "reflect" "testing" "time" ) @@ -277,3 +278,21 @@ func TestAddSampleHotUnpluggingCpu(t *testing.T) { t.Errorf("First cpu usage is %v. should be %v", sample.Cpu.PerCpuUsage[0], cpuCurrentUsage-cpuPrevUsage) } } + +func TestContainerStatsCopy(t *testing.T) { + stats := createStats(100, 101, time.Now()) + shadowStats := stats.Copy(nil) + if !reflect.DeepEqual(stats, shadowStats) { + t.Errorf("Copy() returned different object") + } + stats.Cpu.Usage.PerCpu[0] = shadowStats.Cpu.Usage.PerCpu[0] + 1 + stats.Cpu.Load = shadowStats.Cpu.Load + 1 + stats.Memory.Usage = shadowStats.Memory.Usage + 1 + if reflect.DeepEqual(stats, shadowStats) { + t.Errorf("Copy() did not deeply copy the object") + } + stats = shadowStats.Copy(stats) + if !reflect.DeepEqual(stats, shadowStats) { + t.Errorf("Copy() returned different object") + } +} diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go new file mode 100644 index 00000000..1fcb170f --- /dev/null +++ b/storage/influxdb/influxdb.go @@ -0,0 +1,491 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxdb + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/google/cadvisor/info" + "github.com/google/cadvisor/storage" + "github.com/influxdb/influxdb-go" +) + +type influxdbStorage struct { + client *influxdb.Client + prevStats *info.ContainerStats + machineName string + tableName string + windowLen time.Duration +} + +const ( + colTimestamp string = "timestamp" + colMachineName string = "machine" + colContainerName string = "container_name" + colCpuCumulativeUsage string = "cpu_cumulative_usage" + // Cumulative Cpu Usage in system mode + colCpuCumulativeUsageSystem string = "cpu_cumulative_usage_system" + // Cumulative Cpu Usage in user mode + colCpuCumulativeUsageUser string = "cpu_cumulative_usage_user" + // Memory Usage + colMemoryUsage string = "memory_usage" + // Working set size + colMemoryWorkingSet string = "memory_working_set" + // container page fault + colMemoryContainerPgfault string = "memory_container_pgfault" + // container major page fault + colMemoryContainerPgmajfault string = "memory_container_pgmajfault" + // hierarchical page fault + colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault" + // hierarchical major page fault + colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault" + // Cumulative per core usage + colPerCoreCumulativeUsagePrefix string = "per_core_cumulative_usage_core_" + // Optional: sample duration. Unit: Nanosecond. + colSampleDuration string = "sample_duration" + // Optional: Instant cpu usage + colCpuInstantUsage string = "cpu_instant_usage" + // Optional: Instant per core usage + colPerCoreInstantUsagePrefix string = "per_core_instant_usage_core_" +) + +func (self *influxdbStorage) containerStatsToValues( + ref info.ContainerReference, + stats *info.ContainerStats, +) (columns []string, values []interface{}) { + + // Timestamp + columns = append(columns, colTimestamp) + values = append(values, stats.Timestamp.Format(time.RFC3339Nano)) + + // Machine name + columns = append(columns, colMachineName) + values = append(values, self.machineName) + + // Container name + columns = append(columns, colContainerName) + values = append(values, ref.Name) + + // Cumulative Cpu Usage + columns = append(columns, colCpuCumulativeUsage) + values = append(values, stats.Cpu.Usage.Total) + + // Cumulative Cpu Usage in system mode + columns = append(columns, colCpuCumulativeUsageSystem) + values = append(values, stats.Cpu.Usage.System) + + // Cumulative Cpu Usage in user mode + columns = append(columns, colCpuCumulativeUsageUser) + values = append(values, stats.Cpu.Usage.User) + + // Memory Usage + columns = append(columns, colMemoryUsage) + values = append(values, stats.Memory.Usage) + + // Working set size + columns = append(columns, colMemoryWorkingSet) + values = append(values, stats.Memory.WorkingSet) + + // container page fault + columns = append(columns, colMemoryContainerPgfault) + values = append(values, stats.Memory.ContainerData.Pgfault) + + // container major page fault + columns = append(columns, colMemoryContainerPgmajfault) + values = append(values, stats.Memory.ContainerData.Pgmajfault) + + // hierarchical page fault + columns = append(columns, colMemoryHierarchicalPgfault) + values = append(values, stats.Memory.HierarchicalData.Pgfault) + + // hierarchical major page fault + columns = append(columns, colMemoryHierarchicalPgmajfault) + values = append(values, stats.Memory.HierarchicalData.Pgmajfault) + + // per cpu cumulative usage + for i, u := range stats.Cpu.Usage.PerCpu { + columns = append(columns, fmt.Sprintf("%v%v", colPerCoreCumulativeUsagePrefix, i)) + values = append(values, u) + } + + sample, err := info.NewSample(self.prevStats, stats) + if err != nil || sample == nil { + return columns, values + } + + // Optional: sample duration. Unit: Nanosecond. + columns = append(columns, colSampleDuration) + values = append(values, sample.Duration.String()) + + // Optional: Instant cpu usage + columns = append(columns, colCpuInstantUsage) + values = append(values, sample.Cpu.Usage) + + // Optional: Instant per core usage + for i, u := range sample.Cpu.PerCpuUsage { + columns = append(columns, fmt.Sprintf("%v%v", colPerCoreInstantUsagePrefix, i)) + values = append(values, u) + } + + return columns, values +} + +func convertToUint64(v interface{}) (uint64, error) { + if v == nil { + return 0, nil + } + switch x := v.(type) { + case uint64: + return x, nil + case int: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case int32: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case int64: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case float64: + if x < 0 { + return 0, fmt.Errorf("negative value: %v", x) + } + return uint64(x), nil + case uint32: + return uint64(x), nil + } + return 0, fmt.Errorf("Unknown type") +} + +func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) { + stats := &info.ContainerStats{ + Cpu: &info.CpuStats{}, + Memory: &info.MemoryStats{}, + } + perCoreUsage := make(map[int]uint64, 32) + var err error + for i, col := range columns { + v := values[i] + switch { + case col == colTimestamp: + if str, ok := v.(string); ok { + stats.Timestamp, err = time.Parse(time.RFC3339Nano, str) + } + case col == colMachineName: + if m, ok := v.(string); ok { + if m != self.machineName { + return nil, fmt.Errorf("different machine") + } + } else { + return nil, fmt.Errorf("machine name field is not a string: %v", v) + } + // Cumulative Cpu Usage + case col == colCpuCumulativeUsage: + stats.Cpu.Usage.Total, err = convertToUint64(v) + // Cumulative Cpu used by the system + case col == colCpuCumulativeUsageSystem: + stats.Cpu.Usage.System, err = convertToUint64(v) + // Cumulative Cpu Usage in user mode + case col == colCpuCumulativeUsageUser: + stats.Cpu.Usage.User, err = convertToUint64(v) + // Memory Usage + case col == colMemoryUsage: + stats.Memory.Usage, err = convertToUint64(v) + // Working set size + case col == colMemoryWorkingSet: + stats.Memory.WorkingSet, err = convertToUint64(v) + // container page fault + case col == colMemoryContainerPgfault: + stats.Memory.ContainerData.Pgfault, err = convertToUint64(v) + // container major page fault + case col == colMemoryContainerPgmajfault: + stats.Memory.ContainerData.Pgmajfault, err = convertToUint64(v) + // hierarchical page fault + case col == colMemoryHierarchicalPgfault: + stats.Memory.HierarchicalData.Pgfault, err = convertToUint64(v) + // hierarchical major page fault + case col == colMemoryHierarchicalPgmajfault: + stats.Memory.HierarchicalData.Pgmajfault, err = convertToUint64(v) + case strings.HasPrefix(col, colPerCoreCumulativeUsagePrefix): + idxStr := col[len(colPerCoreCumulativeUsagePrefix):] + idx, err := strconv.Atoi(idxStr) + if err != nil { + continue + } + perCoreUsage[idx], err = convertToUint64(v) + } + if err != nil { + return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) + } + } + stats.Cpu.Usage.PerCpu = make([]uint64, len(perCoreUsage)) + for idx, usage := range perCoreUsage { + stats.Cpu.Usage.PerCpu[idx] = usage + } + return stats, nil +} + +func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) { + sample := &info.ContainerStatsSample{} + perCoreUsage := make(map[int]uint64, 32) + var err error + for i, col := range columns { + v := values[i] + switch { + case col == colTimestamp: + if str, ok := v.(string); ok { + sample.Timestamp, err = time.Parse(time.RFC3339Nano, str) + } + case col == colMachineName: + if m, ok := v.(string); ok { + if m != self.machineName { + return nil, fmt.Errorf("different machine") + } + } else { + return nil, fmt.Errorf("machine name field is not a string: %v", v) + } + // Memory Usage + case col == colMemoryUsage: + sample.Memory.Usage, err = convertToUint64(v) + // sample duration. Unit: Nanosecond. + case col == colSampleDuration: + if v == nil { + // this record does not have sample_duration, so it's the first stats. + return nil, nil + } + sample.Duration, err = time.ParseDuration(v.(string)) + // Instant cpu usage + case col == colCpuInstantUsage: + sample.Cpu.Usage, err = convertToUint64(v) + case strings.HasPrefix(col, colPerCoreInstantUsagePrefix): + idxStr := col[len(colPerCoreInstantUsagePrefix):] + idx, err := strconv.Atoi(idxStr) + if err != nil { + continue + } + perCoreUsage[idx], err = convertToUint64(v) + } + if err != nil { + return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) + } + } + sample.Cpu.PerCpuUsage = make([]uint64, len(perCoreUsage)) + for idx, usage := range perCoreUsage { + sample.Cpu.PerCpuUsage[idx] = usage + } + if sample.Duration.Nanoseconds() == 0 { + return nil, nil + } + return sample, nil +} + +func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { + series := &influxdb.Series{ + Name: self.tableName, + // There's only one point for each stats + Points: make([][]interface{}, 1), + } + if stats == nil || stats.Cpu == nil || stats.Memory == nil { + return nil + } + series.Columns, series.Points[0] = self.containerStatsToValues(ref, stats) + + self.prevStats = stats.Copy(self.prevStats) + err := self.client.WriteSeries([]*influxdb.Series{series}) + if err != nil { + return err + } + return nil +} + +func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { + // TODO(dengnan): select only columns that we need + // TODO(dengnan): escape names + query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName) + if numStats > 0 { + query = fmt.Sprintf("%v limit %v", query, numStats) + } + series, err := self.client.Query(query) + if err != nil { + return nil, err + } + statsList := make([]*info.ContainerStats, 0, len(series)) + for _, s := range series { + for _, values := range s.Points { + stats, err := self.valuesToContainerStats(s.Columns, values) + if err != nil { + return nil, err + } + if stats == nil { + continue + } + statsList = append(statsList, stats) + } + } + return statsList, nil +} + +func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { + // TODO(dengnan): select only columns that we need + // TODO(dengnan): escape names + query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName) + if numSamples > 0 { + query = fmt.Sprintf("%v limit %v", query, numSamples) + } + series, err := self.client.Query(query) + if err != nil { + return nil, err + } + sampleList := make([]*info.ContainerStatsSample, 0, len(series)) + for _, s := range series { + for _, values := range s.Points { + sample, err := self.valuesToContainerSample(s.Columns, values) + if err != nil { + return nil, err + } + if sample == nil { + continue + } + sampleList = append(sampleList, sample) + } + } + return sampleList, nil +} + +func (self *influxdbStorage) Close() error { + self.client = nil + return nil +} + +func (self *influxdbStorage) Percentiles( + containerName string, + cpuUsagePercentiles []int, + memUsagePercentiles []int, +) (*info.ContainerStatsPercentiles, error) { + selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1) + + selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage)) + for _, p := range cpuUsagePercentiles { + selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colCpuInstantUsage, p)) + } + for _, p := range memUsagePercentiles { + selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colMemoryUsage, p)) + } + + query := fmt.Sprintf("select %v from %v where %v='%v' and %v='%v' and time > now() - %v", + strings.Join(selectedCol, ","), + self.tableName, + colContainerName, + containerName, + colMachineName, + self.machineName, + fmt.Sprintf("%vs", self.windowLen.Seconds()), + ) + series, err := self.client.Query(query) + if err != nil { + return nil, err + } + if len(series) != 1 { + return nil, nil + } + if len(series[0].Points) == 0 { + return nil, nil + } + + point := series[0].Points[0] + + ret := new(info.ContainerStatsPercentiles) + ret.MaxMemoryUsage, err = convertToUint64(point[1]) + if err != nil { + return nil, fmt.Errorf("invalid max memory usage: %v", err) + } + retrievedCpuPercentiles := point[2 : 2+len(cpuUsagePercentiles)] + for i, p := range cpuUsagePercentiles { + v, err := convertToUint64(retrievedCpuPercentiles[i]) + if err != nil { + return nil, fmt.Errorf("invalid cpu usage: %v", err) + } + ret.CpuUsagePercentiles = append( + ret.CpuUsagePercentiles, + info.Percentile{ + Percentage: p, + Value: v, + }, + ) + } + retrievedMemoryPercentiles := point[2+len(cpuUsagePercentiles):] + for i, p := range memUsagePercentiles { + v, err := convertToUint64(retrievedMemoryPercentiles[i]) + if err != nil { + return nil, fmt.Errorf("invalid memory usage: %v", err) + } + ret.MemoryUsagePercentiles = append( + ret.MemoryUsagePercentiles, + info.Percentile{ + Percentage: p, + Value: v, + }, + ) + } + return ret, nil +} + +// machineName: A unique identifier to identify the host that current cAdvisor +// instance is running on. +// influxdbHost: The host which runs influxdb. +// percentilesDuration: Time window which will be considered when calls Percentiles() +func New(machineName, + tablename, + database, + username, + password, + influxdbHost string, + isSecure bool, + percentilesDuration time.Duration, +) (storage.StorageDriver, error) { + config := &influxdb.ClientConfig{ + Host: influxdbHost, + Username: username, + Password: password, + Database: database, + IsSecure: isSecure, + } + client, err := influxdb.NewClient(config) + if err != nil { + return nil, err + } + // TODO(monnand): With go 1.3, we cannot compress data now. + client.DisableCompression() + if percentilesDuration.Seconds() < 1.0 { + percentilesDuration = 5 * time.Minute + } + + ret := &influxdbStorage{ + client: client, + windowLen: percentilesDuration, + machineName: machineName, + tableName: tablename, + } + return ret, nil +} diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go new file mode 100644 index 00000000..c5770d35 --- /dev/null +++ b/storage/influxdb/influxdb_test.go @@ -0,0 +1,136 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxdb + +import ( + "fmt" + "testing" + "time" + + "github.com/google/cadvisor/storage" + "github.com/google/cadvisor/storage/test" + "github.com/influxdb/influxdb-go" +) + +func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { + machineName := "machineA" + tablename := "t" + database := "cadvisor" + username := "root" + password := "root" + hostname := "localhost:8086" + percentilesDuration := 10 * time.Minute + rootConfig := &influxdb.ClientConfig{ + Host: hostname, + Username: username, + Password: password, + IsSecure: false, + } + rootClient, err := influxdb.NewClient(rootConfig) + if err != nil { + t.Fatal(err) + } + // create the data base first. + rootClient.CreateDatabase(database) + config := &influxdb.ClientConfig{ + Host: hostname, + Username: username, + Password: password, + Database: database, + IsSecure: false, + } + client, err := influxdb.NewClient(config) + if err != nil { + t.Fatal(err) + } + client.DisableCompression() + deleteAll := fmt.Sprintf("drop series %v", tablename) + _, err = client.Query(deleteAll) + if err != nil { + t.Fatal(err) + } + // delete all data by the end of the call + defer client.Query(deleteAll) + + driver, err := New(machineName, + tablename, + database, + username, + password, + hostname, + false, + percentilesDuration) + if err != nil { + t.Fatal(err) + } + // generate another container's data on same machine. + test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100, driver, t) + + // generate another container's data on another machine. + driverForAnotherMachine, err := New("machineB", + tablename, + database, + username, + password, + hostname, + false, + percentilesDuration) + if err != nil { + t.Fatal(err) + } + defer driverForAnotherMachine.Close() + test.StorageDriverFillRandomStatsFunc("containerOnAnotherMachine", 100, driverForAnotherMachine, t) + f(driver, t) +} + +func TestSampleCpuUsage(t *testing.T) { + runStorageTest(test.StorageDriverTestSampleCpuUsage, t) +} + +func TestRetrievePartialRecentStats(t *testing.T) { + runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t) +} + +func TestSamplesWithoutSample(t *testing.T) { + runStorageTest(test.StorageDriverTestSamplesWithoutSample, t) +} + +func TestRetrieveAllRecentStats(t *testing.T) { + runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t) +} + +func TestNoRecentStats(t *testing.T) { + runStorageTest(test.StorageDriverTestNoRecentStats, t) +} + +func TestNoSamples(t *testing.T) { + runStorageTest(test.StorageDriverTestNoSamples, t) +} + +func TestPercentiles(t *testing.T) { + runStorageTest(test.StorageDriverTestPercentiles, t) +} + +func TestMaxMemoryUsage(t *testing.T) { + runStorageTest(test.StorageDriverTestMaxMemoryUsage, t) +} + +func TestPercentilesWithoutSample(t *testing.T) { + runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t) +} + +func TestPercentilesWithoutStats(t *testing.T) { + runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t) +} diff --git a/storage/memory/memory.go b/storage/memory/memory.go index 63c6d552..d30a3df6 100644 --- a/storage/memory/memory.go +++ b/storage/memory/memory.go @@ -41,27 +41,7 @@ func (self *containerStorage) updatePrevStats(stats *info.ContainerStats) { self.prevStats = nil return } - if self.prevStats == nil { - self.prevStats = &info.ContainerStats{ - Cpu: &info.CpuStats{}, - Memory: &info.MemoryStats{}, - } - } - // make a deep copy. - self.prevStats.Timestamp = stats.Timestamp - // copy the slice first. - percpuSlice := self.prevStats.Cpu.Usage.PerCpu - *self.prevStats.Cpu = *stats.Cpu - // If the old slice is enough to hold the new data, then don't allocate - // a new slice. - if len(percpuSlice) != len(stats.Cpu.Usage.PerCpu) { - percpuSlice = make([]uint64, len(stats.Cpu.Usage.PerCpu)) - } - for i, perCpu := range stats.Cpu.Usage.PerCpu { - percpuSlice[i] = perCpu - } - self.prevStats.Cpu.Usage.PerCpu = percpuSlice - *self.prevStats.Memory = *stats.Memory + self.prevStats = stats.Copy(self.prevStats) } func (self *containerStorage) AddStats(stats *info.ContainerStats) error { diff --git a/storage/storage.go b/storage/storage.go index 51f0b9c0..8b264a93 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -31,7 +31,7 @@ type StorageDriver interface { // Returns samples of the container stats. If numSamples < 0, then // the number of returned samples is implementation defined. Otherwise, the driver // should return at most numSamples samples. - Samples(containername string, numSamples int) ([]*info.ContainerStatsSample, error) + Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) // Close will clear the state of the storage driver. The elements // stored in the underlying storage may or may not be deleted depending diff --git a/storage/test/storagetests.go b/storage/test/storagetests.go index 9c4ec0ae..99912032 100644 --- a/storage/test/storagetests.go +++ b/storage/test/storagetests.go @@ -53,6 +53,67 @@ func buildTrace(cpu, mem []uint64, duration time.Duration) []*info.ContainerStat return ret } +func timeEq(t1, t2 time.Time, tolerance time.Duration) bool { + // t1 should not be later than t2 + if t1.After(t2) { + t1, t2 = t2, t1 + } + diff := t2.Sub(t1) + if diff <= tolerance { + return true + } + return false +} + +func durationEq(a, b time.Duration, tolerance time.Duration) bool { + if a > b { + a, b = b, a + } + diff := a - b + if diff <= tolerance { + return true + } + return false +} + +const ( + // 10ms, i.e. 0.01s + timePrecision time.Duration = 10 * time.Millisecond +) + +// This function is useful because we do not require precise time +// representation. +func statsEq(a, b *info.ContainerStats) bool { + if !timeEq(a.Timestamp, b.Timestamp, timePrecision) { + return false + } + if !reflect.DeepEqual(a.Cpu, b.Cpu) { + return false + } + if !reflect.DeepEqual(a.Memory, b.Memory) { + return false + } + return true +} + +// This function is useful because we do not require precise time +// representation. +func sampleEq(a, b *info.ContainerStatsSample) bool { + if !timeEq(a.Timestamp, b.Timestamp, timePrecision) { + return false + } + if !durationEq(a.Duration, b.Duration, timePrecision) { + return false + } + if !reflect.DeepEqual(a.Cpu, b.Cpu) { + return false + } + if !reflect.DeepEqual(a.Memory, b.Memory) { + return false + } + return true +} + func samplesInTrace(samples []*info.ContainerStatsSample, cpuTrace, memTrace []uint64, samplePeriod time.Duration, t *testing.T) { for _, sample := range samples { if sample.Duration != samplePeriod { @@ -83,6 +144,39 @@ func samplesInTrace(samples []*info.ContainerStatsSample, cpuTrace, memTrace []u } } +// This function will generate random stats and write +// them into the storage. The function will not close the driver +func StorageDriverFillRandomStatsFunc( + containerName string, + N int, + driver storage.StorageDriver, + t *testing.T, +) { + cpuTrace := make([]uint64, 0, N) + memTrace := make([]uint64, 0, N) + + // We need N+1 observations to get N samples + for i := 0; i < N+1; i++ { + cpuTrace = append(cpuTrace, uint64(rand.Intn(1000))) + memTrace = append(memTrace, uint64(rand.Intn(1000))) + } + + samplePeriod := 1 * time.Second + + ref := info.ContainerReference{ + Name: containerName, + } + + trace := buildTrace(cpuTrace, memTrace, samplePeriod) + + for _, stats := range trace { + err := driver.AddStats(ref, stats) + if err != nil { + t.Fatalf("unable to add stats: %v", err) + } + } +} + func StorageDriverTestSampleCpuUsage(driver storage.StorageDriver, t *testing.T) { defer driver.Close() N := 100 @@ -119,6 +213,9 @@ func StorageDriverTestSampleCpuUsage(driver storage.StorageDriver, t *testing.T) if err != nil { t.Errorf("unable to sample stats: %v", err) } + if len(samples) == 0 { + t.Fatal("should at least store one sample") + } samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t) samples, err = driver.Samples(ref.Name, -1) @@ -285,6 +382,9 @@ func StorageDriverTestRetrievePartialRecentStats(driver storage.StorageDriver, t if err != nil { t.Fatal(err) } + if len(recentStats) == 0 { + t.Fatal("should at least store one stats") + } if len(recentStats) > 10 { t.Fatalf("returned %v stats, not 10.", len(recentStats)) @@ -295,7 +395,7 @@ func StorageDriverTestRetrievePartialRecentStats(driver storage.StorageDriver, t for _, r := range recentStats { found := false for _, s := range actualRecentStats { - if reflect.DeepEqual(s, r) { + if statsEq(s, r) { found = true } } @@ -329,6 +429,9 @@ func StorageDriverTestRetrieveAllRecentStats(driver storage.StorageDriver, t *te if err != nil { t.Fatal(err) } + if len(recentStats) == 0 { + t.Fatal("should at least store one stats") + } if len(recentStats) > N { t.Fatalf("returned %v stats, not 100.", len(recentStats)) } @@ -338,7 +441,7 @@ func StorageDriverTestRetrieveAllRecentStats(driver storage.StorageDriver, t *te for _, r := range recentStats { found := false for _, s := range actualRecentStats { - if reflect.DeepEqual(s, r) { + if statsEq(s, r) { found = true } }