From 28dfea761ccb8766800aa7ef35fb181efd1d5ebf Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Fri, 1 May 2015 10:42:17 -0700 Subject: [PATCH 1/2] Add events StoragePolicy. --- events/handler.go | 50 ++++++++++++++++++++++++++++++++---------- events/handler_test.go | 2 +- manager/manager.go | 4 ++-- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/events/handler.go b/events/handler.go index c0ac7fa2..a9146f86 100644 --- a/events/handler.go +++ b/events/handler.go @@ -100,10 +100,8 @@ type events struct { watcherLock sync.RWMutex // last allocated watch id. lastId int - // Max duration for which to keep events (per event type). - maxAge time.Duration - // Max number of events to keep (per event type). - maxNumEvents int + // Event storage policy. + storagePolicy StoragePolicy } // initialized by a call to WatchEvents(), a watch struct will then be added @@ -129,15 +127,34 @@ func NewEventChannel(watchId int) *EventChannel { } } +// Policy specifying how many events to store. +// MaxAge is the max duration for which to keep events. +// MaxNumEvents is the max number of events to keep (-1 for no limit). +type StoragePolicy struct { + // Defaults limites, used if a per-event limit is not set. + DefaultMaxAge time.Duration + DefaultMaxNumEvents int + + // Per-event type limits. + PerTypeMaxAge map[info.EventType]time.Duration + PerTypeMaxNumEvents map[info.EventType]int +} + +func DefaultStoragePolicy() StoragePolicy { + return StoragePolicy{ + DefaultMaxAge: 24 * time.Hour, + DefaultMaxNumEvents: 100000, + PerTypeMaxAge: make(map[info.EventType]time.Duration), + PerTypeMaxNumEvents: make(map[info.EventType]int), + } +} + // returns a pointer to an initialized Events object. -// eventMaxAge is the max duration for which to keep events per type. -// maxNumEvents is the max number of events to keep per type (-1 for no limit). -func NewEventManager(eventMaxAge time.Duration, maxNumEvents int) *events { +func NewEventManager(storagePolicy StoragePolicy) *events { return &events{ - eventStore: make(map[info.EventType]*utils.TimedStore, 0), - watchers: make(map[int]*watch), - maxAge: eventMaxAge, - maxNumEvents: maxNumEvents, + eventStore: make(map[info.EventType]*utils.TimedStore, 0), + watchers: make(map[int]*watch), + storagePolicy: storagePolicy, } } @@ -266,7 +283,16 @@ func (self *events) updateEventStore(e *info.Event) { self.eventsLock.Lock() defer self.eventsLock.Unlock() if _, ok := self.eventStore[e.EventType]; !ok { - self.eventStore[e.EventType] = utils.NewTimedStore(self.maxAge, self.maxNumEvents) + maxAge := self.storagePolicy.DefaultMaxAge + maxNumEvents := self.storagePolicy.DefaultMaxNumEvents + if age, ok := self.storagePolicy.PerTypeMaxAge[e.EventType]; ok { + maxAge = age + } + if numEvents, ok := self.storagePolicy.PerTypeMaxNumEvents[e.EventType]; ok { + maxNumEvents = numEvents + } + + self.eventStore[e.EventType] = utils.NewTimedStore(maxAge, maxNumEvents) } self.eventStore[e.EventType].Add(e.Timestamp, e) } diff --git a/events/handler_test.go b/events/handler_test.go index 82dd6970..65b38897 100644 --- a/events/handler_test.go +++ b/events/handler_test.go @@ -47,7 +47,7 @@ func initializeScenario(t *testing.T) (*events, *Request, *info.Event, *info.Eve fakeEvent := makeEvent(createOldTime(t), "/") fakeEvent2 := makeEvent(time.Now(), "/") - return NewEventManager(time.Hour, -1), NewRequest(), fakeEvent, fakeEvent2 + return NewEventManager(DefaultStoragePolicy()), NewRequest(), fakeEvent, fakeEvent2 } func checkNumberOfEvents(t *testing.T, numEventsExpected int, numEventsReceived int) { diff --git a/manager/manager.go b/manager/manager.go index b21fd275..857d1e08 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -136,8 +136,8 @@ func New(memoryStorage *memory.InMemoryStorage, sysfs sysfs.SysFs) (Manager, err newManager.versionInfo = *versionInfo glog.Infof("Version: %+v", newManager.versionInfo) - // TODO(vmarmol): Make configurable. - newManager.eventHandler = events.NewEventManager(24*time.Hour, 100000) + storagePolicy := events.DefaultStoragePolicy() + newManager.eventHandler = events.NewEventManager(storagePolicy) // Register Docker container factory. err = docker.Register(newManager, fsInfo) From a7a7aacebc31625d1817a77ec8524e3d038f9d39 Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Fri, 1 May 2015 11:28:54 -0700 Subject: [PATCH 2/2] Add flags to customize the events StoragePolicy. --- container/libcontainer/helpers.go | 145 +++++++++++++++++------------- manager/manager.go | 53 ++++++++++- 2 files changed, 135 insertions(+), 63 deletions(-) diff --git a/container/libcontainer/helpers.go b/container/libcontainer/helpers.go index 0ae84086..d5bf142a 100644 --- a/container/libcontainer/helpers.go +++ b/container/libcontainer/helpers.go @@ -100,14 +100,34 @@ func DockerStateDir(dockerRoot string) string { return path.Join(dockerRoot, "containers") } +func DiskStatsCopy0(major, minor uint64) *info.PerDiskStats { + disk := info.PerDiskStats{ + Major: major, + Minor: minor, + } + disk.Stats = make(map[string]uint64) + return &disk +} + +type DiskKey struct { + Major uint64 + Minor uint64 +} + +func DiskStatsCopy1(disk_stat map[DiskKey]*info.PerDiskStats) []info.PerDiskStats { + i := 0 + stat := make([]info.PerDiskStats, len(disk_stat)) + for _, disk := range disk_stat { + stat[i] = *disk + i++ + } + return stat +} + func DiskStatsCopy(blkio_stats []cgroups.BlkioStatEntry) (stat []info.PerDiskStats) { if len(blkio_stats) == 0 { return } - type DiskKey struct { - Major uint64 - Minor uint64 - } disk_stat := make(map[DiskKey]*info.PerDiskStats) for i := range blkio_stats { major := blkio_stats[i].Major @@ -118,12 +138,7 @@ func DiskStatsCopy(blkio_stats []cgroups.BlkioStatEntry) (stat []info.PerDiskSta } diskp, ok := disk_stat[disk_key] if !ok { - disk := info.PerDiskStats{ - Major: major, - Minor: minor, - } - disk.Stats = make(map[string]uint64) - diskp = &disk + diskp = DiskStatsCopy0(major, minor) disk_stat[disk_key] = diskp } op := blkio_stats[i].Op @@ -132,68 +147,76 @@ func DiskStatsCopy(blkio_stats []cgroups.BlkioStatEntry) (stat []info.PerDiskSta } diskp.Stats[op] = blkio_stats[i].Value } - i := 0 - stat = make([]info.PerDiskStats, len(disk_stat)) - for _, disk := range disk_stat { - stat[i] = *disk - i++ - } - return + return DiskStatsCopy1(disk_stat) } // Convert libcontainer stats to info.ContainerStats. +func toContainerStats0(s *cgroups.Stats, ret *info.ContainerStats) { + ret.Cpu.Usage.User = s.CpuStats.CpuUsage.UsageInUsermode + ret.Cpu.Usage.System = s.CpuStats.CpuUsage.UsageInKernelmode + n := len(s.CpuStats.CpuUsage.PercpuUsage) + ret.Cpu.Usage.PerCpu = make([]uint64, n) + + ret.Cpu.Usage.Total = 0 + for i := 0; i < n; i++ { + ret.Cpu.Usage.PerCpu[i] = s.CpuStats.CpuUsage.PercpuUsage[i] + ret.Cpu.Usage.Total += s.CpuStats.CpuUsage.PercpuUsage[i] + } +} + +func toContainerStats1(s *cgroups.Stats, ret *info.ContainerStats) { + ret.DiskIo.IoServiceBytes = DiskStatsCopy(s.BlkioStats.IoServiceBytesRecursive) + ret.DiskIo.IoServiced = DiskStatsCopy(s.BlkioStats.IoServicedRecursive) + ret.DiskIo.IoQueued = DiskStatsCopy(s.BlkioStats.IoQueuedRecursive) + ret.DiskIo.Sectors = DiskStatsCopy(s.BlkioStats.SectorsRecursive) + ret.DiskIo.IoServiceTime = DiskStatsCopy(s.BlkioStats.IoServiceTimeRecursive) + ret.DiskIo.IoWaitTime = DiskStatsCopy(s.BlkioStats.IoWaitTimeRecursive) + ret.DiskIo.IoMerged = DiskStatsCopy(s.BlkioStats.IoMergedRecursive) + ret.DiskIo.IoTime = DiskStatsCopy(s.BlkioStats.IoTimeRecursive) +} + +func toContainerStats2(s *cgroups.Stats, ret *info.ContainerStats) { + ret.Memory.Usage = s.MemoryStats.Usage + if v, ok := s.MemoryStats.Stats["pgfault"]; ok { + ret.Memory.ContainerData.Pgfault = v + ret.Memory.HierarchicalData.Pgfault = v + } + if v, ok := s.MemoryStats.Stats["pgmajfault"]; ok { + ret.Memory.ContainerData.Pgmajfault = v + ret.Memory.HierarchicalData.Pgmajfault = v + } + if v, ok := s.MemoryStats.Stats["total_inactive_anon"]; ok { + ret.Memory.WorkingSet = ret.Memory.Usage - v + if v, ok := s.MemoryStats.Stats["total_active_file"]; ok { + ret.Memory.WorkingSet -= v + } + } +} + +func toContainerStats3(libcontainerStats *libcontainer.Stats, ret *info.ContainerStats) { + // TODO(vmarmol): Handle multiple interfaces. + ret.Network.RxBytes = libcontainerStats.Interfaces[0].RxBytes + ret.Network.RxPackets = libcontainerStats.Interfaces[0].RxPackets + ret.Network.RxErrors = libcontainerStats.Interfaces[0].RxErrors + ret.Network.RxDropped = libcontainerStats.Interfaces[0].RxDropped + ret.Network.TxBytes = libcontainerStats.Interfaces[0].TxBytes + ret.Network.TxPackets = libcontainerStats.Interfaces[0].TxPackets + ret.Network.TxErrors = libcontainerStats.Interfaces[0].TxErrors + ret.Network.TxDropped = libcontainerStats.Interfaces[0].TxDropped +} + func toContainerStats(libcontainerStats *libcontainer.Stats) *info.ContainerStats { s := libcontainerStats.CgroupStats ret := new(info.ContainerStats) ret.Timestamp = time.Now() if s != nil { - ret.Cpu.Usage.User = s.CpuStats.CpuUsage.UsageInUsermode - ret.Cpu.Usage.System = s.CpuStats.CpuUsage.UsageInKernelmode - n := len(s.CpuStats.CpuUsage.PercpuUsage) - ret.Cpu.Usage.PerCpu = make([]uint64, n) - - ret.Cpu.Usage.Total = 0 - for i := 0; i < n; i++ { - ret.Cpu.Usage.PerCpu[i] = s.CpuStats.CpuUsage.PercpuUsage[i] - ret.Cpu.Usage.Total += s.CpuStats.CpuUsage.PercpuUsage[i] - } - - ret.DiskIo.IoServiceBytes = DiskStatsCopy(s.BlkioStats.IoServiceBytesRecursive) - ret.DiskIo.IoServiced = DiskStatsCopy(s.BlkioStats.IoServicedRecursive) - ret.DiskIo.IoQueued = DiskStatsCopy(s.BlkioStats.IoQueuedRecursive) - ret.DiskIo.Sectors = DiskStatsCopy(s.BlkioStats.SectorsRecursive) - ret.DiskIo.IoServiceTime = DiskStatsCopy(s.BlkioStats.IoServiceTimeRecursive) - ret.DiskIo.IoWaitTime = DiskStatsCopy(s.BlkioStats.IoWaitTimeRecursive) - ret.DiskIo.IoMerged = DiskStatsCopy(s.BlkioStats.IoMergedRecursive) - ret.DiskIo.IoTime = DiskStatsCopy(s.BlkioStats.IoTimeRecursive) - - ret.Memory.Usage = s.MemoryStats.Usage - if v, ok := s.MemoryStats.Stats["pgfault"]; ok { - ret.Memory.ContainerData.Pgfault = v - ret.Memory.HierarchicalData.Pgfault = v - } - if v, ok := s.MemoryStats.Stats["pgmajfault"]; ok { - ret.Memory.ContainerData.Pgmajfault = v - ret.Memory.HierarchicalData.Pgmajfault = v - } - if v, ok := s.MemoryStats.Stats["total_inactive_anon"]; ok { - ret.Memory.WorkingSet = ret.Memory.Usage - v - if v, ok := s.MemoryStats.Stats["total_active_file"]; ok { - ret.Memory.WorkingSet -= v - } - } + toContainerStats0(s, ret) + toContainerStats1(s, ret) + toContainerStats2(s, ret) } if len(libcontainerStats.Interfaces) > 0 { - // TODO(vmarmol): Handle multiple interfaces. - ret.Network.RxBytes = libcontainerStats.Interfaces[0].RxBytes - ret.Network.RxPackets = libcontainerStats.Interfaces[0].RxPackets - ret.Network.RxErrors = libcontainerStats.Interfaces[0].RxErrors - ret.Network.RxDropped = libcontainerStats.Interfaces[0].RxDropped - ret.Network.TxBytes = libcontainerStats.Interfaces[0].TxBytes - ret.Network.TxPackets = libcontainerStats.Interfaces[0].TxPackets - ret.Network.TxErrors = libcontainerStats.Interfaces[0].TxErrors - ret.Network.TxDropped = libcontainerStats.Interfaces[0].TxDropped + toContainerStats3(libcontainerStats, ret) } return ret } diff --git a/manager/manager.go b/manager/manager.go index 857d1e08..3d5e02e2 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -20,6 +20,7 @@ import ( "fmt" "path" "regexp" + "strconv" "strings" "sync" "time" @@ -42,6 +43,8 @@ import ( var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings") var logCadvisorUsage = flag.Bool("log_cadvisor_usage", false, "Whether to log the usage of the cAdvisor container") var enableLoadReader = flag.Bool("enable_load_reader", false, "Whether to enable cpu load reader") +var eventStorageAgeLimit = flag.String("event_storage_age_limit", "default=24h", "Max length of time for which to store events (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is a duration. Default is applied to all non-specified event types") +var eventStorageEventLimit = flag.String("event_storage_event_limit", "default=100000", "Max number of events to store (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is an integer. Default is applied to all non-specified event types") // The Manager interface defines operations for starting a manager and getting // container and machine information. @@ -136,8 +139,7 @@ func New(memoryStorage *memory.InMemoryStorage, sysfs sysfs.SysFs) (Manager, err newManager.versionInfo = *versionInfo glog.Infof("Version: %+v", newManager.versionInfo) - storagePolicy := events.DefaultStoragePolicy() - newManager.eventHandler = events.NewEventManager(storagePolicy) + newManager.eventHandler = events.NewEventManager(parseEventsStoragePolicy()) // Register Docker container factory. err = docker.Register(newManager, fsInfo) @@ -949,3 +951,50 @@ func (self *manager) GetPastEvents(request *events.Request) ([]*info.Event, erro func (self *manager) CloseEventChannel(watch_id int) { self.eventHandler.StopWatch(watch_id) } + +// Parses the events StoragePolicy from the flags. +func parseEventsStoragePolicy() events.StoragePolicy { + policy := events.DefaultStoragePolicy() + + // Parse max age. + parts := strings.Split(*eventStorageAgeLimit, ",") + for _, part := range parts { + items := strings.Split(part, "=") + if len(items) != 2 { + glog.Warningf("Unknown event storage policy %q when parsing max age", part) + continue + } + dur, err := time.ParseDuration(items[1]) + if err != nil { + glog.Warningf("Unable to parse event max age duration %q: %v", items[1], err) + continue + } + if items[0] == "default" { + policy.DefaultMaxAge = dur + continue + } + policy.PerTypeMaxAge[info.EventType(items[0])] = dur + } + + // Parse max number. + parts = strings.Split(*eventStorageEventLimit, ",") + for _, part := range parts { + items := strings.Split(part, "=") + if len(items) != 2 { + glog.Warningf("Unknown event storage policy %q when parsing max event limit", part) + continue + } + val, err := strconv.Atoi(items[1]) + if err != nil { + glog.Warningf("Unable to parse integer from %q: %v", items[1], err) + continue + } + if items[0] == "default" { + policy.DefaultMaxNumEvents = val + continue + } + policy.PerTypeMaxNumEvents[info.EventType(items[0])] = val + } + + return policy +}