Merge pull request #574 from kateknister/master

Fixed some bugs in oomparser streaming
This commit is contained in:
Victor Marmol 2015-03-11 09:14:07 -07:00
commit 5eeb6fdd40
4 changed files with 47 additions and 20 deletions

View File

@ -151,9 +151,9 @@ func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Re
for { for {
select { select {
case <-cn.CloseNotify(): case <-cn.CloseNotify():
glog.Infof("Client stopped listening")
return nil return nil
case ev := <-results: case ev := <-results:
glog.V(3).Infof("Received event from watch channel in api: %v", ev)
err := enc.Encode(ev) err := enc.Encode(ev)
if err != nil { if err != nil {
glog.Errorf("error encoding message %+v for result stream: %v", ev, err) glog.Errorf("error encoding message %+v for result stream: %v", ev, err)

View File

@ -746,6 +746,7 @@ func (self *manager) watchForNewContainers(quit chan error) error {
} }
func (self *manager) watchForNewOoms() error { func (self *manager) watchForNewOoms() error {
glog.Infof("Started watching for new ooms in manager")
outStream := make(chan *oomparser.OomInstance, 10) outStream := make(chan *oomparser.OomInstance, 10)
oomLog, err := oomparser.New() oomLog, err := oomparser.New()
if err != nil { if err != nil {

View File

@ -66,19 +66,18 @@ func getContainerName(line string, currentOomInstance *OomInstance) error {
// gets the pid, name, and date from a line and adds it to oomInstance // gets the pid, name, and date from a line and adds it to oomInstance
func getProcessNamePid(line string, currentOomInstance *OomInstance) (bool, error) { func getProcessNamePid(line string, currentOomInstance *OomInstance) (bool, error) {
reList := lastLineRegexp.FindStringSubmatch(line) reList := lastLineRegexp.FindStringSubmatch(line)
if reList == nil { if reList == nil {
return false, nil return false, nil
} }
const longForm = "Jan _2 15:04:05 2006" const longForm = "Jan _2 15:04:05 2006"
stringYear := strconv.Itoa(time.Now().Year()) stringYear := strconv.Itoa(time.Now().Year())
linetime, err := time.Parse(longForm, reList[1]+" "+stringYear) linetime, err := time.ParseInLocation(longForm, reList[1]+" "+stringYear, time.Local)
if err != nil { if err != nil {
return false, err return false, err
} }
currentOomInstance.TimeOfDeath = linetime currentOomInstance.TimeOfDeath = linetime
if err != nil {
return false, err
}
pid, err := strconv.Atoi(reList[2]) pid, err := strconv.Atoi(reList[2])
if err != nil { if err != nil {
return false, err return false, err
@ -97,28 +96,53 @@ func checkIfStartOfOomMessages(line string) bool {
return false return false
} }
// opens a reader to grab new messages from the Reader object called outPipe // reads the file and sends only complete lines over a channel to analyzeLines.
// opened in PopulateOomInformation. It reads line by line splitting on // Should prevent EOF errors that occur when lines are read before being fully
// the "\n" character. Checks if line might be start or end of an oom message // written to the log. It reads line by line splitting on
// log. Then the // the "\n" character.
// lines are checked against a regexp to check for the pid, process name, etc. func readLinesFromFile(lineChannel chan string, ioreader *bufio.Reader) {
// At the end of an oom message group, AnalyzeLines adds the new oomInstance to linefragment := ""
// oomLog
func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) {
var line string var line string
var err error var err error
for true { for true {
for line, err = ioreader.ReadString('\n'); err != nil && err == io.EOF; { line, err = ioreader.ReadString('\n')
time.Sleep(100 * time.Millisecond) if err == io.EOF {
if line != "" {
linefragment += line
} }
time.Sleep(100 * time.Millisecond)
} else if err == nil {
if linefragment != "" {
line = linefragment + line
linefragment = ""
}
lineChannel <- line
} else if err != nil && err != io.EOF {
glog.Errorf("exiting analyzeLinesHelper with error %v", err)
}
}
}
// Calls goroutine for analyzeLinesHelper, which feeds it complete lines.
// Lines are checked against a regexp to check for the pid, process name, etc.
// At the end of an oom message group, AnalyzeLines adds the new oomInstance to
// oomLog
func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) {
lineChannel := make(chan string, 10)
go func() {
readLinesFromFile(lineChannel, ioreader)
}()
for line := range lineChannel {
in_oom_kernel_log := checkIfStartOfOomMessages(line) in_oom_kernel_log := checkIfStartOfOomMessages(line)
if in_oom_kernel_log { if in_oom_kernel_log {
oomCurrentInstance := &OomInstance{ oomCurrentInstance := &OomInstance{
ContainerName: "/", ContainerName: "/",
} }
finished := false finished := false
for err == nil && !finished { for !finished {
err = getContainerName(line, oomCurrentInstance) err := getContainerName(line, oomCurrentInstance)
if err != nil { if err != nil {
glog.Errorf("%v", err) glog.Errorf("%v", err)
} }
@ -126,12 +150,13 @@ func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomI
if err != nil { if err != nil {
glog.Errorf("%v", err) glog.Errorf("%v", err)
} }
line, err = ioreader.ReadString('\n') line = <-lineChannel
} }
in_oom_kernel_log = false in_oom_kernel_log = false
outStream <- oomCurrentInstance outStream <- oomCurrentInstance
} }
} }
glog.Infof("exiting analyzeLines")
} }
// looks for system files that contain kernel messages and if one is found, sets // looks for system files that contain kernel messages and if one is found, sets

View File

@ -29,7 +29,7 @@ const systemLogFile = "systemOomExampleLog.txt"
func createExpectedContainerOomInstance(t *testing.T) *OomInstance { func createExpectedContainerOomInstance(t *testing.T) *OomInstance {
const longForm = "Jan _2 15:04:05 2006" const longForm = "Jan _2 15:04:05 2006"
deathTime, err := time.Parse(longForm, "Jan 5 15:19:27 2015") deathTime, err := time.ParseInLocation(longForm, "Jan 5 15:19:27 2015", time.Local)
if err != nil { if err != nil {
t.Fatalf("could not parse expected time when creating expected container oom instance. Had error %v", err) t.Fatalf("could not parse expected time when creating expected container oom instance. Had error %v", err)
return nil return nil
@ -44,7 +44,7 @@ func createExpectedContainerOomInstance(t *testing.T) *OomInstance {
func createExpectedSystemOomInstance(t *testing.T) *OomInstance { func createExpectedSystemOomInstance(t *testing.T) *OomInstance {
const longForm = "Jan _2 15:04:05 2006" const longForm = "Jan _2 15:04:05 2006"
deathTime, err := time.Parse(longForm, "Jan 28 19:58:45 2015") deathTime, err := time.ParseInLocation(longForm, "Jan 28 19:58:45 2015", time.Local)
if err != nil { if err != nil {
t.Fatalf("could not parse expected time when creating expected system oom instance. Had error %v", err) t.Fatalf("could not parse expected time when creating expected system oom instance. Had error %v", err)
return nil return nil
@ -86,7 +86,7 @@ func TestGetProcessNamePid(t *testing.T) {
} }
const longForm = "Jan _2 15:04:05 2006" const longForm = "Jan _2 15:04:05 2006"
correctTime, err := time.Parse(longForm, "Jan 21 22:01:49 2015") correctTime, err := time.ParseInLocation(longForm, "Jan 21 22:01:49 2015", time.Local)
couldParseLine, err = getProcessNamePid(endLine, currentOomInstance) couldParseLine, err = getProcessNamePid(endLine, currentOomInstance)
if err != nil { if err != nil {
t.Errorf("good line fed to getProcessNamePid should yield no error, but had error %v", err) t.Errorf("good line fed to getProcessNamePid should yield no error, but had error %v", err)
@ -146,6 +146,7 @@ func helpTestAnalyzeLines(oomCheckInstance *OomInstance, sysFile string, t *test
if *oomCheckInstance != *oomInstance { if *oomCheckInstance != *oomInstance {
t.Errorf("wrong instance returned. Expected %v and got %v", t.Errorf("wrong instance returned. Expected %v and got %v",
oomCheckInstance, oomInstance) oomCheckInstance, oomInstance)
t.Errorf("Container of one was %v and the other %v", oomCheckInstance.ContainerName, oomInstance.ContainerName)
} }
case <-timeout: case <-timeout:
t.Error( t.Error(