From 3c7e67991d35fce7fa7b55db92dddaf14ee84c2e Mon Sep 17 00:00:00 2001 From: Katie Knister Date: Wed, 11 Mar 2015 11:32:14 -0700 Subject: [PATCH] Created a way to remove unused channels from an eventHandler's watchers list --- api/handler.go | 6 ++-- api/versions.go | 6 ++-- events/handler.go | 75 +++++++++++++++++++++++++++++++++--------- events/handler_test.go | 22 +++++-------- manager/manager.go | 13 ++++++-- 5 files changed, 84 insertions(+), 38 deletions(-) diff --git a/api/handler.go b/api/handler.go index 703d3c57..d42525bb 100644 --- a/api/handler.go +++ b/api/handler.go @@ -133,7 +133,7 @@ func writeResult(res interface{}, w http.ResponseWriter) error { } -func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Request) error { +func streamResults(eventChannel *events.EventChannel, w http.ResponseWriter, r *http.Request, m manager.Manager) error { cn, ok := w.(http.CloseNotifier) if !ok { return errors.New("could not access http.CloseNotifier") @@ -151,8 +151,10 @@ func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Re for { select { case <-cn.CloseNotify(): + glog.V(3).Infof("Received CloseNotify event") + m.CloseEventChannel(eventChannel.GetWatchId()) return nil - case ev := <-results: + case ev := <-eventChannel.GetChannel(): glog.V(3).Infof("Received event from watch channel in api: %v", ev) err := enc.Encode(ev) if err != nil { diff --git a/api/versions.go b/api/versions.go index 545aae71..57c0133b 100644 --- a/api/versions.go +++ b/api/versions.go @@ -20,7 +20,6 @@ import ( "strconv" "github.com/golang/glog" - "github.com/google/cadvisor/events" info "github.com/google/cadvisor/info/v1" "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/manager" @@ -274,12 +273,11 @@ func (self *version1_3) HandleRequest(requestType string, request []string, m ma } return writeResult(pastEvents, w) } - eventsChannel := make(chan *events.Event, 10) - err = m.WatchForEvents(query, eventsChannel) + eventChannel, err := m.WatchForEvents(query) if err != nil { return err } - return streamResults(eventsChannel, w, r) + return streamResults(eventChannel, w, r, m) default: return self.baseVersion.HandleRequest(requestType, request, m, w, r) } diff --git a/events/handler.go b/events/handler.go index eb586e53..cb6a56c5 100644 --- a/events/handler.go +++ b/events/handler.go @@ -20,6 +20,8 @@ import ( "strings" "sync" "time" + + "github.com/golang/glog" ) // EventManager is implemented by Events. It provides two ways to monitor @@ -27,13 +29,15 @@ import ( type EventManager interface { // Watch checks if events fed to it by the caller of AddEvent satisfy the // request and if so sends the event back to the caller on outChannel - WatchEvents(outChannel chan *Event, request *Request) error + WatchEvents(request *Request) (*EventChannel, error) // GetEvents() returns a slice of all events detected that have passed // the *Request object parameters to the caller GetEvents(request *Request) (EventSlice, error) // AddEvent allows the caller to add an event to an EventManager // object AddEvent(e *Event) error + // Removes a watch instance from the EventManager's watchers map + StopWatch(watch_id int) } // Events holds a slice of *Event objects with a potential field @@ -47,11 +51,14 @@ type events struct { // linked to different calls of WatchEvents. When new events are found that // satisfy the request of a given watch object in watchers, the event // is sent over the channel to that caller of WatchEvents - watchers []*watch + watchers map[int]*watch // lock that blocks eventlist from being accessed until a writer releases it eventsLock sync.RWMutex // lock that blocks watchers from being accessed until a writer releases it watcherLock sync.RWMutex + // receives notices when a watch event ends and needs to be removed from + // the watchers list + lastId int } // initialized by a call to WatchEvents(), a watch struct will then be added @@ -66,7 +73,10 @@ type watch struct { request *Request // a channel created by the caller through which events satisfying the // request are sent to the caller - channel chan *Event + eventChannel *EventChannel + // unique identifier of a watch that is used as a key in events' watchers + // map + id int } // typedef of a slice of Event pointers @@ -127,11 +137,23 @@ const ( type EventDataInterface interface { } +type EventChannel struct { + watchId int + channel chan *Event +} + +func NewEventChannel(watchId int) *EventChannel { + return &EventChannel{ + watchId: watchId, + channel: make(chan *Event, 10), + } +} + // returns a pointer to an initialized Events object func NewEventManager() *events { return &events{ eventlist: make(EventSlice, 0), - watchers: []*watch{}, + watchers: make(map[int]*watch), } } @@ -144,13 +166,21 @@ func NewRequest() *Request { } // returns a pointer to an initialized watch object -func newWatch(request *Request, outChannel chan *Event) *watch { +func newWatch(request *Request, eventChannel *EventChannel) *watch { return &watch{ - request: request, - channel: outChannel, + request: request, + eventChannel: eventChannel, } } +func (self *EventChannel) GetChannel() chan *Event { + return self.channel +} + +func (self *EventChannel) GetWatchId() int { + return self.watchId +} + // function necessary to implement the sort interface on the Events struct func (e EventSlice) Len() int { return len(e) @@ -234,16 +264,19 @@ func (self *events) GetEvents(request *Request) (EventSlice, error) { // Request object it is fed to the channel. The StartTime and EndTime of the watch // request should be uninitialized because the purpose is to watch indefinitely // for events that will happen in the future -func (self *events) WatchEvents(outChannel chan *Event, request *Request) error { +func (self *events) WatchEvents(request *Request) (*EventChannel, error) { if !request.StartTime.IsZero() || !request.EndTime.IsZero() { - return errors.New( + return nil, errors.New( "for a call to watch, request.StartTime and request.EndTime must be uninitialized") } - newWatcher := newWatch(request, outChannel) self.watcherLock.Lock() defer self.watcherLock.Unlock() - self.watchers = append(self.watchers, newWatcher) - return nil + new_id := self.lastId + 1 + returnEventChannel := NewEventChannel(new_id) + newWatcher := newWatch(request, returnEventChannel) + self.watchers[new_id] = newWatcher + self.lastId = new_id + return returnEventChannel, nil } // helper function to update the event manager's eventlist @@ -255,8 +288,6 @@ func (self *events) updateEventList(e *Event) { func (self *events) findValidWatchers(e *Event) []*watch { watchesToSend := make([]*watch, 0) - self.watcherLock.RLock() - defer self.watcherLock.RUnlock() for _, watcher := range self.watchers { watchRequest := watcher.request if checkIfEventSatisfiesRequest(watchRequest, e) { @@ -271,9 +302,23 @@ func (self *events) findValidWatchers(e *Event) []*watch { // held by the manager if it satisfies the request keys of the channels func (self *events) AddEvent(e *Event) error { self.updateEventList(e) + self.watcherLock.RLock() + defer self.watcherLock.RUnlock() watchesToSend := self.findValidWatchers(e) for _, watchObject := range watchesToSend { - watchObject.channel <- e + watchObject.eventChannel.GetChannel() <- e } return nil } + +// Removes a watch instance from the EventManager's watchers map +func (self *events) StopWatch(watchId int) { + self.watcherLock.Lock() + defer self.watcherLock.Unlock() + _, ok := self.watchers[watchId] + if !ok { + glog.Errorf("Could not find watcher instance %v", watchId) + } + close(self.watchers[watchId].eventChannel.GetChannel()) + delete(self.watchers, watchId) +} diff --git a/events/handler_test.go b/events/handler_test.go index c1bf4cac..7ff9fbef 100644 --- a/events/handler_test.go +++ b/events/handler_test.go @@ -17,6 +17,8 @@ package events import ( "testing" "time" + + "github.com/stretchr/testify/assert" ) func createOldTime(t *testing.T) time.Time { @@ -103,8 +105,8 @@ func TestCheckIfIsSubcontainer(t *testing.T) { func TestWatchEventsDetectsNewEvents(t *testing.T) { myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t) myRequest.EventType[TypeOom] = true - outChannel := make(chan *Event, 10) - myEventHolder.WatchEvents(outChannel, myRequest) + returnEventChannel, err := myEventHolder.WatchEvents(myRequest) + assert.Nil(t, err) myEventHolder.AddEvent(fakeEvent) myEventHolder.AddEvent(fakeEvent2) @@ -114,19 +116,17 @@ func TestWatchEventsDetectsNewEvents(t *testing.T) { time.Sleep(5 * time.Second) if time.Since(startTime) > (5 * time.Second) { t.Errorf("Took too long to receive all the events") - close(outChannel) } }() eventsFound := 0 go func() { - for event := range outChannel { + for event := range returnEventChannel.GetChannel() { eventsFound += 1 if eventsFound == 1 { ensureProperEventReturned(t, fakeEvent, event) } else if eventsFound == 2 { ensureProperEventReturned(t, fakeEvent2, event) - close(outChannel) break } } @@ -151,9 +151,7 @@ func TestGetEventsForOneEvent(t *testing.T) { myEventHolder.AddEvent(fakeEvent2) receivedEvents, err := myEventHolder.GetEvents(myRequest) - if err != nil { - t.Errorf("Failed to GetEvents: %v", err) - } + assert.Nil(t, err) checkNumberOfEvents(t, 1, receivedEvents.Len()) ensureProperEventReturned(t, fakeEvent2, receivedEvents[0]) } @@ -168,9 +166,7 @@ func TestGetEventsForTimePeriod(t *testing.T) { myEventHolder.AddEvent(fakeEvent2) receivedEvents, err := myEventHolder.GetEvents(myRequest) - if err != nil { - t.Errorf("Failed to GetEvents: %v", err) - } + assert.Nil(t, err) checkNumberOfEvents(t, 1, receivedEvents.Len()) ensureProperEventReturned(t, fakeEvent, receivedEvents[0]) @@ -183,8 +179,6 @@ func TestGetEventsForNoTypeRequested(t *testing.T) { myEventHolder.AddEvent(fakeEvent2) receivedEvents, err := myEventHolder.GetEvents(myRequest) - if err != nil { - t.Errorf("Failed to GetEvents: %v", err) - } + assert.Nil(t, err) checkNumberOfEvents(t, 0, receivedEvents.Len()) } diff --git a/manager/manager.go b/manager/manager.go index 74dab108..11580871 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -83,10 +83,12 @@ type Manager interface { GetFsInfo(label string) ([]v2.FsInfo, error) // Get events streamed through passedChannel that fit the request. - WatchForEvents(request *events.Request, passedChannel chan *events.Event) error + WatchForEvents(request *events.Request) (*events.EventChannel, error) // Get past events that have been detected and that fit the request. GetPastEvents(request *events.Request) (events.EventSlice, error) + + CloseEventChannel(watch_id int) } // New takes a memory storage and returns a new manager. @@ -891,11 +893,16 @@ func (self *manager) watchForNewOoms() error { } // can be called by the api which will take events returned on the channel -func (self *manager) WatchForEvents(request *events.Request, passedChannel chan *events.Event) error { - return self.eventHandler.WatchEvents(passedChannel, request) +func (self *manager) WatchForEvents(request *events.Request) (*events.EventChannel, error) { + return self.eventHandler.WatchEvents(request) } // can be called by the api which will return all events satisfying the request func (self *manager) GetPastEvents(request *events.Request) (events.EventSlice, error) { return self.eventHandler.GetEvents(request) } + +// called by the api when a client is no longer listening to the channel +func (self *manager) CloseEventChannel(watch_id int) { + self.eventHandler.StopWatch(watch_id) +}