Created a way to remove unused channels from an eventHandler's watchers list

This commit is contained in:
Katie Knister 2015-03-11 11:32:14 -07:00
parent bfaf70b255
commit 3c7e67991d
5 changed files with 84 additions and 38 deletions

View File

@ -133,7 +133,7 @@ func writeResult(res interface{}, w http.ResponseWriter) error {
} }
func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Request) error { func streamResults(eventChannel *events.EventChannel, w http.ResponseWriter, r *http.Request, m manager.Manager) error {
cn, ok := w.(http.CloseNotifier) cn, ok := w.(http.CloseNotifier)
if !ok { if !ok {
return errors.New("could not access http.CloseNotifier") return errors.New("could not access http.CloseNotifier")
@ -151,8 +151,10 @@ func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Re
for { for {
select { select {
case <-cn.CloseNotify(): case <-cn.CloseNotify():
glog.V(3).Infof("Received CloseNotify event")
m.CloseEventChannel(eventChannel.GetWatchId())
return nil return nil
case ev := <-results: case ev := <-eventChannel.GetChannel():
glog.V(3).Infof("Received event from watch channel in api: %v", ev) glog.V(3).Infof("Received event from watch channel in api: %v", ev)
err := enc.Encode(ev) err := enc.Encode(ev)
if err != nil { if err != nil {

View File

@ -20,7 +20,6 @@ import (
"strconv" "strconv"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/google/cadvisor/events"
info "github.com/google/cadvisor/info/v1" info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/info/v2"
"github.com/google/cadvisor/manager" "github.com/google/cadvisor/manager"
@ -274,12 +273,11 @@ func (self *version1_3) HandleRequest(requestType string, request []string, m ma
} }
return writeResult(pastEvents, w) return writeResult(pastEvents, w)
} }
eventsChannel := make(chan *events.Event, 10) eventChannel, err := m.WatchForEvents(query)
err = m.WatchForEvents(query, eventsChannel)
if err != nil { if err != nil {
return err return err
} }
return streamResults(eventsChannel, w, r) return streamResults(eventChannel, w, r, m)
default: default:
return self.baseVersion.HandleRequest(requestType, request, m, w, r) return self.baseVersion.HandleRequest(requestType, request, m, w, r)
} }

View File

@ -20,6 +20,8 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/golang/glog"
) )
// EventManager is implemented by Events. It provides two ways to monitor // EventManager is implemented by Events. It provides two ways to monitor
@ -27,13 +29,15 @@ import (
type EventManager interface { type EventManager interface {
// Watch checks if events fed to it by the caller of AddEvent satisfy the // Watch checks if events fed to it by the caller of AddEvent satisfy the
// request and if so sends the event back to the caller on outChannel // request and if so sends the event back to the caller on outChannel
WatchEvents(outChannel chan *Event, request *Request) error WatchEvents(request *Request) (*EventChannel, error)
// GetEvents() returns a slice of all events detected that have passed // GetEvents() returns a slice of all events detected that have passed
// the *Request object parameters to the caller // the *Request object parameters to the caller
GetEvents(request *Request) (EventSlice, error) GetEvents(request *Request) (EventSlice, error)
// AddEvent allows the caller to add an event to an EventManager // AddEvent allows the caller to add an event to an EventManager
// object // object
AddEvent(e *Event) error AddEvent(e *Event) error
// Removes a watch instance from the EventManager's watchers map
StopWatch(watch_id int)
} }
// Events holds a slice of *Event objects with a potential field // Events holds a slice of *Event objects with a potential field
@ -47,11 +51,14 @@ type events struct {
// linked to different calls of WatchEvents. When new events are found that // linked to different calls of WatchEvents. When new events are found that
// satisfy the request of a given watch object in watchers, the event // satisfy the request of a given watch object in watchers, the event
// is sent over the channel to that caller of WatchEvents // is sent over the channel to that caller of WatchEvents
watchers []*watch watchers map[int]*watch
// lock that blocks eventlist from being accessed until a writer releases it // lock that blocks eventlist from being accessed until a writer releases it
eventsLock sync.RWMutex eventsLock sync.RWMutex
// lock that blocks watchers from being accessed until a writer releases it // lock that blocks watchers from being accessed until a writer releases it
watcherLock sync.RWMutex watcherLock sync.RWMutex
// receives notices when a watch event ends and needs to be removed from
// the watchers list
lastId int
} }
// initialized by a call to WatchEvents(), a watch struct will then be added // initialized by a call to WatchEvents(), a watch struct will then be added
@ -66,7 +73,10 @@ type watch struct {
request *Request request *Request
// a channel created by the caller through which events satisfying the // a channel created by the caller through which events satisfying the
// request are sent to the caller // request are sent to the caller
channel chan *Event eventChannel *EventChannel
// unique identifier of a watch that is used as a key in events' watchers
// map
id int
} }
// typedef of a slice of Event pointers // typedef of a slice of Event pointers
@ -127,11 +137,23 @@ const (
type EventDataInterface interface { type EventDataInterface interface {
} }
type EventChannel struct {
watchId int
channel chan *Event
}
func NewEventChannel(watchId int) *EventChannel {
return &EventChannel{
watchId: watchId,
channel: make(chan *Event, 10),
}
}
// returns a pointer to an initialized Events object // returns a pointer to an initialized Events object
func NewEventManager() *events { func NewEventManager() *events {
return &events{ return &events{
eventlist: make(EventSlice, 0), eventlist: make(EventSlice, 0),
watchers: []*watch{}, watchers: make(map[int]*watch),
} }
} }
@ -144,13 +166,21 @@ func NewRequest() *Request {
} }
// returns a pointer to an initialized watch object // returns a pointer to an initialized watch object
func newWatch(request *Request, outChannel chan *Event) *watch { func newWatch(request *Request, eventChannel *EventChannel) *watch {
return &watch{ return &watch{
request: request, request: request,
channel: outChannel, eventChannel: eventChannel,
} }
} }
func (self *EventChannel) GetChannel() chan *Event {
return self.channel
}
func (self *EventChannel) GetWatchId() int {
return self.watchId
}
// function necessary to implement the sort interface on the Events struct // function necessary to implement the sort interface on the Events struct
func (e EventSlice) Len() int { func (e EventSlice) Len() int {
return len(e) return len(e)
@ -234,16 +264,19 @@ func (self *events) GetEvents(request *Request) (EventSlice, error) {
// Request object it is fed to the channel. The StartTime and EndTime of the watch // Request object it is fed to the channel. The StartTime and EndTime of the watch
// request should be uninitialized because the purpose is to watch indefinitely // request should be uninitialized because the purpose is to watch indefinitely
// for events that will happen in the future // for events that will happen in the future
func (self *events) WatchEvents(outChannel chan *Event, request *Request) error { func (self *events) WatchEvents(request *Request) (*EventChannel, error) {
if !request.StartTime.IsZero() || !request.EndTime.IsZero() { if !request.StartTime.IsZero() || !request.EndTime.IsZero() {
return errors.New( return nil, errors.New(
"for a call to watch, request.StartTime and request.EndTime must be uninitialized") "for a call to watch, request.StartTime and request.EndTime must be uninitialized")
} }
newWatcher := newWatch(request, outChannel)
self.watcherLock.Lock() self.watcherLock.Lock()
defer self.watcherLock.Unlock() defer self.watcherLock.Unlock()
self.watchers = append(self.watchers, newWatcher) new_id := self.lastId + 1
return nil returnEventChannel := NewEventChannel(new_id)
newWatcher := newWatch(request, returnEventChannel)
self.watchers[new_id] = newWatcher
self.lastId = new_id
return returnEventChannel, nil
} }
// helper function to update the event manager's eventlist // helper function to update the event manager's eventlist
@ -255,8 +288,6 @@ func (self *events) updateEventList(e *Event) {
func (self *events) findValidWatchers(e *Event) []*watch { func (self *events) findValidWatchers(e *Event) []*watch {
watchesToSend := make([]*watch, 0) watchesToSend := make([]*watch, 0)
self.watcherLock.RLock()
defer self.watcherLock.RUnlock()
for _, watcher := range self.watchers { for _, watcher := range self.watchers {
watchRequest := watcher.request watchRequest := watcher.request
if checkIfEventSatisfiesRequest(watchRequest, e) { if checkIfEventSatisfiesRequest(watchRequest, e) {
@ -271,9 +302,23 @@ func (self *events) findValidWatchers(e *Event) []*watch {
// held by the manager if it satisfies the request keys of the channels // held by the manager if it satisfies the request keys of the channels
func (self *events) AddEvent(e *Event) error { func (self *events) AddEvent(e *Event) error {
self.updateEventList(e) self.updateEventList(e)
self.watcherLock.RLock()
defer self.watcherLock.RUnlock()
watchesToSend := self.findValidWatchers(e) watchesToSend := self.findValidWatchers(e)
for _, watchObject := range watchesToSend { for _, watchObject := range watchesToSend {
watchObject.channel <- e watchObject.eventChannel.GetChannel() <- e
} }
return nil return nil
} }
// Removes a watch instance from the EventManager's watchers map
func (self *events) StopWatch(watchId int) {
self.watcherLock.Lock()
defer self.watcherLock.Unlock()
_, ok := self.watchers[watchId]
if !ok {
glog.Errorf("Could not find watcher instance %v", watchId)
}
close(self.watchers[watchId].eventChannel.GetChannel())
delete(self.watchers, watchId)
}

View File

@ -17,6 +17,8 @@ package events
import ( import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
) )
func createOldTime(t *testing.T) time.Time { func createOldTime(t *testing.T) time.Time {
@ -103,8 +105,8 @@ func TestCheckIfIsSubcontainer(t *testing.T) {
func TestWatchEventsDetectsNewEvents(t *testing.T) { func TestWatchEventsDetectsNewEvents(t *testing.T) {
myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t) myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t)
myRequest.EventType[TypeOom] = true myRequest.EventType[TypeOom] = true
outChannel := make(chan *Event, 10) returnEventChannel, err := myEventHolder.WatchEvents(myRequest)
myEventHolder.WatchEvents(outChannel, myRequest) assert.Nil(t, err)
myEventHolder.AddEvent(fakeEvent) myEventHolder.AddEvent(fakeEvent)
myEventHolder.AddEvent(fakeEvent2) myEventHolder.AddEvent(fakeEvent2)
@ -114,19 +116,17 @@ func TestWatchEventsDetectsNewEvents(t *testing.T) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
if time.Since(startTime) > (5 * time.Second) { if time.Since(startTime) > (5 * time.Second) {
t.Errorf("Took too long to receive all the events") t.Errorf("Took too long to receive all the events")
close(outChannel)
} }
}() }()
eventsFound := 0 eventsFound := 0
go func() { go func() {
for event := range outChannel { for event := range returnEventChannel.GetChannel() {
eventsFound += 1 eventsFound += 1
if eventsFound == 1 { if eventsFound == 1 {
ensureProperEventReturned(t, fakeEvent, event) ensureProperEventReturned(t, fakeEvent, event)
} else if eventsFound == 2 { } else if eventsFound == 2 {
ensureProperEventReturned(t, fakeEvent2, event) ensureProperEventReturned(t, fakeEvent2, event)
close(outChannel)
break break
} }
} }
@ -151,9 +151,7 @@ func TestGetEventsForOneEvent(t *testing.T) {
myEventHolder.AddEvent(fakeEvent2) myEventHolder.AddEvent(fakeEvent2)
receivedEvents, err := myEventHolder.GetEvents(myRequest) receivedEvents, err := myEventHolder.GetEvents(myRequest)
if err != nil { assert.Nil(t, err)
t.Errorf("Failed to GetEvents: %v", err)
}
checkNumberOfEvents(t, 1, receivedEvents.Len()) checkNumberOfEvents(t, 1, receivedEvents.Len())
ensureProperEventReturned(t, fakeEvent2, receivedEvents[0]) ensureProperEventReturned(t, fakeEvent2, receivedEvents[0])
} }
@ -168,9 +166,7 @@ func TestGetEventsForTimePeriod(t *testing.T) {
myEventHolder.AddEvent(fakeEvent2) myEventHolder.AddEvent(fakeEvent2)
receivedEvents, err := myEventHolder.GetEvents(myRequest) receivedEvents, err := myEventHolder.GetEvents(myRequest)
if err != nil { assert.Nil(t, err)
t.Errorf("Failed to GetEvents: %v", err)
}
checkNumberOfEvents(t, 1, receivedEvents.Len()) checkNumberOfEvents(t, 1, receivedEvents.Len())
ensureProperEventReturned(t, fakeEvent, receivedEvents[0]) ensureProperEventReturned(t, fakeEvent, receivedEvents[0])
@ -183,8 +179,6 @@ func TestGetEventsForNoTypeRequested(t *testing.T) {
myEventHolder.AddEvent(fakeEvent2) myEventHolder.AddEvent(fakeEvent2)
receivedEvents, err := myEventHolder.GetEvents(myRequest) receivedEvents, err := myEventHolder.GetEvents(myRequest)
if err != nil { assert.Nil(t, err)
t.Errorf("Failed to GetEvents: %v", err)
}
checkNumberOfEvents(t, 0, receivedEvents.Len()) checkNumberOfEvents(t, 0, receivedEvents.Len())
} }

View File

@ -83,10 +83,12 @@ type Manager interface {
GetFsInfo(label string) ([]v2.FsInfo, error) GetFsInfo(label string) ([]v2.FsInfo, error)
// Get events streamed through passedChannel that fit the request. // Get events streamed through passedChannel that fit the request.
WatchForEvents(request *events.Request, passedChannel chan *events.Event) error WatchForEvents(request *events.Request) (*events.EventChannel, error)
// Get past events that have been detected and that fit the request. // Get past events that have been detected and that fit the request.
GetPastEvents(request *events.Request) (events.EventSlice, error) GetPastEvents(request *events.Request) (events.EventSlice, error)
CloseEventChannel(watch_id int)
} }
// New takes a memory storage and returns a new manager. // New takes a memory storage and returns a new manager.
@ -891,11 +893,16 @@ func (self *manager) watchForNewOoms() error {
} }
// can be called by the api which will take events returned on the channel // can be called by the api which will take events returned on the channel
func (self *manager) WatchForEvents(request *events.Request, passedChannel chan *events.Event) error { func (self *manager) WatchForEvents(request *events.Request) (*events.EventChannel, error) {
return self.eventHandler.WatchEvents(passedChannel, request) return self.eventHandler.WatchEvents(request)
} }
// can be called by the api which will return all events satisfying the request // can be called by the api which will return all events satisfying the request
func (self *manager) GetPastEvents(request *events.Request) (events.EventSlice, error) { func (self *manager) GetPastEvents(request *events.Request) (events.EventSlice, error) {
return self.eventHandler.GetEvents(request) return self.eventHandler.GetEvents(request)
} }
// called by the api when a client is no longer listening to the channel
func (self *manager) CloseEventChannel(watch_id int) {
self.eventHandler.StopWatch(watch_id)
}