remove StatsPercentiles in container handler

This commit is contained in:
Nan Deng 2014-06-17 12:55:51 -07:00
parent 7f5e082d2c
commit 3718b139b3
14 changed files with 29 additions and 557 deletions

View File

@ -19,34 +19,27 @@ import (
"fmt"
"log"
"net/http"
"time"
"github.com/google/cadvisor/api"
"github.com/google/cadvisor/container"
"github.com/google/cadvisor/container/docker"
"github.com/google/cadvisor/container/lmctfy"
"github.com/google/cadvisor/info"
"github.com/google/cadvisor/manager"
"github.com/google/cadvisor/pages"
"github.com/google/cadvisor/pages/static"
"github.com/google/cadvisor/storage/memory"
)
var argPort = flag.Int("port", 8080, "port to listen")
var argSampleSize = flag.Int("samples", 1024, "number of samples we want to keep")
var argResetPeriod = flag.Duration("reset_period", 2*time.Hour, "period to reset the samples")
var argHistoryDuration = flag.Int("history_duration", 60, "number of seconds of container history to keep")
func main() {
flag.Parse()
// XXX(dengnan): Should we allow users to specify which sampler they want to use?
container.SetStatsParameter(&container.StatsParameter{
Sampler: "uniform",
NumSamples: *argSampleSize,
ResetPeriod: *argResetPeriod,
})
storage := memory.New(*argSampleSize, *argHistoryDuration)
// TODO(monnand): Add stats writer for manager
containerManager, err := manager.New(nil)
containerManager, err := manager.New(storage)
if err != nil {
log.Fatalf("Failed to create a Container Manager: %s", err)
}

View File

@ -14,11 +14,7 @@
package container
import (
"fmt"
"github.com/google/cadvisor/info"
)
import "github.com/google/cadvisor/info"
// Listing types.
const (
@ -36,12 +32,4 @@ type ContainerHandler interface {
ListContainers(listType ListType) ([]info.ContainerReference, error)
ListThreads(listType ListType) ([]int, error)
ListProcesses(listType ListType) ([]int, error)
StatsPercentiles() (*info.ContainerStatsPercentiles, error)
}
type NoStatsSummary struct {
}
func (self *NoStatsSummary) StatsPercentiles() (*info.ContainerStatsPercentiles, error) {
return nil, fmt.Errorf("This method (StatsSummary) should never be called")
}

View File

@ -93,12 +93,11 @@ func Register(factory info.MachineInfoFactory, paths ...string) error {
f := &dockerFactory{
machineInfoFactory: factory,
}
cf := container.AddStatsSummaryToFactory(f)
for _, p := range paths {
if p != "/" && p != "/docker" {
return fmt.Errorf("%v cannot be managed by docker", p)
}
container.RegisterContainerHandlerFactory(p, cf)
container.RegisterContainerHandlerFactory(p, f)
}
return nil
}

View File

@ -37,7 +37,6 @@ type dockerContainerHandler struct {
name string
aliases []string
machineInfoFactory info.MachineInfoFactory
container.NoStatsSummary
}
func newDockerContainerHandler(

View File

@ -23,7 +23,6 @@ import (
type containerListFilter struct {
filter func(string) bool
handler ContainerHandler
NoStatsSummary
}
func (self *containerListFilter) ContainerReference() (info.ContainerReference, error) {

View File

@ -24,7 +24,6 @@ import (
type mockContainerHandler struct {
mock.Mock
NoStatsSummary
}
func (self *mockContainerHandler) GetSpec() (*info.ContainerSpec, error) {

View File

@ -26,7 +26,7 @@ func Register(paths ...string) error {
if _, err := exec.LookPath("lmctfy"); err != nil {
return errors.New("cannot find lmctfy")
}
f := container.AddStatsSummaryToFactory(&lmctfyFactory{})
f := &lmctfyFactory{}
for _, path := range paths {
log.Printf("register lmctfy under %v", path)
container.RegisterContainerHandlerFactory(path, f)

View File

@ -31,7 +31,6 @@ import (
type lmctfyContainerHandler struct {
// Container name
Name string
container.NoStatsSummary
}
const (

View File

@ -1,115 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package container
import (
"fmt"
"sync"
"time"
"github.com/google/cadvisor/info"
"github.com/google/cadvisor/sampling"
)
type samplerFactor interface {
String() string
NewSampler(*StatsParameter) (sampling.Sampler, error)
}
type samplerManager struct {
factoryMap map[string]samplerFactor
lock sync.RWMutex
}
func (self *samplerManager) Register(factory samplerFactor) {
self.lock.Lock()
defer self.lock.Unlock()
if self.factoryMap == nil {
self.factoryMap = make(map[string]samplerFactor, 3)
}
self.factoryMap[factory.String()] = factory
}
func (self *samplerManager) NewSampler(param *StatsParameter) (sampling.Sampler, error) {
self.lock.RLock()
defer self.lock.RUnlock()
if f, ok := self.factoryMap[param.Sampler]; ok {
return f.NewSampler(param)
}
return nil, fmt.Errorf("unknown sampler %v", param.Sampler)
}
var globalSamplerManager samplerManager
func NewSampler(param *StatsParameter) (sampling.Sampler, error) {
return globalSamplerManager.NewSampler(param)
}
type reservoirSamplerFactory struct {
}
func (self *reservoirSamplerFactory) String() string {
return "uniform"
}
func (self *reservoirSamplerFactory) NewSampler(param *StatsParameter) (sampling.Sampler, error) {
s := sampling.NewReservoirSampler(param.NumSamples)
if param.ResetPeriod.Seconds() > 1.0 {
s = sampling.NewPeriodicallyResetSampler(param.ResetPeriod, s)
}
return s, nil
}
type esSamplerFactory struct {
startTime time.Time
}
func (self *esSamplerFactory) String() string {
return "weighted"
}
func (self *esSamplerFactory) NewSampler(param *StatsParameter) (sampling.Sampler, error) {
s := sampling.NewESSampler(param.NumSamples, func(d interface{}) float64 {
stats := d.(*info.ContainerStats)
delta := self.startTime.Sub(stats.Timestamp)
return delta.Seconds()
})
if param.ResetPeriod.Seconds() > 1.0 {
s = sampling.NewPeriodicallyResetSampler(param.ResetPeriod, s)
}
return s, nil
}
type chainSamplerFactory struct {
}
func (self *chainSamplerFactory) String() string {
return "window"
}
func (self *chainSamplerFactory) NewSampler(param *StatsParameter) (sampling.Sampler, error) {
s := sampling.NewChainSampler(param.NumSamples, param.WindowSize)
if param.ResetPeriod.Seconds() > 1.0 {
s = sampling.NewPeriodicallyResetSampler(param.ResetPeriod, s)
}
return s, nil
}
func init() {
globalSamplerManager.Register(&reservoirSamplerFactory{})
globalSamplerManager.Register(&esSamplerFactory{time.Now()})
globalSamplerManager.Register(&chainSamplerFactory{})
}

View File

@ -1,143 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package container
import (
"fmt"
"sync"
"time"
"github.com/google/cadvisor/info"
"github.com/google/cadvisor/sampling"
)
type percentilesContainerHandlerWrapper struct {
handler ContainerHandler
prevStats *info.ContainerStats
numStats uint64
maxMemUsage uint64
sampler sampling.Sampler
lock sync.Mutex
}
func (self *percentilesContainerHandlerWrapper) GetSpec() (*info.ContainerSpec, error) {
return self.handler.GetSpec()
}
func (self *percentilesContainerHandlerWrapper) ContainerReference() (info.ContainerReference, error) {
return self.handler.ContainerReference()
}
func (self *percentilesContainerHandlerWrapper) updatePrevStats(stats *info.ContainerStats) {
if stats == nil || stats.Cpu == nil || stats.Memory == nil {
// discard incomplete stats
self.prevStats = nil
return
}
if self.prevStats == nil {
self.prevStats = &info.ContainerStats{
Cpu: &info.CpuStats{},
Memory: &info.MemoryStats{},
}
}
// make a deep copy.
self.prevStats.Timestamp = stats.Timestamp
*self.prevStats.Cpu = *stats.Cpu
self.prevStats.Cpu.Usage.PerCpu = make([]uint64, len(stats.Cpu.Usage.PerCpu))
for i, perCpu := range stats.Cpu.Usage.PerCpu {
self.prevStats.Cpu.Usage.PerCpu[i] = perCpu
}
*self.prevStats.Memory = *stats.Memory
}
func (self *percentilesContainerHandlerWrapper) GetStats() (*info.ContainerStats, error) {
stats, err := self.handler.GetStats()
if err != nil {
return nil, err
}
if stats == nil {
return nil, fmt.Errorf("container handler returns a nil error and a nil stats")
}
if stats.Timestamp.IsZero() {
return nil, fmt.Errorf("container handler did not set timestamp")
}
self.lock.Lock()
defer self.lock.Unlock()
if self.prevStats != nil {
sample, err := info.NewSample(self.prevStats, stats)
if err != nil {
return nil, fmt.Errorf("wrong stats: %v", err)
}
if sample != nil {
self.sampler.Update(sample)
}
}
self.updatePrevStats(stats)
self.numStats++
if stats.Memory != nil {
if stats.Memory.Usage > self.maxMemUsage {
self.maxMemUsage = stats.Memory.Usage
}
}
return stats, nil
}
func (self *percentilesContainerHandlerWrapper) ListContainers(listType ListType) ([]info.ContainerReference, error) {
return self.handler.ListContainers(listType)
}
func (self *percentilesContainerHandlerWrapper) ListThreads(listType ListType) ([]int, error) {
return self.handler.ListThreads(listType)
}
func (self *percentilesContainerHandlerWrapper) ListProcesses(listType ListType) ([]int, error) {
return self.handler.ListProcesses(listType)
}
func (self *percentilesContainerHandlerWrapper) StatsPercentiles() (*info.ContainerStatsPercentiles, error) {
self.lock.Lock()
defer self.lock.Unlock()
samples := make([]*info.ContainerStatsSample, 0, self.sampler.Len())
self.sampler.Map(func(d interface{}) {
stats := d.(*info.ContainerStatsSample)
samples = append(samples, stats)
})
ret := info.NewPercentiles(
samples,
[]int{50, 80, 90, 95, 99},
[]int{50, 80, 90, 95, 99},
)
ret.MaxMemoryUsage = self.maxMemUsage
return ret, nil
}
type StatsParameter struct {
Sampler string
NumSamples int
WindowSize int
ResetPeriod time.Duration
}
func AddStatsSummary(handler ContainerHandler, parameter *StatsParameter) (ContainerHandler, error) {
sampler, err := NewSampler(parameter)
if err != nil {
return nil, err
}
return &percentilesContainerHandlerWrapper{
handler: handler,
sampler: sampler,
}, nil
}

View File

@ -1,188 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package container
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/google/cadvisor/info"
)
type mockContainer struct {
}
func (self *mockContainer) GetSpec() (*info.ContainerSpec, error) {
return nil, nil
}
func (self *mockContainer) ListContainers(listType ListType) ([]info.ContainerReference, error) {
return nil, nil
}
func (self *mockContainer) ListThreads(listType ListType) ([]int, error) {
return nil, nil
}
func (self *mockContainer) ListProcesses(listType ListType) ([]int, error) {
return nil, nil
}
func TestMaxMemoryUsage(t *testing.T) {
N := 100
memTrace := make([]uint64, N)
for i := 0; i < N; i++ {
memTrace[i] = uint64(i + 1)
}
handler, err := AddStatsSummary(
containerWithTrace(1*time.Second, nil, memTrace),
&StatsParameter{
Sampler: "uniform",
NumSamples: 10,
},
)
if err != nil {
t.Error(err)
}
maxUsage := uint64(N)
for i := 0; i < N; i++ {
_, err := handler.GetStats()
if err != nil {
t.Errorf("Error when get stats: %v", err)
continue
}
}
summary, err := handler.StatsPercentiles()
if err != nil {
t.Fatalf("Error when get summary: %v", err)
}
if summary.MaxMemoryUsage != maxUsage {
t.Fatalf("Max memory usage should be %v; received %v", maxUsage, summary.MaxMemoryUsage)
}
}
type replayTrace struct {
NoStatsSummary
mockContainer
cpuTrace []uint64
memTrace []uint64
totalUsage uint64
currenttime time.Time
duration time.Duration
lock sync.Mutex
}
func containerWithTrace(duration time.Duration, cpuUsages []uint64, memUsages []uint64) ContainerHandler {
return &replayTrace{
duration: duration,
cpuTrace: cpuUsages,
memTrace: memUsages,
currenttime: time.Now(),
}
}
func (self *replayTrace) ContainerReference() (info.ContainerReference, error) {
return info.ContainerReference{
Name: "replay",
}, nil
}
func (self *replayTrace) GetStats() (*info.ContainerStats, error) {
stats := new(info.ContainerStats)
stats.Cpu = new(info.CpuStats)
stats.Memory = new(info.MemoryStats)
if len(self.memTrace) > 0 {
stats.Memory.Usage = self.memTrace[0]
self.memTrace = self.memTrace[1:]
}
self.lock.Lock()
defer self.lock.Unlock()
cpuTrace := self.totalUsage
if len(self.cpuTrace) > 0 {
cpuTrace += self.cpuTrace[0]
self.cpuTrace = self.cpuTrace[1:]
}
self.totalUsage = cpuTrace
stats.Timestamp = self.currenttime
self.currenttime = self.currenttime.Add(self.duration)
stats.Cpu.Usage.Total = cpuTrace
stats.Cpu.Usage.PerCpu = []uint64{cpuTrace}
stats.Cpu.Usage.User = cpuTrace
stats.Cpu.Usage.System = 0
return stats, nil
}
func TestSampleCpuUsage(t *testing.T) {
// Number of samples
N := 10
cpuTrace := make([]uint64, 0, N)
memTrace := make([]uint64, 0, N)
// We need N+1 observations to get N samples
for i := 0; i < N+1; i++ {
cpuusage := uint64(rand.Intn(1000))
memusage := uint64(rand.Intn(1000))
cpuTrace = append(cpuTrace, cpuusage)
memTrace = append(memTrace, memusage)
}
samplePeriod := 1 * time.Second
handler, err := AddStatsSummary(
containerWithTrace(samplePeriod, cpuTrace, memTrace),
&StatsParameter{
// Use uniform sampler with sample size of N, so that
// we will be guaranteed to store the first N samples.
Sampler: "uniform",
NumSamples: N,
},
)
if err != nil {
t.Error(err)
}
// request stats/observation N+1 times, so that there will be N samples
for i := 0; i < N+1; i++ {
_, err = handler.GetStats()
if err != nil {
t.Fatal(err)
}
}
_, err = handler.StatsPercentiles()
if err != nil {
t.Fatal(err)
}
hs := handler.(*percentilesContainerHandlerWrapper)
hs.sampler.Map(func(d interface{}) {
sample := d.(*info.ContainerStatsSample)
if sample.Duration != samplePeriod {
t.Errorf("sample duration is %v, not %v", sample.Duration, samplePeriod)
}
cpuUsage := sample.Cpu.Usage
found := false
for _, u := range cpuTrace {
if u == cpuUsage {
found = true
}
}
if !found {
t.Errorf("unable to find cpu usage %v", cpuUsage)
}
})
}

View File

@ -1,48 +0,0 @@
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package container
import "fmt"
type statsSummaryFactory struct {
factory ContainerHandlerFactory
}
func (self *statsSummaryFactory) String() string {
return fmt.Sprintf("%v/stats", self.factory)
}
var globalStatsParameter StatsParameter
func (self *statsSummaryFactory) NewContainerHandler(name string) (ContainerHandler, error) {
h, err := self.factory.NewContainerHandler(name)
if err != nil {
return nil, err
}
return AddStatsSummary(h, &globalStatsParameter)
}
// This is a decorator for container factory. If the container handler created
// by a container factory does not implement stats summary method, then the factory
// could be decorated with this structure.
func AddStatsSummaryToFactory(factory ContainerHandlerFactory) ContainerHandlerFactory {
return &statsSummaryFactory{
factory: factory,
}
}
func SetStatsParameter(param *StatsParameter) {
globalStatsParameter = *param
}

View File

@ -17,8 +17,6 @@
package manager
import (
"container/list"
"flag"
"log"
"sync"
"time"
@ -28,8 +26,6 @@ import (
"github.com/google/cadvisor/storage"
)
var historyDuration = flag.Int("history_duration", 60, "number of seconds of container history to keep")
// Internal mirror of the external data structure.
type containerStat struct {
Timestamp time.Time
@ -37,10 +33,8 @@ type containerStat struct {
}
type containerInfo struct {
info.ContainerReference
Subcontainers []info.ContainerReference
Spec *info.ContainerSpec
Stats *list.List
StatsPercentiles *info.ContainerStatsPercentiles
Subcontainers []info.ContainerReference
Spec *info.ContainerSpec
}
type containerData struct {
@ -99,7 +93,6 @@ func NewContainerData(containerName string, driver storage.StorageDriver) (*cont
}
cont.info.Name = ref.Name
cont.info.Aliases = ref.Aliases
cont.info.Stats = list.New()
cont.storageDriver = driver
cont.stop = make(chan bool, 1)
@ -164,23 +157,6 @@ func (c *containerData) updateStats() error {
return err
}
}
summary, err := c.handler.StatsPercentiles()
if err != nil {
return err
}
timestamp := time.Now()
// Remove the front if we go over.
c.lock.Lock()
defer c.lock.Unlock()
if c.info.Stats.Len() >= *historyDuration {
c.info.Stats.Remove(c.info.Stats.Front())
}
c.info.Stats.PushBack(&containerStat{
Timestamp: timestamp,
Data: stats,
})
c.info.StatsPercentiles = summary
return nil
}

View File

@ -124,6 +124,24 @@ func (m *manager) GetContainerInfo(containerName string) (*info.ContainerInfo, e
return nil, err
}
var percentiles *info.ContainerStatsPercentiles
var samples []*info.ContainerStatsSample
if m.storageDriver != nil {
// XXX(monnand): These numbers should not be hard coded
percentiles, err = m.storageDriver.Percentiles(
cinfo.Name,
[]int{50, 80, 90, 99},
[]int{50, 80, 90, 99},
)
if err != nil {
return nil, err
}
samples, err = m.storageDriver.Samples(cinfo.Name, 1024)
if err != nil {
return nil, err
}
}
// Make a copy of the info for the user.
ret := &info.ContainerInfo{
ContainerReference: info.ContainerReference{
@ -132,7 +150,8 @@ func (m *manager) GetContainerInfo(containerName string) (*info.ContainerInfo, e
},
Subcontainers: cinfo.Subcontainers,
Spec: cinfo.Spec,
StatsPercentiles: cinfo.StatsPercentiles,
StatsPercentiles: percentiles,
Samples: samples,
}
// Set default value to an actual value
@ -142,11 +161,6 @@ func (m *manager) GetContainerInfo(containerName string) (*info.ContainerInfo, e
ret.Spec.Memory.Limit = uint64(m.machineInfo.MemoryCapacity)
}
}
ret.Stats = make([]*info.ContainerStats, 0, cinfo.Stats.Len())
for e := cinfo.Stats.Front(); e != nil; e = e.Next() {
data := e.Value.(*containerStat)
ret.Stats = append(ret.Stats, data.Data)
}
return ret, nil
}