Fixed some bugs in oomparser streaming
This commit is contained in:
parent
770eae1875
commit
78f3029943
@ -151,9 +151,9 @@ func streamResults(results chan *events.Event, w http.ResponseWriter, r *http.Re
|
||||
for {
|
||||
select {
|
||||
case <-cn.CloseNotify():
|
||||
glog.Infof("Client stopped listening")
|
||||
return nil
|
||||
case ev := <-results:
|
||||
glog.V(3).Infof("Received event from watch channel in api: %v", ev)
|
||||
err := enc.Encode(ev)
|
||||
if err != nil {
|
||||
glog.Errorf("error encoding message %+v for result stream: %v", ev, err)
|
||||
|
@ -695,6 +695,7 @@ func (self *manager) watchForNewContainers(quit chan error) error {
|
||||
}
|
||||
|
||||
func (self *manager) watchForNewOoms() error {
|
||||
glog.Infof("Started watching for new ooms in manager")
|
||||
outStream := make(chan *oomparser.OomInstance, 10)
|
||||
oomLog, err := oomparser.New()
|
||||
if err != nil {
|
||||
|
@ -66,19 +66,18 @@ func getContainerName(line string, currentOomInstance *OomInstance) error {
|
||||
// gets the pid, name, and date from a line and adds it to oomInstance
|
||||
func getProcessNamePid(line string, currentOomInstance *OomInstance) (bool, error) {
|
||||
reList := lastLineRegexp.FindStringSubmatch(line)
|
||||
|
||||
if reList == nil {
|
||||
return false, nil
|
||||
}
|
||||
const longForm = "Jan _2 15:04:05 2006"
|
||||
stringYear := strconv.Itoa(time.Now().Year())
|
||||
linetime, err := time.Parse(longForm, reList[1]+" "+stringYear)
|
||||
linetime, err := time.ParseInLocation(longForm, reList[1]+" "+stringYear, time.Local)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
currentOomInstance.TimeOfDeath = linetime
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
pid, err := strconv.Atoi(reList[2])
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -97,28 +96,53 @@ func checkIfStartOfOomMessages(line string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// opens a reader to grab new messages from the Reader object called outPipe
|
||||
// opened in PopulateOomInformation. It reads line by line splitting on
|
||||
// the "\n" character. Checks if line might be start or end of an oom message
|
||||
// log. Then the
|
||||
// lines are checked against a regexp to check for the pid, process name, etc.
|
||||
// At the end of an oom message group, AnalyzeLines adds the new oomInstance to
|
||||
// oomLog
|
||||
func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) {
|
||||
// reads the file and sends only complete lines over a channel to analyzeLines.
|
||||
// Should prevent EOF errors that occur when lines are read before being fully
|
||||
// written to the log. It reads line by line splitting on
|
||||
// the "\n" character.
|
||||
func readLinesFromFile(lineChannel chan string, ioreader *bufio.Reader) {
|
||||
linefragment := ""
|
||||
var line string
|
||||
var err error
|
||||
for true {
|
||||
for line, err = ioreader.ReadString('\n'); err != nil && err == io.EOF; {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
line, err = ioreader.ReadString('\n')
|
||||
if err == io.EOF {
|
||||
if line != "" {
|
||||
linefragment += line
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
} else if err == nil {
|
||||
if linefragment != "" {
|
||||
line = linefragment + line
|
||||
linefragment = ""
|
||||
}
|
||||
lineChannel <- line
|
||||
} else if err != nil && err != io.EOF {
|
||||
glog.Errorf("exiting analyzeLinesHelper with error %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Calls goroutine for analyzeLinesHelper, which feeds it complete lines.
|
||||
// Lines are checked against a regexp to check for the pid, process name, etc.
|
||||
// At the end of an oom message group, AnalyzeLines adds the new oomInstance to
|
||||
// oomLog
|
||||
func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomInstance) {
|
||||
lineChannel := make(chan string, 10)
|
||||
go func() {
|
||||
readLinesFromFile(lineChannel, ioreader)
|
||||
}()
|
||||
|
||||
for line := range lineChannel {
|
||||
in_oom_kernel_log := checkIfStartOfOomMessages(line)
|
||||
|
||||
if in_oom_kernel_log {
|
||||
oomCurrentInstance := &OomInstance{
|
||||
ContainerName: "/",
|
||||
}
|
||||
finished := false
|
||||
for err == nil && !finished {
|
||||
err = getContainerName(line, oomCurrentInstance)
|
||||
for !finished {
|
||||
err := getContainerName(line, oomCurrentInstance)
|
||||
if err != nil {
|
||||
glog.Errorf("%v", err)
|
||||
}
|
||||
@ -126,12 +150,13 @@ func (self *OomParser) analyzeLines(ioreader *bufio.Reader, outStream chan *OomI
|
||||
if err != nil {
|
||||
glog.Errorf("%v", err)
|
||||
}
|
||||
line, err = ioreader.ReadString('\n')
|
||||
line = <-lineChannel
|
||||
}
|
||||
in_oom_kernel_log = false
|
||||
outStream <- oomCurrentInstance
|
||||
}
|
||||
}
|
||||
glog.Infof("exiting analyzeLines")
|
||||
}
|
||||
|
||||
// looks for system files that contain kernel messages and if one is found, sets
|
||||
|
@ -29,7 +29,7 @@ const systemLogFile = "systemOomExampleLog.txt"
|
||||
|
||||
func createExpectedContainerOomInstance(t *testing.T) *OomInstance {
|
||||
const longForm = "Jan _2 15:04:05 2006"
|
||||
deathTime, err := time.Parse(longForm, "Jan 5 15:19:27 2015")
|
||||
deathTime, err := time.ParseInLocation(longForm, "Jan 5 15:19:27 2015", time.Local)
|
||||
if err != nil {
|
||||
t.Fatalf("could not parse expected time when creating expected container oom instance. Had error %v", err)
|
||||
return nil
|
||||
@ -44,7 +44,7 @@ func createExpectedContainerOomInstance(t *testing.T) *OomInstance {
|
||||
|
||||
func createExpectedSystemOomInstance(t *testing.T) *OomInstance {
|
||||
const longForm = "Jan _2 15:04:05 2006"
|
||||
deathTime, err := time.Parse(longForm, "Jan 28 19:58:45 2015")
|
||||
deathTime, err := time.ParseInLocation(longForm, "Jan 28 19:58:45 2015", time.Local)
|
||||
if err != nil {
|
||||
t.Fatalf("could not parse expected time when creating expected system oom instance. Had error %v", err)
|
||||
return nil
|
||||
@ -86,7 +86,7 @@ func TestGetProcessNamePid(t *testing.T) {
|
||||
}
|
||||
|
||||
const longForm = "Jan _2 15:04:05 2006"
|
||||
correctTime, err := time.Parse(longForm, "Jan 21 22:01:49 2015")
|
||||
correctTime, err := time.ParseInLocation(longForm, "Jan 21 22:01:49 2015", time.Local)
|
||||
couldParseLine, err = getProcessNamePid(endLine, currentOomInstance)
|
||||
if err != nil {
|
||||
t.Errorf("good line fed to getProcessNamePid should yield no error, but had error %v", err)
|
||||
@ -146,6 +146,7 @@ func helpTestAnalyzeLines(oomCheckInstance *OomInstance, sysFile string, t *test
|
||||
if *oomCheckInstance != *oomInstance {
|
||||
t.Errorf("wrong instance returned. Expected %v and got %v",
|
||||
oomCheckInstance, oomInstance)
|
||||
t.Errorf("Container of one was %v and the other %v", oomCheckInstance.ContainerName, oomInstance.ContainerName)
|
||||
}
|
||||
case <-timeout:
|
||||
t.Error(
|
||||
|
Loading…
Reference in New Issue
Block a user