Improvements to events integration test
This commit is contained in:
parent
a1c688751c
commit
ee4bdc2698
@ -151,7 +151,7 @@ func streamResults(eventChannel *events.EventChannel, w http.ResponseWriter, r *
|
||||
for {
|
||||
select {
|
||||
case <-cn.CloseNotify():
|
||||
glog.V(3).Infof("Received CloseNotify event")
|
||||
glog.V(3).Infof("Received CloseNotify event. About to return from api/handler:streamResults")
|
||||
m.CloseEventChannel(eventChannel.GetWatchId())
|
||||
return nil
|
||||
case ev := <-eventChannel.GetChannel():
|
||||
|
@ -205,14 +205,16 @@ func (self *Client) getEventStreamingData(url string, einfo chan *info.Event) er
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(resp.Body)
|
||||
var m *info.Event
|
||||
var m *info.Event = &info.Event{}
|
||||
for {
|
||||
err := dec.Decode(&m)
|
||||
err := dec.Decode(m)
|
||||
glog.V(3).Infof("received m as %v", m)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return err
|
||||
// if called without &stream=true will not be able to parse event and will trigger fatal
|
||||
glog.Fatalf("Received error %v", err)
|
||||
}
|
||||
einfo <- m
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ func staticClientExample() {
|
||||
glog.Errorf("tried to make client and got error %v", err)
|
||||
return
|
||||
}
|
||||
einfo, err := staticClient.EventStaticInfo("?oom_events=true&historical=true")
|
||||
einfo, err := staticClient.EventStaticInfo("?oom_events=true")
|
||||
if err != nil {
|
||||
glog.Errorf("got error retrieving event info: %v", err)
|
||||
return
|
||||
@ -38,7 +38,7 @@ func staticClientExample() {
|
||||
}
|
||||
}
|
||||
|
||||
func streamingClientExample() {
|
||||
func streamingClientExample(url string) {
|
||||
streamingClient, err := client.NewClient("http://localhost:8080/")
|
||||
if err != nil {
|
||||
glog.Errorf("tried to make client and got error %v", err)
|
||||
@ -46,14 +46,14 @@ func streamingClientExample() {
|
||||
}
|
||||
einfo := make(chan *info.Event)
|
||||
go func() {
|
||||
err = streamingClient.EventStreamingInfo("?oom_events=true", einfo)
|
||||
err = streamingClient.EventStreamingInfo(url, einfo)
|
||||
if err != nil {
|
||||
glog.Errorf("got error retrieving event info: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
for ev := range einfo {
|
||||
glog.Infof("streaming einfo: %v", ev)
|
||||
glog.Infof("streaming einfo: %v\n", ev)
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,5 +61,5 @@ func streamingClientExample() {
|
||||
func main() {
|
||||
flag.Parse()
|
||||
staticClientExample()
|
||||
streamingClientExample()
|
||||
streamingClientExample("?creation_events=true&stream=true")
|
||||
}
|
||||
|
@ -270,6 +270,7 @@ func (self *events) AddEvent(e *info.Event) error {
|
||||
for _, watchObject := range watchesToSend {
|
||||
watchObject.eventChannel.GetChannel() <- e
|
||||
}
|
||||
glog.V(1).Infof("Added event %v", e)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -111,7 +111,7 @@ type DockerActions interface {
|
||||
// Run(DockerRunArgs{Image: "busybox"}, "ping", "www.google.com")
|
||||
// -> docker run busybox ping www.google.com
|
||||
Run(args DockerRunArgs, cmd ...string) string
|
||||
RunStress(args DockerRunArgs, cmd ...string)
|
||||
RunStress(args DockerRunArgs, cmd ...string) string
|
||||
}
|
||||
|
||||
type ShellActions interface {
|
||||
@ -240,19 +240,22 @@ func (self dockerActions) Run(args DockerRunArgs, cmd ...string) string {
|
||||
return containerId
|
||||
}
|
||||
|
||||
func (self dockerActions) RunStress(args DockerRunArgs, cmd ...string) {
|
||||
dockerCommand := append(append(append(append([]string{"docker", "run", "-m=4M"}, args.Args...), args.Image), args.InnerArgs...), cmd...)
|
||||
func (self dockerActions) RunStress(args DockerRunArgs, cmd ...string) string {
|
||||
dockerCommand := append(append(append(append([]string{"docker", "run", "-m=4M", "-d", "-t", "-i"}, args.Args...), args.Image), args.InnerArgs...), cmd...)
|
||||
|
||||
self.fm.Shell().RunStress("sudo", dockerCommand...)
|
||||
output, _ := self.fm.Shell().RunStress("sudo", dockerCommand...)
|
||||
|
||||
if len(args.Args) < 2 {
|
||||
self.fm.T().Fatalf("need 2 arguments in DockerRunArgs %v to get the name but have %v", args, len(args.Args))
|
||||
// The last line is the container ID.
|
||||
if len(output) < 1 {
|
||||
self.fm.T().Fatalf("need 1 arguments in output %v to get the name but have %v", output, len(output))
|
||||
}
|
||||
containerId := args.Args[1]
|
||||
elements := strings.Fields(output)
|
||||
containerId := elements[len(elements)-1]
|
||||
|
||||
self.fm.cleanups = append(self.fm.cleanups, func() {
|
||||
self.fm.Shell().Run("sudo", "docker", "rm", "-f", containerId)
|
||||
})
|
||||
return containerId
|
||||
}
|
||||
|
||||
func (self shellActions) Run(command string, args ...string) (string, string) {
|
||||
@ -292,7 +295,7 @@ func (self shellActions) RunStress(command string, args ...string) (string, stri
|
||||
err := cmd.Run()
|
||||
if err != nil {
|
||||
self.fm.T().Logf("Ran %q %v in %q and received error: %q. Stdout: %q, Stderr: %s", command, args, self.fm.Hostname().Host, err, stdout.String(), stderr.String())
|
||||
return "", ""
|
||||
return stdout.String(), stderr.String()
|
||||
}
|
||||
return stdout.String(), stderr.String()
|
||||
}
|
||||
|
@ -15,16 +15,15 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
info "github.com/google/cadvisor/info/v1"
|
||||
"github.com/google/cadvisor/integration/framework"
|
||||
"github.com/google/cadvisor/utils/oomparser"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -34,13 +33,13 @@ func TestStreamingEventInformationIsReturned(t *testing.T) {
|
||||
|
||||
einfo := make(chan *info.Event)
|
||||
go func() {
|
||||
err := fm.Cadvisor().Client().EventStreamingInfo("?oom_events=true", einfo)
|
||||
t.Logf("Started event streaming with error %v", err)
|
||||
err := fm.Cadvisor().Client().EventStreamingInfo("?oom_events=true&stream=true", einfo)
|
||||
t.Logf("tried to stream events but got error %v", err)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
containerName := fmt.Sprintf("test-basic-docker-container-%d", os.Getpid())
|
||||
fm.Docker().RunStress(framework.DockerRunArgs{
|
||||
containerId := fm.Docker().RunStress(framework.DockerRunArgs{
|
||||
Image: "bernardo/stress",
|
||||
Args: []string{"--name", containerName},
|
||||
InnerArgs: []string{
|
||||
@ -52,24 +51,49 @@ func TestStreamingEventInformationIsReturned(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
waitForStreamingEvent(containerId, "?deletion_events=true&stream=true", t, fm, info.EventContainerDeletion)
|
||||
waitForStaticEvent(containerId, "?creation_events=true", t, fm, info.EventContainerCreation)
|
||||
}
|
||||
|
||||
func waitForStreamingEvent(containerId string, urlRequest string, t *testing.T, fm framework.Framework, typeEvent info.EventType) {
|
||||
timeout := make(chan bool, 1)
|
||||
go func() {
|
||||
time.Sleep(60 * time.Second)
|
||||
timeout <- true
|
||||
}()
|
||||
|
||||
select {
|
||||
case ev := <-einfo:
|
||||
if ev.EventType == 0 {
|
||||
marshaledData, err := json.Marshal(ev.EventData)
|
||||
require.Nil(t, err)
|
||||
var oomEvent *oomparser.OomInstance
|
||||
err = json.Unmarshal(marshaledData, &oomEvent)
|
||||
require.Nil(t, err)
|
||||
require.True(t, oomEvent.ProcessName == "stress")
|
||||
einfo := make(chan *info.Event)
|
||||
go func() {
|
||||
err := fm.Cadvisor().Client().EventStreamingInfo(urlRequest, einfo)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case ev := <-einfo:
|
||||
if ev.EventType == typeEvent {
|
||||
if strings.Contains(strings.Trim(ev.ContainerName, "/ "), strings.Trim(containerId, "/ ")) {
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-timeout:
|
||||
t.Fatal(
|
||||
"timeout happened before destruction event was detected")
|
||||
}
|
||||
case <-timeout:
|
||||
t.Fatal(
|
||||
"timeout happened before event was detected")
|
||||
}
|
||||
}
|
||||
|
||||
func waitForStaticEvent(containerId string, urlRequest string, t *testing.T, fm framework.Framework, typeEvent info.EventType) {
|
||||
einfo, err := fm.Cadvisor().Client().EventStaticInfo(urlRequest)
|
||||
require.NoError(t, err)
|
||||
|
||||
found := false
|
||||
for _, ev := range einfo {
|
||||
if ev.EventType == typeEvent {
|
||||
if strings.Contains(strings.Trim(ev.ContainerName, "/ "), strings.Trim(containerId, "/ ")) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
require.True(t, found)
|
||||
}
|
||||
|
@ -889,7 +889,7 @@ func (self *manager) watchForNewOoms() error {
|
||||
},
|
||||
},
|
||||
}
|
||||
glog.V(1).Infof("Created an oom event: %v", newEvent)
|
||||
glog.V(2).Infof("Created an oom event: %v", newEvent)
|
||||
err := self.eventHandler.AddEvent(newEvent)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to add event %v, got error: %v", newEvent, err)
|
||||
|
@ -153,7 +153,6 @@ func (self *OomParser) StreamOoms(outStream chan *OomInstance) {
|
||||
line = <-lineChannel
|
||||
}
|
||||
in_oom_kernel_log = false
|
||||
glog.V(1).Infof("Sending an oomInstance: %v", oomCurrentInstance)
|
||||
outStream <- oomCurrentInstance
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user