diff --git a/api/handler.go b/api/handler.go index 8f61c212..865c9170 100644 --- a/api/handler.go +++ b/api/handler.go @@ -17,6 +17,7 @@ package api import ( "encoding/json" + "errors" "fmt" "io" "net/http" @@ -50,7 +51,6 @@ func RegisterHandlers(m manager.Manager) error { http.Error(w, err.Error(), 500) } }) - return nil } @@ -115,7 +115,9 @@ func handleRequest(supportedApiVersions map[string]ApiVersion, m manager.Manager if len(requestArgs) > 0 && requestArgs[0] == "" { requestArgs = requestArgs[1:] } + return versionHandler.HandleRequest(requestType, requestArgs, m, w, r) + } func writeResult(res interface{}, w http.ResponseWriter) error { @@ -130,6 +132,36 @@ func writeResult(res interface{}, w http.ResponseWriter) error { } +func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Request) error { + cn, ok := w.(http.CloseNotifier) + if !ok { + return errors.New("could not access http.CloseNotifier") + } + flusher, ok := w.(http.Flusher) + if !ok { + return errors.New("could not access http.Flusher") + } + + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + enc := json.NewEncoder(w) + for { + select { + case <-cn.CloseNotify(): + glog.Infof("Client stopped listening") + return nil + case ev := <-results: + err := enc.Encode(ev) + if err != nil { + glog.Errorf("error encoding message %+v for result stream: %v", ev, err) + } + flusher.Flush() + } + } +} + func getContainerInfoRequest(body io.ReadCloser) (*info.ContainerInfoRequest, error) { var query info.ContainerInfoRequest diff --git a/api/versions.go b/api/versions.go index 072b4551..1553ddf9 100644 --- a/api/versions.go +++ b/api/versions.go @@ -262,30 +262,19 @@ func (self *version1_3) HandleRequest(requestType string, request []string, m ma return err } glog.V(2).Infof("Api - Events(%v)", query) - if eventsFromAllTime { - allEvents, err := m.GetPastEvents(query) + pastEvents, err := m.GetPastEvents(query) if err != nil { return err } - return writeResult(allEvents, w) - } else { - // every time URL is entered to watch, a channel is created here - eventChannel := make(chan *events.Event, 10) - err = m.WatchForEvents(query, eventChannel) - - defer close(eventChannel) - currentEventSet := make(events.EventSlice, 0) - for ev := range eventChannel { - // todo: implement write-as-received writeResult method - currentEventSet = append(currentEventSet, ev) - err = writeResult(currentEventSet, w) - if err != nil { - return err - } - } + return writeResult(pastEvents, w) } - return nil + eventsChannel := make(chan *events.Event, 10) + err = m.WatchForEvents(query, eventsChannel) + if err != nil { + return err + } + return streamResults(eventsChannel, w, r) default: return self.baseVersion.HandleRequest(requestType, request, m, w, r) }