diff --git a/info/v2/container.go b/info/v2/container.go index 792db22f..d32f571c 100644 --- a/info/v2/container.go +++ b/info/v2/container.go @@ -236,6 +236,9 @@ type RequestOptions struct { Count int `json:"count"` // Whether to include stats for child subcontainers. Recursive bool `json:"recursive"` + // Update stats if they are older than MaxAge + // nil indicates no update, and 0 will always trigger an update. + MaxAge *time.Duration `json:"max_age"` } type ProcessInfo struct { diff --git a/info/v2/conversion.go b/info/v2/conversion.go index b137d30e..97fb463b 100644 --- a/info/v2/conversion.go +++ b/info/v2/conversion.go @@ -206,9 +206,6 @@ func InstCpuStats(last, cur *v1.ContainerStats) (*CpuInstStats, error) { return nil, fmt.Errorf("different number of cpus") } timeDelta := cur.Timestamp.Sub(last.Timestamp) - if timeDelta <= 100*time.Millisecond { - return nil, fmt.Errorf("time delta unexpectedly small") - } // Nanoseconds to gain precision and avoid having zero seconds if the // difference between the timestamps is just under a second timeDeltaNs := uint64(timeDelta.Nanoseconds()) diff --git a/info/v2/conversion_test.go b/info/v2/conversion_test.go index 092f7e3c..4b9a47e7 100644 --- a/info/v2/conversion_test.go +++ b/info/v2/conversion_test.go @@ -241,16 +241,6 @@ func TestInstCpuStats(t *testing.T) { }, nil, }, - // Unexpectedly small time delta - { - &v1.ContainerStats{ - Timestamp: time.Unix(100, 0), - }, - &v1.ContainerStats{ - Timestamp: time.Unix(100, 0).Add(30 * time.Millisecond), - }, - nil, - }, // Different number of cpus { &v1.ContainerStats{ diff --git a/manager/container.go b/manager/container.go index 14ff4f62..5dd1191e 100644 --- a/manager/container.go +++ b/manager/container.go @@ -40,6 +40,8 @@ import ( units "github.com/docker/go-units" "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/util/clock" ) // Housekeeping interval. @@ -65,8 +67,11 @@ type containerData struct { housekeepingInterval time.Duration maxHousekeepingInterval time.Duration allowDynamicHousekeeping bool - lastUpdatedTime time.Time + infoLastUpdatedTime time.Time + statsLastUpdatedTime time.Time lastErrorTime time.Time + // used to track time + clock clock.Clock // Decay value used for load average smoothing. Interval length of 10 seconds is used. loadDecay float64 @@ -77,6 +82,9 @@ type containerData struct { // Tells the container to stop. stop chan bool + // Tells the container to immediately collect stats + onDemandChan chan chan struct{} + // Runs custom metric collectors. collectorManager collector.CollectorManager @@ -110,16 +118,43 @@ func (c *containerData) Stop() error { } func (c *containerData) allowErrorLogging() bool { - if time.Since(c.lastErrorTime) > time.Minute { - c.lastErrorTime = time.Now() + if c.clock.Since(c.lastErrorTime) > time.Minute { + c.lastErrorTime = c.clock.Now() return true } return false } +// OnDemandHousekeeping performs housekeeping on the container and blocks until it has completed. +// It is designed to be used in conjunction with periodic housekeeping, and will cause the timer for +// periodic housekeeping to reset. This should be used sparingly, as calling OnDemandHousekeeping frequently +// can have serious performance costs. +func (c *containerData) OnDemandHousekeeping(maxAge time.Duration) { + if c.clock.Since(c.statsLastUpdatedTime) > maxAge { + housekeepingFinishedChan := make(chan struct{}) + c.onDemandChan <- housekeepingFinishedChan + select { + case <-c.stop: + case <-housekeepingFinishedChan: + } + } +} + +// notifyOnDemand notifies all calls to OnDemandHousekeeping that housekeeping is finished +func (c *containerData) notifyOnDemand() { + for { + select { + case finishedChan := <-c.onDemandChan: + close(finishedChan) + default: + return + } + } +} + func (c *containerData) GetInfo(shouldUpdateSubcontainers bool) (*containerInfo, error) { // Get spec and subcontainers. - if time.Since(c.lastUpdatedTime) > 5*time.Second { + if c.clock.Since(c.infoLastUpdatedTime) > 5*time.Second { err := c.updateSpec() if err != nil { return nil, err @@ -130,7 +165,7 @@ func (c *containerData) GetInfo(shouldUpdateSubcontainers bool) (*containerInfo, return nil, err } } - c.lastUpdatedTime = time.Now() + c.infoLastUpdatedTime = c.clock.Now() } // Make a copy of the info for the user. c.lock.Lock() @@ -310,7 +345,7 @@ func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace return processes, nil } -func newContainerData(containerName string, memoryCache *memory.InMemoryCache, handler container.ContainerHandler, logUsage bool, collectorManager collector.CollectorManager, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool) (*containerData, error) { +func newContainerData(containerName string, memoryCache *memory.InMemoryCache, handler container.ContainerHandler, logUsage bool, collectorManager collector.CollectorManager, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool, clock clock.Clock) (*containerData, error) { if memoryCache == nil { return nil, fmt.Errorf("nil memory storage") } @@ -332,6 +367,8 @@ func newContainerData(containerName string, memoryCache *memory.InMemoryCache, h loadAvg: -1.0, // negative value indicates uninitialized. stop: make(chan bool, 1), collectorManager: collectorManager, + onDemandChan: make(chan chan struct{}, 100), + clock: clock, } cont.info.ContainerReference = ref @@ -362,7 +399,7 @@ func newContainerData(containerName string, memoryCache *memory.InMemoryCache, h } // Determine when the next housekeeping should occur. -func (self *containerData) nextHousekeeping(lastHousekeeping time.Time) time.Time { +func (self *containerData) nextHousekeepingInterval() time.Duration { if self.allowDynamicHousekeeping { var empty time.Time stats, err := self.memoryCache.RecentStats(self.info.Name, empty, empty, 2) @@ -385,7 +422,7 @@ func (self *containerData) nextHousekeeping(lastHousekeeping time.Time) time.Tim } } - return lastHousekeeping.Add(jitter(self.housekeepingInterval, 1.0)) + return jitter(self.housekeepingInterval, 1.0) } // TODO(vmarmol): Implement stats collecting as a custom collector. @@ -411,24 +448,19 @@ func (c *containerData) housekeeping() { // Housekeep every second. glog.V(3).Infof("Start housekeeping for container %q\n", c.info.Name) - lastHousekeeping := time.Now() + houseKeepingTimer := c.clock.NewTimer(0 * time.Second) + defer houseKeepingTimer.Stop() for { - select { - case <-c.stop: - // Stop housekeeping when signaled. + if !c.housekeepingTick(houseKeepingTimer.C(), longHousekeeping) { return - default: - // Perform housekeeping. - start := time.Now() - c.housekeepingTick() - - // Log if housekeeping took too long. - duration := time.Since(start) - if duration >= longHousekeeping { - glog.V(3).Infof("[%s] Housekeeping took %s", c.info.Name, duration) + } + // Stop and drain the timer so that it is safe to reset it + if !houseKeepingTimer.Stop() { + select { + case <-houseKeepingTimer.C(): + default: } } - // Log usage if asked to do so. if c.logUsage { const numSamples = 60 @@ -455,26 +487,35 @@ func (c *containerData) housekeeping() { glog.Infof("[%s] %.3f cores (average: %.3f cores), %s of memory", c.info.Name, instantUsageInCores, usageInCores, usageInHuman) } } - - next := c.nextHousekeeping(lastHousekeeping) - - // Schedule the next housekeeping. Sleep until that time. - if time.Now().Before(next) { - time.Sleep(next.Sub(time.Now())) - } else { - next = time.Now() - } - lastHousekeeping = next + houseKeepingTimer.Reset(c.nextHousekeepingInterval()) } } -func (c *containerData) housekeepingTick() { +func (c *containerData) housekeepingTick(timer <-chan time.Time, longHousekeeping time.Duration) bool { + select { + case <-c.stop: + // Stop housekeeping when signaled. + return false + case finishedChan := <-c.onDemandChan: + // notify the calling function once housekeeping has completed + defer close(finishedChan) + case <-timer: + } + start := c.clock.Now() err := c.updateStats() if err != nil { if c.allowErrorLogging() { glog.Infof("Failed to update stats for container \"%s\": %s", c.info.Name, err) } } + // Log if housekeeping took too long. + duration := c.clock.Since(start) + if duration >= longHousekeeping { + glog.V(3).Infof("[%s] Housekeeping took %s", c.info.Name, duration) + } + c.notifyOnDemand() + c.statsLastUpdatedTime = c.clock.Now() + return true } func (c *containerData) updateSpec() error { @@ -550,7 +591,7 @@ func (c *containerData) updateStats() error { var customStatsErr error cm := c.collectorManager.(*collector.GenericCollectorManager) if len(cm.Collectors) > 0 { - if cm.NextCollectionTime.Before(time.Now()) { + if cm.NextCollectionTime.Before(c.clock.Now()) { customStats, err := c.updateCustomStats() if customStats != nil { stats.CustomMetrics = customStats diff --git a/manager/container_test.go b/manager/container_test.go index ab90e379..abad92c3 100644 --- a/manager/container_test.go +++ b/manager/container_test.go @@ -19,6 +19,7 @@ package manager import ( "fmt" "reflect" + "sync" "testing" "time" @@ -33,30 +34,34 @@ import ( "github.com/mindprince/gonvml" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/util/clock" ) -const containerName = "/container" +const ( + containerName = "/container" + testLongHousekeeping = time.Second +) // Create a containerData instance for a test. -func setupContainerData(t *testing.T, spec info.ContainerSpec) (*containerData, *containertest.MockContainerHandler, *memory.InMemoryCache) { +func setupContainerData(t *testing.T, spec info.ContainerSpec) (*containerData, *containertest.MockContainerHandler, *memory.InMemoryCache, *clock.FakeClock) { mockHandler := containertest.NewMockContainerHandler(containerName) mockHandler.On("GetSpec").Return( spec, nil, ) memoryCache := memory.New(60, nil) - ret, err := newContainerData(containerName, memoryCache, mockHandler, false, &collector.GenericCollectorManager{}, 60*time.Second, true) + fakeClock := clock.NewFakeClock(time.Now()) + ret, err := newContainerData(containerName, memoryCache, mockHandler, false, &collector.GenericCollectorManager{}, 60*time.Second, true, fakeClock) if err != nil { t.Fatal(err) } - return ret, mockHandler, memoryCache + return ret, mockHandler, memoryCache, fakeClock } // Create a containerData instance for a test and add a default GetSpec mock. -func newTestContainerData(t *testing.T) (*containerData, *containertest.MockContainerHandler, *memory.InMemoryCache) { - spec := itest.GenerateRandomContainerSpec(4) - ret, mockHandler, memoryCache := setupContainerData(t, spec) - return ret, mockHandler, memoryCache +func newTestContainerData(t *testing.T) (*containerData, *containertest.MockContainerHandler, *memory.InMemoryCache, *clock.FakeClock) { + return setupContainerData(t, itest.GenerateRandomContainerSpec(4)) } func TestUpdateSubcontainers(t *testing.T) { @@ -65,7 +70,7 @@ func TestUpdateSubcontainers(t *testing.T) { {Name: "/container/abcd"}, {Name: "/container/something"}, } - cd, mockHandler, _ := newTestContainerData(t) + cd, mockHandler, _, _ := newTestContainerData(t) mockHandler.On("ListContainers", container.ListSelf).Return( subcontainers, nil, @@ -96,7 +101,7 @@ func TestUpdateSubcontainers(t *testing.T) { } func TestUpdateSubcontainersWithError(t *testing.T) { - cd, mockHandler, _ := newTestContainerData(t) + cd, mockHandler, _, _ := newTestContainerData(t) mockHandler.On("ListContainers", container.ListSelf).Return( []info.ContainerReference{}, fmt.Errorf("some error"), @@ -109,7 +114,7 @@ func TestUpdateSubcontainersWithError(t *testing.T) { } func TestUpdateSubcontainersWithErrorOnDeadContainer(t *testing.T) { - cd, mockHandler, _ := newTestContainerData(t) + cd, mockHandler, _, _ := newTestContainerData(t) mockHandler.On("ListContainers", container.ListSelf).Return( []info.ContainerReference{}, fmt.Errorf("some error"), @@ -131,7 +136,7 @@ func TestUpdateStats(t *testing.T) { statsList := itest.GenerateRandomStats(1, 4, 1*time.Second) stats := statsList[0] - cd, mockHandler, memoryCache := newTestContainerData(t) + cd, mockHandler, memoryCache, _ := newTestContainerData(t) mockHandler.On("GetStats").Return( stats, nil, @@ -148,7 +153,7 @@ func TestUpdateStats(t *testing.T) { func TestUpdateSpec(t *testing.T) { spec := itest.GenerateRandomContainerSpec(4) - cd, mockHandler, _ := newTestContainerData(t) + cd, mockHandler, _, _ := newTestContainerData(t) mockHandler.On("GetSpec").Return( spec, nil, @@ -169,7 +174,7 @@ func TestGetInfo(t *testing.T) { {Name: "/container/abcd"}, {Name: "/container/something"}, } - cd, mockHandler, _ := setupContainerData(t, spec) + cd, mockHandler, _, _ := setupContainerData(t, spec) mockHandler.On("ListContainers", container.ListSelf).Return( subcontainers, nil, @@ -209,7 +214,7 @@ func TestGetInfo(t *testing.T) { } func TestUpdateNvidiaStats(t *testing.T) { - cd, _, _ := newTestContainerData(t) + cd, _, _, _ := newTestContainerData(t) stats := info.ContainerStats{} // When there are no devices, we should not get an error and stats should not change. @@ -226,3 +231,60 @@ func TestUpdateNvidiaStats(t *testing.T) { assert.NotNil(t, err) assert.Equal(t, info.ContainerStats{}, stats) } + +func TestOnDemandHousekeeping(t *testing.T) { + statsList := itest.GenerateRandomStats(1, 4, 1*time.Second) + stats := statsList[0] + + cd, mockHandler, memoryCache, fakeClock := newTestContainerData(t) + mockHandler.On("GetStats").Return(stats, nil) + defer cd.Stop() + + // 0 seconds should always trigger an update + go cd.OnDemandHousekeeping(0 * time.Second) + cd.housekeepingTick(fakeClock.NewTimer(time.Minute).C(), testLongHousekeeping) + + fakeClock.Step(2 * time.Second) + + // This should return without requiring a housekeepingTick because stats have been updated recently enough + cd.OnDemandHousekeeping(3 * time.Second) + + go cd.OnDemandHousekeeping(1 * time.Second) + cd.housekeepingTick(fakeClock.NewTimer(time.Minute).C(), testLongHousekeeping) + + checkNumStats(t, memoryCache, 2) + mockHandler.AssertExpectations(t) +} + +func TestConcurrentOnDemandHousekeeping(t *testing.T) { + statsList := itest.GenerateRandomStats(1, 4, 1*time.Second) + stats := statsList[0] + + cd, mockHandler, memoryCache, fakeClock := newTestContainerData(t) + mockHandler.On("GetStats").Return(stats, nil) + defer cd.Stop() + + numConcurrentCalls := 5 + var waitForHousekeeping sync.WaitGroup + waitForHousekeeping.Add(numConcurrentCalls) + onDemandCache := []chan struct{}{} + for i := 0; i < numConcurrentCalls; i++ { + go func() { + cd.OnDemandHousekeeping(0 * time.Second) + waitForHousekeeping.Done() + }() + // Wait for work to be queued + onDemandCache = append(onDemandCache, <-cd.onDemandChan) + } + // Requeue work: + for _, ch := range onDemandCache { + cd.onDemandChan <- ch + } + + go cd.housekeepingTick(fakeClock.NewTimer(time.Minute).C(), testLongHousekeeping) + // Ensure that all queued calls return with only a single call to housekeepingTick + waitForHousekeeping.Wait() + + checkNumStats(t, memoryCache, 1) + mockHandler.AssertExpectations(t) +} diff --git a/manager/manager.go b/manager/manager.go index 365a213d..01482022 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -50,6 +50,7 @@ import ( "github.com/golang/glog" "github.com/opencontainers/runc/libcontainer/cgroups" + "k8s.io/apimachinery/pkg/util/clock" ) var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings") @@ -713,6 +714,18 @@ func (self *manager) getRequestedContainers(containerName string, options v2.Req default: return containersMap, fmt.Errorf("invalid request type %q", options.IdType) } + if options.MaxAge != nil { + // update stats for all containers in containersMap + var waitGroup sync.WaitGroup + waitGroup.Add(len(containersMap)) + for _, container := range containersMap { + go func(cont *containerData) { + cont.OnDemandHousekeeping(*options.MaxAge) + waitGroup.Done() + }(container) + } + waitGroup.Wait() + } return containersMap, nil } @@ -810,6 +823,8 @@ func (m *manager) Exists(containerName string) bool { func (m *manager) GetProcessList(containerName string, options v2.RequestOptions) ([]v2.ProcessInfo, error) { // override recursive. Only support single container listing. options.Recursive = false + // override MaxAge. ProcessList does not require updated stats. + options.MaxAge = nil conts, err := m.getRequestedContainers(containerName, options) if err != nil { return nil, err @@ -925,7 +940,7 @@ func (m *manager) createContainerLocked(containerName string, watchSource watche } logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer - cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping) + cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping, clock.RealClock{}) if err != nil { return err } diff --git a/manager/manager_test.go b/manager/manager_test.go index 88e16106..82482e15 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -35,6 +35,7 @@ import ( "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/utils/sysfs/fakesysfs" "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/clock" ) // TODO(vmarmol): Refactor these tests. @@ -59,7 +60,7 @@ func createManagerAndAddContainers( spec, nil, ).Once() - cont, err := newContainerData(name, memoryCache, mockHandler, false, &collector.GenericCollectorManager{}, 60*time.Second, true) + cont, err := newContainerData(name, memoryCache, mockHandler, false, &collector.GenericCollectorManager{}, 60*time.Second, true, clock.NewFakeClock(time.Now())) if err != nil { t.Fatal(err) } @@ -261,7 +262,7 @@ func TestGetContainerInfoV2Failure(t *testing.T) { handlerMap[failing].GetSpec() // Use up default GetSpec call, and replace below handlerMap[failing].On("GetSpec").Return(info.ContainerSpec{}, mockErr) handlerMap[failing].On("Exists").Return(true) - m.containers[namespacedContainerName{Name: failing}].lastUpdatedTime = time.Time{} // Force GetSpec. + m.containers[namespacedContainerName{Name: failing}].infoLastUpdatedTime = time.Time{} // Force GetSpec. infos, err := m.GetContainerInfoV2("/", options) if err == nil {