From d4201f66e044221bdf121fadeebbe101a55aa3ee Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 16 Jun 2014 18:19:14 -0700 Subject: [PATCH] in memory storage --- storage/memory/memory.go | 205 ++++++++++++++++++++++++++++++++++ storage/memory/memory_test.go | 131 ++++++++++++++++++++++ 2 files changed, 336 insertions(+) create mode 100644 storage/memory/memory.go create mode 100644 storage/memory/memory_test.go diff --git a/storage/memory/memory.go b/storage/memory/memory.go new file mode 100644 index 00000000..b34b1bc3 --- /dev/null +++ b/storage/memory/memory.go @@ -0,0 +1,205 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "container/list" + "fmt" + "sync" + + "github.com/google/cadvisor/info" + "github.com/google/cadvisor/sampling" + "github.com/google/cadvisor/storage" +) + +// containerStorage is used to store per-container information +type containerStorage struct { + ref info.ContainerReference + prevStats *info.ContainerStats + sampler sampling.Sampler + recentStats *list.List + numRecentStats int + maxMemUsage uint64 + lock sync.RWMutex +} + +func (self *containerStorage) updatePrevStats(stats *info.ContainerStats) { + if stats == nil || stats.Cpu == nil || stats.Memory == nil { + // discard incomplete stats + self.prevStats = nil + return + } + if self.prevStats == nil { + self.prevStats = &info.ContainerStats{ + Cpu: &info.CpuStats{}, + Memory: &info.MemoryStats{}, + } + } + // make a deep copy. + self.prevStats.Timestamp = stats.Timestamp + *self.prevStats.Cpu = *stats.Cpu + self.prevStats.Cpu.Usage.PerCpu = make([]uint64, len(stats.Cpu.Usage.PerCpu)) + for i, perCpu := range stats.Cpu.Usage.PerCpu { + self.prevStats.Cpu.Usage.PerCpu[i] = perCpu + } + *self.prevStats.Memory = *stats.Memory +} + +func (self *containerStorage) AddStats(stats *info.ContainerStats) error { + self.lock.Lock() + defer self.lock.Unlock() + if self.prevStats != nil { + sample, err := info.NewSample(self.prevStats, stats) + if err != nil { + return fmt.Errorf("wrong stats: %v", err) + } + if sample != nil { + self.sampler.Update(sample) + } + } + if stats.Memory != nil { + if self.maxMemUsage < stats.Memory.Usage { + self.maxMemUsage = stats.Memory.Usage + } + } + if self.recentStats.Len() >= self.numRecentStats { + self.recentStats.Remove(self.recentStats.Front()) + } + self.recentStats.PushBack(stats) + self.updatePrevStats(stats) + return nil +} + +func (self *containerStorage) RecentStats(numStats int) ([]*info.ContainerStats, error) { + self.lock.RLock() + defer self.lock.RUnlock() + if self.recentStats.Len() < numStats || numStats < 0 { + numStats = self.recentStats.Len() + } + ret := make([]*info.ContainerStats, 0, numStats) + e := self.recentStats.Front() + for i := 0; i < numStats; i++ { + data := e.Value.(*info.ContainerStats) + ret = append(ret, data) + e = e.Next() + if e == nil { + break + } + } + return ret, nil +} + +func (self *containerStorage) Samples(numSamples int) ([]*info.ContainerStatsSample, error) { + self.lock.RLock() + defer self.lock.RUnlock() + if self.sampler.Len() < numSamples || numSamples < 0 { + numSamples = self.sampler.Len() + } + ret := make([]*info.ContainerStatsSample, 0, numSamples) + + self.sampler.Map(func(d interface{}) { + if len(ret) >= numSamples { + return + } + sample := d.(*info.ContainerStatsSample) + ret = append(ret, sample) + }) + return ret, nil +} + +func (self *containerStorage) Percentiles(cpuPercentiles, memPercentiles []int) (*info.ContainerStatsPercentiles, error) { + samples, err := self.Samples(-1) + if err != nil { + return nil, err + } + ret := new(info.ContainerStatsPercentiles) + ret.FillPercentiles(samples, cpuPercentiles, memPercentiles) + ret.MaxMemoryUsage = self.maxMemUsage + return ret, nil +} + +func newContainerStore(ref info.ContainerReference, maxNumSamples, numRecentStats int) *containerStorage { + s := sampling.NewReservoirSampler(maxNumSamples) + return &containerStorage{ + ref: ref, + recentStats: list.New(), + sampler: s, + numRecentStats: numRecentStats, + } +} + +type InMemoryStorage struct { + lock sync.RWMutex + containerStorageMap map[string]*containerStorage + maxNumSamples int + numRecentStats int +} + +func (self *InMemoryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { + var cstore *containerStorage + var ok bool + self.lock.Lock() + if cstore, ok = self.containerStorageMap[ref.Name]; !ok { + cstore = newContainerStore(ref, self.maxNumSamples, self.numRecentStats) + self.containerStorageMap[ref.Name] = cstore + } + self.lock.Unlock() + return cstore.AddStats(stats) +} + +func (self *InMemoryStorage) Samples(name string, numSamples int) ([]*info.ContainerStatsSample, error) { + var cstore *containerStorage + var ok bool + self.lock.RLock() + if cstore, ok = self.containerStorageMap[name]; !ok { + return nil, fmt.Errorf("unable to find data for container %v", name) + } + self.lock.RUnlock() + + return cstore.Samples(numSamples) +} + +func (self *InMemoryStorage) RecentStats(name string, numStats int) ([]*info.ContainerStats, error) { + var cstore *containerStorage + var ok bool + self.lock.RLock() + if cstore, ok = self.containerStorageMap[name]; !ok { + return nil, fmt.Errorf("unable to find data for container %v", name) + } + self.lock.RUnlock() + + return cstore.RecentStats(numStats) +} + +func (self *InMemoryStorage) Percentiles(name string, cpuPercentiles, memPercentiles []int) (*info.ContainerStatsPercentiles, error) { + var cstore *containerStorage + var ok bool + self.lock.RLock() + if cstore, ok = self.containerStorageMap[name]; !ok { + return nil, fmt.Errorf("unable to find data for container %v", name) + } + self.lock.RUnlock() + + return cstore.Percentiles(cpuPercentiles, memPercentiles) +} + +func New(maxNumSamples, numRecentStats int) storage.StorageDriver { + ret := &InMemoryStorage{ + containerStorageMap: make(map[string]*containerStorage, 32), + maxNumSamples: maxNumSamples, + numRecentStats: numRecentStats, + } + return ret +} diff --git a/storage/memory/memory_test.go b/storage/memory/memory_test.go new file mode 100644 index 00000000..750505b8 --- /dev/null +++ b/storage/memory/memory_test.go @@ -0,0 +1,131 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "math/rand" + "testing" + "time" + + "github.com/google/cadvisor/info" +) + +func buildTrace(cpu, mem []uint64, duration time.Duration) []*info.ContainerStats { + if len(cpu) != len(mem) { + panic("len(cpu) != len(mem)") + } + + ret := make([]*info.ContainerStats, len(cpu)) + currentTime := time.Now() + + var cpuTotalUsage uint64 = 0 + for i, cpuUsage := range cpu { + cpuTotalUsage += cpuUsage + stats := new(info.ContainerStats) + stats.Cpu = new(info.CpuStats) + stats.Memory = new(info.MemoryStats) + stats.Timestamp = currentTime + currentTime = currentTime.Add(duration) + + stats.Cpu.Usage.Total = cpuTotalUsage + stats.Cpu.Usage.User = stats.Cpu.Usage.Total + stats.Cpu.Usage.System = 0 + + stats.Memory.Usage = mem[i] + + ret[i] = stats + } + return ret +} + +func TestSampleCpuUsage(t *testing.T) { + // Number of samples + N := 10 + cpuTrace := make([]uint64, 0, N) + memTrace := make([]uint64, 0, N) + + // We need N+1 observations to get N samples + for i := 0; i < N+1; i++ { + cpuusage := uint64(rand.Intn(1000)) + memusage := uint64(rand.Intn(1000)) + cpuTrace = append(cpuTrace, cpuusage) + memTrace = append(memTrace, memusage) + } + + samplePeriod := 1 * time.Second + + storage := New(N, N) + + ref := info.ContainerReference{ + Name: "container", + } + + trace := buildTrace(cpuTrace, memTrace, samplePeriod) + + for _, stats := range trace { + storage.AddStats(ref, stats) + } + + samples, err := storage.Samples(ref.Name, N) + if err != nil { + t.Errorf("unable to sample stats: %v", err) + } + for _, sample := range samples { + if sample.Duration != samplePeriod { + t.Errorf("sample duration is %v, not %v", sample.Duration, samplePeriod) + } + cpuUsage := sample.Cpu.Usage + found := false + for _, u := range cpuTrace { + if u == cpuUsage { + found = true + } + } + if !found { + t.Errorf("unable to find cpu usage %v", cpuUsage) + } + } +} + +func TestMaxMemoryUsage(t *testing.T) { + N := 100 + memTrace := make([]uint64, N) + cpuTrace := make([]uint64, N) + for i := 0; i < N; i++ { + memTrace[i] = uint64(i + 1) + cpuTrace[i] = uint64(1) + } + + storage := New(N-10, N-10) + + ref := info.ContainerReference{ + Name: "container", + } + + trace := buildTrace(cpuTrace, memTrace, 1*time.Second) + + for _, stats := range trace { + storage.AddStats(ref, stats) + } + + percentiles, err := storage.Percentiles(ref.Name, []int{50}, []int{50}) + if err != nil { + t.Errorf("unable to call Percentiles(): %v", err) + } + maxUsage := uint64(N) + if percentiles.MaxMemoryUsage != maxUsage { + t.Fatalf("Max memory usage should be %v; received %v", maxUsage, percentiles.MaxMemoryUsage) + } +}