on demand metrics

This commit is contained in:
David Ashpole 2017-11-20 14:51:04 -08:00
parent fd43dc16ba
commit 3d6ad6dd86
7 changed files with 174 additions and 65 deletions

View File

@ -236,6 +236,9 @@ type RequestOptions struct {
Count int `json:"count"`
// Whether to include stats for child subcontainers.
Recursive bool `json:"recursive"`
// Update stats if they are older than MaxAge
// nil indicates no update, and 0 will always trigger an update.
MaxAge *time.Duration `json:"max_age"`
}
type ProcessInfo struct {

View File

@ -206,9 +206,6 @@ func InstCpuStats(last, cur *v1.ContainerStats) (*CpuInstStats, error) {
return nil, fmt.Errorf("different number of cpus")
}
timeDelta := cur.Timestamp.Sub(last.Timestamp)
if timeDelta <= 100*time.Millisecond {
return nil, fmt.Errorf("time delta unexpectedly small")
}
// Nanoseconds to gain precision and avoid having zero seconds if the
// difference between the timestamps is just under a second
timeDeltaNs := uint64(timeDelta.Nanoseconds())

View File

@ -241,16 +241,6 @@ func TestInstCpuStats(t *testing.T) {
},
nil,
},
// Unexpectedly small time delta
{
&v1.ContainerStats{
Timestamp: time.Unix(100, 0),
},
&v1.ContainerStats{
Timestamp: time.Unix(100, 0).Add(30 * time.Millisecond),
},
nil,
},
// Different number of cpus
{
&v1.ContainerStats{

View File

@ -40,6 +40,8 @@ import (
units "github.com/docker/go-units"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/clock"
)
// Housekeeping interval.
@ -65,8 +67,11 @@ type containerData struct {
housekeepingInterval time.Duration
maxHousekeepingInterval time.Duration
allowDynamicHousekeeping bool
lastUpdatedTime time.Time
infoLastUpdatedTime time.Time
statsLastUpdatedTime time.Time
lastErrorTime time.Time
// used to track time
clock clock.Clock
// Decay value used for load average smoothing. Interval length of 10 seconds is used.
loadDecay float64
@ -77,6 +82,9 @@ type containerData struct {
// Tells the container to stop.
stop chan bool
// Tells the container to immediately collect stats
onDemandChan chan chan struct{}
// Runs custom metric collectors.
collectorManager collector.CollectorManager
@ -110,16 +118,43 @@ func (c *containerData) Stop() error {
}
func (c *containerData) allowErrorLogging() bool {
if time.Since(c.lastErrorTime) > time.Minute {
c.lastErrorTime = time.Now()
if c.clock.Since(c.lastErrorTime) > time.Minute {
c.lastErrorTime = c.clock.Now()
return true
}
return false
}
// OnDemandHousekeeping performs housekeeping on the container and blocks until it has completed.
// It is designed to be used in conjunction with periodic housekeeping, and will cause the timer for
// periodic housekeeping to reset. This should be used sparingly, as calling OnDemandHousekeeping frequently
// can have serious performance costs.
func (c *containerData) OnDemandHousekeeping(maxAge time.Duration) {
if c.clock.Since(c.statsLastUpdatedTime) > maxAge {
housekeepingFinishedChan := make(chan struct{})
c.onDemandChan <- housekeepingFinishedChan
select {
case <-c.stop:
case <-housekeepingFinishedChan:
}
}
}
// notifyOnDemand notifies all calls to OnDemandHousekeeping that housekeeping is finished
func (c *containerData) notifyOnDemand() {
for {
select {
case finishedChan := <-c.onDemandChan:
close(finishedChan)
default:
return
}
}
}
func (c *containerData) GetInfo(shouldUpdateSubcontainers bool) (*containerInfo, error) {
// Get spec and subcontainers.
if time.Since(c.lastUpdatedTime) > 5*time.Second {
if c.clock.Since(c.infoLastUpdatedTime) > 5*time.Second {
err := c.updateSpec()
if err != nil {
return nil, err
@ -130,7 +165,7 @@ func (c *containerData) GetInfo(shouldUpdateSubcontainers bool) (*containerInfo,
return nil, err
}
}
c.lastUpdatedTime = time.Now()
c.infoLastUpdatedTime = c.clock.Now()
}
// Make a copy of the info for the user.
c.lock.Lock()
@ -310,7 +345,7 @@ func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace
return processes, nil
}
func newContainerData(containerName string, memoryCache *memory.InMemoryCache, handler container.ContainerHandler, logUsage bool, collectorManager collector.CollectorManager, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool) (*containerData, error) {
func newContainerData(containerName string, memoryCache *memory.InMemoryCache, handler container.ContainerHandler, logUsage bool, collectorManager collector.CollectorManager, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool, clock clock.Clock) (*containerData, error) {
if memoryCache == nil {
return nil, fmt.Errorf("nil memory storage")
}
@ -332,6 +367,8 @@ func newContainerData(containerName string, memoryCache *memory.InMemoryCache, h
loadAvg: -1.0, // negative value indicates uninitialized.
stop: make(chan bool, 1),
collectorManager: collectorManager,
onDemandChan: make(chan chan struct{}, 100),
clock: clock,
}
cont.info.ContainerReference = ref
@ -362,7 +399,7 @@ func newContainerData(containerName string, memoryCache *memory.InMemoryCache, h
}
// Determine when the next housekeeping should occur.
func (self *containerData) nextHousekeeping(lastHousekeeping time.Time) time.Time {
func (self *containerData) nextHousekeepingInterval() time.Duration {
if self.allowDynamicHousekeeping {
var empty time.Time
stats, err := self.memoryCache.RecentStats(self.info.Name, empty, empty, 2)
@ -385,7 +422,7 @@ func (self *containerData) nextHousekeeping(lastHousekeeping time.Time) time.Tim
}
}
return lastHousekeeping.Add(jitter(self.housekeepingInterval, 1.0))
return jitter(self.housekeepingInterval, 1.0)
}
// TODO(vmarmol): Implement stats collecting as a custom collector.
@ -411,24 +448,19 @@ func (c *containerData) housekeeping() {
// Housekeep every second.
glog.V(3).Infof("Start housekeeping for container %q\n", c.info.Name)
lastHousekeeping := time.Now()
houseKeepingTimer := c.clock.NewTimer(0 * time.Second)
defer houseKeepingTimer.Stop()
for {
select {
case <-c.stop:
// Stop housekeeping when signaled.
if !c.housekeepingTick(houseKeepingTimer.C(), longHousekeeping) {
return
default:
// Perform housekeeping.
start := time.Now()
c.housekeepingTick()
// Log if housekeeping took too long.
duration := time.Since(start)
if duration >= longHousekeeping {
glog.V(3).Infof("[%s] Housekeeping took %s", c.info.Name, duration)
}
// Stop and drain the timer so that it is safe to reset it
if !houseKeepingTimer.Stop() {
select {
case <-houseKeepingTimer.C():
default:
}
}
// Log usage if asked to do so.
if c.logUsage {
const numSamples = 60
@ -455,26 +487,35 @@ func (c *containerData) housekeeping() {
glog.Infof("[%s] %.3f cores (average: %.3f cores), %s of memory", c.info.Name, instantUsageInCores, usageInCores, usageInHuman)
}
}
next := c.nextHousekeeping(lastHousekeeping)
// Schedule the next housekeeping. Sleep until that time.
if time.Now().Before(next) {
time.Sleep(next.Sub(time.Now()))
} else {
next = time.Now()
}
lastHousekeeping = next
houseKeepingTimer.Reset(c.nextHousekeepingInterval())
}
}
func (c *containerData) housekeepingTick() {
func (c *containerData) housekeepingTick(timer <-chan time.Time, longHousekeeping time.Duration) bool {
select {
case <-c.stop:
// Stop housekeeping when signaled.
return false
case finishedChan := <-c.onDemandChan:
// notify the calling function once housekeeping has completed
defer close(finishedChan)
case <-timer:
}
start := c.clock.Now()
err := c.updateStats()
if err != nil {
if c.allowErrorLogging() {
glog.Infof("Failed to update stats for container \"%s\": %s", c.info.Name, err)
}
}
// Log if housekeeping took too long.
duration := c.clock.Since(start)
if duration >= longHousekeeping {
glog.V(3).Infof("[%s] Housekeeping took %s", c.info.Name, duration)
}
c.notifyOnDemand()
c.statsLastUpdatedTime = c.clock.Now()
return true
}
func (c *containerData) updateSpec() error {
@ -550,7 +591,7 @@ func (c *containerData) updateStats() error {
var customStatsErr error
cm := c.collectorManager.(*collector.GenericCollectorManager)
if len(cm.Collectors) > 0 {
if cm.NextCollectionTime.Before(time.Now()) {
if cm.NextCollectionTime.Before(c.clock.Now()) {
customStats, err := c.updateCustomStats()
if customStats != nil {
stats.CustomMetrics = customStats

View File

@ -19,6 +19,7 @@ package manager
import (
"fmt"
"reflect"
"sync"
"testing"
"time"
@ -33,30 +34,34 @@ import (
"github.com/mindprince/gonvml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/clock"
)
const containerName = "/container"
const (
containerName = "/container"
testLongHousekeeping = time.Second
)
// Create a containerData instance for a test.
func setupContainerData(t *testing.T, spec info.ContainerSpec) (*containerData, *containertest.MockContainerHandler, *memory.InMemoryCache) {
func setupContainerData(t *testing.T, spec info.ContainerSpec) (*containerData, *containertest.MockContainerHandler, *memory.InMemoryCache, *clock.FakeClock) {
mockHandler := containertest.NewMockContainerHandler(containerName)
mockHandler.On("GetSpec").Return(
spec,
nil,
)
memoryCache := memory.New(60, nil)
ret, err := newContainerData(containerName, memoryCache, mockHandler, false, &collector.GenericCollectorManager{}, 60*time.Second, true)
fakeClock := clock.NewFakeClock(time.Now())
ret, err := newContainerData(containerName, memoryCache, mockHandler, false, &collector.GenericCollectorManager{}, 60*time.Second, true, fakeClock)
if err != nil {
t.Fatal(err)
}
return ret, mockHandler, memoryCache
return ret, mockHandler, memoryCache, fakeClock
}
// Create a containerData instance for a test and add a default GetSpec mock.
func newTestContainerData(t *testing.T) (*containerData, *containertest.MockContainerHandler, *memory.InMemoryCache) {
spec := itest.GenerateRandomContainerSpec(4)
ret, mockHandler, memoryCache := setupContainerData(t, spec)
return ret, mockHandler, memoryCache
func newTestContainerData(t *testing.T) (*containerData, *containertest.MockContainerHandler, *memory.InMemoryCache, *clock.FakeClock) {
return setupContainerData(t, itest.GenerateRandomContainerSpec(4))
}
func TestUpdateSubcontainers(t *testing.T) {
@ -65,7 +70,7 @@ func TestUpdateSubcontainers(t *testing.T) {
{Name: "/container/abcd"},
{Name: "/container/something"},
}
cd, mockHandler, _ := newTestContainerData(t)
cd, mockHandler, _, _ := newTestContainerData(t)
mockHandler.On("ListContainers", container.ListSelf).Return(
subcontainers,
nil,
@ -96,7 +101,7 @@ func TestUpdateSubcontainers(t *testing.T) {
}
func TestUpdateSubcontainersWithError(t *testing.T) {
cd, mockHandler, _ := newTestContainerData(t)
cd, mockHandler, _, _ := newTestContainerData(t)
mockHandler.On("ListContainers", container.ListSelf).Return(
[]info.ContainerReference{},
fmt.Errorf("some error"),
@ -109,7 +114,7 @@ func TestUpdateSubcontainersWithError(t *testing.T) {
}
func TestUpdateSubcontainersWithErrorOnDeadContainer(t *testing.T) {
cd, mockHandler, _ := newTestContainerData(t)
cd, mockHandler, _, _ := newTestContainerData(t)
mockHandler.On("ListContainers", container.ListSelf).Return(
[]info.ContainerReference{},
fmt.Errorf("some error"),
@ -131,7 +136,7 @@ func TestUpdateStats(t *testing.T) {
statsList := itest.GenerateRandomStats(1, 4, 1*time.Second)
stats := statsList[0]
cd, mockHandler, memoryCache := newTestContainerData(t)
cd, mockHandler, memoryCache, _ := newTestContainerData(t)
mockHandler.On("GetStats").Return(
stats,
nil,
@ -148,7 +153,7 @@ func TestUpdateStats(t *testing.T) {
func TestUpdateSpec(t *testing.T) {
spec := itest.GenerateRandomContainerSpec(4)
cd, mockHandler, _ := newTestContainerData(t)
cd, mockHandler, _, _ := newTestContainerData(t)
mockHandler.On("GetSpec").Return(
spec,
nil,
@ -169,7 +174,7 @@ func TestGetInfo(t *testing.T) {
{Name: "/container/abcd"},
{Name: "/container/something"},
}
cd, mockHandler, _ := setupContainerData(t, spec)
cd, mockHandler, _, _ := setupContainerData(t, spec)
mockHandler.On("ListContainers", container.ListSelf).Return(
subcontainers,
nil,
@ -209,7 +214,7 @@ func TestGetInfo(t *testing.T) {
}
func TestUpdateNvidiaStats(t *testing.T) {
cd, _, _ := newTestContainerData(t)
cd, _, _, _ := newTestContainerData(t)
stats := info.ContainerStats{}
// When there are no devices, we should not get an error and stats should not change.
@ -226,3 +231,60 @@ func TestUpdateNvidiaStats(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, info.ContainerStats{}, stats)
}
func TestOnDemandHousekeeping(t *testing.T) {
statsList := itest.GenerateRandomStats(1, 4, 1*time.Second)
stats := statsList[0]
cd, mockHandler, memoryCache, fakeClock := newTestContainerData(t)
mockHandler.On("GetStats").Return(stats, nil)
defer cd.Stop()
// 0 seconds should always trigger an update
go cd.OnDemandHousekeeping(0 * time.Second)
cd.housekeepingTick(fakeClock.NewTimer(time.Minute).C(), testLongHousekeeping)
fakeClock.Step(2 * time.Second)
// This should return without requiring a housekeepingTick because stats have been updated recently enough
cd.OnDemandHousekeeping(3 * time.Second)
go cd.OnDemandHousekeeping(1 * time.Second)
cd.housekeepingTick(fakeClock.NewTimer(time.Minute).C(), testLongHousekeeping)
checkNumStats(t, memoryCache, 2)
mockHandler.AssertExpectations(t)
}
func TestConcurrentOnDemandHousekeeping(t *testing.T) {
statsList := itest.GenerateRandomStats(1, 4, 1*time.Second)
stats := statsList[0]
cd, mockHandler, memoryCache, fakeClock := newTestContainerData(t)
mockHandler.On("GetStats").Return(stats, nil)
defer cd.Stop()
numConcurrentCalls := 5
var waitForHousekeeping sync.WaitGroup
waitForHousekeeping.Add(numConcurrentCalls)
onDemandCache := []chan struct{}{}
for i := 0; i < numConcurrentCalls; i++ {
go func() {
cd.OnDemandHousekeeping(0 * time.Second)
waitForHousekeeping.Done()
}()
// Wait for work to be queued
onDemandCache = append(onDemandCache, <-cd.onDemandChan)
}
// Requeue work:
for _, ch := range onDemandCache {
cd.onDemandChan <- ch
}
go cd.housekeepingTick(fakeClock.NewTimer(time.Minute).C(), testLongHousekeeping)
// Ensure that all queued calls return with only a single call to housekeepingTick
waitForHousekeeping.Wait()
checkNumStats(t, memoryCache, 1)
mockHandler.AssertExpectations(t)
}

View File

@ -50,6 +50,7 @@ import (
"github.com/golang/glog"
"github.com/opencontainers/runc/libcontainer/cgroups"
"k8s.io/apimachinery/pkg/util/clock"
)
var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings")
@ -713,6 +714,18 @@ func (self *manager) getRequestedContainers(containerName string, options v2.Req
default:
return containersMap, fmt.Errorf("invalid request type %q", options.IdType)
}
if options.MaxAge != nil {
// update stats for all containers in containersMap
var waitGroup sync.WaitGroup
waitGroup.Add(len(containersMap))
for _, container := range containersMap {
go func(cont *containerData) {
cont.OnDemandHousekeeping(*options.MaxAge)
waitGroup.Done()
}(container)
}
waitGroup.Wait()
}
return containersMap, nil
}
@ -810,6 +823,8 @@ func (m *manager) Exists(containerName string) bool {
func (m *manager) GetProcessList(containerName string, options v2.RequestOptions) ([]v2.ProcessInfo, error) {
// override recursive. Only support single container listing.
options.Recursive = false
// override MaxAge. ProcessList does not require updated stats.
options.MaxAge = nil
conts, err := m.getRequestedContainers(containerName, options)
if err != nil {
return nil, err
@ -925,7 +940,7 @@ func (m *manager) createContainerLocked(containerName string, watchSource watche
}
logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping)
cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping, clock.RealClock{})
if err != nil {
return err
}

View File

@ -35,6 +35,7 @@ import (
"github.com/google/cadvisor/info/v2"
"github.com/google/cadvisor/utils/sysfs/fakesysfs"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/clock"
)
// TODO(vmarmol): Refactor these tests.
@ -59,7 +60,7 @@ func createManagerAndAddContainers(
spec,
nil,
).Once()
cont, err := newContainerData(name, memoryCache, mockHandler, false, &collector.GenericCollectorManager{}, 60*time.Second, true)
cont, err := newContainerData(name, memoryCache, mockHandler, false, &collector.GenericCollectorManager{}, 60*time.Second, true, clock.NewFakeClock(time.Now()))
if err != nil {
t.Fatal(err)
}
@ -261,7 +262,7 @@ func TestGetContainerInfoV2Failure(t *testing.T) {
handlerMap[failing].GetSpec() // Use up default GetSpec call, and replace below
handlerMap[failing].On("GetSpec").Return(info.ContainerSpec{}, mockErr)
handlerMap[failing].On("Exists").Return(true)
m.containers[namespacedContainerName{Name: failing}].lastUpdatedTime = time.Time{} // Force GetSpec.
m.containers[namespacedContainerName{Name: failing}].infoLastUpdatedTime = time.Time{} // Force GetSpec.
infos, err := m.GetContainerInfoV2("/", options)
if err == nil {