diff --git a/events/handler.go b/events/handler.go index 9d026503..dc955b64 100644 --- a/events/handler.go +++ b/events/handler.go @@ -23,20 +23,21 @@ import ( "github.com/golang/glog" info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/utils" ) -type EventSlice []*info.Event +type byTimestamp []*info.Event // functions necessary to implement the sort interface on the Events struct -func (e EventSlice) Len() int { +func (e byTimestamp) Len() int { return len(e) } -func (e EventSlice) Swap(i, j int) { +func (e byTimestamp) Swap(i, j int) { e[i], e[j] = e[j], e[i] } -func (e EventSlice) Less(i, j int) bool { +func (e byTimestamp) Less(i, j int) bool { return e[i].Timestamp.Before(e[j].Timestamp) } @@ -79,7 +80,7 @@ type EventManager interface { // On successful registration, an EventChannel object is returned. WatchEvents(request *Request) (*EventChannel, error) // GetEvents() returns all detected events based on the filters specified in request. - GetEvents(request *Request) (EventSlice, error) + GetEvents(request *Request) ([]*info.Event, error) // AddEvent allows the caller to add an event to an EventManager // object AddEvent(e *info.Event) error @@ -89,17 +90,18 @@ type EventManager interface { // events provides an implementation for the EventManager interface. type events struct { - // eventList holds the complete set of events found over an - // EventManager events instantiation. - eventList EventSlice + // eventStore holds the events by event type. + eventStore map[info.EventType]*utils.TimedStore // map of registered watchers keyed by watch id. watchers map[int]*watch - // lock guarding the eventList. + // lock guarding the eventStore. eventsLock sync.RWMutex // lock guarding watchers. watcherLock sync.RWMutex // last allocated watch id. lastId int + // Max duration for which to keep events. + maxAge time.Duration } // initialized by a call to WatchEvents(), a watch struct will then be added @@ -125,11 +127,13 @@ func NewEventChannel(watchId int) *EventChannel { } } -// returns a pointer to an initialized Events object -func NewEventManager() *events { +// returns a pointer to an initialized Events object. +// eventMaxAge is the max duration for which to keep events. +func NewEventManager(eventMaxAge time.Duration) *events { return &events{ - eventList: make(EventSlice, 0), - watchers: make(map[int]*watch), + eventStore: make(map[info.EventType]*utils.TimedStore, 0), + watchers: make(map[int]*watch), + maxAge: eventMaxAge, } } @@ -159,13 +163,13 @@ func (self *EventChannel) GetWatchId() int { } // sorts and returns up to the last MaxEventsReturned chronological elements -func getMaxEventsReturned(request *Request, eSlice EventSlice) EventSlice { - sort.Sort(eSlice) +func getMaxEventsReturned(request *Request, eSlice []*info.Event) []*info.Event { + sort.Sort(byTimestamp(eSlice)) n := request.MaxEventsReturned - if n >= eSlice.Len() || n <= 0 { + if n >= len(eSlice) || n <= 0 { return eSlice } - return eSlice[eSlice.Len()-n:] + return eSlice[len(eSlice)-n:] } // If the request wants all subcontainers, this returns if the request's @@ -194,7 +198,7 @@ func checkIfEventSatisfiesRequest(request *Request, event *info.Event) bool { return false } } - if request.EventType[event.EventType] != true { + if !request.EventType[event.EventType] { return false } if request.ContainerName != "" { @@ -203,18 +207,30 @@ func checkIfEventSatisfiesRequest(request *Request, event *info.Event) bool { return true } -// method of Events object that screens Event objects found in the eventList +// method of Events object that screens Event objects found in the eventStore // attribute and if they fit the parameters passed by the Request object, // adds it to a slice of *Event objects that is returned. If both MaxEventsReturned // and StartTime/EndTime are specified in the request object, then only // up to the most recent MaxEventsReturned events in that time range are returned. -func (self *events) GetEvents(request *Request) (EventSlice, error) { - returnEventList := EventSlice{} +func (self *events) GetEvents(request *Request) ([]*info.Event, error) { + returnEventList := []*info.Event{} self.eventsLock.RLock() defer self.eventsLock.RUnlock() - for _, e := range self.eventList { - if checkIfEventSatisfiesRequest(request, e) { - returnEventList = append(returnEventList, e) + for eventType, fetch := range request.EventType { + if !fetch { + continue + } + evs, ok := self.eventStore[eventType] + if !ok { + continue + } + + res := evs.InTimeRange(request.StartTime, request.EndTime, request.MaxEventsReturned) + for _, in := range res { + e := in.(*info.Event) + if checkIfEventSatisfiesRequest(request, e) { + returnEventList = append(returnEventList, e) + } } } returnEventList = getMaxEventsReturned(request, returnEventList) @@ -241,11 +257,14 @@ func (self *events) WatchEvents(request *Request) (*EventChannel, error) { return returnEventChannel, nil } -// helper function to update the event manager's eventList -func (self *events) updateEventList(e *info.Event) { +// helper function to update the event manager's eventStore +func (self *events) updateEventStore(e *info.Event) { self.eventsLock.Lock() defer self.eventsLock.Unlock() - self.eventList = append(self.eventList, e) + if _, ok := self.eventStore[e.EventType]; !ok { + self.eventStore[e.EventType] = utils.NewTimedStore(self.maxAge) + } + self.eventStore[e.EventType].Add(e.Timestamp, e) } func (self *events) findValidWatchers(e *info.Event) []*watch { @@ -260,10 +279,10 @@ func (self *events) findValidWatchers(e *info.Event) []*watch { } // method of Events object that adds the argument Event object to the -// eventList. It also feeds the event to a set of watch channels +// eventStore. It also feeds the event to a set of watch channels // held by the manager if it satisfies the request keys of the channels func (self *events) AddEvent(e *info.Event) error { - self.updateEventList(e) + self.updateEventStore(e) self.watcherLock.RLock() defer self.watcherLock.RUnlock() watchesToSend := self.findValidWatchers(e) diff --git a/events/handler_test.go b/events/handler_test.go index f9f074b4..108acf11 100644 --- a/events/handler_test.go +++ b/events/handler_test.go @@ -47,7 +47,7 @@ func initializeScenario(t *testing.T) (*events, *Request, *info.Event, *info.Eve fakeEvent := makeEvent(createOldTime(t), "/") fakeEvent2 := makeEvent(time.Now(), "/") - return NewEventManager(), NewRequest(), fakeEvent, fakeEvent2 + return NewEventManager(time.Hour), NewRequest(), fakeEvent, fakeEvent2 } func checkNumberOfEvents(t *testing.T, numEventsExpected int, numEventsReceived int) { @@ -150,8 +150,8 @@ func TestAddEventAddsEventsToEventManager(t *testing.T) { myEventHolder.AddEvent(fakeEvent) - checkNumberOfEvents(t, 1, myEventHolder.eventList.Len()) - ensureProperEventReturned(t, fakeEvent, myEventHolder.eventList[0]) + checkNumberOfEvents(t, 1, len(myEventHolder.eventStore)) + ensureProperEventReturned(t, fakeEvent, myEventHolder.eventStore[info.EventOom].Get(0).(*info.Event)) } func TestGetEventsForOneEvent(t *testing.T) { @@ -164,7 +164,7 @@ func TestGetEventsForOneEvent(t *testing.T) { receivedEvents, err := myEventHolder.GetEvents(myRequest) assert.Nil(t, err) - checkNumberOfEvents(t, 1, receivedEvents.Len()) + checkNumberOfEvents(t, 1, len(receivedEvents)) ensureProperEventReturned(t, fakeEvent2, receivedEvents[0]) } @@ -180,7 +180,7 @@ func TestGetEventsForTimePeriod(t *testing.T) { receivedEvents, err := myEventHolder.GetEvents(myRequest) assert.Nil(t, err) - checkNumberOfEvents(t, 1, receivedEvents.Len()) + checkNumberOfEvents(t, 1, len(receivedEvents)) ensureProperEventReturned(t, fakeEvent, receivedEvents[0]) } @@ -192,5 +192,5 @@ func TestGetEventsForNoTypeRequested(t *testing.T) { receivedEvents, err := myEventHolder.GetEvents(myRequest) assert.Nil(t, err) - checkNumberOfEvents(t, 0, receivedEvents.Len()) + checkNumberOfEvents(t, 0, len(receivedEvents)) } diff --git a/manager/manager.go b/manager/manager.go index 37868653..da458fa9 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -133,7 +133,8 @@ func New(memoryStorage *memory.InMemoryStorage, sysfs sysfs.SysFs) (Manager, err newManager.versionInfo = *versionInfo glog.Infof("Version: %+v", newManager.versionInfo) - newManager.eventHandler = events.NewEventManager() + // TODO(vmarmol): Make configurable. + newManager.eventHandler = events.NewEventManager(24 * time.Hour) // Register Docker container factory. err = docker.Register(newManager, fsInfo)