From 21e96ed61c01cdc9082ec0effa51c3114302165f Mon Sep 17 00:00:00 2001 From: Katie Knister Date: Wed, 18 Mar 2015 11:17:06 -0700 Subject: [PATCH] adding systemd oomparser functionality --- manager/manager.go | 8 +- .../oomparser/{oominfo => oomexample}/main.go | 5 +- utils/oomparser/oomparser.go | 75 +++++++++++-------- utils/oomparser/oomparser_test.go | 55 ++------------ 4 files changed, 55 insertions(+), 88 deletions(-) rename utils/oomparser/{oominfo => oomexample}/main.go (93%) diff --git a/manager/manager.go b/manager/manager.go index 74dab108..8a71a16b 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -868,10 +868,8 @@ func (self *manager) watchForNewOoms() error { if err != nil { return err } - err = oomLog.StreamOoms(outStream) - if err != nil { - return err - } + go oomLog.StreamOoms(outStream) + go func() { for oomInstance := range outStream { newEvent := &events.Event{ @@ -883,7 +881,7 @@ func (self *manager) watchForNewOoms() error { glog.V(1).Infof("Created an oom event: %v", newEvent) err := self.eventHandler.AddEvent(newEvent) if err != nil { - glog.Errorf("Failed to add event %v, got error: %v", newEvent, err) + glog.Errorf("failed to add event %v, got error: %v", newEvent, err) } } }() diff --git a/utils/oomparser/oominfo/main.go b/utils/oomparser/oomexample/main.go similarity index 93% rename from utils/oomparser/oominfo/main.go rename to utils/oomparser/oomexample/main.go index 208d9d90..8667d328 100644 --- a/utils/oomparser/oominfo/main.go +++ b/utils/oomparser/oomexample/main.go @@ -31,10 +31,7 @@ func main() { if err != nil { glog.Infof("Couldn't make a new oomparser. %v", err) } else { - err := oomLog.StreamOoms(outStream) - if err != nil { - glog.Errorf("%v", err) - } + go oomLog.StreamOoms(outStream) // demonstration of how to get oomLog's list of oomInstances or access // the user-declared oomInstance channel, here called outStream for oomInstance := range outStream { diff --git a/utils/oomparser/oomparser.go b/utils/oomparser/oomparser.go index 874c6a69..84936e08 100644 --- a/utils/oomparser/oomparser.go +++ b/utils/oomparser/oomparser.go @@ -16,9 +16,10 @@ package oomparser import ( "bufio" - "fmt" + "errors" "io" "os" + "os/exec" "path" "regexp" "strconv" @@ -37,7 +38,7 @@ var firstLineRegexp *regexp.Regexp = regexp.MustCompile( // struct to hold file from which we obtain OomInstances type OomParser struct { - systemFile string + ioreader *bufio.Reader } // struct that contains information related to an OOM kill instance @@ -123,19 +124,18 @@ func readLinesFromFile(lineChannel chan string, ioreader *bufio.Reader) { } } -// Calls goroutine for analyzeLinesHelper, which feeds it complete lines. +// Calls goroutine for readLinesFromFile, which feeds it complete lines. // Lines are checked against a regexp to check for the pid, process name, etc. -// At the end of an oom message group, AnalyzeLines adds the new oomInstance to +// At the end of an oom message group, StreamOoms adds the new oomInstance to // oomLog -func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) { +func (self *OomParser) StreamOoms(outStream chan *OomInstance) { lineChannel := make(chan string, 10) go func() { - readLinesFromFile(lineChannel, ioreader) + readLinesFromFile(lineChannel, self.ioreader) }() for line := range lineChannel { in_oom_kernel_log := checkIfStartOfOomMessages(line) - if in_oom_kernel_log { oomCurrentInstance := &OomInstance{ ContainerName: "/", @@ -153,12 +153,37 @@ func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomI line = <-lineChannel } in_oom_kernel_log = false + glog.V(1).Infof("Sending an oomInstance: %v", oomCurrentInstance) outStream <- oomCurrentInstance } } glog.Infof("exiting analyzeLines") } +func callJournalctl() (io.ReadCloser, error) { + cmd := exec.Command("journalctl", "-f") + readcloser, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + if err := cmd.Start(); err != nil { + return nil, err + } + return readcloser, err +} + +func trySystemd() (*OomParser, error) { + readcloser, err := callJournalctl() + if err != nil { + return nil, err + } + glog.V(1).Infof("oomparser using systemd") + return &OomParser{ + ioreader: bufio.NewReader(readcloser), + }, nil + +} + // looks for system files that contain kernel messages and if one is found, sets // the systemFile attribute of the OomParser object func getSystemFile() (string, error) { @@ -169,37 +194,23 @@ func getSystemFile() (string, error) { } else if utils.FileExists(varLogSyslog) { return varLogSyslog, nil } - return "", fmt.Errorf("neither %s nor %s exists from which to read kernel errors", varLogMessages, varLogSyslog) -} - -// calls a go routine that populates self.OomInstances and fills the argument -// channel with OomInstance objects as they are read from the file. -// opens the OomParser's systemFile which was set in getSystemFile -// to look for OOM messages by calling AnalyzeLines. Takes in the argument -// outStream, which is passed in by the user and passed to AnalyzeLines. -// OomInstance objects are added to outStream when they are found by -// AnalyzeLines -func (self *OomParser) StreamOoms(outStream chan *OomInstance) error { - file, err := os.Open(self.systemFile) - if err != nil { - return err - } - ioreader := bufio.NewReader(file) - - // Process the events received from the kernel. - go func() { - self.analyzeLines(ioreader, outStream) - }() - return nil + return "", errors.New("neither " + varLogSyslog + " nor " + varLogMessages + " exists from which to read kernel errors") } // initializes an OomParser object and calls getSystemFile to set the systemFile // attribute. Returns and OomParser object and an error func New() (*OomParser, error) { - systemFileName, err := getSystemFile() + systemFile, err := getSystemFile() if err != nil { - return nil, err + glog.V(1).Infof("received error %v when calling getSystemFile", err) + return trySystemd() + } + file, err := os.Open(systemFile) + if err != nil { + glog.V(1).Infof("received error %v when opening file", err) + return trySystemd() } return &OomParser{ - systemFile: systemFileName}, nil + ioreader: bufio.NewReader(file), + }, nil } diff --git a/utils/oomparser/oomparser_test.go b/utils/oomparser/oomparser_test.go index 1df9d98f..4cb6a6d8 100644 --- a/utils/oomparser/oomparser_test.go +++ b/utils/oomparser/oomparser_test.go @@ -116,44 +116,6 @@ func TestCheckIfStartOfMessages(t *testing.T) { } } -func TestAnalyzeLinesContainerOom(t *testing.T) { - expectedContainerOomInstance := createExpectedContainerOomInstance(t) - helpTestAnalyzeLines(expectedContainerOomInstance, containerLogFile, t) -} - -func TestAnalyzeLinesSystemOom(t *testing.T) { - expectedSystemOomInstance := createExpectedSystemOomInstance(t) - helpTestAnalyzeLines(expectedSystemOomInstance, systemLogFile, t) -} - -func helpTestAnalyzeLines(oomCheckInstance *OomInstance, sysFile string, t *testing.T) { - outStream := make(chan *OomInstance) - oomLog := new(OomParser) - oomLog.systemFile = sysFile - file, err := os.Open(oomLog.systemFile) - if err != nil { - t.Errorf("couldn't open test log: %v", err) - } - ioreader := bufio.NewReader(file) - timeout := make(chan bool, 1) - go func() { - time.Sleep(1 * time.Second) - timeout <- true - }() - go oomLog.analyzeLines(ioreader, outStream) - select { - case oomInstance := <-outStream: - if *oomCheckInstance != *oomInstance { - t.Errorf("wrong instance returned. Expected %v and got %v", - oomCheckInstance, oomInstance) - t.Errorf("Container of one was %v and the other %v", oomCheckInstance.ContainerName, oomInstance.ContainerName) - } - case <-timeout: - t.Error( - "timeout happened before oomInstance was found in test file") - } -} - func TestStreamOomsContainer(t *testing.T) { expectedContainerOomInstance := createExpectedContainerOomInstance(t) helpTestStreamOoms(expectedContainerOomInstance, containerLogFile, t) @@ -166,18 +128,14 @@ func TestStreamOomsSystem(t *testing.T) { func helpTestStreamOoms(oomCheckInstance *OomInstance, sysFile string, t *testing.T) { outStream := make(chan *OomInstance) - oomLog := new(OomParser) - oomLog.systemFile = sysFile + oomLog := mockOomParser(sysFile, t) timeout := make(chan bool, 1) go func() { time.Sleep(1 * time.Second) timeout <- true }() - err := oomLog.StreamOoms(outStream) - if err != nil { - t.Errorf("had an error opening file: %v", err) - } + go oomLog.StreamOoms(outStream) select { case oomInstance := <-outStream: @@ -191,9 +149,12 @@ func helpTestStreamOoms(oomCheckInstance *OomInstance, sysFile string, t *testin } } -func TestNew(t *testing.T) { - _, err := New() +func mockOomParser(sysFile string, t *testing.T) *OomParser { + file, err := os.Open(sysFile) if err != nil { - t.Errorf("function New() had error %v", err) + t.Errorf("had an error opening file: %v", err) + } + return &OomParser{ + ioreader: bufio.NewReader(file), } }