From 42add2409a88e24739dd0d73e84a1c71912446c5 Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Sat, 13 Sep 2014 15:29:49 -0700 Subject: [PATCH] Removing sampling and percentiles from interface. Also removing all implementations. --- info/container.go | 164 ------------------- info/container_test.go | 200 ----------------------- info/test/datagen.go | 47 +----- manager/manager.go | 26 +-- manager/manager_test.go | 33 +--- sampling/autofilter.go | 51 ------ sampling/autoreset.go | 60 ------- sampling/chainsample.go | 171 -------------------- sampling/chainsample_test.go | 43 ----- sampling/doc.go | 17 -- sampling/es.go | 143 ----------------- sampling/es_test.go | 81 ---------- sampling/reservoir.go | 99 ------------ sampling/reservoir_test.go | 70 -------- sampling/sampler.go | 42 ----- storage/bigquery/bigquery.go | 86 ---------- storage/influxdb/influxdb.go | 19 --- storage/memory/memory.go | 115 +------------- storage/memory/memory_test.go | 37 +---- storage/storage.go | 9 -- storage/test/mock.go | 14 -- storage/test/storagetests.go | 289 ---------------------------------- storagedriver.go | 13 +- 23 files changed, 20 insertions(+), 1809 deletions(-) delete mode 100644 sampling/autofilter.go delete mode 100644 sampling/autoreset.go delete mode 100644 sampling/chainsample.go delete mode 100644 sampling/chainsample_test.go delete mode 100644 sampling/doc.go delete mode 100644 sampling/es.go delete mode 100644 sampling/es_test.go delete mode 100644 sampling/reservoir.go delete mode 100644 sampling/reservoir_test.go delete mode 100644 sampling/sampler.go diff --git a/info/container.go b/info/container.go index f4a06413..eb407159 100644 --- a/info/container.go +++ b/info/container.go @@ -15,9 +15,7 @@ package info import ( - "fmt" "reflect" - "sort" "time" ) @@ -79,11 +77,6 @@ type ContainerInfo struct { // Historical statistics gathered from the container. Stats []*ContainerStats `json:"stats,omitempty"` - - // Randomly sampled container states. - Samples []*ContainerStatsSample `json:"samples,omitempty"` - - StatsPercentiles *ContainerStatsPercentiles `json:"stats_summary,omitempty"` } // ContainerInfo may be (un)marshaled by json or other en/decoder. In that @@ -112,9 +105,6 @@ func (self *ContainerInfo) Eq(b *ContainerInfo) bool { if !reflect.DeepEqual(self.Spec, b.Spec) { return false } - if !reflect.DeepEqual(self.StatsPercentiles, b.StatsPercentiles) { - return false - } for i, expectedStats := range b.Stats { selfStats := self.Stats[i] @@ -123,12 +113,6 @@ func (self *ContainerInfo) Eq(b *ContainerInfo) bool { } } - for i, expectedSample := range b.Samples { - selfSample := self.Samples[i] - if !expectedSample.Eq(selfSample) { - return false - } - } return true } @@ -278,24 +262,6 @@ func (self *ContainerStats) Copy(dst *ContainerStats) *ContainerStats { return dst } -type ContainerStatsSample struct { - // Timetamp of the end of the sample period - Timestamp time.Time `json:"timestamp"` - // Duration of the sample period - Duration time.Duration `json:"duration"` - Cpu struct { - // number of nanoseconds of CPU time used by the container - Usage uint64 `json:"usage"` - - // Per-core usage of the container. (unit: nanoseconds) - PerCpuUsage []uint64 `json:"per_cpu_usage,omitempty"` - } `json:"cpu"` - Memory struct { - // Units: Bytes. - Usage uint64 `json:"usage"` - } `json:"memory"` -} - func timeEq(t1, t2 time.Time, tolerance time.Duration) bool { // t1 should not be later than t2 if t1.After(t2) { @@ -339,35 +305,6 @@ func (a *ContainerStats) Eq(b *ContainerStats) bool { return true } -// This function is useful because we do not require precise time -// representation. -func (a *ContainerStatsSample) Eq(b *ContainerStatsSample) bool { - if !timeEq(a.Timestamp, b.Timestamp, timePrecision) { - return false - } - if !durationEq(a.Duration, b.Duration, timePrecision) { - return false - } - if !reflect.DeepEqual(a.Cpu, b.Cpu) { - return false - } - if !reflect.DeepEqual(a.Memory, b.Memory) { - return false - } - return true -} - -type Percentile struct { - Percentage int `json:"percentage"` - Value uint64 `json:"value"` -} - -type ContainerStatsPercentiles struct { - MaxMemoryUsage uint64 `json:"max_memory_usage,omitempty"` - MemoryUsagePercentiles []Percentile `json:"memory_usage_percentiles,omitempty"` - CpuUsagePercentiles []Percentile `json:"cpu_usage_percentiles,omitempty"` -} - // Saturate CPU usage to 0. func calculateCpuUsage(prev, cur uint64) uint64 { if prev > cur { @@ -375,104 +312,3 @@ func calculateCpuUsage(prev, cur uint64) uint64 { } return cur - prev } - -// Each sample needs two stats because the cpu usage in ContainerStats is -// cumulative. -// prev should be an earlier observation than current. -// This method is not thread/goroutine safe. -func NewSample(prev, current *ContainerStats) (*ContainerStatsSample, error) { - if prev == nil || current == nil { - return nil, fmt.Errorf("empty stats") - } - // Ignore this sample if it is incomplete - if prev.Cpu == nil || prev.Memory == nil || current.Cpu == nil || current.Memory == nil { - return nil, fmt.Errorf("incomplete stats") - } - // prev must be an early observation - if !current.Timestamp.After(prev.Timestamp) { - return nil, fmt.Errorf("wrong stats order") - } - - var percpu []uint64 - - if len(current.Cpu.Usage.PerCpu) > 0 { - curNumCpus := len(current.Cpu.Usage.PerCpu) - percpu = make([]uint64, curNumCpus) - - for i, currUsage := range current.Cpu.Usage.PerCpu { - var prevUsage uint64 = 0 - if i < len(prev.Cpu.Usage.PerCpu) { - prevUsage = prev.Cpu.Usage.PerCpu[i] - } - - percpu[i] = calculateCpuUsage(prevUsage, currUsage) - } - } - sample := new(ContainerStatsSample) - // Calculate the diff to get the CPU usage within the time interval. - sample.Cpu.Usage = calculateCpuUsage(prev.Cpu.Usage.Total, current.Cpu.Usage.Total) - sample.Cpu.PerCpuUsage = percpu - // Memory usage is current memory usage - sample.Memory.Usage = current.Memory.Usage - sample.Timestamp = current.Timestamp - sample.Duration = current.Timestamp.Sub(prev.Timestamp) - - return sample, nil -} - -type uint64Slice []uint64 - -func (self uint64Slice) Len() int { - return len(self) -} - -func (self uint64Slice) Less(i, j int) bool { - return self[i] < self[j] -} - -func (self uint64Slice) Swap(i, j int) { - self[i], self[j] = self[j], self[i] -} - -func (self uint64Slice) Percentiles(requestedPercentiles ...int) []Percentile { - if len(self) == 0 { - return nil - } - ret := make([]Percentile, 0, len(requestedPercentiles)) - sort.Sort(self) - for _, p := range requestedPercentiles { - idx := (len(self) * p / 100) - 1 - if idx < 0 { - idx = 0 - } - ret = append( - ret, - Percentile{ - Percentage: p, - Value: self[idx], - }, - ) - } - return ret -} - -func NewPercentiles(samples []*ContainerStatsSample, cpuPercentages, memoryPercentages []int) *ContainerStatsPercentiles { - if len(samples) == 0 { - return nil - } - cpuUsages := make([]uint64, 0, len(samples)) - memUsages := make([]uint64, 0, len(samples)) - - for _, sample := range samples { - if sample == nil { - continue - } - cpuUsages = append(cpuUsages, sample.Cpu.Usage) - memUsages = append(memUsages, sample.Memory.Usage) - } - - ret := new(ContainerStatsPercentiles) - ret.CpuUsagePercentiles = uint64Slice(cpuUsages).Percentiles(cpuPercentages...) - ret.MemoryUsagePercentiles = uint64Slice(memUsages).Percentiles(memoryPercentages...) - return ret -} diff --git a/info/container_test.go b/info/container_test.go index 6cc109b3..bd730c19 100644 --- a/info/container_test.go +++ b/info/container_test.go @@ -68,65 +68,6 @@ func TestStatsEndTime(t *testing.T) { } } -func TestPercentiles(t *testing.T) { - N := 100 - data := make([]uint64, N) - - for i := 1; i < N+1; i++ { - data[i-1] = uint64(i) - } - percentages := []int{ - 80, - 90, - 50, - } - percentiles := uint64Slice(data).Percentiles(percentages...) - for _, s := range percentiles { - if s.Value != uint64(s.Percentage) { - t.Errorf("%v percentile data should be %v, but got %v", s.Percentage, s.Percentage, s.Value) - } - } -} - -func TestPercentilesSmallDataSet(t *testing.T) { - var value uint64 = 11 - data := []uint64{value} - - percentages := []int{ - 80, - 90, - 50, - } - percentiles := uint64Slice(data).Percentiles(percentages...) - for _, s := range percentiles { - if s.Value != value { - t.Errorf("%v percentile data should be %v, but got %v", s.Percentage, value, s.Value) - } - } -} - -func TestNewSampleNilStats(t *testing.T) { - stats := &ContainerStats{ - Cpu: &CpuStats{}, - Memory: &MemoryStats{}, - } - stats.Cpu.Usage.PerCpu = []uint64{uint64(10)} - stats.Cpu.Usage.Total = uint64(10) - stats.Cpu.Usage.System = uint64(2) - stats.Cpu.Usage.User = uint64(8) - stats.Memory.Usage = uint64(200) - - sample, err := NewSample(nil, stats) - if err == nil { - t.Errorf("generated an unexpected sample: %+v", sample) - } - - sample, err = NewSample(stats, nil) - if err == nil { - t.Errorf("generated an unexpected sample: %+v", sample) - } -} - func createStats(cpuUsage, memUsage uint64, timestamp time.Time) *ContainerStats { stats := &ContainerStats{ Cpu: &CpuStats{}, @@ -141,147 +82,6 @@ func createStats(cpuUsage, memUsage uint64, timestamp time.Time) *ContainerStats return stats } -func TestAddSample(t *testing.T) { - cpuPrevUsage := uint64(10) - cpuCurrentUsage := uint64(15) - memCurrentUsage := uint64(200) - prevTime := time.Now() - - prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime) - current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second)) - - sample, err := NewSample(prev, current) - if err != nil { - t.Errorf("should be able to generate a sample. but received error: %v", err) - } - if sample == nil { - t.Fatalf("nil sample and nil error. unexpected result!") - } - - if sample.Memory.Usage != memCurrentUsage { - t.Errorf("wrong memory usage: %v. should be %v", sample.Memory.Usage, memCurrentUsage) - } - - if sample.Cpu.Usage != cpuCurrentUsage-cpuPrevUsage { - t.Errorf("wrong CPU usage: %v. should be %v", sample.Cpu.Usage, cpuCurrentUsage-cpuPrevUsage) - } -} - -func TestAddSampleIncompleteStats(t *testing.T) { - cpuPrevUsage := uint64(10) - cpuCurrentUsage := uint64(15) - memCurrentUsage := uint64(200) - prevTime := time.Now() - - prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime) - current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second)) - stats := &ContainerStats{ - Cpu: prev.Cpu, - Memory: nil, - } - sample, err := NewSample(stats, current) - if err == nil { - t.Errorf("generated an unexpected sample: %+v", sample) - } - sample, err = NewSample(prev, stats) - if err == nil { - t.Errorf("generated an unexpected sample: %+v", sample) - } - - stats = &ContainerStats{ - Cpu: nil, - Memory: prev.Memory, - } - sample, err = NewSample(stats, current) - if err == nil { - t.Errorf("generated an unexpected sample: %+v", sample) - } - sample, err = NewSample(prev, stats) - if err == nil { - t.Errorf("generated an unexpected sample: %+v", sample) - } -} - -func TestAddSampleWrongOrder(t *testing.T) { - cpuPrevUsage := uint64(10) - cpuCurrentUsage := uint64(15) - memCurrentUsage := uint64(200) - prevTime := time.Now() - - prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime) - current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second)) - - sample, err := NewSample(current, prev) - if err == nil { - t.Errorf("generated an unexpected sample: %v", sample) - } -} - -func TestAddSampleNegativeCpuUsage(t *testing.T) { - cpuPrevUsage := uint64(15) - cpuCurrentUsage := uint64(10) - memCurrentUsage := uint64(200) - prevTime := time.Now() - - prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime) - current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second)) - - sample, err := NewSample(prev, current) - if err != nil { - t.Errorf("expected to sample without error %+v", err) - } - if sample.Cpu.Usage != 0 || sample.Cpu.PerCpuUsage[0] != 0 { - t.Errorf("expected usage to saturate to 0: %+v", sample) - } -} - -func TestAddSampleHotPluggingCpu(t *testing.T) { - cpuPrevUsage := uint64(10) - cpuCurrentUsage := uint64(15) - memCurrentUsage := uint64(200) - prevTime := time.Now() - - prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime) - current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second)) - current.Cpu.Usage.PerCpu = append(current.Cpu.Usage.PerCpu, 10) - - sample, err := NewSample(prev, current) - if err != nil { - t.Errorf("should be able to generate a sample. but received error: %v", err) - } - if len(sample.Cpu.PerCpuUsage) != 2 { - t.Fatalf("Should have 2 cores.") - } - if sample.Cpu.PerCpuUsage[0] != cpuCurrentUsage-cpuPrevUsage { - t.Errorf("First cpu usage is %v. should be %v", sample.Cpu.PerCpuUsage[0], cpuCurrentUsage-cpuPrevUsage) - } - if sample.Cpu.PerCpuUsage[1] != 10 { - t.Errorf("Second cpu usage is %v. should be 10", sample.Cpu.PerCpuUsage[1]) - } -} - -func TestAddSampleHotUnpluggingCpu(t *testing.T) { - cpuPrevUsage := uint64(10) - cpuCurrentUsage := uint64(15) - memCurrentUsage := uint64(200) - prevTime := time.Now() - - prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime) - current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second)) - prev.Cpu.Usage.PerCpu = append(prev.Cpu.Usage.PerCpu, 10) - - sample, err := NewSample(prev, current) - if err != nil { - t.Errorf("should be able to generate a sample. but received error: %v", err) - } - if len(sample.Cpu.PerCpuUsage) != 1 { - t.Fatalf("Should have 1 cores.") - } - if sample.Cpu.PerCpuUsage[0] != cpuCurrentUsage-cpuPrevUsage { - t.Errorf("First cpu usage is %v. should be %v", sample.Cpu.PerCpuUsage[0], cpuCurrentUsage-cpuPrevUsage) - } -} - func TestContainerStatsCopy(t *testing.T) { stats := createStats(100, 101, time.Now()) shadowStats := stats.Copy(nil) diff --git a/info/test/datagen.go b/info/test/datagen.go index d52a156a..5d43ae57 100644 --- a/info/test/datagen.go +++ b/info/test/datagen.go @@ -65,57 +65,14 @@ func GenerateRandomContainerSpec(numCores int) *info.ContainerSpec { func GenerateRandomContainerInfo(containerName string, numCores int, query *info.ContainerInfoRequest, duration time.Duration) *info.ContainerInfo { stats := GenerateRandomStats(query.NumStats, numCores, duration) - samples, _ := NewSamplesFromStats(stats...) - if len(samples) > query.NumSamples { - samples = samples[:query.NumSamples] - } - cpuPercentiles := make([]info.Percentile, 0, len(query.CpuUsagePercentiles)) - - // TODO(monnand): This will generate percentiles where 50%tile data may - // be larger than 90%tile data. - for _, p := range query.CpuUsagePercentiles { - percentile := info.Percentile{p, uint64(rand.Int63n(1000))} - cpuPercentiles = append(cpuPercentiles, percentile) - } - memPercentiles := make([]info.Percentile, 0, len(query.MemoryUsagePercentiles)) - for _, p := range query.MemoryUsagePercentiles { - percentile := info.Percentile{p, uint64(rand.Int63n(1000))} - memPercentiles = append(memPercentiles, percentile) - } - - percentiles := &info.ContainerStatsPercentiles{ - MaxMemoryUsage: uint64(rand.Int63n(4096)), - MemoryUsagePercentiles: memPercentiles, - CpuUsagePercentiles: cpuPercentiles, - } - spec := GenerateRandomContainerSpec(numCores) ret := &info.ContainerInfo{ ContainerReference: info.ContainerReference{ Name: containerName, }, - Spec: spec, - StatsPercentiles: percentiles, - Samples: samples, - Stats: stats, + Spec: spec, + Stats: stats, } return ret } - -func NewSamplesFromStats(stats ...*info.ContainerStats) ([]*info.ContainerStatsSample, error) { - if len(stats) < 2 { - return nil, nil - } - samples := make([]*info.ContainerStatsSample, 0, len(stats)-1) - for i, s := range stats[1:] { - prev := stats[i] - sample, err := info.NewSample(prev, s) - if err != nil { - return nil, fmt.Errorf("Unable to generate sample from %+v and %+v: %v", - prev, s, err) - } - samples = append(samples, sample) - } - return samples, nil -} diff --git a/manager/manager.go b/manager/manager.go index 70de1538..a057bc5e 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -147,23 +147,7 @@ func (self *manager) containerDataToContainerInfo(cont *containerData, query *in return nil, err } - var percentiles *info.ContainerStatsPercentiles - var samples []*info.ContainerStatsSample - var stats []*info.ContainerStats - percentiles, err = self.storageDriver.Percentiles( - cinfo.Name, - query.CpuUsagePercentiles, - query.MemoryUsagePercentiles, - ) - if err != nil { - return nil, err - } - samples, err = self.storageDriver.Samples(cinfo.Name, query.NumSamples) - if err != nil { - return nil, err - } - - stats, err = self.storageDriver.RecentStats(cinfo.Name, query.NumStats) + stats, err := self.storageDriver.RecentStats(cinfo.Name, query.NumStats) if err != nil { return nil, err } @@ -174,11 +158,9 @@ func (self *manager) containerDataToContainerInfo(cont *containerData, query *in Name: cinfo.Name, Aliases: cinfo.Aliases, }, - Subcontainers: cinfo.Subcontainers, - Spec: cinfo.Spec, - StatsPercentiles: percentiles, - Samples: samples, - Stats: stats, + Subcontainers: cinfo.Subcontainers, + Spec: cinfo.Spec, + Stats: stats, } // Set default value to an actual value diff --git a/manager/manager_test.go b/manager/manager_test.go index 857b62e0..cd104e5a 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -88,28 +88,7 @@ func expectManagerWithContainers(containers []string, query *info.ContainerInfoR func(h *container.MockContainerHandler) { cinfo := infosMap[h.Name] stats := cinfo.Stats - samples := cinfo.Samples - percentiles := cinfo.StatsPercentiles spec := cinfo.Spec - driver.On( - "Percentiles", - h.Name, - query.CpuUsagePercentiles, - query.MemoryUsagePercentiles, - ).Return( - percentiles, - nil, - ) - - driver.On( - "Samples", - h.Name, - query.NumSamples, - ).Return( - samples, - nil, - ) - driver.On( "RecentStats", h.Name, @@ -142,10 +121,8 @@ func TestGetContainerInfo(t *testing.T) { } query := &info.ContainerInfoRequest{ - NumStats: 256, - NumSamples: 128, - CpuUsagePercentiles: []int{10, 50, 90}, - MemoryUsagePercentiles: []int{10, 80, 90}, + NumStats: 256, + NumSamples: 128, } m, infosMap, handlerMap := expectManagerWithContainers(containers, query, t) @@ -178,10 +155,8 @@ func TestSubcontainersInfo(t *testing.T) { } query := &info.ContainerInfoRequest{ - NumStats: 64, - NumSamples: 64, - CpuUsagePercentiles: []int{10, 50, 90}, - MemoryUsagePercentiles: []int{10, 80, 90}, + NumStats: 64, + NumSamples: 64, } m, _, _ := expectManagerWithContainers(containers, query, t) diff --git a/sampling/autofilter.go b/sampling/autofilter.go deleted file mode 100644 index 7389de52..00000000 --- a/sampling/autofilter.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -type autoFilterSampler struct { - // filter will run to remove elements before adding every observation - filter func(d interface{}) bool - sampler Sampler -} - -func (self *autoFilterSampler) Len() int { - return self.sampler.Len() -} - -func (self *autoFilterSampler) Reset() { - self.sampler.Reset() -} - -func (self *autoFilterSampler) Map(f func(d interface{})) { - self.sampler.Map(f) -} - -func (self *autoFilterSampler) Filter(filter func(d interface{}) bool) { - self.sampler.Filter(filter) -} - -func (self *autoFilterSampler) Update(d interface{}) { - self.Filter(self.filter) - self.sampler.Update(d) -} - -// Add a decorator for sampler. Whenever an Update() is called, the sampler will -// call filter() first to remove elements in the decorated sampler. -func NewAutoFilterSampler(sampler Sampler, filter func(d interface{}) bool) Sampler { - return &autoFilterSampler{ - filter: filter, - sampler: sampler, - } -} diff --git a/sampling/autoreset.go b/sampling/autoreset.go deleted file mode 100644 index b466e5e6..00000000 --- a/sampling/autoreset.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import "time" - -type autoResetSampler struct { - shouldReset func(d interface{}) bool - sampler Sampler -} - -func (self *autoResetSampler) Len() int { - return self.sampler.Len() -} - -func (self *autoResetSampler) Reset() { - self.sampler.Reset() -} - -func (self *autoResetSampler) Map(f func(d interface{})) { - self.sampler.Map(f) -} - -func (self *autoResetSampler) Filter(filter func(d interface{}) bool) { - self.sampler.Filter(filter) -} - -func (self *autoResetSampler) Update(d interface{}) { - if self.shouldReset(d) { - self.sampler.Reset() - } - self.sampler.Update(d) -} - -func NewPeriodicallyResetSampler(period time.Duration, sampler Sampler) Sampler { - lastRest := time.Now() - shouldReset := func(d interface{}) bool { - if time.Now().Sub(lastRest) > period { - lastRest = time.Now() - return true - } - return false - } - return &autoResetSampler{ - shouldReset: shouldReset, - sampler: sampler, - } -} diff --git a/sampling/chainsample.go b/sampling/chainsample.go deleted file mode 100644 index a6d94288..00000000 --- a/sampling/chainsample.go +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "log" - "math/rand" - "sync" - - "github.com/kr/pretty" -) - -type empty struct{} - -// Randomly generate number [start,end) except @except. -func randInt64Except(start, end int64, except map[int64]empty) int64 { - n := end - start - ret := rand.Int63n(n) + start - for _, ok := except[ret]; ok; _, ok = except[ret] { - ret = rand.Int63n(n) + start - } - return ret -} - -// Basic idea: -// Every observation will have a sequence number as its id. -// Suppose we want to sample k observations within latest n observations -// At first, we generated k random numbers in [0,n). These random numbers -// will be used as ids of observations that will be sampled. -type chainSampler struct { - sampleSize int - windowSize int64 - - // Every observation will have a sequence number starting from 1. - // The sequence number must increase by one for each observation. - numObservations int64 - - // All samples stored as id -> value. - samples map[int64]interface{} - - // The set of id of future observations. - futureSamples map[int64]empty - - // The chain of samples: old observation id -> future observation id. - // When the old observation expires, the future observation will be - // stored as a sample. - sampleChain map[int64]int64 - - // Replacements are: observations whose previous sample is not expired - // id->value. - replacements map[int64]interface{} - lock sync.RWMutex -} - -func (self *chainSampler) initFutureSamples() { - for i := 0; i < self.sampleSize; i++ { - n := randInt64Except(1, self.windowSize+1, self.futureSamples) - self.futureSamples[n] = empty{} - } -} - -func (self *chainSampler) arrive(seqNum int64, obv interface{}) { - if _, ok := self.futureSamples[seqNum]; !ok { - // If this observation is not selected, ignore it. - return - } - - delete(self.futureSamples, seqNum) - - if len(self.samples) < self.sampleSize { - self.samples[seqNum] = obv - } - self.replacements[seqNum] = obv - - // Select a future observation which will replace current observation - // when it expires. - futureSeqNum := randInt64Except(seqNum+1, seqNum+self.windowSize+1, self.futureSamples) - self.futureSamples[futureSeqNum] = empty{} - self.sampleChain[seqNum] = futureSeqNum -} - -func (self *chainSampler) expireAndReplace() { - expSeqNum := self.numObservations - self.windowSize - if _, ok := self.samples[expSeqNum]; !ok { - // No sample expires - return - } - delete(self.samples, expSeqNum) - // There must be a replacement, otherwise panic. - replacementSeqNum := self.sampleChain[expSeqNum] - // The sequence number must increase by one for each observation. - replacement, ok := self.replacements[replacementSeqNum] - if !ok { - log.Printf("cannot find %v. which is the replacement of %v\n", replacementSeqNum, expSeqNum) - pretty.Printf("chain: %# v\n", self) - panic("Should never occur!") - } - // This observation must have arrived before. - self.samples[replacementSeqNum] = replacement -} - -func (self *chainSampler) Update(obv interface{}) { - self.lock.Lock() - defer self.lock.Unlock() - - self.numObservations++ - self.arrive(self.numObservations, obv) - self.expireAndReplace() -} - -func (self *chainSampler) Len() int { - self.lock.RLock() - defer self.lock.RUnlock() - return len(self.samples) -} - -func (self *chainSampler) Reset() { - self.lock.Lock() - defer self.lock.Unlock() - self.numObservations = 0 - self.samples = make(map[int64]interface{}, self.sampleSize) - self.futureSamples = make(map[int64]empty, self.sampleSize*2) - self.sampleChain = make(map[int64]int64, self.sampleSize*2) - self.replacements = make(map[int64]interface{}, self.sampleSize*2) - self.initFutureSamples() -} - -func (self *chainSampler) Map(f func(d interface{})) { - self.lock.RLock() - defer self.lock.RUnlock() - - for seqNum, obv := range self.samples { - if _, ok := obv.(int); !ok { - pretty.Printf("Seq %v. WAT: %# v\n", seqNum, obv) - } - f(obv) - } -} - -// NOT SUPPORTED -func (self *chainSampler) Filter(filter func(d interface{}) bool) { - return -} - -// Chain sampler described in -// Brian Babcok, Mayur Datar and Rajeev Motwani, -// Sampling From a Moving Window Over Streaming Data -func NewChainSampler(sampleSize, windowSize int) Sampler { - sampler := &chainSampler{ - sampleSize: sampleSize, - windowSize: int64(windowSize), - samples: make(map[int64]interface{}, sampleSize), - futureSamples: make(map[int64]empty, sampleSize*2), - sampleChain: make(map[int64]int64, sampleSize*2), - replacements: make(map[int64]interface{}, sampleSize*2), - } - sampler.initFutureSamples() - return sampler -} diff --git a/sampling/chainsample_test.go b/sampling/chainsample_test.go deleted file mode 100644 index 85d6e3e7..00000000 --- a/sampling/chainsample_test.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import "testing" - -func TestChainSampler(t *testing.T) { - numSamples := 10 - windowSize := 10 * numSamples - numObservations := 10 * windowSize - numSampleRounds := 10 * numObservations - - s := NewChainSampler(numSamples, windowSize) - hist := make(map[int]int, numSamples) - for i := 0; i < numSampleRounds; i++ { - sampleStream(hist, numObservations, s) - } - ratio := histStddev(hist) / histMean(hist) - if ratio > 1.05 { - // XXX(dengnan): better sampler? - t.Errorf("std dev: %v; mean: %v. Either we have a really bad PRNG, or a bad implementation", histStddev(hist), histMean(hist)) - } - if len(hist) > windowSize { - t.Errorf("sampled %v data. larger than window size %v", len(hist), windowSize) - } - for seqNum, freq := range hist { - if seqNum < numObservations-windowSize && freq > 0 { - t.Errorf("observation with seqnum %v is sampled %v times", seqNum, freq) - } - } -} diff --git a/sampling/doc.go b/sampling/doc.go deleted file mode 100644 index 8eeb6c47..00000000 --- a/sampling/doc.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package sampling provides several sampling algorithms. -// These algorithms will be used to sample containers' stats information -package sampling diff --git a/sampling/es.go b/sampling/es.go deleted file mode 100644 index 64f40be1..00000000 --- a/sampling/es.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "container/heap" - "math" - "math/rand" - "sync" -) - -type esSampleItem struct { - data interface{} - key float64 -} - -type esSampleHeap []esSampleItem - -func (self esSampleHeap) Len() int { - return len(self) -} - -func (self esSampleHeap) Less(i, j int) bool { - return self[i].key < self[j].key -} - -func (self esSampleHeap) Swap(i, j int) { - self[i], self[j] = self[j], self[i] -} - -func (self *esSampleHeap) Push(x interface{}) { - item := x.(esSampleItem) - *self = append(*self, item) -} - -func (self *esSampleHeap) Pop() interface{} { - old := *self - item := old[len(old)-1] - *self = old[:len(old)-1] - return item -} - -type esSampler struct { - weight func(interface{}) float64 - samples *esSampleHeap - maxSize int - lock sync.RWMutex -} - -func (self *esSampler) Update(d interface{}) { - self.lock.Lock() - defer self.lock.Unlock() - - u := rand.Float64() - key := math.Pow(u, 1.0/self.weight(d)) - - if self.samples.Len() < self.maxSize { - heap.Push(self.samples, esSampleItem{ - data: d, - key: key, - }) - return - } - - s := *(self.samples) - min := s[0] - - // The key of the new item is larger than a key in existing item. - // Add this new item. - if key > min.key { - heap.Pop(self.samples) - heap.Push(self.samples, esSampleItem{ - data: d, - key: key, - }) - } -} - -func (self *esSampler) Len() int { - self.lock.RLock() - defer self.lock.RUnlock() - return len(*self.samples) -} - -func (self *esSampler) Reset() { - self.lock.Lock() - defer self.lock.Unlock() - self.samples = &esSampleHeap{} - heap.Init(self.samples) -} - -func (self *esSampler) Map(f func(interface{})) { - self.lock.RLock() - defer self.lock.RUnlock() - - for _, d := range *self.samples { - f(d.data) - } -} - -func (self *esSampler) Filter(filter func(d interface{}) bool) { - self.lock.Lock() - defer self.lock.Unlock() - - rmlist := make([]int, 0, len(*self.samples)) - for i, d := range *self.samples { - if filter(d.data) { - rmlist = append(rmlist, i) - } - } - - for _, i := range rmlist { - heap.Remove(self.samples, i) - } -} - -// ES sampling algorithm described in -// -// Pavlos S. Efraimidis and Paul G. Spirakis. Weighted random sampling with a -// reservoir. Information Processing Letters, 97(5):181 – 185, 2006. -// -// http://dl.acm.org/citation.cfm?id=1138834 -func NewESSampler(size int, weight func(interface{}) float64) Sampler { - s := &esSampleHeap{} - heap.Init(s) - return &esSampler{ - maxSize: size, - samples: s, - weight: weight, - } -} diff --git a/sampling/es_test.go b/sampling/es_test.go deleted file mode 100644 index d45b2762..00000000 --- a/sampling/es_test.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "container/heap" - "math/rand" - "testing" - - "github.com/kr/pretty" -) - -// This should be a min heap -func TestESSampleHeap(t *testing.T) { - h := &esSampleHeap{} - heap.Init(h) - min := 5.0 - N := 10 - - for i := 0; i < N; i++ { - key := rand.Float64() - if key < min { - min = key - } - heap.Push(h, esSampleItem{nil, key}) - } - l := *h - if l[0].key != min { - t.Errorf("not a min heap") - pretty.Printf("min=%v\nheap=%# v\n", min, l) - } -} - -func TestESSampler(t *testing.T) { - reservoirSize := 10 - numObvs := 10 * reservoirSize - numSampleRounds := 100 * numObvs - - weight := func(d interface{}) float64 { - n := d.(int) - return float64(n + 1) - } - s := NewESSampler(reservoirSize, weight) - hist := make(map[int]int, numObvs) - for i := 0; i < numSampleRounds; i++ { - sampleStream(hist, numObvs, s) - } - - diff := 2 - wrongOrderedItems := make([]int, 0, numObvs) - threshold := 1.05 - for i := 0; i < numObvs-diff; i++ { - // Item with smaller weight should have lower probability to be selected. - n1 := hist[i] - n2 := hist[i+diff] - if n1 > n2 { - if float64(n1) > float64(n2)*threshold { - wrongOrderedItems = append(wrongOrderedItems, i) - } - } - } - if float64(len(wrongOrderedItems)) > float64(numObvs)*0.05 { - for _, i := range wrongOrderedItems { - n1 := hist[i] - n2 := hist[i+diff] - t.Errorf("item with weight %v is selected %v times; while item with weight %v is selected %v times", i, n1, i+diff, n2) - } - } -} diff --git a/sampling/reservoir.go b/sampling/reservoir.go deleted file mode 100644 index a0276360..00000000 --- a/sampling/reservoir.go +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "math/rand" - "sync" -) - -// Reservoir sampling algorithm. -// http://en.wikipedia.org/wiki/Reservoir_sampling -type reservoirSampler struct { - maxSize int - samples []interface{} - numInstances int64 - lock sync.RWMutex -} - -func (self *reservoirSampler) Len() int { - self.lock.RLock() - defer self.lock.RUnlock() - return len(self.samples) -} - -func (self *reservoirSampler) Reset() { - self.lock.Lock() - defer self.lock.Unlock() - self.samples = make([]interface{}, 0, self.maxSize) - self.numInstances = 0 -} - -// Update samples according to http://en.wikipedia.org/wiki/Reservoir_sampling -func (self *reservoirSampler) Update(d interface{}) { - self.lock.Lock() - defer self.lock.Unlock() - - self.numInstances++ - if len(self.samples) < self.maxSize { - self.samples = append(self.samples, d) - return - } - // Randomly generates a number between [0, numInstances). - // Use this random number, j, as an index. If j is larger than the - // reservoir size, we will ignore the current new data. - // Otherwise replace the jth element in reservoir with the new data. - j := rand.Int63n(self.numInstances) - if j < int64(len(self.samples)) { - self.samples[int(j)] = d - } -} - -func (self *reservoirSampler) Map(f func(d interface{})) { - self.lock.RLock() - defer self.lock.RUnlock() - - for _, d := range self.samples { - f(d) - } -} - -// Once an element is removed, the probability of sampling an observation will -// be increased. Removing all elements in the sampler has the same effect as -// calling Reset(). However, it will not guarantee the uniform probability of -// all unfiltered samples. -func (self *reservoirSampler) Filter(filter func(d interface{}) bool) { - self.lock.Lock() - defer self.lock.Unlock() - rmlist := make([]int, 0, len(self.samples)) - for i, d := range self.samples { - if filter(d) { - rmlist = append(rmlist, i) - } - } - - for _, i := range rmlist { - // slice trick: remove the ith element without preserving the order - self.samples[i] = self.samples[len(self.samples)-1] - self.samples = self.samples[:len(self.samples)-1] - } - self.numInstances -= int64(len(rmlist)) -} - -func NewReservoirSampler(reservoirSize int) Sampler { - return &reservoirSampler{ - maxSize: reservoirSize, - } -} diff --git a/sampling/reservoir_test.go b/sampling/reservoir_test.go deleted file mode 100644 index 1113b363..00000000 --- a/sampling/reservoir_test.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "math" - "testing" -) - -func sampleStream(hist map[int]int, n int, s Sampler) { - s.Reset() - for i := 0; i < n; i++ { - s.Update(i) - } - s.Map(func(d interface{}) { - j := d.(int) - if _, ok := hist[j]; !ok { - hist[j] = 0 - } - hist[j]++ - }) -} - -func histMean(hist map[int]int) float64 { - total := 0 - for _, v := range hist { - total += v - } - return float64(total) / float64(len(hist)) -} - -func histStddev(hist map[int]int) float64 { - mean := histMean(hist) - var totalDiff float64 - for _, v := range hist { - diff := float64(v) - mean - sq := diff * diff - totalDiff += sq - } - return math.Sqrt(totalDiff / float64(len(hist))) -} - -// XXX(dengnan): This test may take more than 10 seconds. -func TestReservoirSampler(t *testing.T) { - reservoirSize := 10 - numSamples := 10 * reservoirSize - numSampleRounds := 100 * numSamples - - s := NewReservoirSampler(reservoirSize) - hist := make(map[int]int, numSamples) - for i := 0; i < numSampleRounds; i++ { - sampleStream(hist, numSamples, s) - } - ratio := histStddev(hist) / histMean(hist) - if ratio > 0.05 { - t.Errorf("std dev: %v; mean: %v. Either we have a really bad PRNG, or a bad implementation", histStddev(hist), histMean(hist)) - } -} diff --git a/sampling/sampler.go b/sampling/sampler.go deleted file mode 100644 index 549f6e9e..00000000 --- a/sampling/sampler.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2014 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - crand "crypto/rand" - "encoding/binary" - "math/rand" -) - -func init() { - // NOTE(dengnan): Even if we picked a good random seed, - // the random number from math/rand is still not cryptographically secure! - var seed int64 - binary.Read(crand.Reader, binary.LittleEndian, &seed) - rand.Seed(seed) -} - -type Sampler interface { - Update(d interface{}) - Len() int - Reset() - Map(f func(interface{})) - - // Filter() should update in place. Removing elements may or may not - // affect the statistical behavior of the sampler, i.e. the probability - // that an observation will be sampled after removing some elements is - // implementation defined. - Filter(filter func(interface{}) bool) -} diff --git a/storage/bigquery/bigquery.go b/storage/bigquery/bigquery.go index 168df090..4586e4a0 100644 --- a/storage/bigquery/bigquery.go +++ b/storage/bigquery/bigquery.go @@ -27,9 +27,7 @@ import ( type bigqueryStorage struct { client *client.Client - prevStats *info.ContainerStats machineName string - windowLen time.Duration } const ( @@ -57,10 +55,6 @@ const ( colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault" // Hierarchical major page fault colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault" - // Optional: sample duration. Unit: nanoseconds. - colSampleDuration string = "sample_duration" - // Optional: Instant cpu usage. - colCpuInstantUsage string = "cpu_instant_usage" // Cumulative count of bytes received. colRxBytes string = "rx_bytes" // Cumulative count of receive errors encountered. @@ -138,16 +132,6 @@ func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema { Name: colMemoryHierarchicalPgmajfault, } i++ - fields[i] = &bigquery.TableFieldSchema{ - Type: typeInteger, - Name: colSampleDuration, - } - i++ - fields[i] = &bigquery.TableFieldSchema{ - Type: typeInteger, - Name: colCpuInstantUsage, - } - i++ fields[i] = &bigquery.TableFieldSchema{ Type: typeInteger, Name: colRxBytes, @@ -227,17 +211,8 @@ func (self *bigqueryStorage) containerStatsToValues( row[colTxErrors] = stats.Network.TxErrors } - sample, err := info.NewSample(self.prevStats, stats) - if err != nil || sample == nil { - return - } // TODO(jnagal): Handle per-cpu stats. - // Optional: sample duration. Unit: Nanosecond. - row[colSampleDuration] = sample.Duration - // Optional: Instant cpu usage - row[colCpuInstantUsage] = sample.Cpu.Usage - return } @@ -342,55 +317,12 @@ func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []i return stats, nil } -func (self *bigqueryStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) { - sample := &info.ContainerStatsSample{} - var err error - for i, col := range columns { - v := values[i] - switch { - case col == colTimestamp: - if t, ok := v.(time.Time); ok { - sample.Timestamp = t - } - case col == colMachineName: - if m, ok := v.(string); ok { - if m != self.machineName { - return nil, fmt.Errorf("different machine") - } - } else { - return nil, fmt.Errorf("machine name field is not a string: %v", v) - } - // Memory Usage - case col == colMemoryUsage: - sample.Memory.Usage, err = convertToUint64(v) - // sample duration. Unit: Nanosecond. - case col == colSampleDuration: - if v == nil { - // this record does not have sample_duration, so it's the first stats. - return nil, nil - } - sample.Duration = time.Duration(v.(int64)) - // Instant cpu usage - case col == colCpuInstantUsage: - sample.Cpu.Usage, err = convertToUint64(v) - } - if err != nil { - return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) - } - } - if sample.Duration.Nanoseconds() == 0 { - return nil, nil - } - return sample, nil -} - func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { if stats == nil || stats.Cpu == nil || stats.Memory == nil { return nil } row := self.containerStatsToValues(ref, stats) - self.prevStats = stats.Copy(self.prevStats) err := self.client.InsertRow(row) if err != nil { @@ -435,33 +367,19 @@ func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([] return statsList, nil } -func (self *bigqueryStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - return nil, fmt.Errorf("will be removed") -} - func (self *bigqueryStorage) Close() error { self.client.Close() self.client = nil return nil } -func (self *bigqueryStorage) Percentiles( - containerName string, - cpuUsagePercentiles []int, - memUsagePercentiles []int, -) (*info.ContainerStatsPercentiles, error) { - return nil, fmt.Errorf("will be removed") -} - // Create a new bigquery storage driver. // machineName: A unique identifier to identify the host that current cAdvisor // instance is running on. // tableName: BigQuery table used for storing stats. -// percentilesDuration: Time window which will be considered when calls Percentiles() func New(machineName, datasetId, tableName string, - percentilesDuration time.Duration, ) (storage.StorageDriver, error) { bqClient, err := client.NewClient() if err != nil { @@ -471,13 +389,9 @@ func New(machineName, if err != nil { return nil, err } - if percentilesDuration.Seconds() < 1.0 { - percentilesDuration = 5 * time.Minute - } ret := &bigqueryStorage{ client: bqClient, - windowLen: percentilesDuration, machineName: machineName, } schema := ret.GetSchema() diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go index f7731896..ae7b3e6f 100644 --- a/storage/influxdb/influxdb.go +++ b/storage/influxdb/influxdb.go @@ -27,7 +27,6 @@ type influxdbStorage struct { client *influxdb.Client machineName string tableName string - windowLen time.Duration bufferDuration time.Duration lastWrite time.Time series []*influxdb.Series @@ -256,23 +255,11 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([] return statsList, nil } -func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - return nil, fmt.Errorf("will be removed") -} - func (self *influxdbStorage) Close() error { self.client = nil return nil } -func (self *influxdbStorage) Percentiles( - containerName string, - cpuUsagePercentiles []int, - memUsagePercentiles []int, -) (*info.ContainerStatsPercentiles, error) { - return nil, fmt.Errorf("will be removed") -} - // Returns a new influxdb series. func (self *influxdbStorage) newSeries(columns []string, points []interface{}) *influxdb.Series { out := &influxdb.Series{ @@ -288,7 +275,6 @@ func (self *influxdbStorage) newSeries(columns []string, points []interface{}) * // machineName: A unique identifier to identify the host that current cAdvisor // instance is running on. // influxdbHost: The host which runs influxdb. -// percentilesDuration: Time window which will be considered when calls Percentiles() func New(machineName, tablename, database, @@ -297,7 +283,6 @@ func New(machineName, influxdbHost string, isSecure bool, bufferDuration time.Duration, - percentilesDuration time.Duration, ) (*influxdbStorage, error) { config := &influxdb.ClientConfig{ Host: influxdbHost, @@ -312,13 +297,9 @@ func New(machineName, } // TODO(monnand): With go 1.3, we cannot compress data now. client.DisableCompression() - if percentilesDuration.Seconds() < 1.0 { - percentilesDuration = 5 * time.Minute - } ret := &influxdbStorage{ client: client, - windowLen: percentilesDuration, machineName: machineName, tableName: tablename, bufferDuration: bufferDuration, diff --git a/storage/memory/memory.go b/storage/memory/memory.go index a8577bdb..d9fa17bb 100644 --- a/storage/memory/memory.go +++ b/storage/memory/memory.go @@ -20,52 +20,26 @@ import ( "sync" "github.com/google/cadvisor/info" - "github.com/google/cadvisor/sampling" "github.com/google/cadvisor/storage" ) // containerStorage is used to store per-container information type containerStorage struct { ref info.ContainerReference - prevStats *info.ContainerStats - sampler sampling.Sampler recentStats *list.List maxNumStats int - maxMemUsage uint64 lock sync.RWMutex } -func (self *containerStorage) updatePrevStats(stats *info.ContainerStats) { - if stats == nil || stats.Cpu == nil || stats.Memory == nil { - // discard incomplete stats - self.prevStats = nil - return - } - self.prevStats = stats.Copy(self.prevStats) -} - func (self *containerStorage) AddStats(stats *info.ContainerStats) error { self.lock.Lock() defer self.lock.Unlock() - if self.prevStats != nil { - sample, err := info.NewSample(self.prevStats, stats) - if err != nil { - return fmt.Errorf("wrong stats: %v", err) - } - if sample != nil { - self.sampler.Update(sample) - } - } - if stats.Memory != nil { - if self.maxMemUsage < stats.Memory.Usage { - self.maxMemUsage = stats.Memory.Usage - } - } + + // Add the stat to storage. if self.recentStats.Len() >= self.maxNumStats { self.recentStats.Remove(self.recentStats.Back()) } self.recentStats.PushFront(stats) - self.updatePrevStats(stats) return nil } @@ -98,50 +72,10 @@ func (self *containerStorage) RecentStats(numStats int) ([]*info.ContainerStats, return ret, nil } -func (self *containerStorage) Samples(numSamples int) ([]*info.ContainerStatsSample, error) { - self.lock.RLock() - defer self.lock.RUnlock() - if self.sampler.Len() < numSamples || numSamples < 0 { - numSamples = self.sampler.Len() - } - ret := make([]*info.ContainerStatsSample, 0, numSamples) - - var err error - self.sampler.Map(func(d interface{}) { - if len(ret) >= numSamples || err != nil { - return - } - sample, ok := d.(*info.ContainerStatsSample) - if !ok { - err = fmt.Errorf("An element in the sample is not a ContainerStatsSample") - } - ret = append(ret, sample) - }) - if err != nil { - return nil, err - } - return ret, nil -} - -func (self *containerStorage) Percentiles(cpuPercentiles, memPercentiles []int) (*info.ContainerStatsPercentiles, error) { - samples, err := self.Samples(-1) - if err != nil { - return nil, err - } - if len(samples) == 0 { - return nil, nil - } - ret := info.NewPercentiles(samples, cpuPercentiles, memPercentiles) - ret.MaxMemoryUsage = self.maxMemUsage - return ret, nil -} - -func newContainerStore(ref info.ContainerReference, maxNumSamples, maxNumStats int) *containerStorage { - s := sampling.NewReservoirSampler(maxNumSamples) +func newContainerStore(ref info.ContainerReference, maxNumStats int) *containerStorage { return &containerStorage{ ref: ref, recentStats: list.New(), - sampler: s, maxNumStats: maxNumStats, } } @@ -149,7 +83,6 @@ func newContainerStore(ref info.ContainerReference, maxNumSamples, maxNumStats i type InMemoryStorage struct { lock sync.RWMutex containerStorageMap map[string]*containerStorage - maxNumSamples int maxNumStats int backend storage.StorageDriver } @@ -162,7 +95,7 @@ func (self *InMemoryStorage) AddStats(ref info.ContainerReference, stats *info.C self.lock.Lock() defer self.lock.Unlock() if cstore, ok = self.containerStorageMap[ref.Name]; !ok { - cstore = newContainerStore(ref, self.maxNumSamples, self.maxNumStats) + cstore = newContainerStore(ref, self.maxNumStats) self.containerStorageMap[ref.Name] = cstore } }() @@ -176,26 +109,6 @@ func (self *InMemoryStorage) AddStats(ref info.ContainerReference, stats *info.C return cstore.AddStats(stats) } -func (self *InMemoryStorage) Samples(name string, numSamples int) ([]*info.ContainerStatsSample, error) { - var cstore *containerStorage - var ok bool - - err := func() error { - self.lock.RLock() - defer self.lock.RUnlock() - if cstore, ok = self.containerStorageMap[name]; !ok { - return fmt.Errorf("unable to find data for container %v", name) - } - return nil - }() - - if err != nil { - return nil, err - } - - return cstore.Samples(numSamples) -} - func (self *InMemoryStorage) RecentStats(name string, numStats int) ([]*info.ContainerStats, error) { var cstore *containerStorage var ok bool @@ -214,24 +127,6 @@ func (self *InMemoryStorage) RecentStats(name string, numStats int) ([]*info.Con return cstore.RecentStats(numStats) } -func (self *InMemoryStorage) Percentiles(name string, cpuPercentiles, memPercentiles []int) (*info.ContainerStatsPercentiles, error) { - var cstore *containerStorage - var ok bool - err := func() error { - self.lock.RLock() - defer self.lock.RUnlock() - if cstore, ok = self.containerStorageMap[name]; !ok { - return fmt.Errorf("unable to find data for container %v", name) - } - return nil - }() - if err != nil { - return nil, err - } - - return cstore.Percentiles(cpuPercentiles, memPercentiles) -} - func (self *InMemoryStorage) Close() error { self.lock.Lock() self.containerStorageMap = make(map[string]*containerStorage, 32) @@ -240,13 +135,11 @@ func (self *InMemoryStorage) Close() error { } func New( - maxNumSamples, maxNumStats int, backend storage.StorageDriver, ) *InMemoryStorage { ret := &InMemoryStorage{ containerStorageMap: make(map[string]*containerStorage, 32), - maxNumSamples: maxNumSamples, maxNumStats: maxNumStats, backend: backend, } diff --git a/storage/memory/memory_test.go b/storage/memory/memory_test.go index 52f09417..0516efef 100644 --- a/storage/memory/memory_test.go +++ b/storage/memory/memory_test.go @@ -35,34 +35,11 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) { for N := 10; N < maxSize; N += 10 { testDriver := &memoryTestStorageDriver{} - testDriver.StorageDriver = New(N, N, nil) + testDriver.StorageDriver = New(N, nil) f(testDriver, t) } } -func TestMaxMemoryUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestMaxMemoryUsage, t) -} - -func TestSampleCpuUsage(t *testing.T) { - runStorageTest(test.StorageDriverTestSampleCpuUsage, t) -} - -func TestSamplesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverTestSamplesWithoutSample, t) -} - -func TestPercentilesWithoutSample(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t) -} - -func TestPercentiles(t *testing.T) { - N := 100 - testDriver := &memoryTestStorageDriver{} - testDriver.StorageDriver = New(N, N, nil) - test.StorageDriverTestPercentiles(testDriver, t) -} - func TestRetrievePartialRecentStats(t *testing.T) { runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t) } @@ -75,18 +52,6 @@ func TestNoRecentStats(t *testing.T) { runStorageTest(test.StorageDriverTestNoRecentStats, t) } -func TestNoSamples(t *testing.T) { - runStorageTest(test.StorageDriverTestNoSamples, t) -} - -func TestPercentilesWithoutStats(t *testing.T) { - runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t) -} - func TestRetrieveZeroStats(t *testing.T) { runStorageTest(test.StorageDriverTestRetrieveZeroRecentStats, t) } - -func TestRetrieveZeroSamples(t *testing.T) { - runStorageTest(test.StorageDriverTestRetrieveZeroSamples, t) -} diff --git a/storage/storage.go b/storage/storage.go index 84ca0b3b..58a51ce0 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -26,15 +26,6 @@ type StorageDriver interface { // recent stats should be the last. RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) - // Read the specified percentiles of CPU and memory usage of the container. - // The implementation decides which time range to look at. - Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) - - // Returns samples of the container stats. If numSamples < 0, then - // the number of returned samples is implementation defined. Otherwise, the driver - // should return at most numSamples samples. - Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) - // Close will clear the state of the storage driver. The elements // stored in the underlying storage may or may not be deleted depending // on the implementation of the storage driver. diff --git a/storage/test/mock.go b/storage/test/mock.go index 24fe2500..c0f923e6 100644 --- a/storage/test/mock.go +++ b/storage/test/mock.go @@ -34,20 +34,6 @@ func (self *MockStorageDriver) RecentStats(containerName string, numStats int) ( return args.Get(0).([]*info.ContainerStats), args.Error(1) } -func (self *MockStorageDriver) Percentiles( - containerName string, - cpuUsagePercentiles []int, - memUsagePercentiles []int, -) (*info.ContainerStatsPercentiles, error) { - args := self.Called(containerName, cpuUsagePercentiles, memUsagePercentiles) - return args.Get(0).(*info.ContainerStatsPercentiles), args.Error(1) -} - -func (self *MockStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { - args := self.Called(containerName, numSamples) - return args.Get(0).([]*info.ContainerStatsSample), args.Error(1) -} - func (self *MockStorageDriver) Close() error { if self.MockCloseMethod { args := self.Called() diff --git a/storage/test/storagetests.go b/storage/test/storagetests.go index 6f6c761f..21385332 100644 --- a/storage/test/storagetests.go +++ b/storage/test/storagetests.go @@ -109,54 +109,6 @@ func DefaultStatsEq(a, b *info.ContainerStats) bool { return true } -// This function is useful because we do not require precise time -// representation. -func sampleEq(a, b *info.ContainerStatsSample) bool { - if !TimeEq(a.Timestamp, b.Timestamp, timePrecision) { - return false - } - if !durationEq(a.Duration, b.Duration, timePrecision) { - return false - } - if !reflect.DeepEqual(a.Cpu, b.Cpu) { - return false - } - if !reflect.DeepEqual(a.Memory, b.Memory) { - return false - } - return true -} - -func samplesInTrace(samples []*info.ContainerStatsSample, cpuTrace, memTrace []uint64, samplePeriod time.Duration, t *testing.T) { - for _, sample := range samples { - if sample.Duration != samplePeriod { - t.Errorf("sample duration is %v, not %v", sample.Duration, samplePeriod) - } - cpuUsage := sample.Cpu.Usage - memUsage := sample.Memory.Usage - found := false - for _, u := range cpuTrace { - if u == cpuUsage { - found = true - break - } - } - if !found { - t.Errorf("unable to find cpu usage %v", cpuUsage) - } - found = false - for _, u := range memTrace { - if u == memUsage { - found = true - break - } - } - if !found { - t.Errorf("unable to find mem usage %v", memUsage) - } - } -} - // This function will generate random stats and write // them into the storage. The function will not close the driver func StorageDriverFillRandomStatsFunc( @@ -190,187 +142,6 @@ func StorageDriverFillRandomStatsFunc( } } -func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) { - defer driver.Close() - N := 100 - cpuTrace := make([]uint64, 0, N) - memTrace := make([]uint64, 0, N) - - // We need N+1 observations to get N samples - for i := 0; i < N+1; i++ { - cpuTrace = append(cpuTrace, uint64(rand.Intn(1000))) - memTrace = append(memTrace, uint64(rand.Intn(1000))) - } - - samplePeriod := 1 * time.Second - - ref := info.ContainerReference{ - Name: "container", - } - - trace := buildTrace(cpuTrace, memTrace, samplePeriod) - - for _, stats := range trace { - err := driver.AddStats(ref, stats) - if err != nil { - t.Fatalf("unable to add stats: %v", err) - } - // set the trace to something else. The stats stored in the - // storage should not be affected. - stats.Cpu.Usage.Total = 0 - stats.Cpu.Usage.System = 0 - stats.Cpu.Usage.User = 0 - } - - samples, err := driver.Samples(ref.Name, N) - if err != nil { - t.Errorf("unable to sample stats: %v", err) - } - if len(samples) == 0 { - t.Fatal("should at least store one sample") - } - samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t) - - samples, err = driver.Samples(ref.Name, -1) - if err != nil { - t.Errorf("unable to sample stats: %v", err) - } - samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t) - - samples, err = driver.Samples(ref.Name, N-5) - if err != nil { - t.Errorf("unable to sample stats: %v", err) - } - samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t) -} - -func StorageDriverTestMaxMemoryUsage(driver TestStorageDriver, t *testing.T) { - defer driver.Close() - N := 100 - memTrace := make([]uint64, N) - cpuTrace := make([]uint64, N) - for i := 0; i < N; i++ { - memTrace[i] = uint64(i + 1) - cpuTrace[i] = uint64(1) - } - - ref := info.ContainerReference{ - Name: "container", - } - - trace := buildTrace(cpuTrace, memTrace, 1*time.Second) - - for _, stats := range trace { - err := driver.AddStats(ref, stats) - if err != nil { - t.Fatalf("unable to add stats: %v", err) - } - // set the trace to something else. The stats stored in the - // storage should not be affected. - stats.Cpu.Usage.Total = 0 - stats.Cpu.Usage.System = 0 - stats.Cpu.Usage.User = 0 - stats.Memory.Usage = 0 - } - - percentiles, err := driver.Percentiles(ref.Name, []int{50}, []int{50}) - if err != nil { - t.Errorf("unable to call Percentiles(): %v", err) - } - maxUsage := uint64(N) - if percentiles.MaxMemoryUsage != maxUsage { - t.Fatalf("Max memory usage should be %v; received %v", maxUsage, percentiles.MaxMemoryUsage) - } -} - -func StorageDriverTestSamplesWithoutSample(driver TestStorageDriver, t *testing.T) { - defer driver.Close() - trace := buildTrace( - []uint64{10}, - []uint64{10}, - 1*time.Second) - ref := info.ContainerReference{ - Name: "container", - } - driver.AddStats(ref, trace[0]) - samples, err := driver.Samples(ref.Name, -1) - if err != nil { - t.Fatal(err) - } - if len(samples) != 0 { - t.Errorf("There should be no sample") - } -} - -func StorageDriverTestPercentilesWithoutSample(driver TestStorageDriver, t *testing.T) { - defer driver.Close() - trace := buildTrace( - []uint64{10}, - []uint64{10}, - 1*time.Second) - ref := info.ContainerReference{ - Name: "container", - } - driver.AddStats(ref, trace[0]) - percentiles, err := driver.Percentiles( - ref.Name, - []int{50}, - []int{50}, - ) - if err != nil { - t.Fatal(err) - } - if percentiles != nil { - t.Errorf("There should be no percentiles") - } -} - -func StorageDriverTestPercentiles(driver TestStorageDriver, t *testing.T) { - defer driver.Close() - N := 100 - cpuTrace := make([]uint64, N) - memTrace := make([]uint64, N) - for i := 1; i < N+1; i++ { - cpuTrace[i-1] = uint64(i) - memTrace[i-1] = uint64(i) - } - - trace := buildTrace(cpuTrace, memTrace, 1*time.Second) - - ref := info.ContainerReference{ - Name: "container", - } - for _, stats := range trace { - driver.AddStats(ref, stats) - } - percentages := []int{ - 80, - 90, - 50, - } - percentiles, err := driver.Percentiles(ref.Name, percentages, percentages) - if err != nil { - t.Fatal(err) - } - for _, x := range percentiles.CpuUsagePercentiles { - for _, y := range percentiles.CpuUsagePercentiles { - // lower percentage, smaller value - if x.Percentage < y.Percentage && x.Value > y.Value { - t.Errorf("%v percent is %v; while %v percent is %v", - x.Percentage, x.Value, y.Percentage, y.Value) - } - } - } - for _, x := range percentiles.MemoryUsagePercentiles { - for _, y := range percentiles.MemoryUsagePercentiles { - if x.Percentage < y.Percentage && x.Value > y.Value { - t.Errorf("%v percent is %v; while %v percent is %v", - x.Percentage, x.Value, y.Percentage, y.Value) - } - } - } -} - func StorageDriverTestRetrievePartialRecentStats(driver TestStorageDriver, t *testing.T) { defer driver.Close() N := 100 @@ -465,37 +236,6 @@ func StorageDriverTestNoRecentStats(driver TestStorageDriver, t *testing.T) { } } -func StorageDriverTestNoSamples(driver TestStorageDriver, t *testing.T) { - defer driver.Close() - nonExistContainer := "somerandomecontainer" - samples, _ := driver.Samples(nonExistContainer, -1) - if len(samples) > 0 { - t.Errorf("Samples() returns %v samples on non exist container", len(samples)) - } -} - -func StorageDriverTestPercentilesWithoutStats(driver TestStorageDriver, t *testing.T) { - defer driver.Close() - nonExistContainer := "somerandomecontainer" - percentiles, _ := driver.Percentiles(nonExistContainer, []int{50, 80}, []int{50, 80}) - if percentiles == nil { - return - } - if percentiles.MaxMemoryUsage != 0 { - t.Errorf("Percentiles() reports max memory usage > 0 when there's no stats.") - } - for _, p := range percentiles.CpuUsagePercentiles { - if p.Value != 0 { - t.Errorf("Percentiles() reports cpu usage is %v when there's no stats.", p.Value) - } - } - for _, p := range percentiles.MemoryUsagePercentiles { - if p.Value != 0 { - t.Errorf("Percentiles() reports memory usage is %v when there's no stats.", p.Value) - } - } -} - func StorageDriverTestRetrieveZeroRecentStats(driver TestStorageDriver, t *testing.T) { defer driver.Close() N := 100 @@ -524,32 +264,3 @@ func StorageDriverTestRetrieveZeroRecentStats(driver TestStorageDriver, t *testi t.Errorf("RecentStats() returns %v stats when requests for 0 stats", len(recentStats)) } } - -func StorageDriverTestRetrieveZeroSamples(driver TestStorageDriver, t *testing.T) { - defer driver.Close() - N := 100 - memTrace := make([]uint64, N) - cpuTrace := make([]uint64, N) - for i := 0; i < N; i++ { - memTrace[i] = uint64(i + 1) - cpuTrace[i] = uint64(1) - } - - ref := info.ContainerReference{ - Name: "container", - } - - trace := buildTrace(cpuTrace, memTrace, 1*time.Second) - - for _, stats := range trace { - driver.AddStats(ref, stats) - } - - samples, err := driver.Samples(ref.Name, 0) - if err != nil { - t.Fatal(err) - } - if len(samples) > 0 { - t.Errorf("RecentStats() returns %v stats when requests for 0 stats", len(samples)) - } -} diff --git a/storagedriver.go b/storagedriver.go index ea0c94bb..40f7c320 100644 --- a/storagedriver.go +++ b/storagedriver.go @@ -43,10 +43,10 @@ func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) { var backendStorage storage.StorageDriver var err error // TODO(vmarmol): We shouldn't need the housekeeping interval here and it shouldn't be public. - samplesToCache := int(*argDbBufferDuration / *manager.HousekeepingInterval) - if samplesToCache < statsRequestedByUI { + statsToCache := int(*argDbBufferDuration / *manager.HousekeepingInterval) + if statsToCache < statsRequestedByUI { // The UI requests the most recent 60 stats by default. - samplesToCache = statsRequestedByUI + statsToCache = statsRequestedByUI } switch driverName { case "": @@ -67,8 +67,6 @@ func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) { *argDbHost, *argDbIsSecure, *argDbBufferDuration, - // TODO(monnand): One hour? Or user-defined? - 1*time.Hour, ) case "bigquery": var hostname string @@ -80,7 +78,6 @@ func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) { hostname, "cadvisor", *argDbName, - 1*time.Hour, ) default: @@ -89,7 +86,7 @@ func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) { if err != nil { return nil, err } - glog.Infof("Caching %d recent stats in memory; using \"%v\" storage driver\n", samplesToCache, driverName) - storageDriver = memory.New(samplesToCache, samplesToCache, backendStorage) + glog.Infof("Caching %d recent stats in memory; using \"%v\" storage driver\n", statsToCache, driverName) + storageDriver = memory.New(statsToCache, backendStorage) return storageDriver, nil }