Limit the number of events stored to 24h.
There is a TODO to make this configurable.
This commit is contained in:
parent
8197d35ea2
commit
62d670d386
@ -23,20 +23,21 @@ import (
|
|||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
info "github.com/google/cadvisor/info/v1"
|
info "github.com/google/cadvisor/info/v1"
|
||||||
|
"github.com/google/cadvisor/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EventSlice []*info.Event
|
type byTimestamp []*info.Event
|
||||||
|
|
||||||
// functions necessary to implement the sort interface on the Events struct
|
// functions necessary to implement the sort interface on the Events struct
|
||||||
func (e EventSlice) Len() int {
|
func (e byTimestamp) Len() int {
|
||||||
return len(e)
|
return len(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e EventSlice) Swap(i, j int) {
|
func (e byTimestamp) Swap(i, j int) {
|
||||||
e[i], e[j] = e[j], e[i]
|
e[i], e[j] = e[j], e[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e EventSlice) Less(i, j int) bool {
|
func (e byTimestamp) Less(i, j int) bool {
|
||||||
return e[i].Timestamp.Before(e[j].Timestamp)
|
return e[i].Timestamp.Before(e[j].Timestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,7 +80,7 @@ type EventManager interface {
|
|||||||
// On successful registration, an EventChannel object is returned.
|
// On successful registration, an EventChannel object is returned.
|
||||||
WatchEvents(request *Request) (*EventChannel, error)
|
WatchEvents(request *Request) (*EventChannel, error)
|
||||||
// GetEvents() returns all detected events based on the filters specified in request.
|
// GetEvents() returns all detected events based on the filters specified in request.
|
||||||
GetEvents(request *Request) (EventSlice, error)
|
GetEvents(request *Request) ([]*info.Event, error)
|
||||||
// AddEvent allows the caller to add an event to an EventManager
|
// AddEvent allows the caller to add an event to an EventManager
|
||||||
// object
|
// object
|
||||||
AddEvent(e *info.Event) error
|
AddEvent(e *info.Event) error
|
||||||
@ -89,17 +90,18 @@ type EventManager interface {
|
|||||||
|
|
||||||
// events provides an implementation for the EventManager interface.
|
// events provides an implementation for the EventManager interface.
|
||||||
type events struct {
|
type events struct {
|
||||||
// eventList holds the complete set of events found over an
|
// eventStore holds the events by event type.
|
||||||
// EventManager events instantiation.
|
eventStore map[info.EventType]*utils.TimedStore
|
||||||
eventList EventSlice
|
|
||||||
// map of registered watchers keyed by watch id.
|
// map of registered watchers keyed by watch id.
|
||||||
watchers map[int]*watch
|
watchers map[int]*watch
|
||||||
// lock guarding the eventList.
|
// lock guarding the eventStore.
|
||||||
eventsLock sync.RWMutex
|
eventsLock sync.RWMutex
|
||||||
// lock guarding watchers.
|
// lock guarding watchers.
|
||||||
watcherLock sync.RWMutex
|
watcherLock sync.RWMutex
|
||||||
// last allocated watch id.
|
// last allocated watch id.
|
||||||
lastId int
|
lastId int
|
||||||
|
// Max duration for which to keep events.
|
||||||
|
maxAge time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialized by a call to WatchEvents(), a watch struct will then be added
|
// initialized by a call to WatchEvents(), a watch struct will then be added
|
||||||
@ -125,11 +127,13 @@ func NewEventChannel(watchId int) *EventChannel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns a pointer to an initialized Events object
|
// returns a pointer to an initialized Events object.
|
||||||
func NewEventManager() *events {
|
// eventMaxAge is the max duration for which to keep events.
|
||||||
|
func NewEventManager(eventMaxAge time.Duration) *events {
|
||||||
return &events{
|
return &events{
|
||||||
eventList: make(EventSlice, 0),
|
eventStore: make(map[info.EventType]*utils.TimedStore, 0),
|
||||||
watchers: make(map[int]*watch),
|
watchers: make(map[int]*watch),
|
||||||
|
maxAge: eventMaxAge,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,13 +163,13 @@ func (self *EventChannel) GetWatchId() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// sorts and returns up to the last MaxEventsReturned chronological elements
|
// sorts and returns up to the last MaxEventsReturned chronological elements
|
||||||
func getMaxEventsReturned(request *Request, eSlice EventSlice) EventSlice {
|
func getMaxEventsReturned(request *Request, eSlice []*info.Event) []*info.Event {
|
||||||
sort.Sort(eSlice)
|
sort.Sort(byTimestamp(eSlice))
|
||||||
n := request.MaxEventsReturned
|
n := request.MaxEventsReturned
|
||||||
if n >= eSlice.Len() || n <= 0 {
|
if n >= len(eSlice) || n <= 0 {
|
||||||
return eSlice
|
return eSlice
|
||||||
}
|
}
|
||||||
return eSlice[eSlice.Len()-n:]
|
return eSlice[len(eSlice)-n:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the request wants all subcontainers, this returns if the request's
|
// If the request wants all subcontainers, this returns if the request's
|
||||||
@ -194,7 +198,7 @@ func checkIfEventSatisfiesRequest(request *Request, event *info.Event) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if request.EventType[event.EventType] != true {
|
if !request.EventType[event.EventType] {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if request.ContainerName != "" {
|
if request.ContainerName != "" {
|
||||||
@ -203,18 +207,30 @@ func checkIfEventSatisfiesRequest(request *Request, event *info.Event) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// method of Events object that screens Event objects found in the eventList
|
// method of Events object that screens Event objects found in the eventStore
|
||||||
// attribute and if they fit the parameters passed by the Request object,
|
// attribute and if they fit the parameters passed by the Request object,
|
||||||
// adds it to a slice of *Event objects that is returned. If both MaxEventsReturned
|
// adds it to a slice of *Event objects that is returned. If both MaxEventsReturned
|
||||||
// and StartTime/EndTime are specified in the request object, then only
|
// and StartTime/EndTime are specified in the request object, then only
|
||||||
// up to the most recent MaxEventsReturned events in that time range are returned.
|
// up to the most recent MaxEventsReturned events in that time range are returned.
|
||||||
func (self *events) GetEvents(request *Request) (EventSlice, error) {
|
func (self *events) GetEvents(request *Request) ([]*info.Event, error) {
|
||||||
returnEventList := EventSlice{}
|
returnEventList := []*info.Event{}
|
||||||
self.eventsLock.RLock()
|
self.eventsLock.RLock()
|
||||||
defer self.eventsLock.RUnlock()
|
defer self.eventsLock.RUnlock()
|
||||||
for _, e := range self.eventList {
|
for eventType, fetch := range request.EventType {
|
||||||
if checkIfEventSatisfiesRequest(request, e) {
|
if !fetch {
|
||||||
returnEventList = append(returnEventList, e)
|
continue
|
||||||
|
}
|
||||||
|
evs, ok := self.eventStore[eventType]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
res := evs.InTimeRange(request.StartTime, request.EndTime, request.MaxEventsReturned)
|
||||||
|
for _, in := range res {
|
||||||
|
e := in.(*info.Event)
|
||||||
|
if checkIfEventSatisfiesRequest(request, e) {
|
||||||
|
returnEventList = append(returnEventList, e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
returnEventList = getMaxEventsReturned(request, returnEventList)
|
returnEventList = getMaxEventsReturned(request, returnEventList)
|
||||||
@ -241,11 +257,14 @@ func (self *events) WatchEvents(request *Request) (*EventChannel, error) {
|
|||||||
return returnEventChannel, nil
|
return returnEventChannel, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// helper function to update the event manager's eventList
|
// helper function to update the event manager's eventStore
|
||||||
func (self *events) updateEventList(e *info.Event) {
|
func (self *events) updateEventStore(e *info.Event) {
|
||||||
self.eventsLock.Lock()
|
self.eventsLock.Lock()
|
||||||
defer self.eventsLock.Unlock()
|
defer self.eventsLock.Unlock()
|
||||||
self.eventList = append(self.eventList, e)
|
if _, ok := self.eventStore[e.EventType]; !ok {
|
||||||
|
self.eventStore[e.EventType] = utils.NewTimedStore(self.maxAge)
|
||||||
|
}
|
||||||
|
self.eventStore[e.EventType].Add(e.Timestamp, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *events) findValidWatchers(e *info.Event) []*watch {
|
func (self *events) findValidWatchers(e *info.Event) []*watch {
|
||||||
@ -260,10 +279,10 @@ func (self *events) findValidWatchers(e *info.Event) []*watch {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// method of Events object that adds the argument Event object to the
|
// method of Events object that adds the argument Event object to the
|
||||||
// eventList. It also feeds the event to a set of watch channels
|
// eventStore. It also feeds the event to a set of watch channels
|
||||||
// held by the manager if it satisfies the request keys of the channels
|
// held by the manager if it satisfies the request keys of the channels
|
||||||
func (self *events) AddEvent(e *info.Event) error {
|
func (self *events) AddEvent(e *info.Event) error {
|
||||||
self.updateEventList(e)
|
self.updateEventStore(e)
|
||||||
self.watcherLock.RLock()
|
self.watcherLock.RLock()
|
||||||
defer self.watcherLock.RUnlock()
|
defer self.watcherLock.RUnlock()
|
||||||
watchesToSend := self.findValidWatchers(e)
|
watchesToSend := self.findValidWatchers(e)
|
||||||
|
@ -47,7 +47,7 @@ func initializeScenario(t *testing.T) (*events, *Request, *info.Event, *info.Eve
|
|||||||
fakeEvent := makeEvent(createOldTime(t), "/")
|
fakeEvent := makeEvent(createOldTime(t), "/")
|
||||||
fakeEvent2 := makeEvent(time.Now(), "/")
|
fakeEvent2 := makeEvent(time.Now(), "/")
|
||||||
|
|
||||||
return NewEventManager(), NewRequest(), fakeEvent, fakeEvent2
|
return NewEventManager(time.Hour), NewRequest(), fakeEvent, fakeEvent2
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkNumberOfEvents(t *testing.T, numEventsExpected int, numEventsReceived int) {
|
func checkNumberOfEvents(t *testing.T, numEventsExpected int, numEventsReceived int) {
|
||||||
@ -150,8 +150,8 @@ func TestAddEventAddsEventsToEventManager(t *testing.T) {
|
|||||||
|
|
||||||
myEventHolder.AddEvent(fakeEvent)
|
myEventHolder.AddEvent(fakeEvent)
|
||||||
|
|
||||||
checkNumberOfEvents(t, 1, myEventHolder.eventList.Len())
|
checkNumberOfEvents(t, 1, len(myEventHolder.eventStore))
|
||||||
ensureProperEventReturned(t, fakeEvent, myEventHolder.eventList[0])
|
ensureProperEventReturned(t, fakeEvent, myEventHolder.eventStore[info.EventOom].Get(0).(*info.Event))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetEventsForOneEvent(t *testing.T) {
|
func TestGetEventsForOneEvent(t *testing.T) {
|
||||||
@ -164,7 +164,7 @@ func TestGetEventsForOneEvent(t *testing.T) {
|
|||||||
|
|
||||||
receivedEvents, err := myEventHolder.GetEvents(myRequest)
|
receivedEvents, err := myEventHolder.GetEvents(myRequest)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
checkNumberOfEvents(t, 1, receivedEvents.Len())
|
checkNumberOfEvents(t, 1, len(receivedEvents))
|
||||||
ensureProperEventReturned(t, fakeEvent2, receivedEvents[0])
|
ensureProperEventReturned(t, fakeEvent2, receivedEvents[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -180,7 +180,7 @@ func TestGetEventsForTimePeriod(t *testing.T) {
|
|||||||
receivedEvents, err := myEventHolder.GetEvents(myRequest)
|
receivedEvents, err := myEventHolder.GetEvents(myRequest)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
checkNumberOfEvents(t, 1, receivedEvents.Len())
|
checkNumberOfEvents(t, 1, len(receivedEvents))
|
||||||
ensureProperEventReturned(t, fakeEvent, receivedEvents[0])
|
ensureProperEventReturned(t, fakeEvent, receivedEvents[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,5 +192,5 @@ func TestGetEventsForNoTypeRequested(t *testing.T) {
|
|||||||
|
|
||||||
receivedEvents, err := myEventHolder.GetEvents(myRequest)
|
receivedEvents, err := myEventHolder.GetEvents(myRequest)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
checkNumberOfEvents(t, 0, receivedEvents.Len())
|
checkNumberOfEvents(t, 0, len(receivedEvents))
|
||||||
}
|
}
|
||||||
|
@ -133,7 +133,8 @@ func New(memoryStorage *memory.InMemoryStorage, sysfs sysfs.SysFs) (Manager, err
|
|||||||
newManager.versionInfo = *versionInfo
|
newManager.versionInfo = *versionInfo
|
||||||
glog.Infof("Version: %+v", newManager.versionInfo)
|
glog.Infof("Version: %+v", newManager.versionInfo)
|
||||||
|
|
||||||
newManager.eventHandler = events.NewEventManager()
|
// TODO(vmarmol): Make configurable.
|
||||||
|
newManager.eventHandler = events.NewEventManager(24 * time.Hour)
|
||||||
|
|
||||||
// Register Docker container factory.
|
// Register Docker container factory.
|
||||||
err = docker.Register(newManager, fsInfo)
|
err = docker.Register(newManager, fsInfo)
|
||||||
|
Loading…
Reference in New Issue
Block a user