Merge pull request #672 from vmarmol/limit
Add flags for event storage limits
This commit is contained in:
commit
58c49b562e
@ -100,14 +100,34 @@ func DockerStateDir(dockerRoot string) string {
|
||||
return path.Join(dockerRoot, "containers")
|
||||
}
|
||||
|
||||
func DiskStatsCopy0(major, minor uint64) *info.PerDiskStats {
|
||||
disk := info.PerDiskStats{
|
||||
Major: major,
|
||||
Minor: minor,
|
||||
}
|
||||
disk.Stats = make(map[string]uint64)
|
||||
return &disk
|
||||
}
|
||||
|
||||
type DiskKey struct {
|
||||
Major uint64
|
||||
Minor uint64
|
||||
}
|
||||
|
||||
func DiskStatsCopy1(disk_stat map[DiskKey]*info.PerDiskStats) []info.PerDiskStats {
|
||||
i := 0
|
||||
stat := make([]info.PerDiskStats, len(disk_stat))
|
||||
for _, disk := range disk_stat {
|
||||
stat[i] = *disk
|
||||
i++
|
||||
}
|
||||
return stat
|
||||
}
|
||||
|
||||
func DiskStatsCopy(blkio_stats []cgroups.BlkioStatEntry) (stat []info.PerDiskStats) {
|
||||
if len(blkio_stats) == 0 {
|
||||
return
|
||||
}
|
||||
type DiskKey struct {
|
||||
Major uint64
|
||||
Minor uint64
|
||||
}
|
||||
disk_stat := make(map[DiskKey]*info.PerDiskStats)
|
||||
for i := range blkio_stats {
|
||||
major := blkio_stats[i].Major
|
||||
@ -118,12 +138,7 @@ func DiskStatsCopy(blkio_stats []cgroups.BlkioStatEntry) (stat []info.PerDiskSta
|
||||
}
|
||||
diskp, ok := disk_stat[disk_key]
|
||||
if !ok {
|
||||
disk := info.PerDiskStats{
|
||||
Major: major,
|
||||
Minor: minor,
|
||||
}
|
||||
disk.Stats = make(map[string]uint64)
|
||||
diskp = &disk
|
||||
diskp = DiskStatsCopy0(major, minor)
|
||||
disk_stat[disk_key] = diskp
|
||||
}
|
||||
op := blkio_stats[i].Op
|
||||
@ -132,68 +147,76 @@ func DiskStatsCopy(blkio_stats []cgroups.BlkioStatEntry) (stat []info.PerDiskSta
|
||||
}
|
||||
diskp.Stats[op] = blkio_stats[i].Value
|
||||
}
|
||||
i := 0
|
||||
stat = make([]info.PerDiskStats, len(disk_stat))
|
||||
for _, disk := range disk_stat {
|
||||
stat[i] = *disk
|
||||
i++
|
||||
}
|
||||
return
|
||||
return DiskStatsCopy1(disk_stat)
|
||||
}
|
||||
|
||||
// Convert libcontainer stats to info.ContainerStats.
|
||||
func toContainerStats0(s *cgroups.Stats, ret *info.ContainerStats) {
|
||||
ret.Cpu.Usage.User = s.CpuStats.CpuUsage.UsageInUsermode
|
||||
ret.Cpu.Usage.System = s.CpuStats.CpuUsage.UsageInKernelmode
|
||||
n := len(s.CpuStats.CpuUsage.PercpuUsage)
|
||||
ret.Cpu.Usage.PerCpu = make([]uint64, n)
|
||||
|
||||
ret.Cpu.Usage.Total = 0
|
||||
for i := 0; i < n; i++ {
|
||||
ret.Cpu.Usage.PerCpu[i] = s.CpuStats.CpuUsage.PercpuUsage[i]
|
||||
ret.Cpu.Usage.Total += s.CpuStats.CpuUsage.PercpuUsage[i]
|
||||
}
|
||||
}
|
||||
|
||||
func toContainerStats1(s *cgroups.Stats, ret *info.ContainerStats) {
|
||||
ret.DiskIo.IoServiceBytes = DiskStatsCopy(s.BlkioStats.IoServiceBytesRecursive)
|
||||
ret.DiskIo.IoServiced = DiskStatsCopy(s.BlkioStats.IoServicedRecursive)
|
||||
ret.DiskIo.IoQueued = DiskStatsCopy(s.BlkioStats.IoQueuedRecursive)
|
||||
ret.DiskIo.Sectors = DiskStatsCopy(s.BlkioStats.SectorsRecursive)
|
||||
ret.DiskIo.IoServiceTime = DiskStatsCopy(s.BlkioStats.IoServiceTimeRecursive)
|
||||
ret.DiskIo.IoWaitTime = DiskStatsCopy(s.BlkioStats.IoWaitTimeRecursive)
|
||||
ret.DiskIo.IoMerged = DiskStatsCopy(s.BlkioStats.IoMergedRecursive)
|
||||
ret.DiskIo.IoTime = DiskStatsCopy(s.BlkioStats.IoTimeRecursive)
|
||||
}
|
||||
|
||||
func toContainerStats2(s *cgroups.Stats, ret *info.ContainerStats) {
|
||||
ret.Memory.Usage = s.MemoryStats.Usage
|
||||
if v, ok := s.MemoryStats.Stats["pgfault"]; ok {
|
||||
ret.Memory.ContainerData.Pgfault = v
|
||||
ret.Memory.HierarchicalData.Pgfault = v
|
||||
}
|
||||
if v, ok := s.MemoryStats.Stats["pgmajfault"]; ok {
|
||||
ret.Memory.ContainerData.Pgmajfault = v
|
||||
ret.Memory.HierarchicalData.Pgmajfault = v
|
||||
}
|
||||
if v, ok := s.MemoryStats.Stats["total_inactive_anon"]; ok {
|
||||
ret.Memory.WorkingSet = ret.Memory.Usage - v
|
||||
if v, ok := s.MemoryStats.Stats["total_active_file"]; ok {
|
||||
ret.Memory.WorkingSet -= v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func toContainerStats3(libcontainerStats *libcontainer.Stats, ret *info.ContainerStats) {
|
||||
// TODO(vmarmol): Handle multiple interfaces.
|
||||
ret.Network.RxBytes = libcontainerStats.Interfaces[0].RxBytes
|
||||
ret.Network.RxPackets = libcontainerStats.Interfaces[0].RxPackets
|
||||
ret.Network.RxErrors = libcontainerStats.Interfaces[0].RxErrors
|
||||
ret.Network.RxDropped = libcontainerStats.Interfaces[0].RxDropped
|
||||
ret.Network.TxBytes = libcontainerStats.Interfaces[0].TxBytes
|
||||
ret.Network.TxPackets = libcontainerStats.Interfaces[0].TxPackets
|
||||
ret.Network.TxErrors = libcontainerStats.Interfaces[0].TxErrors
|
||||
ret.Network.TxDropped = libcontainerStats.Interfaces[0].TxDropped
|
||||
}
|
||||
|
||||
func toContainerStats(libcontainerStats *libcontainer.Stats) *info.ContainerStats {
|
||||
s := libcontainerStats.CgroupStats
|
||||
ret := new(info.ContainerStats)
|
||||
ret.Timestamp = time.Now()
|
||||
|
||||
if s != nil {
|
||||
ret.Cpu.Usage.User = s.CpuStats.CpuUsage.UsageInUsermode
|
||||
ret.Cpu.Usage.System = s.CpuStats.CpuUsage.UsageInKernelmode
|
||||
n := len(s.CpuStats.CpuUsage.PercpuUsage)
|
||||
ret.Cpu.Usage.PerCpu = make([]uint64, n)
|
||||
|
||||
ret.Cpu.Usage.Total = 0
|
||||
for i := 0; i < n; i++ {
|
||||
ret.Cpu.Usage.PerCpu[i] = s.CpuStats.CpuUsage.PercpuUsage[i]
|
||||
ret.Cpu.Usage.Total += s.CpuStats.CpuUsage.PercpuUsage[i]
|
||||
}
|
||||
|
||||
ret.DiskIo.IoServiceBytes = DiskStatsCopy(s.BlkioStats.IoServiceBytesRecursive)
|
||||
ret.DiskIo.IoServiced = DiskStatsCopy(s.BlkioStats.IoServicedRecursive)
|
||||
ret.DiskIo.IoQueued = DiskStatsCopy(s.BlkioStats.IoQueuedRecursive)
|
||||
ret.DiskIo.Sectors = DiskStatsCopy(s.BlkioStats.SectorsRecursive)
|
||||
ret.DiskIo.IoServiceTime = DiskStatsCopy(s.BlkioStats.IoServiceTimeRecursive)
|
||||
ret.DiskIo.IoWaitTime = DiskStatsCopy(s.BlkioStats.IoWaitTimeRecursive)
|
||||
ret.DiskIo.IoMerged = DiskStatsCopy(s.BlkioStats.IoMergedRecursive)
|
||||
ret.DiskIo.IoTime = DiskStatsCopy(s.BlkioStats.IoTimeRecursive)
|
||||
|
||||
ret.Memory.Usage = s.MemoryStats.Usage
|
||||
if v, ok := s.MemoryStats.Stats["pgfault"]; ok {
|
||||
ret.Memory.ContainerData.Pgfault = v
|
||||
ret.Memory.HierarchicalData.Pgfault = v
|
||||
}
|
||||
if v, ok := s.MemoryStats.Stats["pgmajfault"]; ok {
|
||||
ret.Memory.ContainerData.Pgmajfault = v
|
||||
ret.Memory.HierarchicalData.Pgmajfault = v
|
||||
}
|
||||
if v, ok := s.MemoryStats.Stats["total_inactive_anon"]; ok {
|
||||
ret.Memory.WorkingSet = ret.Memory.Usage - v
|
||||
if v, ok := s.MemoryStats.Stats["total_active_file"]; ok {
|
||||
ret.Memory.WorkingSet -= v
|
||||
}
|
||||
}
|
||||
toContainerStats0(s, ret)
|
||||
toContainerStats1(s, ret)
|
||||
toContainerStats2(s, ret)
|
||||
}
|
||||
if len(libcontainerStats.Interfaces) > 0 {
|
||||
// TODO(vmarmol): Handle multiple interfaces.
|
||||
ret.Network.RxBytes = libcontainerStats.Interfaces[0].RxBytes
|
||||
ret.Network.RxPackets = libcontainerStats.Interfaces[0].RxPackets
|
||||
ret.Network.RxErrors = libcontainerStats.Interfaces[0].RxErrors
|
||||
ret.Network.RxDropped = libcontainerStats.Interfaces[0].RxDropped
|
||||
ret.Network.TxBytes = libcontainerStats.Interfaces[0].TxBytes
|
||||
ret.Network.TxPackets = libcontainerStats.Interfaces[0].TxPackets
|
||||
ret.Network.TxErrors = libcontainerStats.Interfaces[0].TxErrors
|
||||
ret.Network.TxDropped = libcontainerStats.Interfaces[0].TxDropped
|
||||
toContainerStats3(libcontainerStats, ret)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
@ -100,10 +100,8 @@ type events struct {
|
||||
watcherLock sync.RWMutex
|
||||
// last allocated watch id.
|
||||
lastId int
|
||||
// Max duration for which to keep events (per event type).
|
||||
maxAge time.Duration
|
||||
// Max number of events to keep (per event type).
|
||||
maxNumEvents int
|
||||
// Event storage policy.
|
||||
storagePolicy StoragePolicy
|
||||
}
|
||||
|
||||
// initialized by a call to WatchEvents(), a watch struct will then be added
|
||||
@ -129,15 +127,34 @@ func NewEventChannel(watchId int) *EventChannel {
|
||||
}
|
||||
}
|
||||
|
||||
// Policy specifying how many events to store.
|
||||
// MaxAge is the max duration for which to keep events.
|
||||
// MaxNumEvents is the max number of events to keep (-1 for no limit).
|
||||
type StoragePolicy struct {
|
||||
// Defaults limites, used if a per-event limit is not set.
|
||||
DefaultMaxAge time.Duration
|
||||
DefaultMaxNumEvents int
|
||||
|
||||
// Per-event type limits.
|
||||
PerTypeMaxAge map[info.EventType]time.Duration
|
||||
PerTypeMaxNumEvents map[info.EventType]int
|
||||
}
|
||||
|
||||
func DefaultStoragePolicy() StoragePolicy {
|
||||
return StoragePolicy{
|
||||
DefaultMaxAge: 24 * time.Hour,
|
||||
DefaultMaxNumEvents: 100000,
|
||||
PerTypeMaxAge: make(map[info.EventType]time.Duration),
|
||||
PerTypeMaxNumEvents: make(map[info.EventType]int),
|
||||
}
|
||||
}
|
||||
|
||||
// returns a pointer to an initialized Events object.
|
||||
// eventMaxAge is the max duration for which to keep events per type.
|
||||
// maxNumEvents is the max number of events to keep per type (-1 for no limit).
|
||||
func NewEventManager(eventMaxAge time.Duration, maxNumEvents int) *events {
|
||||
func NewEventManager(storagePolicy StoragePolicy) *events {
|
||||
return &events{
|
||||
eventStore: make(map[info.EventType]*utils.TimedStore, 0),
|
||||
watchers: make(map[int]*watch),
|
||||
maxAge: eventMaxAge,
|
||||
maxNumEvents: maxNumEvents,
|
||||
eventStore: make(map[info.EventType]*utils.TimedStore, 0),
|
||||
watchers: make(map[int]*watch),
|
||||
storagePolicy: storagePolicy,
|
||||
}
|
||||
}
|
||||
|
||||
@ -266,7 +283,16 @@ func (self *events) updateEventStore(e *info.Event) {
|
||||
self.eventsLock.Lock()
|
||||
defer self.eventsLock.Unlock()
|
||||
if _, ok := self.eventStore[e.EventType]; !ok {
|
||||
self.eventStore[e.EventType] = utils.NewTimedStore(self.maxAge, self.maxNumEvents)
|
||||
maxAge := self.storagePolicy.DefaultMaxAge
|
||||
maxNumEvents := self.storagePolicy.DefaultMaxNumEvents
|
||||
if age, ok := self.storagePolicy.PerTypeMaxAge[e.EventType]; ok {
|
||||
maxAge = age
|
||||
}
|
||||
if numEvents, ok := self.storagePolicy.PerTypeMaxNumEvents[e.EventType]; ok {
|
||||
maxNumEvents = numEvents
|
||||
}
|
||||
|
||||
self.eventStore[e.EventType] = utils.NewTimedStore(maxAge, maxNumEvents)
|
||||
}
|
||||
self.eventStore[e.EventType].Add(e.Timestamp, e)
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ func initializeScenario(t *testing.T) (*events, *Request, *info.Event, *info.Eve
|
||||
fakeEvent := makeEvent(createOldTime(t), "/")
|
||||
fakeEvent2 := makeEvent(time.Now(), "/")
|
||||
|
||||
return NewEventManager(time.Hour, -1), NewRequest(), fakeEvent, fakeEvent2
|
||||
return NewEventManager(DefaultStoragePolicy()), NewRequest(), fakeEvent, fakeEvent2
|
||||
}
|
||||
|
||||
func checkNumberOfEvents(t *testing.T, numEventsExpected int, numEventsReceived int) {
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"fmt"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -42,6 +43,8 @@ import (
|
||||
var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings")
|
||||
var logCadvisorUsage = flag.Bool("log_cadvisor_usage", false, "Whether to log the usage of the cAdvisor container")
|
||||
var enableLoadReader = flag.Bool("enable_load_reader", false, "Whether to enable cpu load reader")
|
||||
var eventStorageAgeLimit = flag.String("event_storage_age_limit", "default=24h", "Max length of time for which to store events (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is a duration. Default is applied to all non-specified event types")
|
||||
var eventStorageEventLimit = flag.String("event_storage_event_limit", "default=100000", "Max number of events to store (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is an integer. Default is applied to all non-specified event types")
|
||||
|
||||
// The Manager interface defines operations for starting a manager and getting
|
||||
// container and machine information.
|
||||
@ -136,8 +139,7 @@ func New(memoryStorage *memory.InMemoryStorage, sysfs sysfs.SysFs) (Manager, err
|
||||
newManager.versionInfo = *versionInfo
|
||||
glog.Infof("Version: %+v", newManager.versionInfo)
|
||||
|
||||
// TODO(vmarmol): Make configurable.
|
||||
newManager.eventHandler = events.NewEventManager(24*time.Hour, 100000)
|
||||
newManager.eventHandler = events.NewEventManager(parseEventsStoragePolicy())
|
||||
|
||||
// Register Docker container factory.
|
||||
err = docker.Register(newManager, fsInfo)
|
||||
@ -949,3 +951,50 @@ func (self *manager) GetPastEvents(request *events.Request) ([]*info.Event, erro
|
||||
func (self *manager) CloseEventChannel(watch_id int) {
|
||||
self.eventHandler.StopWatch(watch_id)
|
||||
}
|
||||
|
||||
// Parses the events StoragePolicy from the flags.
|
||||
func parseEventsStoragePolicy() events.StoragePolicy {
|
||||
policy := events.DefaultStoragePolicy()
|
||||
|
||||
// Parse max age.
|
||||
parts := strings.Split(*eventStorageAgeLimit, ",")
|
||||
for _, part := range parts {
|
||||
items := strings.Split(part, "=")
|
||||
if len(items) != 2 {
|
||||
glog.Warningf("Unknown event storage policy %q when parsing max age", part)
|
||||
continue
|
||||
}
|
||||
dur, err := time.ParseDuration(items[1])
|
||||
if err != nil {
|
||||
glog.Warningf("Unable to parse event max age duration %q: %v", items[1], err)
|
||||
continue
|
||||
}
|
||||
if items[0] == "default" {
|
||||
policy.DefaultMaxAge = dur
|
||||
continue
|
||||
}
|
||||
policy.PerTypeMaxAge[info.EventType(items[0])] = dur
|
||||
}
|
||||
|
||||
// Parse max number.
|
||||
parts = strings.Split(*eventStorageEventLimit, ",")
|
||||
for _, part := range parts {
|
||||
items := strings.Split(part, "=")
|
||||
if len(items) != 2 {
|
||||
glog.Warningf("Unknown event storage policy %q when parsing max event limit", part)
|
||||
continue
|
||||
}
|
||||
val, err := strconv.Atoi(items[1])
|
||||
if err != nil {
|
||||
glog.Warningf("Unable to parse integer from %q: %v", items[1], err)
|
||||
continue
|
||||
}
|
||||
if items[0] == "default" {
|
||||
policy.DefaultMaxNumEvents = val
|
||||
continue
|
||||
}
|
||||
policy.PerTypeMaxNumEvents[info.EventType(items[0])] = val
|
||||
}
|
||||
|
||||
return policy
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user