Merge pull request #569 from kateknister/apiBranch

Added streaming to api
This commit is contained in:
Victor Marmol 2015-03-09 18:29:31 -07:00
commit 4d9cdc0426
2 changed files with 41 additions and 20 deletions

View File

@ -17,6 +17,7 @@ package api
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
@ -50,7 +51,6 @@ func RegisterHandlers(m manager.Manager) error {
http.Error(w, err.Error(), 500)
}
})
return nil
}
@ -115,7 +115,9 @@ func handleRequest(supportedApiVersions map[string]ApiVersion, m manager.Manager
if len(requestArgs) > 0 && requestArgs[0] == "" {
requestArgs = requestArgs[1:]
}
return versionHandler.HandleRequest(requestType, requestArgs, m, w, r)
}
func writeResult(res interface{}, w http.ResponseWriter) error {
@ -130,6 +132,36 @@ func writeResult(res interface{}, w http.ResponseWriter) error {
}
func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Request) error {
cn, ok := w.(http.CloseNotifier)
if !ok {
return errors.New("could not access http.CloseNotifier")
}
flusher, ok := w.(http.Flusher)
if !ok {
return errors.New("could not access http.Flusher")
}
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
enc := json.NewEncoder(w)
for {
select {
case <-cn.CloseNotify():
glog.Infof("Client stopped listening")
return nil
case ev := <-results:
err := enc.Encode(ev)
if err != nil {
glog.Errorf("error encoding message %+v for result stream: %v", ev, err)
}
flusher.Flush()
}
}
}
func getContainerInfoRequest(body io.ReadCloser) (*info.ContainerInfoRequest, error) {
var query info.ContainerInfoRequest

View File

@ -262,30 +262,19 @@ func (self *version1_3) HandleRequest(requestType string, request []string, m ma
return err
}
glog.V(2).Infof("Api - Events(%v)", query)
if eventsFromAllTime {
allEvents, err := m.GetPastEvents(query)
pastEvents, err := m.GetPastEvents(query)
if err != nil {
return err
}
return writeResult(allEvents, w)
} else {
// every time URL is entered to watch, a channel is created here
eventChannel := make(chan *events.Event, 10)
err = m.WatchForEvents(query, eventChannel)
defer close(eventChannel)
currentEventSet := make(events.EventSlice, 0)
for ev := range eventChannel {
// todo: implement write-as-received writeResult method
currentEventSet = append(currentEventSet, ev)
err = writeResult(currentEventSet, w)
if err != nil {
return err
}
}
return writeResult(pastEvents, w)
}
return nil
eventsChannel := make(chan *events.Event, 10)
err = m.WatchForEvents(query, eventsChannel)
if err != nil {
return err
}
return streamResults(eventsChannel, w, r)
default:
return self.baseVersion.HandleRequest(requestType, request, m, w, r)
}