Set of fixes for cpuload calculations.

. Remove counting of taskgroups from scheddebug.
. Move monitoring thread 500ms ahead of other containers housekeeping.
. Rely on /proc/loadavg for root load.
. Cover up for scheddebug atomicity issues (WIP)
. Remove counting of monitoring thread.

Getting better, but still a bit farther away from ideal load :(
This commit is contained in:
Rohit Jnagal 2015-01-29 20:50:44 +00:00
parent ef2d9d1c76
commit 62b02a6b94

View File

@ -19,8 +19,8 @@ import (
"io/ioutil" "io/ioutil"
"path" "path"
"regexp" "regexp"
"sort"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -36,13 +36,16 @@ const (
var ( var (
// Scans cpu number, task group name, and number of running threads. // Scans cpu number, task group name, and number of running threads.
// TODO(rjnagal): cpu number is only used for debug. Remove it later. // TODO(rjnagal): cpu number is only used for debug. Remove it later.
schedRegExp = regexp.MustCompile(`cfs_rq\[([0-9]+)\]:(.*)\n(?:.*\n)*?.*nr_running.*: ([0-9]+)`) schedRegExp = regexp.MustCompile(`cfs_rq\[([0-9]+)\]:(.*)\n(?:.*\n)*?.*nr_running.*: ([0-9]+)`)
pollInterval = 1 * time.Second selfCgroupRegExp = regexp.MustCompile(`cpu.*:(.*)\n`)
procLoadAvgRegExp = regexp.MustCompile(` ([0-9]+)/`)
pollInterval = 1 * time.Second
) )
type SchedReader struct { type SchedReader struct {
quitChan chan error // Used to cleanly shutdown housekeeping. quitChan chan error // Used to cleanly shutdown housekeeping.
lastErrorTime time.Time // Limit errors to one per minute. lastErrorTime time.Time // Limit errors to one per minute.
selfCgroup string // Cgroup that cAdvisor is running under.
dataLock sync.RWMutex dataLock sync.RWMutex
load map[string]int // Load per container. Guarded by dataLock. load map[string]int // Load per container. Guarded by dataLock.
} }
@ -66,6 +69,9 @@ func (self *SchedReader) Stop() {
// there is a chance of sometimes picking the last cycle's data. We can solve that by // there is a chance of sometimes picking the last cycle's data. We can solve that by
// calling this housekeeping from globalhousekeeping if its an issue. // calling this housekeeping from globalhousekeeping if its an issue.
func (self *SchedReader) housekeep() { func (self *SchedReader) housekeep() {
// We start all housekeeping threads around the same time.
// Phase shift load reader thread so it does not poll all housekeeping threads whenever it wakes up.
time.Sleep(500 * time.Millisecond)
ticker := time.Tick(pollInterval) ticker := time.Tick(pollInterval)
for { for {
select { select {
@ -106,22 +112,54 @@ func (self *SchedReader) refresh() {
} }
continue continue
} }
glog.V(2).Infof("Load for %q on cpu %s: %d", cgroup, cpu, numRunning) glog.V(3).Infof("Load for %q on cpu %s: %d", cgroup, cpu, numRunning)
if strings.HasPrefix(cgroup, "/autogroup") { if numRunning == 0 {
// collapse all autogroups to root. continue
cgroup = "/"
} }
load[cgroup] += int(numRunning) load[cgroup] += int(numRunning)
// Walk up the path and add load to all parent containers. // detect task group entry from parent's runnable count.
for cgroup != "/" { if cgroup != "/" {
cgroup = path.Dir(cgroup) parent := getParent(cgroup)
if cgroup == "." { load[parent] -= 1
cgroup = "/"
}
load[cgroup] += int(numRunning)
} }
} }
glog.V(2).Infof("New loads : %+v", load) glog.V(3).Infof("New non-hierarchical loads : %+v", load)
// sort the keys and update parents in order.
var cgroups sort.StringSlice
for c := range load {
cgroups = append(cgroups, c)
}
sort.Sort(sort.Reverse(cgroups[:]))
for _, c := range cgroups {
// Add this task groups' processes to its parent.
if c != "/" {
parent := getParent(c)
load[parent] += load[c]
}
// Sometimes we catch a sched dump in middle of an update.
// TODO(rjnagal): Look into why the task hierarchy isn't fully filled sometimes.
if load[c] < 0 {
load[c] = 0
}
}
// Take off this cAdvisor thread from load calculation.
if self.selfCgroup != "" && load[self.selfCgroup] >= 1 {
load[self.selfCgroup] -= 1
// Deduct from all parents.
p := self.selfCgroup
for p != "/" {
p = getParent(p)
if load[p] >= 1 {
load[p] -= 1
}
}
}
glog.V(3).Infof("Derived task group loads : %+v", load)
rootLoad, err := getRootLoad()
if err != nil {
glog.Infof("failed to get root load: %v", err)
}
load["/"] = int(rootLoad)
self.dataLock.Lock() self.dataLock.Lock()
defer self.dataLock.Unlock() defer self.dataLock.Unlock()
self.load = load self.load = load
@ -142,9 +180,52 @@ func (self *SchedReader) allowErrorLogging() bool {
return false return false
} }
func getSelfCgroup() (string, error) {
out, err := ioutil.ReadFile("/proc/self/cgroup")
if err != nil {
return "", fmt.Errorf("failed to read cgroup path for cAdvisor: %v", err)
}
matches := selfCgroupRegExp.FindSubmatch(out)
if len(matches) != 2 {
return "", fmt.Errorf("could not find cpu cgroup path in %q", string(out))
}
return string(matches[1]), nil
}
func getRootLoad() (int64, error) {
loadFile := "/proc/loadavg"
out, err := ioutil.ReadFile(loadFile)
if err != nil {
return -1, fmt.Errorf("failed to get load from %q: %v", loadFile, err)
}
matches := procLoadAvgRegExp.FindSubmatch(out)
if len(matches) != 2 {
return -1, fmt.Errorf("could not find cpu load in %q", string(out))
}
numRunning, err := strconv.ParseInt(string(matches[1]), 10, 64)
if err != nil {
return -1, fmt.Errorf("could not parse number of running processes from %q: %v", matches[1], err)
}
numRunning -= 1
return numRunning, nil
}
// Return parent cgroup name given an absolute cgroup name.
func getParent(c string) string {
parent := path.Dir(c)
if parent == "." {
parent = "/"
}
return parent
}
func New() (*SchedReader, error) { func New() (*SchedReader, error) {
if !utils.FileExists(schedDebugPath) { if !utils.FileExists(schedDebugPath) {
return nil, fmt.Errorf("sched debug file %q not accessible", schedDebugPath) return nil, fmt.Errorf("sched debug file %q not accessible", schedDebugPath)
} }
return &SchedReader{}, nil selfCgroup, err := getSelfCgroup()
if err != nil {
glog.Infof("failed to get cgroup for cadvisor: %v", err)
}
return &SchedReader{selfCgroup: selfCgroup}, nil
} }