diff --git a/client/client.go b/client/client.go index 113e1168..6313b746 100644 --- a/client/client.go +++ b/client/client.go @@ -59,7 +59,7 @@ func (self *Client) EventStaticInfo(name string) (einfo []*info.Event, err error // Streams all events that occur that satisfy the request into the channel // that is passed -func (self *Client) EventStreamingInfo(name string, einfo chan interface{}) (err error) { +func (self *Client) EventStreamingInfo(name string, einfo chan *info.Event) (err error) { u := self.eventsInfoUrl(name) if err = self.getEventStreamingData(u, einfo); err != nil { return @@ -191,7 +191,7 @@ func (self *Client) httpGetJsonData(data, postData interface{}, url, infoName st return nil } -func (self *Client) getEventStreamingData(url string, einfo chan interface{}) error { +func (self *Client) getEventStreamingData(url string, einfo chan *info.Event) error { req, err := http.NewRequest("GET", url, nil) if err != nil { return err @@ -205,7 +205,7 @@ func (self *Client) getEventStreamingData(url string, einfo chan interface{}) er } dec := json.NewDecoder(resp.Body) - var m interface{} + var m *info.Event for { err := dec.Decode(&m) if err != nil { diff --git a/client/clientexample/main.go b/client/clientexample/main.go index 1e1bd0dd..1d38666a 100644 --- a/client/clientexample/main.go +++ b/client/clientexample/main.go @@ -19,6 +19,7 @@ import ( "github.com/golang/glog" "github.com/google/cadvisor/client" + info "github.com/google/cadvisor/info/v1" ) func staticClientExample() { @@ -43,7 +44,7 @@ func streamingClientExample() { glog.Errorf("tried to make client and got error %v", err) return } - einfo := make(chan interface{}) + einfo := make(chan *info.Event) go func() { err = streamingClient.EventStreamingInfo("?oom_events=true", einfo) if err != nil { diff --git a/integration/framework/framework.go b/integration/framework/framework.go index 63d7d7a5..2dab54b8 100644 --- a/integration/framework/framework.go +++ b/integration/framework/framework.go @@ -111,11 +111,13 @@ type DockerActions interface { // Run(DockerRunArgs{Image: "busybox"}, "ping", "www.google.com") // -> docker run busybox ping www.google.com Run(args DockerRunArgs, cmd ...string) string + RunStress(args DockerRunArgs, cmd ...string) } type ShellActions interface { // Runs a specified command and arguments. Returns the stdout and stderr. Run(cmd string, args ...string) (string, string) + RunStress(cmd string, args ...string) (string, string) } type CadvisorActions interface { @@ -212,6 +214,8 @@ type DockerRunArgs struct { // Arguments to the Docker CLI. Args []string + + InnerArgs []string } // TODO(vmarmol): Use the Docker remote API. @@ -236,6 +240,21 @@ func (self dockerActions) Run(args DockerRunArgs, cmd ...string) string { return containerId } +func (self dockerActions) RunStress(args DockerRunArgs, cmd ...string) { + dockerCommand := append(append(append(append([]string{"docker", "run", "-m=4M"}, args.Args...), args.Image), args.InnerArgs...), cmd...) + + self.fm.Shell().RunStress("sudo", dockerCommand...) + + if len(args.Args) < 2 { + self.fm.T().Fatalf("need 2 arguments in DockerRunArgs %v to get the name but have %v", args, len(args.Args)) + } + containerId := args.Args[1] + + self.fm.cleanups = append(self.fm.cleanups, func() { + self.fm.Shell().Run("sudo", "docker", "rm", "-f", containerId) + }) +} + func (self shellActions) Run(command string, args ...string) (string, string) { var cmd *exec.Cmd if self.fm.Hostname().Host == "localhost" { @@ -257,6 +276,27 @@ func (self shellActions) Run(command string, args ...string) (string, string) { return stdout.String(), stderr.String() } +func (self shellActions) RunStress(command string, args ...string) (string, string) { + var cmd *exec.Cmd + if self.fm.Hostname().Host == "localhost" { + // Just run locally. + cmd = exec.Command(command, args...) + } else { + // We must SSH to the remote machine and run the command. + cmd = exec.Command("gcutil", append([]string{"ssh", self.fm.Hostname().GceInstanceName, command}, args...)...) + } + var stdout bytes.Buffer + var stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + self.fm.T().Logf("Ran %q %v in %q and received error: %q. Stdout: %q, Stderr: %s", command, args, self.fm.Hostname().Host, err, stdout.String(), stderr.String()) + return "", "" + } + return stdout.String(), stderr.String() +} + // Runs retryFunc until no error is returned. After dur time the last error is returned. // Note that the function does not timeout the execution of retryFunc when the limit is reached. func RetryForDuration(retryFunc func() error, dur time.Duration) error { diff --git a/integration/tests/api/event_test.go b/integration/tests/api/event_test.go new file mode 100644 index 00000000..5b9d00d9 --- /dev/null +++ b/integration/tests/api/event_test.go @@ -0,0 +1,75 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "testing" + "time" + + info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/integration/framework" + "github.com/google/cadvisor/utils/oomparser" + "github.com/stretchr/testify/require" +) + +func TestStreamingEventInformationIsReturned(t *testing.T) { + fm := framework.New(t) + defer fm.Cleanup() + + einfo := make(chan *info.Event) + go func() { + err := fm.Cadvisor().Client().EventStreamingInfo("?oom_events=true", einfo) + t.Logf("Started event streaming with error %v", err) + require.NoError(t, err) + }() + + containerName := fmt.Sprintf("test-basic-docker-container-%d", os.Getpid()) + fm.Docker().RunStress(framework.DockerRunArgs{ + Image: "bernardo/stress", + Args: []string{"--name", containerName}, + InnerArgs: []string{ + "--vm", strconv.FormatUint(4, 10), + "--vm-keep", + "-q", + "-m", strconv.FormatUint(1000, 10), + "--timeout", strconv.FormatUint(10, 10), + }, + }) + + timeout := make(chan bool, 1) + go func() { + time.Sleep(60 * time.Second) + timeout <- true + }() + + select { + case ev := <-einfo: + if ev.EventType == 0 { + marshaledData, err := json.Marshal(ev.EventData) + require.Nil(t, err) + var oomEvent *oomparser.OomInstance + err = json.Unmarshal(marshaledData, &oomEvent) + require.Nil(t, err) + require.True(t, oomEvent.ProcessName == "stress") + } + case <-timeout: + t.Fatal( + "timeout happened before event was detected") + } +}