added locks to make events library threadsafe

This commit is contained in:
Katie Knister 2015-02-18 16:14:16 -08:00
parent d3d4bb8dbc
commit 9c68c949e2

View File

@ -18,6 +18,7 @@ import (
"errors" "errors"
"sort" "sort"
"strings" "strings"
"sync"
"time" "time"
) )
@ -47,6 +48,10 @@ type events struct {
// satisfy the request of a given watch object in watchers, the event // satisfy the request of a given watch object in watchers, the event
// is sent over the channel to that caller of WatchEvents // is sent over the channel to that caller of WatchEvents
watchers []*watch watchers []*watch
// lock that blocks eventlist from being accessed until a writer releases it
eventsLock sync.RWMutex
// lock that blocks watchers from being accessed until a writer releases it
watcherLock sync.RWMutex
} }
// initialized by a call to WatchEvents(), a watch struct will then be added // initialized by a call to WatchEvents(), a watch struct will then be added
@ -213,6 +218,8 @@ func checkIfEventSatisfiesRequest(request *Request, event *Event) bool {
// up to the most recent MaxEventsReturned events in that time range are returned. // up to the most recent MaxEventsReturned events in that time range are returned.
func (self *events) GetEvents(request *Request) (EventSlice, error) { func (self *events) GetEvents(request *Request) (EventSlice, error) {
returnEventList := EventSlice{} returnEventList := EventSlice{}
self.eventsLock.RLock()
defer self.eventsLock.RUnlock()
for _, e := range self.eventlist { for _, e := range self.eventlist {
if checkIfEventSatisfiesRequest(request, e) { if checkIfEventSatisfiesRequest(request, e) {
returnEventList = append(returnEventList, e) returnEventList = append(returnEventList, e)
@ -233,21 +240,40 @@ func (self *events) WatchEvents(outChannel chan *Event, request *Request) error
"for a call to watch, request.StartTime and request.EndTime must be uninitialized") "for a call to watch, request.StartTime and request.EndTime must be uninitialized")
} }
newWatcher := newWatch(request, outChannel) newWatcher := newWatch(request, outChannel)
self.watcherLock.Lock()
defer self.watcherLock.Unlock()
self.watchers = append(self.watchers, newWatcher) self.watchers = append(self.watchers, newWatcher)
return nil return nil
} }
// helper function to update the event manager's eventlist
func (self *events) updateEventList(e *Event) {
self.eventsLock.Lock()
defer self.eventsLock.Unlock()
self.eventlist = append(self.eventlist, e)
}
func (self *events) findValidWatchers(e *Event) []*watch {
watchesToSend := make([]*watch, 0)
self.watcherLock.RLock()
defer self.watcherLock.RUnlock()
for _, watcher := range self.watchers {
watchRequest := watcher.request
if checkIfEventSatisfiesRequest(watchRequest, e) {
watchesToSend = append(watchesToSend, watcher)
}
}
return watchesToSend
}
// method of Events object that adds the argument Event object to the // method of Events object that adds the argument Event object to the
// eventlist. It also feeds the event to a set of watch channels // eventlist. It also feeds the event to a set of watch channels
// held by the manager if it satisfies the request keys of the channels // held by the manager if it satisfies the request keys of the channels
func (self *events) AddEvent(e *Event) error { func (self *events) AddEvent(e *Event) error {
self.eventlist = append(self.eventlist, e) self.updateEventList(e)
for _, watcher := range self.watchers { watchesToSend := self.findValidWatchers(e)
watchRequest := watcher.request for _, watchObject := range watchesToSend {
inChannel := watcher.channel watchObject.channel <- e
if checkIfEventSatisfiesRequest(watchRequest, e) {
inChannel <- e
}
} }
return nil return nil
} }