Removing sampling and percentiles from interface.

Also removing all implementations.
This commit is contained in:
Victor Marmol 2014-09-13 15:29:49 -07:00
parent 4397c6ed32
commit 42add2409a
23 changed files with 20 additions and 1809 deletions

View File

@ -15,9 +15,7 @@
package info
import (
"fmt"
"reflect"
"sort"
"time"
)
@ -79,11 +77,6 @@ type ContainerInfo struct {
// Historical statistics gathered from the container.
Stats []*ContainerStats `json:"stats,omitempty"`
// Randomly sampled container states.
Samples []*ContainerStatsSample `json:"samples,omitempty"`
StatsPercentiles *ContainerStatsPercentiles `json:"stats_summary,omitempty"`
}
// ContainerInfo may be (un)marshaled by json or other en/decoder. In that
@ -112,9 +105,6 @@ func (self *ContainerInfo) Eq(b *ContainerInfo) bool {
if !reflect.DeepEqual(self.Spec, b.Spec) {
return false
}
if !reflect.DeepEqual(self.StatsPercentiles, b.StatsPercentiles) {
return false
}
for i, expectedStats := range b.Stats {
selfStats := self.Stats[i]
@ -123,12 +113,6 @@ func (self *ContainerInfo) Eq(b *ContainerInfo) bool {
}
}
for i, expectedSample := range b.Samples {
selfSample := self.Samples[i]
if !expectedSample.Eq(selfSample) {
return false
}
}
return true
}
@ -278,24 +262,6 @@ func (self *ContainerStats) Copy(dst *ContainerStats) *ContainerStats {
return dst
}
type ContainerStatsSample struct {
// Timetamp of the end of the sample period
Timestamp time.Time `json:"timestamp"`
// Duration of the sample period
Duration time.Duration `json:"duration"`
Cpu struct {
// number of nanoseconds of CPU time used by the container
Usage uint64 `json:"usage"`
// Per-core usage of the container. (unit: nanoseconds)
PerCpuUsage []uint64 `json:"per_cpu_usage,omitempty"`
} `json:"cpu"`
Memory struct {
// Units: Bytes.
Usage uint64 `json:"usage"`
} `json:"memory"`
}
func timeEq(t1, t2 time.Time, tolerance time.Duration) bool {
// t1 should not be later than t2
if t1.After(t2) {
@ -339,35 +305,6 @@ func (a *ContainerStats) Eq(b *ContainerStats) bool {
return true
}
// This function is useful because we do not require precise time
// representation.
func (a *ContainerStatsSample) Eq(b *ContainerStatsSample) bool {
if !timeEq(a.Timestamp, b.Timestamp, timePrecision) {
return false
}
if !durationEq(a.Duration, b.Duration, timePrecision) {
return false
}
if !reflect.DeepEqual(a.Cpu, b.Cpu) {
return false
}
if !reflect.DeepEqual(a.Memory, b.Memory) {
return false
}
return true
}
type Percentile struct {
Percentage int `json:"percentage"`
Value uint64 `json:"value"`
}
type ContainerStatsPercentiles struct {
MaxMemoryUsage uint64 `json:"max_memory_usage,omitempty"`
MemoryUsagePercentiles []Percentile `json:"memory_usage_percentiles,omitempty"`
CpuUsagePercentiles []Percentile `json:"cpu_usage_percentiles,omitempty"`
}
// Saturate CPU usage to 0.
func calculateCpuUsage(prev, cur uint64) uint64 {
if prev > cur {
@ -375,104 +312,3 @@ func calculateCpuUsage(prev, cur uint64) uint64 {
}
return cur - prev
}
// Each sample needs two stats because the cpu usage in ContainerStats is
// cumulative.
// prev should be an earlier observation than current.
// This method is not thread/goroutine safe.
func NewSample(prev, current *ContainerStats) (*ContainerStatsSample, error) {
if prev == nil || current == nil {
return nil, fmt.Errorf("empty stats")
}
// Ignore this sample if it is incomplete
if prev.Cpu == nil || prev.Memory == nil || current.Cpu == nil || current.Memory == nil {
return nil, fmt.Errorf("incomplete stats")
}
// prev must be an early observation
if !current.Timestamp.After(prev.Timestamp) {
return nil, fmt.Errorf("wrong stats order")
}
var percpu []uint64
if len(current.Cpu.Usage.PerCpu) > 0 {
curNumCpus := len(current.Cpu.Usage.PerCpu)
percpu = make([]uint64, curNumCpus)
for i, currUsage := range current.Cpu.Usage.PerCpu {
var prevUsage uint64 = 0
if i < len(prev.Cpu.Usage.PerCpu) {
prevUsage = prev.Cpu.Usage.PerCpu[i]
}
percpu[i] = calculateCpuUsage(prevUsage, currUsage)
}
}
sample := new(ContainerStatsSample)
// Calculate the diff to get the CPU usage within the time interval.
sample.Cpu.Usage = calculateCpuUsage(prev.Cpu.Usage.Total, current.Cpu.Usage.Total)
sample.Cpu.PerCpuUsage = percpu
// Memory usage is current memory usage
sample.Memory.Usage = current.Memory.Usage
sample.Timestamp = current.Timestamp
sample.Duration = current.Timestamp.Sub(prev.Timestamp)
return sample, nil
}
type uint64Slice []uint64
func (self uint64Slice) Len() int {
return len(self)
}
func (self uint64Slice) Less(i, j int) bool {
return self[i] < self[j]
}
func (self uint64Slice) Swap(i, j int) {
self[i], self[j] = self[j], self[i]
}
func (self uint64Slice) Percentiles(requestedPercentiles ...int) []Percentile {
if len(self) == 0 {
return nil
}
ret := make([]Percentile, 0, len(requestedPercentiles))
sort.Sort(self)
for _, p := range requestedPercentiles {
idx := (len(self) * p / 100) - 1
if idx < 0 {
idx = 0
}
ret = append(
ret,
Percentile{
Percentage: p,
Value: self[idx],
},
)
}
return ret
}
func NewPercentiles(samples []*ContainerStatsSample, cpuPercentages, memoryPercentages []int) *ContainerStatsPercentiles {
if len(samples) == 0 {
return nil
}
cpuUsages := make([]uint64, 0, len(samples))
memUsages := make([]uint64, 0, len(samples))
for _, sample := range samples {
if sample == nil {
continue
}
cpuUsages = append(cpuUsages, sample.Cpu.Usage)
memUsages = append(memUsages, sample.Memory.Usage)
}
ret := new(ContainerStatsPercentiles)
ret.CpuUsagePercentiles = uint64Slice(cpuUsages).Percentiles(cpuPercentages...)
ret.MemoryUsagePercentiles = uint64Slice(memUsages).Percentiles(memoryPercentages...)
return ret
}

View File

@ -68,65 +68,6 @@ func TestStatsEndTime(t *testing.T) {
}
}
func TestPercentiles(t *testing.T) {
N := 100
data := make([]uint64, N)
for i := 1; i < N+1; i++ {
data[i-1] = uint64(i)
}
percentages := []int{
80,
90,
50,
}
percentiles := uint64Slice(data).Percentiles(percentages...)
for _, s := range percentiles {
if s.Value != uint64(s.Percentage) {
t.Errorf("%v percentile data should be %v, but got %v", s.Percentage, s.Percentage, s.Value)
}
}
}
func TestPercentilesSmallDataSet(t *testing.T) {
var value uint64 = 11
data := []uint64{value}
percentages := []int{
80,
90,
50,
}
percentiles := uint64Slice(data).Percentiles(percentages...)
for _, s := range percentiles {
if s.Value != value {
t.Errorf("%v percentile data should be %v, but got %v", s.Percentage, value, s.Value)
}
}
}
func TestNewSampleNilStats(t *testing.T) {
stats := &ContainerStats{
Cpu: &CpuStats{},
Memory: &MemoryStats{},
}
stats.Cpu.Usage.PerCpu = []uint64{uint64(10)}
stats.Cpu.Usage.Total = uint64(10)
stats.Cpu.Usage.System = uint64(2)
stats.Cpu.Usage.User = uint64(8)
stats.Memory.Usage = uint64(200)
sample, err := NewSample(nil, stats)
if err == nil {
t.Errorf("generated an unexpected sample: %+v", sample)
}
sample, err = NewSample(stats, nil)
if err == nil {
t.Errorf("generated an unexpected sample: %+v", sample)
}
}
func createStats(cpuUsage, memUsage uint64, timestamp time.Time) *ContainerStats {
stats := &ContainerStats{
Cpu: &CpuStats{},
@ -141,147 +82,6 @@ func createStats(cpuUsage, memUsage uint64, timestamp time.Time) *ContainerStats
return stats
}
func TestAddSample(t *testing.T) {
cpuPrevUsage := uint64(10)
cpuCurrentUsage := uint64(15)
memCurrentUsage := uint64(200)
prevTime := time.Now()
prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime)
current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second))
sample, err := NewSample(prev, current)
if err != nil {
t.Errorf("should be able to generate a sample. but received error: %v", err)
}
if sample == nil {
t.Fatalf("nil sample and nil error. unexpected result!")
}
if sample.Memory.Usage != memCurrentUsage {
t.Errorf("wrong memory usage: %v. should be %v", sample.Memory.Usage, memCurrentUsage)
}
if sample.Cpu.Usage != cpuCurrentUsage-cpuPrevUsage {
t.Errorf("wrong CPU usage: %v. should be %v", sample.Cpu.Usage, cpuCurrentUsage-cpuPrevUsage)
}
}
func TestAddSampleIncompleteStats(t *testing.T) {
cpuPrevUsage := uint64(10)
cpuCurrentUsage := uint64(15)
memCurrentUsage := uint64(200)
prevTime := time.Now()
prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime)
current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second))
stats := &ContainerStats{
Cpu: prev.Cpu,
Memory: nil,
}
sample, err := NewSample(stats, current)
if err == nil {
t.Errorf("generated an unexpected sample: %+v", sample)
}
sample, err = NewSample(prev, stats)
if err == nil {
t.Errorf("generated an unexpected sample: %+v", sample)
}
stats = &ContainerStats{
Cpu: nil,
Memory: prev.Memory,
}
sample, err = NewSample(stats, current)
if err == nil {
t.Errorf("generated an unexpected sample: %+v", sample)
}
sample, err = NewSample(prev, stats)
if err == nil {
t.Errorf("generated an unexpected sample: %+v", sample)
}
}
func TestAddSampleWrongOrder(t *testing.T) {
cpuPrevUsage := uint64(10)
cpuCurrentUsage := uint64(15)
memCurrentUsage := uint64(200)
prevTime := time.Now()
prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime)
current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second))
sample, err := NewSample(current, prev)
if err == nil {
t.Errorf("generated an unexpected sample: %v", sample)
}
}
func TestAddSampleNegativeCpuUsage(t *testing.T) {
cpuPrevUsage := uint64(15)
cpuCurrentUsage := uint64(10)
memCurrentUsage := uint64(200)
prevTime := time.Now()
prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime)
current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second))
sample, err := NewSample(prev, current)
if err != nil {
t.Errorf("expected to sample without error %+v", err)
}
if sample.Cpu.Usage != 0 || sample.Cpu.PerCpuUsage[0] != 0 {
t.Errorf("expected usage to saturate to 0: %+v", sample)
}
}
func TestAddSampleHotPluggingCpu(t *testing.T) {
cpuPrevUsage := uint64(10)
cpuCurrentUsage := uint64(15)
memCurrentUsage := uint64(200)
prevTime := time.Now()
prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime)
current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second))
current.Cpu.Usage.PerCpu = append(current.Cpu.Usage.PerCpu, 10)
sample, err := NewSample(prev, current)
if err != nil {
t.Errorf("should be able to generate a sample. but received error: %v", err)
}
if len(sample.Cpu.PerCpuUsage) != 2 {
t.Fatalf("Should have 2 cores.")
}
if sample.Cpu.PerCpuUsage[0] != cpuCurrentUsage-cpuPrevUsage {
t.Errorf("First cpu usage is %v. should be %v", sample.Cpu.PerCpuUsage[0], cpuCurrentUsage-cpuPrevUsage)
}
if sample.Cpu.PerCpuUsage[1] != 10 {
t.Errorf("Second cpu usage is %v. should be 10", sample.Cpu.PerCpuUsage[1])
}
}
func TestAddSampleHotUnpluggingCpu(t *testing.T) {
cpuPrevUsage := uint64(10)
cpuCurrentUsage := uint64(15)
memCurrentUsage := uint64(200)
prevTime := time.Now()
prev := createStats(cpuPrevUsage, memCurrentUsage, prevTime)
current := createStats(cpuCurrentUsage, memCurrentUsage, prevTime.Add(1*time.Second))
prev.Cpu.Usage.PerCpu = append(prev.Cpu.Usage.PerCpu, 10)
sample, err := NewSample(prev, current)
if err != nil {
t.Errorf("should be able to generate a sample. but received error: %v", err)
}
if len(sample.Cpu.PerCpuUsage) != 1 {
t.Fatalf("Should have 1 cores.")
}
if sample.Cpu.PerCpuUsage[0] != cpuCurrentUsage-cpuPrevUsage {
t.Errorf("First cpu usage is %v. should be %v", sample.Cpu.PerCpuUsage[0], cpuCurrentUsage-cpuPrevUsage)
}
}
func TestContainerStatsCopy(t *testing.T) {
stats := createStats(100, 101, time.Now())
shadowStats := stats.Copy(nil)

View File

@ -65,57 +65,14 @@ func GenerateRandomContainerSpec(numCores int) *info.ContainerSpec {
func GenerateRandomContainerInfo(containerName string, numCores int, query *info.ContainerInfoRequest, duration time.Duration) *info.ContainerInfo {
stats := GenerateRandomStats(query.NumStats, numCores, duration)
samples, _ := NewSamplesFromStats(stats...)
if len(samples) > query.NumSamples {
samples = samples[:query.NumSamples]
}
cpuPercentiles := make([]info.Percentile, 0, len(query.CpuUsagePercentiles))
// TODO(monnand): This will generate percentiles where 50%tile data may
// be larger than 90%tile data.
for _, p := range query.CpuUsagePercentiles {
percentile := info.Percentile{p, uint64(rand.Int63n(1000))}
cpuPercentiles = append(cpuPercentiles, percentile)
}
memPercentiles := make([]info.Percentile, 0, len(query.MemoryUsagePercentiles))
for _, p := range query.MemoryUsagePercentiles {
percentile := info.Percentile{p, uint64(rand.Int63n(1000))}
memPercentiles = append(memPercentiles, percentile)
}
percentiles := &info.ContainerStatsPercentiles{
MaxMemoryUsage: uint64(rand.Int63n(4096)),
MemoryUsagePercentiles: memPercentiles,
CpuUsagePercentiles: cpuPercentiles,
}
spec := GenerateRandomContainerSpec(numCores)
ret := &info.ContainerInfo{
ContainerReference: info.ContainerReference{
Name: containerName,
},
Spec: spec,
StatsPercentiles: percentiles,
Samples: samples,
Stats: stats,
Spec: spec,
Stats: stats,
}
return ret
}
func NewSamplesFromStats(stats ...*info.ContainerStats) ([]*info.ContainerStatsSample, error) {
if len(stats) < 2 {
return nil, nil
}
samples := make([]*info.ContainerStatsSample, 0, len(stats)-1)
for i, s := range stats[1:] {
prev := stats[i]
sample, err := info.NewSample(prev, s)
if err != nil {
return nil, fmt.Errorf("Unable to generate sample from %+v and %+v: %v",
prev, s, err)
}
samples = append(samples, sample)
}
return samples, nil
}

View File

@ -147,23 +147,7 @@ func (self *manager) containerDataToContainerInfo(cont *containerData, query *in
return nil, err
}
var percentiles *info.ContainerStatsPercentiles
var samples []*info.ContainerStatsSample
var stats []*info.ContainerStats
percentiles, err = self.storageDriver.Percentiles(
cinfo.Name,
query.CpuUsagePercentiles,
query.MemoryUsagePercentiles,
)
if err != nil {
return nil, err
}
samples, err = self.storageDriver.Samples(cinfo.Name, query.NumSamples)
if err != nil {
return nil, err
}
stats, err = self.storageDriver.RecentStats(cinfo.Name, query.NumStats)
stats, err := self.storageDriver.RecentStats(cinfo.Name, query.NumStats)
if err != nil {
return nil, err
}
@ -174,11 +158,9 @@ func (self *manager) containerDataToContainerInfo(cont *containerData, query *in
Name: cinfo.Name,
Aliases: cinfo.Aliases,
},
Subcontainers: cinfo.Subcontainers,
Spec: cinfo.Spec,
StatsPercentiles: percentiles,
Samples: samples,
Stats: stats,
Subcontainers: cinfo.Subcontainers,
Spec: cinfo.Spec,
Stats: stats,
}
// Set default value to an actual value

View File

@ -88,28 +88,7 @@ func expectManagerWithContainers(containers []string, query *info.ContainerInfoR
func(h *container.MockContainerHandler) {
cinfo := infosMap[h.Name]
stats := cinfo.Stats
samples := cinfo.Samples
percentiles := cinfo.StatsPercentiles
spec := cinfo.Spec
driver.On(
"Percentiles",
h.Name,
query.CpuUsagePercentiles,
query.MemoryUsagePercentiles,
).Return(
percentiles,
nil,
)
driver.On(
"Samples",
h.Name,
query.NumSamples,
).Return(
samples,
nil,
)
driver.On(
"RecentStats",
h.Name,
@ -142,10 +121,8 @@ func TestGetContainerInfo(t *testing.T) {
}
query := &info.ContainerInfoRequest{
NumStats: 256,
NumSamples: 128,
CpuUsagePercentiles: []int{10, 50, 90},
MemoryUsagePercentiles: []int{10, 80, 90},
NumStats: 256,
NumSamples: 128,
}
m, infosMap, handlerMap := expectManagerWithContainers(containers, query, t)
@ -178,10 +155,8 @@ func TestSubcontainersInfo(t *testing.T) {
}
query := &info.ContainerInfoRequest{
NumStats: 64,
NumSamples: 64,
CpuUsagePercentiles: []int{10, 50, 90},
MemoryUsagePercentiles: []int{10, 80, 90},
NumStats: 64,
NumSamples: 64,
}
m, _, _ := expectManagerWithContainers(containers, query, t)

View File

@ -1,51 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sampling
type autoFilterSampler struct {
// filter will run to remove elements before adding every observation
filter func(d interface{}) bool
sampler Sampler
}
func (self *autoFilterSampler) Len() int {
return self.sampler.Len()
}
func (self *autoFilterSampler) Reset() {
self.sampler.Reset()
}
func (self *autoFilterSampler) Map(f func(d interface{})) {
self.sampler.Map(f)
}
func (self *autoFilterSampler) Filter(filter func(d interface{}) bool) {
self.sampler.Filter(filter)
}
func (self *autoFilterSampler) Update(d interface{}) {
self.Filter(self.filter)
self.sampler.Update(d)
}
// Add a decorator for sampler. Whenever an Update() is called, the sampler will
// call filter() first to remove elements in the decorated sampler.
func NewAutoFilterSampler(sampler Sampler, filter func(d interface{}) bool) Sampler {
return &autoFilterSampler{
filter: filter,
sampler: sampler,
}
}

View File

@ -1,60 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sampling
import "time"
type autoResetSampler struct {
shouldReset func(d interface{}) bool
sampler Sampler
}
func (self *autoResetSampler) Len() int {
return self.sampler.Len()
}
func (self *autoResetSampler) Reset() {
self.sampler.Reset()
}
func (self *autoResetSampler) Map(f func(d interface{})) {
self.sampler.Map(f)
}
func (self *autoResetSampler) Filter(filter func(d interface{}) bool) {
self.sampler.Filter(filter)
}
func (self *autoResetSampler) Update(d interface{}) {
if self.shouldReset(d) {
self.sampler.Reset()
}
self.sampler.Update(d)
}
func NewPeriodicallyResetSampler(period time.Duration, sampler Sampler) Sampler {
lastRest := time.Now()
shouldReset := func(d interface{}) bool {
if time.Now().Sub(lastRest) > period {
lastRest = time.Now()
return true
}
return false
}
return &autoResetSampler{
shouldReset: shouldReset,
sampler: sampler,
}
}

View File

@ -1,171 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sampling
import (
"log"
"math/rand"
"sync"
"github.com/kr/pretty"
)
type empty struct{}
// Randomly generate number [start,end) except @except.
func randInt64Except(start, end int64, except map[int64]empty) int64 {
n := end - start
ret := rand.Int63n(n) + start
for _, ok := except[ret]; ok; _, ok = except[ret] {
ret = rand.Int63n(n) + start
}
return ret
}
// Basic idea:
// Every observation will have a sequence number as its id.
// Suppose we want to sample k observations within latest n observations
// At first, we generated k random numbers in [0,n). These random numbers
// will be used as ids of observations that will be sampled.
type chainSampler struct {
sampleSize int
windowSize int64
// Every observation will have a sequence number starting from 1.
// The sequence number must increase by one for each observation.
numObservations int64
// All samples stored as id -> value.
samples map[int64]interface{}
// The set of id of future observations.
futureSamples map[int64]empty
// The chain of samples: old observation id -> future observation id.
// When the old observation expires, the future observation will be
// stored as a sample.
sampleChain map[int64]int64
// Replacements are: observations whose previous sample is not expired
// id->value.
replacements map[int64]interface{}
lock sync.RWMutex
}
func (self *chainSampler) initFutureSamples() {
for i := 0; i < self.sampleSize; i++ {
n := randInt64Except(1, self.windowSize+1, self.futureSamples)
self.futureSamples[n] = empty{}
}
}
func (self *chainSampler) arrive(seqNum int64, obv interface{}) {
if _, ok := self.futureSamples[seqNum]; !ok {
// If this observation is not selected, ignore it.
return
}
delete(self.futureSamples, seqNum)
if len(self.samples) < self.sampleSize {
self.samples[seqNum] = obv
}
self.replacements[seqNum] = obv
// Select a future observation which will replace current observation
// when it expires.
futureSeqNum := randInt64Except(seqNum+1, seqNum+self.windowSize+1, self.futureSamples)
self.futureSamples[futureSeqNum] = empty{}
self.sampleChain[seqNum] = futureSeqNum
}
func (self *chainSampler) expireAndReplace() {
expSeqNum := self.numObservations - self.windowSize
if _, ok := self.samples[expSeqNum]; !ok {
// No sample expires
return
}
delete(self.samples, expSeqNum)
// There must be a replacement, otherwise panic.
replacementSeqNum := self.sampleChain[expSeqNum]
// The sequence number must increase by one for each observation.
replacement, ok := self.replacements[replacementSeqNum]
if !ok {
log.Printf("cannot find %v. which is the replacement of %v\n", replacementSeqNum, expSeqNum)
pretty.Printf("chain: %# v\n", self)
panic("Should never occur!")
}
// This observation must have arrived before.
self.samples[replacementSeqNum] = replacement
}
func (self *chainSampler) Update(obv interface{}) {
self.lock.Lock()
defer self.lock.Unlock()
self.numObservations++
self.arrive(self.numObservations, obv)
self.expireAndReplace()
}
func (self *chainSampler) Len() int {
self.lock.RLock()
defer self.lock.RUnlock()
return len(self.samples)
}
func (self *chainSampler) Reset() {
self.lock.Lock()
defer self.lock.Unlock()
self.numObservations = 0
self.samples = make(map[int64]interface{}, self.sampleSize)
self.futureSamples = make(map[int64]empty, self.sampleSize*2)
self.sampleChain = make(map[int64]int64, self.sampleSize*2)
self.replacements = make(map[int64]interface{}, self.sampleSize*2)
self.initFutureSamples()
}
func (self *chainSampler) Map(f func(d interface{})) {
self.lock.RLock()
defer self.lock.RUnlock()
for seqNum, obv := range self.samples {
if _, ok := obv.(int); !ok {
pretty.Printf("Seq %v. WAT: %# v\n", seqNum, obv)
}
f(obv)
}
}
// NOT SUPPORTED
func (self *chainSampler) Filter(filter func(d interface{}) bool) {
return
}
// Chain sampler described in
// Brian Babcok, Mayur Datar and Rajeev Motwani,
// Sampling From a Moving Window Over Streaming Data
func NewChainSampler(sampleSize, windowSize int) Sampler {
sampler := &chainSampler{
sampleSize: sampleSize,
windowSize: int64(windowSize),
samples: make(map[int64]interface{}, sampleSize),
futureSamples: make(map[int64]empty, sampleSize*2),
sampleChain: make(map[int64]int64, sampleSize*2),
replacements: make(map[int64]interface{}, sampleSize*2),
}
sampler.initFutureSamples()
return sampler
}

View File

@ -1,43 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sampling
import "testing"
func TestChainSampler(t *testing.T) {
numSamples := 10
windowSize := 10 * numSamples
numObservations := 10 * windowSize
numSampleRounds := 10 * numObservations
s := NewChainSampler(numSamples, windowSize)
hist := make(map[int]int, numSamples)
for i := 0; i < numSampleRounds; i++ {
sampleStream(hist, numObservations, s)
}
ratio := histStddev(hist) / histMean(hist)
if ratio > 1.05 {
// XXX(dengnan): better sampler?
t.Errorf("std dev: %v; mean: %v. Either we have a really bad PRNG, or a bad implementation", histStddev(hist), histMean(hist))
}
if len(hist) > windowSize {
t.Errorf("sampled %v data. larger than window size %v", len(hist), windowSize)
}
for seqNum, freq := range hist {
if seqNum < numObservations-windowSize && freq > 0 {
t.Errorf("observation with seqnum %v is sampled %v times", seqNum, freq)
}
}
}

View File

@ -1,17 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package sampling provides several sampling algorithms.
// These algorithms will be used to sample containers' stats information
package sampling

View File

@ -1,143 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sampling
import (
"container/heap"
"math"
"math/rand"
"sync"
)
type esSampleItem struct {
data interface{}
key float64
}
type esSampleHeap []esSampleItem
func (self esSampleHeap) Len() int {
return len(self)
}
func (self esSampleHeap) Less(i, j int) bool {
return self[i].key < self[j].key
}
func (self esSampleHeap) Swap(i, j int) {
self[i], self[j] = self[j], self[i]
}
func (self *esSampleHeap) Push(x interface{}) {
item := x.(esSampleItem)
*self = append(*self, item)
}
func (self *esSampleHeap) Pop() interface{} {
old := *self
item := old[len(old)-1]
*self = old[:len(old)-1]
return item
}
type esSampler struct {
weight func(interface{}) float64
samples *esSampleHeap
maxSize int
lock sync.RWMutex
}
func (self *esSampler) Update(d interface{}) {
self.lock.Lock()
defer self.lock.Unlock()
u := rand.Float64()
key := math.Pow(u, 1.0/self.weight(d))
if self.samples.Len() < self.maxSize {
heap.Push(self.samples, esSampleItem{
data: d,
key: key,
})
return
}
s := *(self.samples)
min := s[0]
// The key of the new item is larger than a key in existing item.
// Add this new item.
if key > min.key {
heap.Pop(self.samples)
heap.Push(self.samples, esSampleItem{
data: d,
key: key,
})
}
}
func (self *esSampler) Len() int {
self.lock.RLock()
defer self.lock.RUnlock()
return len(*self.samples)
}
func (self *esSampler) Reset() {
self.lock.Lock()
defer self.lock.Unlock()
self.samples = &esSampleHeap{}
heap.Init(self.samples)
}
func (self *esSampler) Map(f func(interface{})) {
self.lock.RLock()
defer self.lock.RUnlock()
for _, d := range *self.samples {
f(d.data)
}
}
func (self *esSampler) Filter(filter func(d interface{}) bool) {
self.lock.Lock()
defer self.lock.Unlock()
rmlist := make([]int, 0, len(*self.samples))
for i, d := range *self.samples {
if filter(d.data) {
rmlist = append(rmlist, i)
}
}
for _, i := range rmlist {
heap.Remove(self.samples, i)
}
}
// ES sampling algorithm described in
//
// Pavlos S. Efraimidis and Paul G. Spirakis. Weighted random sampling with a
// reservoir. Information Processing Letters, 97(5):181 185, 2006.
//
// http://dl.acm.org/citation.cfm?id=1138834
func NewESSampler(size int, weight func(interface{}) float64) Sampler {
s := &esSampleHeap{}
heap.Init(s)
return &esSampler{
maxSize: size,
samples: s,
weight: weight,
}
}

View File

@ -1,81 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sampling
import (
"container/heap"
"math/rand"
"testing"
"github.com/kr/pretty"
)
// This should be a min heap
func TestESSampleHeap(t *testing.T) {
h := &esSampleHeap{}
heap.Init(h)
min := 5.0
N := 10
for i := 0; i < N; i++ {
key := rand.Float64()
if key < min {
min = key
}
heap.Push(h, esSampleItem{nil, key})
}
l := *h
if l[0].key != min {
t.Errorf("not a min heap")
pretty.Printf("min=%v\nheap=%# v\n", min, l)
}
}
func TestESSampler(t *testing.T) {
reservoirSize := 10
numObvs := 10 * reservoirSize
numSampleRounds := 100 * numObvs
weight := func(d interface{}) float64 {
n := d.(int)
return float64(n + 1)
}
s := NewESSampler(reservoirSize, weight)
hist := make(map[int]int, numObvs)
for i := 0; i < numSampleRounds; i++ {
sampleStream(hist, numObvs, s)
}
diff := 2
wrongOrderedItems := make([]int, 0, numObvs)
threshold := 1.05
for i := 0; i < numObvs-diff; i++ {
// Item with smaller weight should have lower probability to be selected.
n1 := hist[i]
n2 := hist[i+diff]
if n1 > n2 {
if float64(n1) > float64(n2)*threshold {
wrongOrderedItems = append(wrongOrderedItems, i)
}
}
}
if float64(len(wrongOrderedItems)) > float64(numObvs)*0.05 {
for _, i := range wrongOrderedItems {
n1 := hist[i]
n2 := hist[i+diff]
t.Errorf("item with weight %v is selected %v times; while item with weight %v is selected %v times", i, n1, i+diff, n2)
}
}
}

View File

@ -1,99 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sampling
import (
"math/rand"
"sync"
)
// Reservoir sampling algorithm.
// http://en.wikipedia.org/wiki/Reservoir_sampling
type reservoirSampler struct {
maxSize int
samples []interface{}
numInstances int64
lock sync.RWMutex
}
func (self *reservoirSampler) Len() int {
self.lock.RLock()
defer self.lock.RUnlock()
return len(self.samples)
}
func (self *reservoirSampler) Reset() {
self.lock.Lock()
defer self.lock.Unlock()
self.samples = make([]interface{}, 0, self.maxSize)
self.numInstances = 0
}
// Update samples according to http://en.wikipedia.org/wiki/Reservoir_sampling
func (self *reservoirSampler) Update(d interface{}) {
self.lock.Lock()
defer self.lock.Unlock()
self.numInstances++
if len(self.samples) < self.maxSize {
self.samples = append(self.samples, d)
return
}
// Randomly generates a number between [0, numInstances).
// Use this random number, j, as an index. If j is larger than the
// reservoir size, we will ignore the current new data.
// Otherwise replace the jth element in reservoir with the new data.
j := rand.Int63n(self.numInstances)
if j < int64(len(self.samples)) {
self.samples[int(j)] = d
}
}
func (self *reservoirSampler) Map(f func(d interface{})) {
self.lock.RLock()
defer self.lock.RUnlock()
for _, d := range self.samples {
f(d)
}
}
// Once an element is removed, the probability of sampling an observation will
// be increased. Removing all elements in the sampler has the same effect as
// calling Reset(). However, it will not guarantee the uniform probability of
// all unfiltered samples.
func (self *reservoirSampler) Filter(filter func(d interface{}) bool) {
self.lock.Lock()
defer self.lock.Unlock()
rmlist := make([]int, 0, len(self.samples))
for i, d := range self.samples {
if filter(d) {
rmlist = append(rmlist, i)
}
}
for _, i := range rmlist {
// slice trick: remove the ith element without preserving the order
self.samples[i] = self.samples[len(self.samples)-1]
self.samples = self.samples[:len(self.samples)-1]
}
self.numInstances -= int64(len(rmlist))
}
func NewReservoirSampler(reservoirSize int) Sampler {
return &reservoirSampler{
maxSize: reservoirSize,
}
}

View File

@ -1,70 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sampling
import (
"math"
"testing"
)
func sampleStream(hist map[int]int, n int, s Sampler) {
s.Reset()
for i := 0; i < n; i++ {
s.Update(i)
}
s.Map(func(d interface{}) {
j := d.(int)
if _, ok := hist[j]; !ok {
hist[j] = 0
}
hist[j]++
})
}
func histMean(hist map[int]int) float64 {
total := 0
for _, v := range hist {
total += v
}
return float64(total) / float64(len(hist))
}
func histStddev(hist map[int]int) float64 {
mean := histMean(hist)
var totalDiff float64
for _, v := range hist {
diff := float64(v) - mean
sq := diff * diff
totalDiff += sq
}
return math.Sqrt(totalDiff / float64(len(hist)))
}
// XXX(dengnan): This test may take more than 10 seconds.
func TestReservoirSampler(t *testing.T) {
reservoirSize := 10
numSamples := 10 * reservoirSize
numSampleRounds := 100 * numSamples
s := NewReservoirSampler(reservoirSize)
hist := make(map[int]int, numSamples)
for i := 0; i < numSampleRounds; i++ {
sampleStream(hist, numSamples, s)
}
ratio := histStddev(hist) / histMean(hist)
if ratio > 0.05 {
t.Errorf("std dev: %v; mean: %v. Either we have a really bad PRNG, or a bad implementation", histStddev(hist), histMean(hist))
}
}

View File

@ -1,42 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sampling
import (
crand "crypto/rand"
"encoding/binary"
"math/rand"
)
func init() {
// NOTE(dengnan): Even if we picked a good random seed,
// the random number from math/rand is still not cryptographically secure!
var seed int64
binary.Read(crand.Reader, binary.LittleEndian, &seed)
rand.Seed(seed)
}
type Sampler interface {
Update(d interface{})
Len() int
Reset()
Map(f func(interface{}))
// Filter() should update in place. Removing elements may or may not
// affect the statistical behavior of the sampler, i.e. the probability
// that an observation will be sampled after removing some elements is
// implementation defined.
Filter(filter func(interface{}) bool)
}

View File

@ -27,9 +27,7 @@ import (
type bigqueryStorage struct {
client *client.Client
prevStats *info.ContainerStats
machineName string
windowLen time.Duration
}
const (
@ -57,10 +55,6 @@ const (
colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault"
// Hierarchical major page fault
colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault"
// Optional: sample duration. Unit: nanoseconds.
colSampleDuration string = "sample_duration"
// Optional: Instant cpu usage.
colCpuInstantUsage string = "cpu_instant_usage"
// Cumulative count of bytes received.
colRxBytes string = "rx_bytes"
// Cumulative count of receive errors encountered.
@ -138,16 +132,6 @@ func (self *bigqueryStorage) GetSchema() *bigquery.TableSchema {
Name: colMemoryHierarchicalPgmajfault,
}
i++
fields[i] = &bigquery.TableFieldSchema{
Type: typeInteger,
Name: colSampleDuration,
}
i++
fields[i] = &bigquery.TableFieldSchema{
Type: typeInteger,
Name: colCpuInstantUsage,
}
i++
fields[i] = &bigquery.TableFieldSchema{
Type: typeInteger,
Name: colRxBytes,
@ -227,17 +211,8 @@ func (self *bigqueryStorage) containerStatsToValues(
row[colTxErrors] = stats.Network.TxErrors
}
sample, err := info.NewSample(self.prevStats, stats)
if err != nil || sample == nil {
return
}
// TODO(jnagal): Handle per-cpu stats.
// Optional: sample duration. Unit: Nanosecond.
row[colSampleDuration] = sample.Duration
// Optional: Instant cpu usage
row[colCpuInstantUsage] = sample.Cpu.Usage
return
}
@ -342,55 +317,12 @@ func (self *bigqueryStorage) valuesToContainerStats(columns []string, values []i
return stats, nil
}
func (self *bigqueryStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) {
sample := &info.ContainerStatsSample{}
var err error
for i, col := range columns {
v := values[i]
switch {
case col == colTimestamp:
if t, ok := v.(time.Time); ok {
sample.Timestamp = t
}
case col == colMachineName:
if m, ok := v.(string); ok {
if m != self.machineName {
return nil, fmt.Errorf("different machine")
}
} else {
return nil, fmt.Errorf("machine name field is not a string: %v", v)
}
// Memory Usage
case col == colMemoryUsage:
sample.Memory.Usage, err = convertToUint64(v)
// sample duration. Unit: Nanosecond.
case col == colSampleDuration:
if v == nil {
// this record does not have sample_duration, so it's the first stats.
return nil, nil
}
sample.Duration = time.Duration(v.(int64))
// Instant cpu usage
case col == colCpuInstantUsage:
sample.Cpu.Usage, err = convertToUint64(v)
}
if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
}
}
if sample.Duration.Nanoseconds() == 0 {
return nil, nil
}
return sample, nil
}
func (self *bigqueryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
if stats == nil || stats.Cpu == nil || stats.Memory == nil {
return nil
}
row := self.containerStatsToValues(ref, stats)
self.prevStats = stats.Copy(self.prevStats)
err := self.client.InsertRow(row)
if err != nil {
@ -435,33 +367,19 @@ func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([]
return statsList, nil
}
func (self *bigqueryStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
return nil, fmt.Errorf("will be removed")
}
func (self *bigqueryStorage) Close() error {
self.client.Close()
self.client = nil
return nil
}
func (self *bigqueryStorage) Percentiles(
containerName string,
cpuUsagePercentiles []int,
memUsagePercentiles []int,
) (*info.ContainerStatsPercentiles, error) {
return nil, fmt.Errorf("will be removed")
}
// Create a new bigquery storage driver.
// machineName: A unique identifier to identify the host that current cAdvisor
// instance is running on.
// tableName: BigQuery table used for storing stats.
// percentilesDuration: Time window which will be considered when calls Percentiles()
func New(machineName,
datasetId,
tableName string,
percentilesDuration time.Duration,
) (storage.StorageDriver, error) {
bqClient, err := client.NewClient()
if err != nil {
@ -471,13 +389,9 @@ func New(machineName,
if err != nil {
return nil, err
}
if percentilesDuration.Seconds() < 1.0 {
percentilesDuration = 5 * time.Minute
}
ret := &bigqueryStorage{
client: bqClient,
windowLen: percentilesDuration,
machineName: machineName,
}
schema := ret.GetSchema()

View File

@ -27,7 +27,6 @@ type influxdbStorage struct {
client *influxdb.Client
machineName string
tableName string
windowLen time.Duration
bufferDuration time.Duration
lastWrite time.Time
series []*influxdb.Series
@ -256,23 +255,11 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]
return statsList, nil
}
func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
return nil, fmt.Errorf("will be removed")
}
func (self *influxdbStorage) Close() error {
self.client = nil
return nil
}
func (self *influxdbStorage) Percentiles(
containerName string,
cpuUsagePercentiles []int,
memUsagePercentiles []int,
) (*info.ContainerStatsPercentiles, error) {
return nil, fmt.Errorf("will be removed")
}
// Returns a new influxdb series.
func (self *influxdbStorage) newSeries(columns []string, points []interface{}) *influxdb.Series {
out := &influxdb.Series{
@ -288,7 +275,6 @@ func (self *influxdbStorage) newSeries(columns []string, points []interface{}) *
// machineName: A unique identifier to identify the host that current cAdvisor
// instance is running on.
// influxdbHost: The host which runs influxdb.
// percentilesDuration: Time window which will be considered when calls Percentiles()
func New(machineName,
tablename,
database,
@ -297,7 +283,6 @@ func New(machineName,
influxdbHost string,
isSecure bool,
bufferDuration time.Duration,
percentilesDuration time.Duration,
) (*influxdbStorage, error) {
config := &influxdb.ClientConfig{
Host: influxdbHost,
@ -312,13 +297,9 @@ func New(machineName,
}
// TODO(monnand): With go 1.3, we cannot compress data now.
client.DisableCompression()
if percentilesDuration.Seconds() < 1.0 {
percentilesDuration = 5 * time.Minute
}
ret := &influxdbStorage{
client: client,
windowLen: percentilesDuration,
machineName: machineName,
tableName: tablename,
bufferDuration: bufferDuration,

View File

@ -20,52 +20,26 @@ import (
"sync"
"github.com/google/cadvisor/info"
"github.com/google/cadvisor/sampling"
"github.com/google/cadvisor/storage"
)
// containerStorage is used to store per-container information
type containerStorage struct {
ref info.ContainerReference
prevStats *info.ContainerStats
sampler sampling.Sampler
recentStats *list.List
maxNumStats int
maxMemUsage uint64
lock sync.RWMutex
}
func (self *containerStorage) updatePrevStats(stats *info.ContainerStats) {
if stats == nil || stats.Cpu == nil || stats.Memory == nil {
// discard incomplete stats
self.prevStats = nil
return
}
self.prevStats = stats.Copy(self.prevStats)
}
func (self *containerStorage) AddStats(stats *info.ContainerStats) error {
self.lock.Lock()
defer self.lock.Unlock()
if self.prevStats != nil {
sample, err := info.NewSample(self.prevStats, stats)
if err != nil {
return fmt.Errorf("wrong stats: %v", err)
}
if sample != nil {
self.sampler.Update(sample)
}
}
if stats.Memory != nil {
if self.maxMemUsage < stats.Memory.Usage {
self.maxMemUsage = stats.Memory.Usage
}
}
// Add the stat to storage.
if self.recentStats.Len() >= self.maxNumStats {
self.recentStats.Remove(self.recentStats.Back())
}
self.recentStats.PushFront(stats)
self.updatePrevStats(stats)
return nil
}
@ -98,50 +72,10 @@ func (self *containerStorage) RecentStats(numStats int) ([]*info.ContainerStats,
return ret, nil
}
func (self *containerStorage) Samples(numSamples int) ([]*info.ContainerStatsSample, error) {
self.lock.RLock()
defer self.lock.RUnlock()
if self.sampler.Len() < numSamples || numSamples < 0 {
numSamples = self.sampler.Len()
}
ret := make([]*info.ContainerStatsSample, 0, numSamples)
var err error
self.sampler.Map(func(d interface{}) {
if len(ret) >= numSamples || err != nil {
return
}
sample, ok := d.(*info.ContainerStatsSample)
if !ok {
err = fmt.Errorf("An element in the sample is not a ContainerStatsSample")
}
ret = append(ret, sample)
})
if err != nil {
return nil, err
}
return ret, nil
}
func (self *containerStorage) Percentiles(cpuPercentiles, memPercentiles []int) (*info.ContainerStatsPercentiles, error) {
samples, err := self.Samples(-1)
if err != nil {
return nil, err
}
if len(samples) == 0 {
return nil, nil
}
ret := info.NewPercentiles(samples, cpuPercentiles, memPercentiles)
ret.MaxMemoryUsage = self.maxMemUsage
return ret, nil
}
func newContainerStore(ref info.ContainerReference, maxNumSamples, maxNumStats int) *containerStorage {
s := sampling.NewReservoirSampler(maxNumSamples)
func newContainerStore(ref info.ContainerReference, maxNumStats int) *containerStorage {
return &containerStorage{
ref: ref,
recentStats: list.New(),
sampler: s,
maxNumStats: maxNumStats,
}
}
@ -149,7 +83,6 @@ func newContainerStore(ref info.ContainerReference, maxNumSamples, maxNumStats i
type InMemoryStorage struct {
lock sync.RWMutex
containerStorageMap map[string]*containerStorage
maxNumSamples int
maxNumStats int
backend storage.StorageDriver
}
@ -162,7 +95,7 @@ func (self *InMemoryStorage) AddStats(ref info.ContainerReference, stats *info.C
self.lock.Lock()
defer self.lock.Unlock()
if cstore, ok = self.containerStorageMap[ref.Name]; !ok {
cstore = newContainerStore(ref, self.maxNumSamples, self.maxNumStats)
cstore = newContainerStore(ref, self.maxNumStats)
self.containerStorageMap[ref.Name] = cstore
}
}()
@ -176,26 +109,6 @@ func (self *InMemoryStorage) AddStats(ref info.ContainerReference, stats *info.C
return cstore.AddStats(stats)
}
func (self *InMemoryStorage) Samples(name string, numSamples int) ([]*info.ContainerStatsSample, error) {
var cstore *containerStorage
var ok bool
err := func() error {
self.lock.RLock()
defer self.lock.RUnlock()
if cstore, ok = self.containerStorageMap[name]; !ok {
return fmt.Errorf("unable to find data for container %v", name)
}
return nil
}()
if err != nil {
return nil, err
}
return cstore.Samples(numSamples)
}
func (self *InMemoryStorage) RecentStats(name string, numStats int) ([]*info.ContainerStats, error) {
var cstore *containerStorage
var ok bool
@ -214,24 +127,6 @@ func (self *InMemoryStorage) RecentStats(name string, numStats int) ([]*info.Con
return cstore.RecentStats(numStats)
}
func (self *InMemoryStorage) Percentiles(name string, cpuPercentiles, memPercentiles []int) (*info.ContainerStatsPercentiles, error) {
var cstore *containerStorage
var ok bool
err := func() error {
self.lock.RLock()
defer self.lock.RUnlock()
if cstore, ok = self.containerStorageMap[name]; !ok {
return fmt.Errorf("unable to find data for container %v", name)
}
return nil
}()
if err != nil {
return nil, err
}
return cstore.Percentiles(cpuPercentiles, memPercentiles)
}
func (self *InMemoryStorage) Close() error {
self.lock.Lock()
self.containerStorageMap = make(map[string]*containerStorage, 32)
@ -240,13 +135,11 @@ func (self *InMemoryStorage) Close() error {
}
func New(
maxNumSamples,
maxNumStats int,
backend storage.StorageDriver,
) *InMemoryStorage {
ret := &InMemoryStorage{
containerStorageMap: make(map[string]*containerStorage, 32),
maxNumSamples: maxNumSamples,
maxNumStats: maxNumStats,
backend: backend,
}

View File

@ -35,34 +35,11 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
for N := 10; N < maxSize; N += 10 {
testDriver := &memoryTestStorageDriver{}
testDriver.StorageDriver = New(N, N, nil)
testDriver.StorageDriver = New(N, nil)
f(testDriver, t)
}
}
func TestMaxMemoryUsage(t *testing.T) {
runStorageTest(test.StorageDriverTestMaxMemoryUsage, t)
}
func TestSampleCpuUsage(t *testing.T) {
runStorageTest(test.StorageDriverTestSampleCpuUsage, t)
}
func TestSamplesWithoutSample(t *testing.T) {
runStorageTest(test.StorageDriverTestSamplesWithoutSample, t)
}
func TestPercentilesWithoutSample(t *testing.T) {
runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t)
}
func TestPercentiles(t *testing.T) {
N := 100
testDriver := &memoryTestStorageDriver{}
testDriver.StorageDriver = New(N, N, nil)
test.StorageDriverTestPercentiles(testDriver, t)
}
func TestRetrievePartialRecentStats(t *testing.T) {
runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t)
}
@ -75,18 +52,6 @@ func TestNoRecentStats(t *testing.T) {
runStorageTest(test.StorageDriverTestNoRecentStats, t)
}
func TestNoSamples(t *testing.T) {
runStorageTest(test.StorageDriverTestNoSamples, t)
}
func TestPercentilesWithoutStats(t *testing.T) {
runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t)
}
func TestRetrieveZeroStats(t *testing.T) {
runStorageTest(test.StorageDriverTestRetrieveZeroRecentStats, t)
}
func TestRetrieveZeroSamples(t *testing.T) {
runStorageTest(test.StorageDriverTestRetrieveZeroSamples, t)
}

View File

@ -26,15 +26,6 @@ type StorageDriver interface {
// recent stats should be the last.
RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error)
// Read the specified percentiles of CPU and memory usage of the container.
// The implementation decides which time range to look at.
Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error)
// Returns samples of the container stats. If numSamples < 0, then
// the number of returned samples is implementation defined. Otherwise, the driver
// should return at most numSamples samples.
Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error)
// Close will clear the state of the storage driver. The elements
// stored in the underlying storage may or may not be deleted depending
// on the implementation of the storage driver.

View File

@ -34,20 +34,6 @@ func (self *MockStorageDriver) RecentStats(containerName string, numStats int) (
return args.Get(0).([]*info.ContainerStats), args.Error(1)
}
func (self *MockStorageDriver) Percentiles(
containerName string,
cpuUsagePercentiles []int,
memUsagePercentiles []int,
) (*info.ContainerStatsPercentiles, error) {
args := self.Called(containerName, cpuUsagePercentiles, memUsagePercentiles)
return args.Get(0).(*info.ContainerStatsPercentiles), args.Error(1)
}
func (self *MockStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
args := self.Called(containerName, numSamples)
return args.Get(0).([]*info.ContainerStatsSample), args.Error(1)
}
func (self *MockStorageDriver) Close() error {
if self.MockCloseMethod {
args := self.Called()

View File

@ -109,54 +109,6 @@ func DefaultStatsEq(a, b *info.ContainerStats) bool {
return true
}
// This function is useful because we do not require precise time
// representation.
func sampleEq(a, b *info.ContainerStatsSample) bool {
if !TimeEq(a.Timestamp, b.Timestamp, timePrecision) {
return false
}
if !durationEq(a.Duration, b.Duration, timePrecision) {
return false
}
if !reflect.DeepEqual(a.Cpu, b.Cpu) {
return false
}
if !reflect.DeepEqual(a.Memory, b.Memory) {
return false
}
return true
}
func samplesInTrace(samples []*info.ContainerStatsSample, cpuTrace, memTrace []uint64, samplePeriod time.Duration, t *testing.T) {
for _, sample := range samples {
if sample.Duration != samplePeriod {
t.Errorf("sample duration is %v, not %v", sample.Duration, samplePeriod)
}
cpuUsage := sample.Cpu.Usage
memUsage := sample.Memory.Usage
found := false
for _, u := range cpuTrace {
if u == cpuUsage {
found = true
break
}
}
if !found {
t.Errorf("unable to find cpu usage %v", cpuUsage)
}
found = false
for _, u := range memTrace {
if u == memUsage {
found = true
break
}
}
if !found {
t.Errorf("unable to find mem usage %v", memUsage)
}
}
}
// This function will generate random stats and write
// them into the storage. The function will not close the driver
func StorageDriverFillRandomStatsFunc(
@ -190,187 +142,6 @@ func StorageDriverFillRandomStatsFunc(
}
}
func StorageDriverTestSampleCpuUsage(driver TestStorageDriver, t *testing.T) {
defer driver.Close()
N := 100
cpuTrace := make([]uint64, 0, N)
memTrace := make([]uint64, 0, N)
// We need N+1 observations to get N samples
for i := 0; i < N+1; i++ {
cpuTrace = append(cpuTrace, uint64(rand.Intn(1000)))
memTrace = append(memTrace, uint64(rand.Intn(1000)))
}
samplePeriod := 1 * time.Second
ref := info.ContainerReference{
Name: "container",
}
trace := buildTrace(cpuTrace, memTrace, samplePeriod)
for _, stats := range trace {
err := driver.AddStats(ref, stats)
if err != nil {
t.Fatalf("unable to add stats: %v", err)
}
// set the trace to something else. The stats stored in the
// storage should not be affected.
stats.Cpu.Usage.Total = 0
stats.Cpu.Usage.System = 0
stats.Cpu.Usage.User = 0
}
samples, err := driver.Samples(ref.Name, N)
if err != nil {
t.Errorf("unable to sample stats: %v", err)
}
if len(samples) == 0 {
t.Fatal("should at least store one sample")
}
samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t)
samples, err = driver.Samples(ref.Name, -1)
if err != nil {
t.Errorf("unable to sample stats: %v", err)
}
samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t)
samples, err = driver.Samples(ref.Name, N-5)
if err != nil {
t.Errorf("unable to sample stats: %v", err)
}
samplesInTrace(samples, cpuTrace, memTrace, samplePeriod, t)
}
func StorageDriverTestMaxMemoryUsage(driver TestStorageDriver, t *testing.T) {
defer driver.Close()
N := 100
memTrace := make([]uint64, N)
cpuTrace := make([]uint64, N)
for i := 0; i < N; i++ {
memTrace[i] = uint64(i + 1)
cpuTrace[i] = uint64(1)
}
ref := info.ContainerReference{
Name: "container",
}
trace := buildTrace(cpuTrace, memTrace, 1*time.Second)
for _, stats := range trace {
err := driver.AddStats(ref, stats)
if err != nil {
t.Fatalf("unable to add stats: %v", err)
}
// set the trace to something else. The stats stored in the
// storage should not be affected.
stats.Cpu.Usage.Total = 0
stats.Cpu.Usage.System = 0
stats.Cpu.Usage.User = 0
stats.Memory.Usage = 0
}
percentiles, err := driver.Percentiles(ref.Name, []int{50}, []int{50})
if err != nil {
t.Errorf("unable to call Percentiles(): %v", err)
}
maxUsage := uint64(N)
if percentiles.MaxMemoryUsage != maxUsage {
t.Fatalf("Max memory usage should be %v; received %v", maxUsage, percentiles.MaxMemoryUsage)
}
}
func StorageDriverTestSamplesWithoutSample(driver TestStorageDriver, t *testing.T) {
defer driver.Close()
trace := buildTrace(
[]uint64{10},
[]uint64{10},
1*time.Second)
ref := info.ContainerReference{
Name: "container",
}
driver.AddStats(ref, trace[0])
samples, err := driver.Samples(ref.Name, -1)
if err != nil {
t.Fatal(err)
}
if len(samples) != 0 {
t.Errorf("There should be no sample")
}
}
func StorageDriverTestPercentilesWithoutSample(driver TestStorageDriver, t *testing.T) {
defer driver.Close()
trace := buildTrace(
[]uint64{10},
[]uint64{10},
1*time.Second)
ref := info.ContainerReference{
Name: "container",
}
driver.AddStats(ref, trace[0])
percentiles, err := driver.Percentiles(
ref.Name,
[]int{50},
[]int{50},
)
if err != nil {
t.Fatal(err)
}
if percentiles != nil {
t.Errorf("There should be no percentiles")
}
}
func StorageDriverTestPercentiles(driver TestStorageDriver, t *testing.T) {
defer driver.Close()
N := 100
cpuTrace := make([]uint64, N)
memTrace := make([]uint64, N)
for i := 1; i < N+1; i++ {
cpuTrace[i-1] = uint64(i)
memTrace[i-1] = uint64(i)
}
trace := buildTrace(cpuTrace, memTrace, 1*time.Second)
ref := info.ContainerReference{
Name: "container",
}
for _, stats := range trace {
driver.AddStats(ref, stats)
}
percentages := []int{
80,
90,
50,
}
percentiles, err := driver.Percentiles(ref.Name, percentages, percentages)
if err != nil {
t.Fatal(err)
}
for _, x := range percentiles.CpuUsagePercentiles {
for _, y := range percentiles.CpuUsagePercentiles {
// lower percentage, smaller value
if x.Percentage < y.Percentage && x.Value > y.Value {
t.Errorf("%v percent is %v; while %v percent is %v",
x.Percentage, x.Value, y.Percentage, y.Value)
}
}
}
for _, x := range percentiles.MemoryUsagePercentiles {
for _, y := range percentiles.MemoryUsagePercentiles {
if x.Percentage < y.Percentage && x.Value > y.Value {
t.Errorf("%v percent is %v; while %v percent is %v",
x.Percentage, x.Value, y.Percentage, y.Value)
}
}
}
}
func StorageDriverTestRetrievePartialRecentStats(driver TestStorageDriver, t *testing.T) {
defer driver.Close()
N := 100
@ -465,37 +236,6 @@ func StorageDriverTestNoRecentStats(driver TestStorageDriver, t *testing.T) {
}
}
func StorageDriverTestNoSamples(driver TestStorageDriver, t *testing.T) {
defer driver.Close()
nonExistContainer := "somerandomecontainer"
samples, _ := driver.Samples(nonExistContainer, -1)
if len(samples) > 0 {
t.Errorf("Samples() returns %v samples on non exist container", len(samples))
}
}
func StorageDriverTestPercentilesWithoutStats(driver TestStorageDriver, t *testing.T) {
defer driver.Close()
nonExistContainer := "somerandomecontainer"
percentiles, _ := driver.Percentiles(nonExistContainer, []int{50, 80}, []int{50, 80})
if percentiles == nil {
return
}
if percentiles.MaxMemoryUsage != 0 {
t.Errorf("Percentiles() reports max memory usage > 0 when there's no stats.")
}
for _, p := range percentiles.CpuUsagePercentiles {
if p.Value != 0 {
t.Errorf("Percentiles() reports cpu usage is %v when there's no stats.", p.Value)
}
}
for _, p := range percentiles.MemoryUsagePercentiles {
if p.Value != 0 {
t.Errorf("Percentiles() reports memory usage is %v when there's no stats.", p.Value)
}
}
}
func StorageDriverTestRetrieveZeroRecentStats(driver TestStorageDriver, t *testing.T) {
defer driver.Close()
N := 100
@ -524,32 +264,3 @@ func StorageDriverTestRetrieveZeroRecentStats(driver TestStorageDriver, t *testi
t.Errorf("RecentStats() returns %v stats when requests for 0 stats", len(recentStats))
}
}
func StorageDriverTestRetrieveZeroSamples(driver TestStorageDriver, t *testing.T) {
defer driver.Close()
N := 100
memTrace := make([]uint64, N)
cpuTrace := make([]uint64, N)
for i := 0; i < N; i++ {
memTrace[i] = uint64(i + 1)
cpuTrace[i] = uint64(1)
}
ref := info.ContainerReference{
Name: "container",
}
trace := buildTrace(cpuTrace, memTrace, 1*time.Second)
for _, stats := range trace {
driver.AddStats(ref, stats)
}
samples, err := driver.Samples(ref.Name, 0)
if err != nil {
t.Fatal(err)
}
if len(samples) > 0 {
t.Errorf("RecentStats() returns %v stats when requests for 0 stats", len(samples))
}
}

View File

@ -43,10 +43,10 @@ func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) {
var backendStorage storage.StorageDriver
var err error
// TODO(vmarmol): We shouldn't need the housekeeping interval here and it shouldn't be public.
samplesToCache := int(*argDbBufferDuration / *manager.HousekeepingInterval)
if samplesToCache < statsRequestedByUI {
statsToCache := int(*argDbBufferDuration / *manager.HousekeepingInterval)
if statsToCache < statsRequestedByUI {
// The UI requests the most recent 60 stats by default.
samplesToCache = statsRequestedByUI
statsToCache = statsRequestedByUI
}
switch driverName {
case "":
@ -67,8 +67,6 @@ func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) {
*argDbHost,
*argDbIsSecure,
*argDbBufferDuration,
// TODO(monnand): One hour? Or user-defined?
1*time.Hour,
)
case "bigquery":
var hostname string
@ -80,7 +78,6 @@ func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) {
hostname,
"cadvisor",
*argDbName,
1*time.Hour,
)
default:
@ -89,7 +86,7 @@ func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) {
if err != nil {
return nil, err
}
glog.Infof("Caching %d recent stats in memory; using \"%v\" storage driver\n", samplesToCache, driverName)
storageDriver = memory.New(samplesToCache, samplesToCache, backendStorage)
glog.Infof("Caching %d recent stats in memory; using \"%v\" storage driver\n", statsToCache, driverName)
storageDriver = memory.New(statsToCache, backendStorage)
return storageDriver, nil
}