diff --git a/api/handler.go b/api/handler.go index baf67116..e9cd9368 100644 --- a/api/handler.go +++ b/api/handler.go @@ -151,9 +151,9 @@ func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Re for { select { case <-cn.CloseNotify(): - glog.Infof("Client stopped listening") return nil case ev := <-results: + glog.V(3).Infof("Received event from watch channel in api: %v", ev) err := enc.Encode(ev) if err != nil { glog.Errorf("error encoding message %+v for result stream: %v", ev, err) diff --git a/manager/manager.go b/manager/manager.go index 6b7006d9..cb2c477b 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -695,6 +695,7 @@ func (self *manager) watchForNewContainers(quit chan error) error { } func (self *manager) watchForNewOoms() error { + glog.Infof("Started watching for new ooms in manager") outStream := make(chan *oomparser.OomInstance, 10) oomLog, err := oomparser.New() if err != nil { diff --git a/utils/oomparser/oomparser.go b/utils/oomparser/oomparser.go index 54835915..874c6a69 100644 --- a/utils/oomparser/oomparser.go +++ b/utils/oomparser/oomparser.go @@ -66,19 +66,18 @@ func getContainerName(line string, currentOomInstance *OomInstance) error { // gets the pid, name, and date from a line and adds it to oomInstance func getProcessNamePid(line string, currentOomInstance *OomInstance) (bool, error) { reList := lastLineRegexp.FindStringSubmatch(line) + if reList == nil { return false, nil } const longForm = "Jan _2 15:04:05 2006" stringYear := strconv.Itoa(time.Now().Year()) - linetime, err := time.Parse(longForm, reList[1]+" "+stringYear) + linetime, err := time.ParseInLocation(longForm, reList[1]+" "+stringYear, time.Local) if err != nil { return false, err } + currentOomInstance.TimeOfDeath = linetime - if err != nil { - return false, err - } pid, err := strconv.Atoi(reList[2]) if err != nil { return false, err @@ -97,28 +96,53 @@ func checkIfStartOfOomMessages(line string) bool { return false } -// opens a reader to grab new messages from the Reader object called outPipe -// opened in PopulateOomInformation. It reads line by line splitting on -// the "\n" character. Checks if line might be start or end of an oom message -// log. Then the -// lines are checked against a regexp to check for the pid, process name, etc. -// At the end of an oom message group, AnalyzeLines adds the new oomInstance to -// oomLog -func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) { +// reads the file and sends only complete lines over a channel to analyzeLines. +// Should prevent EOF errors that occur when lines are read before being fully +// written to the log. It reads line by line splitting on +// the "\n" character. +func readLinesFromFile(lineChannel chan string, ioreader *bufio.Reader) { + linefragment := "" var line string var err error for true { - for line, err = ioreader.ReadString('\n'); err != nil && err == io.EOF; { + line, err = ioreader.ReadString('\n') + if err == io.EOF { + if line != "" { + linefragment += line + } time.Sleep(100 * time.Millisecond) + } else if err == nil { + if linefragment != "" { + line = linefragment + line + linefragment = "" + } + lineChannel <- line + } else if err != nil && err != io.EOF { + glog.Errorf("exiting analyzeLinesHelper with error %v", err) } + } +} + +// Calls goroutine for analyzeLinesHelper, which feeds it complete lines. +// Lines are checked against a regexp to check for the pid, process name, etc. +// At the end of an oom message group, AnalyzeLines adds the new oomInstance to +// oomLog +func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) { + lineChannel := make(chan string, 10) + go func() { + readLinesFromFile(lineChannel, ioreader) + }() + + for line := range lineChannel { in_oom_kernel_log := checkIfStartOfOomMessages(line) + if in_oom_kernel_log { oomCurrentInstance := &OomInstance{ ContainerName: "/", } finished := false - for err == nil && !finished { - err = getContainerName(line, oomCurrentInstance) + for !finished { + err := getContainerName(line, oomCurrentInstance) if err != nil { glog.Errorf("%v", err) } @@ -126,12 +150,13 @@ func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomI if err != nil { glog.Errorf("%v", err) } - line, err = ioreader.ReadString('\n') + line = <-lineChannel } in_oom_kernel_log = false outStream <- oomCurrentInstance } } + glog.Infof("exiting analyzeLines") } // looks for system files that contain kernel messages and if one is found, sets diff --git a/utils/oomparser/oomparser_test.go b/utils/oomparser/oomparser_test.go index 93835861..1df9d98f 100644 --- a/utils/oomparser/oomparser_test.go +++ b/utils/oomparser/oomparser_test.go @@ -29,7 +29,7 @@ const systemLogFile = "systemOomExampleLog.txt" func createExpectedContainerOomInstance(t *testing.T) *OomInstance { const longForm = "Jan _2 15:04:05 2006" - deathTime, err := time.Parse(longForm, "Jan 5 15:19:27 2015") + deathTime, err := time.ParseInLocation(longForm, "Jan 5 15:19:27 2015", time.Local) if err != nil { t.Fatalf("could not parse expected time when creating expected container oom instance. Had error %v", err) return nil @@ -44,7 +44,7 @@ func createExpectedContainerOomInstance(t *testing.T) *OomInstance { func createExpectedSystemOomInstance(t *testing.T) *OomInstance { const longForm = "Jan _2 15:04:05 2006" - deathTime, err := time.Parse(longForm, "Jan 28 19:58:45 2015") + deathTime, err := time.ParseInLocation(longForm, "Jan 28 19:58:45 2015", time.Local) if err != nil { t.Fatalf("could not parse expected time when creating expected system oom instance. Had error %v", err) return nil @@ -86,7 +86,7 @@ func TestGetProcessNamePid(t *testing.T) { } const longForm = "Jan _2 15:04:05 2006" - correctTime, err := time.Parse(longForm, "Jan 21 22:01:49 2015") + correctTime, err := time.ParseInLocation(longForm, "Jan 21 22:01:49 2015", time.Local) couldParseLine, err = getProcessNamePid(endLine, currentOomInstance) if err != nil { t.Errorf("good line fed to getProcessNamePid should yield no error, but had error %v", err) @@ -146,6 +146,7 @@ func helpTestAnalyzeLines(oomCheckInstance *OomInstance, sysFile string, t *test if *oomCheckInstance != *oomInstance { t.Errorf("wrong instance returned. Expected %v and got %v", oomCheckInstance, oomInstance) + t.Errorf("Container of one was %v and the other %v", oomCheckInstance.ContainerName, oomInstance.ContainerName) } case <-timeout: t.Error(