Revised oomparser to not use all the cpu

This commit is contained in:
Katie Knister 2015-02-27 11:50:17 -08:00
parent 63a6b91554
commit 412f27798e
3 changed files with 23 additions and 28 deletions

View File

@ -178,10 +178,10 @@ func (self *manager) Start() error {
return err return err
} }
self.quitChannels = append(self.quitChannels, quitWatcher) self.quitChannels = append(self.quitChannels, quitWatcher)
// err = self.watchForNewOoms() err = self.watchForNewOoms()
// if err != nil { if err != nil {
// glog.Errorf("Failed to start OOM watcher, will not get OOM events: %v", err) glog.Errorf("Failed to start OOM watcher, will not get OOM events: %v", err)
// } }
// Look for new containers in the main housekeeping thread. // Look for new containers in the main housekeeping thread.
quitGlobalHousekeeping := make(chan error) quitGlobalHousekeeping := make(chan error)

View File

@ -89,12 +89,12 @@ func getProcessNamePid(line string, currentOomInstance *OomInstance) (bool, erro
} }
// uses regex to see if line is the start of a kernel oom log // uses regex to see if line is the start of a kernel oom log
func checkIfStartOfOomMessages(line string) (bool, error) { func checkIfStartOfOomMessages(line string) bool {
potential_oom_start := firstLineRegexp.MatchString(line) potential_oom_start := firstLineRegexp.MatchString(line)
if potential_oom_start { if potential_oom_start {
return true, nil return true
} }
return false, nil return false
} }
// opens a reader to grab new messages from the Reader object called outPipe // opens a reader to grab new messages from the Reader object called outPipe
@ -104,15 +104,14 @@ func checkIfStartOfOomMessages(line string) (bool, error) {
// lines are checked against a regexp to check for the pid, process name, etc. // lines are checked against a regexp to check for the pid, process name, etc.
// At the end of an oom message group, AnalyzeLines adds the new oomInstance to // At the end of an oom message group, AnalyzeLines adds the new oomInstance to
// oomLog // oomLog
func (self *OomParser) analyzeLines(outPipe io.ReadCloser, outStream chan *OomInstance) { func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) {
ioreader := bufio.NewReader(outPipe) var line string
line, err := ioreader.ReadString('\n') var err error
for err == nil { for true {
in_oom_kernel_log, err := checkIfStartOfOomMessages(line) for line, err = ioreader.ReadString('\n'); err != nil && err == io.EOF; {
if err != nil { time.Sleep(100 * time.Millisecond)
glog.Errorf("%v", err)
continue
} }
in_oom_kernel_log := checkIfStartOfOomMessages(line)
if in_oom_kernel_log { if in_oom_kernel_log {
oomCurrentInstance := &OomInstance{ oomCurrentInstance := &OomInstance{
ContainerName: "/", ContainerName: "/",
@ -132,9 +131,7 @@ func (self *OomParser) analyzeLines(outPipe io.ReadCloser, outStream chan *OomIn
in_oom_kernel_log = false in_oom_kernel_log = false
outStream <- oomCurrentInstance outStream <- oomCurrentInstance
} }
line, err = ioreader.ReadString('\n')
} }
glog.Errorf("%v", err)
} }
// looks for system files that contain kernel messages and if one is found, sets // looks for system files that contain kernel messages and if one is found, sets
@ -162,8 +159,11 @@ func (self *OomParser) StreamOoms(outStream chan *OomInstance) error {
if err != nil { if err != nil {
return err return err
} }
ioreader := bufio.NewReader(file)
// Process the events received from the kernel.
go func() { go func() {
self.analyzeLines(file, outStream) self.analyzeLines(ioreader, outStream)
}() }()
return nil return nil
} }

View File

@ -15,6 +15,7 @@
package oomparser package oomparser
import ( import (
"bufio"
"os" "os"
"testing" "testing"
"time" "time"
@ -105,18 +106,11 @@ func TestGetProcessNamePid(t *testing.T) {
} }
func TestCheckIfStartOfMessages(t *testing.T) { func TestCheckIfStartOfMessages(t *testing.T) {
couldParseLine, err := checkIfStartOfOomMessages(endLine) couldParseLine := checkIfStartOfOomMessages(endLine)
if err != nil {
t.Errorf("bad line fed to checkIfStartOfMessages should yield no error, but had error %v", err)
}
if couldParseLine { if couldParseLine {
t.Errorf("bad line fed to checkIfStartOfMessages should return false but returned %v", couldParseLine) t.Errorf("bad line fed to checkIfStartOfMessages should return false but returned %v", couldParseLine)
} }
couldParseLine = checkIfStartOfOomMessages(startLine)
couldParseLine, err = checkIfStartOfOomMessages(startLine)
if err != nil {
t.Errorf("start line fed to checkIfStartOfMessages should yield no error, but had error %v", err)
}
if !couldParseLine { if !couldParseLine {
t.Errorf("start line fed to checkIfStartOfMessages should return true but returned %v", couldParseLine) t.Errorf("start line fed to checkIfStartOfMessages should return true but returned %v", couldParseLine)
} }
@ -140,12 +134,13 @@ func helpTestAnalyzeLines(oomCheckInstance *OomInstance, sysFile string, t *test
if err != nil { if err != nil {
t.Errorf("couldn't open test log: %v", err) t.Errorf("couldn't open test log: %v", err)
} }
ioreader := bufio.NewReader(file)
timeout := make(chan bool, 1) timeout := make(chan bool, 1)
go func() { go func() {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
timeout <- true timeout <- true
}() }()
go oomLog.analyzeLines(file, outStream) go oomLog.analyzeLines(ioreader, outStream)
select { select {
case oomInstance := <-outStream: case oomInstance := <-outStream:
if *oomCheckInstance != *oomInstance { if *oomCheckInstance != *oomInstance {