From 9c68c949e2992df06879990eb39327acafe1b4d2 Mon Sep 17 00:00:00 2001 From: Katie Knister Date: Wed, 18 Feb 2015 16:14:16 -0800 Subject: [PATCH] added locks to make events library threadsafe --- events/handler.go | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/events/handler.go b/events/handler.go index 4a59ff0d..d7a57bbd 100644 --- a/events/handler.go +++ b/events/handler.go @@ -18,6 +18,7 @@ import ( "errors" "sort" "strings" + "sync" "time" ) @@ -47,6 +48,10 @@ type events struct { // satisfy the request of a given watch object in watchers, the event // is sent over the channel to that caller of WatchEvents watchers []*watch + // lock that blocks eventlist from being accessed until a writer releases it + eventsLock sync.RWMutex + // lock that blocks watchers from being accessed until a writer releases it + watcherLock sync.RWMutex } // initialized by a call to WatchEvents(), a watch struct will then be added @@ -213,6 +218,8 @@ func checkIfEventSatisfiesRequest(request *Request, event *Event) bool { // up to the most recent MaxEventsReturned events in that time range are returned. func (self *events) GetEvents(request *Request) (EventSlice, error) { returnEventList := EventSlice{} + self.eventsLock.RLock() + defer self.eventsLock.RUnlock() for _, e := range self.eventlist { if checkIfEventSatisfiesRequest(request, e) { returnEventList = append(returnEventList, e) @@ -233,21 +240,40 @@ func (self *events) WatchEvents(outChannel chan *Event, request *Request) error "for a call to watch, request.StartTime and request.EndTime must be uninitialized") } newWatcher := newWatch(request, outChannel) + self.watcherLock.Lock() + defer self.watcherLock.Unlock() self.watchers = append(self.watchers, newWatcher) return nil } +// helper function to update the event manager's eventlist +func (self *events) updateEventList(e *Event) { + self.eventsLock.Lock() + defer self.eventsLock.Unlock() + self.eventlist = append(self.eventlist, e) +} + +func (self *events) findValidWatchers(e *Event) []*watch { + watchesToSend := make([]*watch, 0) + self.watcherLock.RLock() + defer self.watcherLock.RUnlock() + for _, watcher := range self.watchers { + watchRequest := watcher.request + if checkIfEventSatisfiesRequest(watchRequest, e) { + watchesToSend = append(watchesToSend, watcher) + } + } + return watchesToSend +} + // method of Events object that adds the argument Event object to the // eventlist. It also feeds the event to a set of watch channels // held by the manager if it satisfies the request keys of the channels func (self *events) AddEvent(e *Event) error { - self.eventlist = append(self.eventlist, e) - for _, watcher := range self.watchers { - watchRequest := watcher.request - inChannel := watcher.channel - if checkIfEventSatisfiesRequest(watchRequest, e) { - inChannel <- e - } + self.updateEventList(e) + watchesToSend := self.findValidWatchers(e) + for _, watchObject := range watchesToSend { + watchObject.channel <- e } return nil }