From 412f27798e07c10cf2a7dfcfe1e5421423c6efe0 Mon Sep 17 00:00:00 2001 From: Katie Knister Date: Fri, 27 Feb 2015 11:50:17 -0800 Subject: [PATCH] Revised oomparser to not use all the cpu --- manager/manager.go | 8 ++++---- utils/oomparser/oomparser.go | 28 ++++++++++++++-------------- utils/oomparser/oomparser_test.go | 15 +++++---------- 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index 20d82d74..9858d44d 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -178,10 +178,10 @@ func (self *manager) Start() error { return err } self.quitChannels = append(self.quitChannels, quitWatcher) - // err = self.watchForNewOoms() - // if err != nil { - // glog.Errorf("Failed to start OOM watcher, will not get OOM events: %v", err) - // } + err = self.watchForNewOoms() + if err != nil { + glog.Errorf("Failed to start OOM watcher, will not get OOM events: %v", err) + } // Look for new containers in the main housekeeping thread. quitGlobalHousekeeping := make(chan error) diff --git a/utils/oomparser/oomparser.go b/utils/oomparser/oomparser.go index 9e331b79..54835915 100644 --- a/utils/oomparser/oomparser.go +++ b/utils/oomparser/oomparser.go @@ -89,12 +89,12 @@ func getProcessNamePid(line string, currentOomInstance *OomInstance) (bool, erro } // uses regex to see if line is the start of a kernel oom log -func checkIfStartOfOomMessages(line string) (bool, error) { +func checkIfStartOfOomMessages(line string) bool { potential_oom_start := firstLineRegexp.MatchString(line) if potential_oom_start { - return true, nil + return true } - return false, nil + return false } // opens a reader to grab new messages from the Reader object called outPipe @@ -104,15 +104,14 @@ func checkIfStartOfOomMessages(line string) (bool, error) { // lines are checked against a regexp to check for the pid, process name, etc. // At the end of an oom message group, AnalyzeLines adds the new oomInstance to // oomLog -func (self *OomParser) analyzeLines(outPipe io.ReadCloser, outStream chan *OomInstance) { - ioreader := bufio.NewReader(outPipe) - line, err := ioreader.ReadString('\n') - for err == nil { - in_oom_kernel_log, err := checkIfStartOfOomMessages(line) - if err != nil { - glog.Errorf("%v", err) - continue +func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) { + var line string + var err error + for true { + for line, err = ioreader.ReadString('\n'); err != nil && err == io.EOF; { + time.Sleep(100 * time.Millisecond) } + in_oom_kernel_log := checkIfStartOfOomMessages(line) if in_oom_kernel_log { oomCurrentInstance := &OomInstance{ ContainerName: "/", @@ -132,9 +131,7 @@ func (self *OomParser) analyzeLines(outPipe io.ReadCloser, outStream chan *OomIn in_oom_kernel_log = false outStream <- oomCurrentInstance } - line, err = ioreader.ReadString('\n') } - glog.Errorf("%v", err) } // looks for system files that contain kernel messages and if one is found, sets @@ -162,8 +159,11 @@ func (self *OomParser) StreamOoms(outStream chan *OomInstance) error { if err != nil { return err } + ioreader := bufio.NewReader(file) + + // Process the events received from the kernel. go func() { - self.analyzeLines(file, outStream) + self.analyzeLines(ioreader, outStream) }() return nil } diff --git a/utils/oomparser/oomparser_test.go b/utils/oomparser/oomparser_test.go index 61f67875..93835861 100644 --- a/utils/oomparser/oomparser_test.go +++ b/utils/oomparser/oomparser_test.go @@ -15,6 +15,7 @@ package oomparser import ( + "bufio" "os" "testing" "time" @@ -105,18 +106,11 @@ func TestGetProcessNamePid(t *testing.T) { } func TestCheckIfStartOfMessages(t *testing.T) { - couldParseLine, err := checkIfStartOfOomMessages(endLine) - if err != nil { - t.Errorf("bad line fed to checkIfStartOfMessages should yield no error, but had error %v", err) - } + couldParseLine := checkIfStartOfOomMessages(endLine) if couldParseLine { t.Errorf("bad line fed to checkIfStartOfMessages should return false but returned %v", couldParseLine) } - - couldParseLine, err = checkIfStartOfOomMessages(startLine) - if err != nil { - t.Errorf("start line fed to checkIfStartOfMessages should yield no error, but had error %v", err) - } + couldParseLine = checkIfStartOfOomMessages(startLine) if !couldParseLine { t.Errorf("start line fed to checkIfStartOfMessages should return true but returned %v", couldParseLine) } @@ -140,12 +134,13 @@ func helpTestAnalyzeLines(oomCheckInstance *OomInstance, sysFile string, t *test if err != nil { t.Errorf("couldn't open test log: %v", err) } + ioreader := bufio.NewReader(file) timeout := make(chan bool, 1) go func() { time.Sleep(1 * time.Second) timeout <- true }() - go oomLog.analyzeLines(file, outStream) + go oomLog.analyzeLines(ioreader, outStream) select { case oomInstance := <-outStream: if *oomCheckInstance != *oomInstance {