adding systemd oomparser functionality

This commit is contained in:
Katie Knister 2015-03-18 11:17:06 -07:00
parent cf13f4d5ee
commit 21e96ed61c
4 changed files with 55 additions and 88 deletions

View File

@ -868,10 +868,8 @@ func (self *manager) watchForNewOoms() error {
if err != nil {
return err
}
err = oomLog.StreamOoms(outStream)
if err != nil {
return err
}
go oomLog.StreamOoms(outStream)
go func() {
for oomInstance := range outStream {
newEvent := &events.Event{
@ -883,7 +881,7 @@ func (self *manager) watchForNewOoms() error {
glog.V(1).Infof("Created an oom event: %v", newEvent)
err := self.eventHandler.AddEvent(newEvent)
if err != nil {
glog.Errorf("Failed to add event %v, got error: %v", newEvent, err)
glog.Errorf("failed to add event %v, got error: %v", newEvent, err)
}
}
}()

View File

@ -31,10 +31,7 @@ func main() {
if err != nil {
glog.Infof("Couldn't make a new oomparser. %v", err)
} else {
err := oomLog.StreamOoms(outStream)
if err != nil {
glog.Errorf("%v", err)
}
go oomLog.StreamOoms(outStream)
// demonstration of how to get oomLog's list of oomInstances or access
// the user-declared oomInstance channel, here called outStream
for oomInstance := range outStream {

View File

@ -16,9 +16,10 @@ package oomparser
import (
"bufio"
"fmt"
"errors"
"io"
"os"
"os/exec"
"path"
"regexp"
"strconv"
@ -37,7 +38,7 @@ var firstLineRegexp *regexp.Regexp = regexp.MustCompile(
// struct to hold file from which we obtain OomInstances
type OomParser struct {
systemFile string
ioreader *bufio.Reader
}
// struct that contains information related to an OOM kill instance
@ -123,19 +124,18 @@ func readLinesFromFile(lineChannel chan string, ioreader *bufio.Reader) {
}
}
// Calls goroutine for analyzeLinesHelper, which feeds it complete lines.
// Calls goroutine for readLinesFromFile, which feeds it complete lines.
// Lines are checked against a regexp to check for the pid, process name, etc.
// At the end of an oom message group, AnalyzeLines adds the new oomInstance to
// At the end of an oom message group, StreamOoms adds the new oomInstance to
// oomLog
func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) {
func (self *OomParser) StreamOoms(outStream chan *OomInstance) {
lineChannel := make(chan string, 10)
go func() {
readLinesFromFile(lineChannel, ioreader)
readLinesFromFile(lineChannel, self.ioreader)
}()
for line := range lineChannel {
in_oom_kernel_log := checkIfStartOfOomMessages(line)
if in_oom_kernel_log {
oomCurrentInstance := &OomInstance{
ContainerName: "/",
@ -153,12 +153,37 @@ func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomI
line = <-lineChannel
}
in_oom_kernel_log = false
glog.V(1).Infof("Sending an oomInstance: %v", oomCurrentInstance)
outStream <- oomCurrentInstance
}
}
glog.Infof("exiting analyzeLines")
}
func callJournalctl() (io.ReadCloser, error) {
cmd := exec.Command("journalctl", "-f")
readcloser, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, err
}
return readcloser, err
}
func trySystemd() (*OomParser, error) {
readcloser, err := callJournalctl()
if err != nil {
return nil, err
}
glog.V(1).Infof("oomparser using systemd")
return &OomParser{
ioreader: bufio.NewReader(readcloser),
}, nil
}
// looks for system files that contain kernel messages and if one is found, sets
// the systemFile attribute of the OomParser object
func getSystemFile() (string, error) {
@ -169,37 +194,23 @@ func getSystemFile() (string, error) {
} else if utils.FileExists(varLogSyslog) {
return varLogSyslog, nil
}
return "", fmt.Errorf("neither %s nor %s exists from which to read kernel errors", varLogMessages, varLogSyslog)
}
// calls a go routine that populates self.OomInstances and fills the argument
// channel with OomInstance objects as they are read from the file.
// opens the OomParser's systemFile which was set in getSystemFile
// to look for OOM messages by calling AnalyzeLines. Takes in the argument
// outStream, which is passed in by the user and passed to AnalyzeLines.
// OomInstance objects are added to outStream when they are found by
// AnalyzeLines
func (self *OomParser) StreamOoms(outStream chan *OomInstance) error {
file, err := os.Open(self.systemFile)
if err != nil {
return err
}
ioreader := bufio.NewReader(file)
// Process the events received from the kernel.
go func() {
self.analyzeLines(ioreader, outStream)
}()
return nil
return "", errors.New("neither " + varLogSyslog + " nor " + varLogMessages + " exists from which to read kernel errors")
}
// initializes an OomParser object and calls getSystemFile to set the systemFile
// attribute. Returns and OomParser object and an error
func New() (*OomParser, error) {
systemFileName, err := getSystemFile()
systemFile, err := getSystemFile()
if err != nil {
return nil, err
glog.V(1).Infof("received error %v when calling getSystemFile", err)
return trySystemd()
}
file, err := os.Open(systemFile)
if err != nil {
glog.V(1).Infof("received error %v when opening file", err)
return trySystemd()
}
return &OomParser{
systemFile: systemFileName}, nil
ioreader: bufio.NewReader(file),
}, nil
}

View File

@ -116,44 +116,6 @@ func TestCheckIfStartOfMessages(t *testing.T) {
}
}
func TestAnalyzeLinesContainerOom(t *testing.T) {
expectedContainerOomInstance := createExpectedContainerOomInstance(t)
helpTestAnalyzeLines(expectedContainerOomInstance, containerLogFile, t)
}
func TestAnalyzeLinesSystemOom(t *testing.T) {
expectedSystemOomInstance := createExpectedSystemOomInstance(t)
helpTestAnalyzeLines(expectedSystemOomInstance, systemLogFile, t)
}
func helpTestAnalyzeLines(oomCheckInstance *OomInstance, sysFile string, t *testing.T) {
outStream := make(chan *OomInstance)
oomLog := new(OomParser)
oomLog.systemFile = sysFile
file, err := os.Open(oomLog.systemFile)
if err != nil {
t.Errorf("couldn't open test log: %v", err)
}
ioreader := bufio.NewReader(file)
timeout := make(chan bool, 1)
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()
go oomLog.analyzeLines(ioreader, outStream)
select {
case oomInstance := <-outStream:
if *oomCheckInstance != *oomInstance {
t.Errorf("wrong instance returned. Expected %v and got %v",
oomCheckInstance, oomInstance)
t.Errorf("Container of one was %v and the other %v", oomCheckInstance.ContainerName, oomInstance.ContainerName)
}
case <-timeout:
t.Error(
"timeout happened before oomInstance was found in test file")
}
}
func TestStreamOomsContainer(t *testing.T) {
expectedContainerOomInstance := createExpectedContainerOomInstance(t)
helpTestStreamOoms(expectedContainerOomInstance, containerLogFile, t)
@ -166,18 +128,14 @@ func TestStreamOomsSystem(t *testing.T) {
func helpTestStreamOoms(oomCheckInstance *OomInstance, sysFile string, t *testing.T) {
outStream := make(chan *OomInstance)
oomLog := new(OomParser)
oomLog.systemFile = sysFile
oomLog := mockOomParser(sysFile, t)
timeout := make(chan bool, 1)
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()
err := oomLog.StreamOoms(outStream)
if err != nil {
t.Errorf("had an error opening file: %v", err)
}
go oomLog.StreamOoms(outStream)
select {
case oomInstance := <-outStream:
@ -191,9 +149,12 @@ func helpTestStreamOoms(oomCheckInstance *OomInstance, sysFile string, t *testin
}
}
func TestNew(t *testing.T) {
_, err := New()
func mockOomParser(sysFile string, t *testing.T) *OomParser {
file, err := os.Open(sysFile)
if err != nil {
t.Errorf("function New() had error %v", err)
t.Errorf("had an error opening file: %v", err)
}
return &OomParser{
ioreader: bufio.NewReader(file),
}
}