Created an events handler library
This commit is contained in:
parent
4d252057ee
commit
97a6836f89
253
events/handler.go
Normal file
253
events/handler.go
Normal file
@ -0,0 +1,253 @@
|
||||
// Copyright 2015 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// EventManager is implemented by Events. It provides two ways to monitor
|
||||
// events and one way to add events
|
||||
type EventManager interface {
|
||||
// Watch checks if events fed to it by the caller of AddEvent satisfy the
|
||||
// request and if so sends the event back to the caller on outChannel
|
||||
WatchEvents(outChannel chan *Event, request *Request) error
|
||||
// GetEvents() returns a slice of all events detected that have passed
|
||||
// the *Request object parameters to the caller
|
||||
GetEvents(request *Request) (EventSlice, error)
|
||||
// AddEvent allows the caller to add an event to an EventManager
|
||||
// object
|
||||
AddEvent(e *Event) error
|
||||
}
|
||||
|
||||
// Events holds a slice of *Event objects with a potential field
|
||||
// that caps the number of events held. It is an implementation of the
|
||||
// EventManager interface
|
||||
type events struct {
|
||||
// eventlist holds the complete set of events found over an
|
||||
// EventManager events instantiation.
|
||||
eventlist EventSlice
|
||||
// the slice of watch pointers allows the EventManager access to channels
|
||||
// linked to different calls of WatchEvents. When new events are found that
|
||||
// satisfy the request of a given watch object in watchers, the event
|
||||
// is sent over the channel to that caller of WatchEvents
|
||||
watchers []*watch
|
||||
}
|
||||
|
||||
// initialized by a call to WatchEvents(), a watch struct will then be added
|
||||
// to the events slice of *watch objects. When AddEvent() finds an event that
|
||||
// satisfies the request parameter of a watch object in events.watchers,
|
||||
// it will send that event out over the watch object's channel. The caller that
|
||||
// called WatchEvents will receive the event over the channel provided to
|
||||
// WatchEvents
|
||||
type watch struct {
|
||||
// request specifies all the parameters that events sent through the
|
||||
// channel must satisfy. Specified by the creator of the watch object
|
||||
request *Request
|
||||
// a channel created by the caller through which events satisfying the
|
||||
// request are sent to the caller
|
||||
channel chan *Event
|
||||
}
|
||||
|
||||
// typedef of a slice of Event pointers
|
||||
type EventSlice []*Event
|
||||
|
||||
// Event contains information general to events such as the time at which they
|
||||
// occurred, their specific type, and the actual event. Event types are
|
||||
// differentiated by the EventType field of Event.
|
||||
type Event struct {
|
||||
// the absolute container name for which the event occurred
|
||||
ContainerName string
|
||||
// the time at which the event occurred
|
||||
Timestamp time.Time
|
||||
// the type of event. EventType is an enumerated type
|
||||
EventType EventType
|
||||
// the original event object and all of its extraneous data, ex. an
|
||||
// OomInstance
|
||||
EventData EventDataInterface
|
||||
}
|
||||
|
||||
// Request holds a set of parameters by which Event objects may be screened.
|
||||
// The caller may want events that occurred within a specific timeframe
|
||||
// or of a certain type, which may be specified in the *Request object
|
||||
// they pass to an EventManager function
|
||||
type Request struct {
|
||||
// events falling before StartTime do not satisfy the request. StartTime
|
||||
// must be left blank in calls to WatchEvents
|
||||
StartTime time.Time
|
||||
// events falling after EndTime do not satisfy the request. EndTime
|
||||
// must be left blank in calls to WatchEvents
|
||||
EndTime time.Time
|
||||
// EventType is a map that specifies the type(s) of events wanted
|
||||
EventType map[EventType]bool
|
||||
// allows the caller to put a limit on how many
|
||||
// events they receive. If there are more events than MaxEventsReturned
|
||||
// then the most chronologically recent events in the time period
|
||||
// specified are returned
|
||||
MaxEventsReturned int
|
||||
// the absolute container name for which the event occurred
|
||||
ContainerName string
|
||||
// if IncludeSubcontainers is false, only events occurring in the specific
|
||||
// container, and not the subcontainers, will be returned
|
||||
IncludeSubcontainers bool
|
||||
}
|
||||
|
||||
// EventType is an enumerated type which lists the categories under which
|
||||
// events may fall. The Event field EventType is populated by this enum.
|
||||
type EventType int
|
||||
|
||||
const (
|
||||
TypeOom EventType = iota
|
||||
TypeContainerCreation
|
||||
TypeContainerDeletion
|
||||
)
|
||||
|
||||
// a general interface which populates the Event field EventData. The actual
|
||||
// object, such as an OomInstance, is set as an Event's EventData
|
||||
type EventDataInterface interface {
|
||||
}
|
||||
|
||||
// returns a pointer to an initialized Events object
|
||||
func NewEventManager() *events {
|
||||
return &events{
|
||||
eventlist: make(EventSlice, 0),
|
||||
watchers: []*watch{},
|
||||
}
|
||||
}
|
||||
|
||||
// returns a pointer to an initialized Request object
|
||||
func NewRequest() *Request {
|
||||
return &Request{
|
||||
EventType: map[EventType]bool{},
|
||||
IncludeSubcontainers: false,
|
||||
}
|
||||
}
|
||||
|
||||
// returns a pointer to an initialized watch object
|
||||
func newWatch(request *Request, outChannel chan *Event) *watch {
|
||||
return &watch{
|
||||
request: request,
|
||||
channel: outChannel,
|
||||
}
|
||||
}
|
||||
|
||||
// function necessary to implement the sort interface on the Events struct
|
||||
func (e EventSlice) Len() int {
|
||||
return len(e)
|
||||
}
|
||||
|
||||
// function necessary to implement the sort interface on the Events struct
|
||||
func (e EventSlice) Swap(i, j int) {
|
||||
e[i], e[j] = e[j], e[i]
|
||||
}
|
||||
|
||||
// function necessary to implement the sort interface on the Events struct
|
||||
func (e EventSlice) Less(i, j int) bool {
|
||||
return e[i].Timestamp.Before(e[j].Timestamp)
|
||||
}
|
||||
|
||||
// sorts and returns up to the last MaxEventsReturned chronological elements
|
||||
func getMaxEventsReturned(request *Request, eSlice EventSlice) EventSlice {
|
||||
sort.Sort(eSlice)
|
||||
n := request.MaxEventsReturned
|
||||
if n >= eSlice.Len() || n <= 0 {
|
||||
return eSlice
|
||||
}
|
||||
return eSlice[eSlice.Len()-n:]
|
||||
}
|
||||
|
||||
// If the request wants all subcontainers, this returns if the request's
|
||||
// container path is a prefix of the event container path. Otherwise,
|
||||
// it checks that the container paths of the event and request are
|
||||
// equivalent
|
||||
func checkIfIsSubcontainer(request *Request, event *Event) bool {
|
||||
if request.IncludeSubcontainers == true {
|
||||
return strings.HasPrefix(event.ContainerName+"/", request.ContainerName+"/")
|
||||
}
|
||||
return event.ContainerName == request.ContainerName
|
||||
}
|
||||
|
||||
// determines if an event occurs within the time set in the request object and is the right type
|
||||
func checkIfEventSatisfiesRequest(request *Request, event *Event) bool {
|
||||
startTime := request.StartTime
|
||||
endTime := request.EndTime
|
||||
eventTime := event.Timestamp
|
||||
if !startTime.IsZero() {
|
||||
if startTime.After(eventTime) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if !endTime.IsZero() {
|
||||
if endTime.Before(eventTime) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if request.EventType[event.EventType] != true {
|
||||
return false
|
||||
}
|
||||
if request.ContainerName != "" {
|
||||
return checkIfIsSubcontainer(request, event)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// method of Events object that screens Event objects found in the eventlist
|
||||
// attribute and if they fit the parameters passed by the Request object,
|
||||
// adds it to a slice of *Event objects that is returned. If both MaxEventsReturned
|
||||
// and StartTime/EndTime are specified in the request object, then only
|
||||
// up to the most recent MaxEventsReturned events in that time range are returned.
|
||||
func (self *events) GetEvents(request *Request) (EventSlice, error) {
|
||||
returnEventList := EventSlice{}
|
||||
for _, e := range self.eventlist {
|
||||
if checkIfEventSatisfiesRequest(request, e) {
|
||||
returnEventList = append(returnEventList, e)
|
||||
}
|
||||
}
|
||||
returnEventList = getMaxEventsReturned(request, returnEventList)
|
||||
return returnEventList, nil
|
||||
}
|
||||
|
||||
// method of Events object that maintains an *Event channel passed by the user.
|
||||
// When an event is added by AddEvents that satisfies the parameters in the passed
|
||||
// Request object it is fed to the channel. The StartTime and EndTime of the watch
|
||||
// request should be uninitialized because the purpose is to watch indefinitely
|
||||
// for events that will happen in the future
|
||||
func (self *events) WatchEvents(outChannel chan *Event, request *Request) error {
|
||||
if !request.StartTime.IsZero() || !request.EndTime.IsZero() {
|
||||
return errors.New(
|
||||
"for a call to watch, request.StartTime and request.EndTime must be uninitialized")
|
||||
}
|
||||
newWatcher := newWatch(request, outChannel)
|
||||
self.watchers = append(self.watchers, newWatcher)
|
||||
return nil
|
||||
}
|
||||
|
||||
// method of Events object that adds the argument Event object to the
|
||||
// eventlist. It also feeds the event to a set of watch channels
|
||||
// held by the manager if it satisfies the request keys of the channels
|
||||
func (self *events) AddEvent(e *Event) error {
|
||||
self.eventlist = append(self.eventlist, e)
|
||||
for _, watcher := range self.watchers {
|
||||
watchRequest := watcher.request
|
||||
inChannel := watcher.channel
|
||||
if checkIfEventSatisfiesRequest(watchRequest, e) {
|
||||
inChannel <- e
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
190
events/handler_test.go
Normal file
190
events/handler_test.go
Normal file
@ -0,0 +1,190 @@
|
||||
// Copyright 2015 Google Inc. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func createOldTime(t *testing.T) time.Time {
|
||||
const longForm = "Jan 2, 2006 at 3:04pm (MST)"
|
||||
linetime, err := time.Parse(longForm, "Feb 3, 2013 at 7:54pm (PST)")
|
||||
if err != nil {
|
||||
t.Fatalf("could not format time.Time object")
|
||||
} else {
|
||||
return linetime
|
||||
}
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
// used to convert an OomInstance to an Event object
|
||||
func makeEvent(inTime time.Time, containerName string) *Event {
|
||||
return &Event{
|
||||
ContainerName: containerName,
|
||||
Timestamp: inTime,
|
||||
EventType: TypeOom,
|
||||
}
|
||||
}
|
||||
|
||||
// returns EventManager and Request to use in tests
|
||||
func initializeScenario(t *testing.T) (*events, *Request, *Event, *Event) {
|
||||
fakeEvent := makeEvent(createOldTime(t), "/")
|
||||
fakeEvent2 := makeEvent(time.Now(), "/")
|
||||
|
||||
return NewEventManager(), NewRequest(), fakeEvent, fakeEvent2
|
||||
}
|
||||
|
||||
func checkNumberOfEvents(t *testing.T, numEventsExpected int, numEventsReceived int) {
|
||||
if numEventsReceived != numEventsExpected {
|
||||
t.Fatalf("Expected to return %v events but received %v",
|
||||
numEventsExpected, numEventsReceived)
|
||||
}
|
||||
}
|
||||
|
||||
func ensureProperEventReturned(t *testing.T, expectedEvent *Event, eventObjectFound *Event) {
|
||||
if eventObjectFound != expectedEvent {
|
||||
t.Errorf("Expected to find test object %v but found a different object: %v",
|
||||
expectedEvent, eventObjectFound)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckIfIsSubcontainer(t *testing.T) {
|
||||
myRequest := NewRequest()
|
||||
myRequest.ContainerName = "/root"
|
||||
|
||||
sameContainerEvent := &Event{
|
||||
ContainerName: "/root",
|
||||
}
|
||||
subContainerEvent := &Event{
|
||||
ContainerName: "/root/subdir",
|
||||
}
|
||||
differentContainerEvent := &Event{
|
||||
ContainerName: "/root-completely-different-container",
|
||||
}
|
||||
|
||||
if !checkIfIsSubcontainer(myRequest, sameContainerEvent) {
|
||||
t.Errorf("should have found %v and %v had the same container name",
|
||||
myRequest, sameContainerEvent)
|
||||
}
|
||||
if checkIfIsSubcontainer(myRequest, subContainerEvent) {
|
||||
t.Errorf("should have found %v and %v had different containers",
|
||||
myRequest, subContainerEvent)
|
||||
}
|
||||
|
||||
myRequest.IncludeSubcontainers = true
|
||||
|
||||
if !checkIfIsSubcontainer(myRequest, sameContainerEvent) {
|
||||
t.Errorf("should have found %v and %v had the same container",
|
||||
myRequest.ContainerName, sameContainerEvent.ContainerName)
|
||||
}
|
||||
if !checkIfIsSubcontainer(myRequest, subContainerEvent) {
|
||||
t.Errorf("should have found %v was a subcontainer of %v",
|
||||
subContainerEvent.ContainerName, myRequest.ContainerName)
|
||||
}
|
||||
if checkIfIsSubcontainer(myRequest, differentContainerEvent) {
|
||||
t.Errorf("should have found %v and %v had different containers",
|
||||
myRequest.ContainerName, differentContainerEvent.ContainerName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchEventsDetectsNewEvents(t *testing.T) {
|
||||
myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t)
|
||||
myRequest.EventType[TypeOom] = true
|
||||
outChannel := make(chan *Event, 10)
|
||||
myEventHolder.WatchEvents(outChannel, myRequest)
|
||||
|
||||
myEventHolder.AddEvent(fakeEvent)
|
||||
myEventHolder.AddEvent(fakeEvent2)
|
||||
|
||||
startTime := time.Now()
|
||||
go func() {
|
||||
time.Sleep(5 * time.Second)
|
||||
if time.Since(startTime) > (5 * time.Second) {
|
||||
t.Errorf("Took too long to receive all the events")
|
||||
close(outChannel)
|
||||
}
|
||||
}()
|
||||
|
||||
eventsFound := 0
|
||||
go func() {
|
||||
for event := range outChannel {
|
||||
eventsFound += 1
|
||||
if eventsFound == 1 {
|
||||
ensureProperEventReturned(t, fakeEvent, event)
|
||||
} else if eventsFound == 2 {
|
||||
ensureProperEventReturned(t, fakeEvent2, event)
|
||||
close(outChannel)
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func TestAddEventAddsEventsToEventManager(t *testing.T) {
|
||||
myEventHolder, _, fakeEvent, _ := initializeScenario(t)
|
||||
|
||||
myEventHolder.AddEvent(fakeEvent)
|
||||
|
||||
checkNumberOfEvents(t, 1, myEventHolder.eventlist.Len())
|
||||
ensureProperEventReturned(t, fakeEvent, myEventHolder.eventlist[0])
|
||||
}
|
||||
|
||||
func TestGetEventsForOneEvent(t *testing.T) {
|
||||
myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t)
|
||||
myRequest.MaxEventsReturned = 1
|
||||
myRequest.EventType[TypeOom] = true
|
||||
|
||||
myEventHolder.AddEvent(fakeEvent)
|
||||
myEventHolder.AddEvent(fakeEvent2)
|
||||
|
||||
receivedEvents, err := myEventHolder.GetEvents(myRequest)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to GetEvents: %v", err)
|
||||
}
|
||||
checkNumberOfEvents(t, 1, receivedEvents.Len())
|
||||
ensureProperEventReturned(t, fakeEvent2, receivedEvents[0])
|
||||
}
|
||||
|
||||
func TestGetEventsForTimePeriod(t *testing.T) {
|
||||
myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t)
|
||||
myRequest.StartTime = createOldTime(t).Add(-1 * time.Second * 10)
|
||||
myRequest.EndTime = createOldTime(t).Add(time.Second * 10)
|
||||
myRequest.EventType[TypeOom] = true
|
||||
|
||||
myEventHolder.AddEvent(fakeEvent)
|
||||
myEventHolder.AddEvent(fakeEvent2)
|
||||
|
||||
receivedEvents, err := myEventHolder.GetEvents(myRequest)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to GetEvents: %v", err)
|
||||
}
|
||||
|
||||
checkNumberOfEvents(t, 1, receivedEvents.Len())
|
||||
ensureProperEventReturned(t, fakeEvent, receivedEvents[0])
|
||||
}
|
||||
|
||||
func TestGetEventsForNoTypeRequested(t *testing.T) {
|
||||
myEventHolder, myRequest, fakeEvent, fakeEvent2 := initializeScenario(t)
|
||||
|
||||
myEventHolder.AddEvent(fakeEvent)
|
||||
myEventHolder.AddEvent(fakeEvent2)
|
||||
|
||||
receivedEvents, err := myEventHolder.GetEvents(myRequest)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to GetEvents: %v", err)
|
||||
}
|
||||
checkNumberOfEvents(t, 0, receivedEvents.Len())
|
||||
}
|
Loading…
Reference in New Issue
Block a user