Add logic to read custom metric config files from container root.
Docker does not provide the rootfs path through docker inspect or statefile and the path is dependent on the storage driver being used. Instead of enumerating the storage drivers, we pick a pid from the container and get the config from /proc/pid/root. Although a bit expensive, this method works for non-docker containers too.
This commit is contained in:
parent
5853f97295
commit
a123fd72d8
@ -22,6 +22,8 @@ import (
|
||||
"github.com/google/cadvisor/info/v1"
|
||||
)
|
||||
|
||||
const metricLabelPrefix = "io.cadvisor.metric."
|
||||
|
||||
type GenericCollectorManager struct {
|
||||
Collectors []*collectorData
|
||||
NextCollectionTime time.Time
|
||||
@ -40,6 +42,17 @@ func NewCollectorManager() (CollectorManager, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func GetCollectorConfigs(labels map[string]string) map[string]string {
|
||||
configs := map[string]string{}
|
||||
for k, v := range labels {
|
||||
if strings.HasPrefix(k, metricLabelPrefix) {
|
||||
name := strings.TrimPrefix(k, metricLabelPrefix)
|
||||
configs[name] = v
|
||||
}
|
||||
}
|
||||
return configs
|
||||
}
|
||||
|
||||
func (cm *GenericCollectorManager) RegisterCollector(collector Collector) error {
|
||||
cm.Collectors = append(cm.Collectors, &collectorData{
|
||||
collector: collector,
|
||||
|
@ -47,14 +47,9 @@ type collectorInfo struct {
|
||||
}
|
||||
|
||||
//Returns a new collector using the information extracted from the configfile
|
||||
func NewCollector(collectorName string, configfile string) (*GenericCollector, error) {
|
||||
configFile, err := ioutil.ReadFile(configfile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func NewCollector(collectorName string, configFile []byte) (*GenericCollector, error) {
|
||||
var configInJSON Config
|
||||
err = json.Unmarshal(configFile, &configInJSON)
|
||||
err := json.Unmarshal(configFile, &configInJSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -40,7 +40,10 @@ func TestEmptyConfig(t *testing.T) {
|
||||
//Create a temporary config file 'temp.json' with invalid json format
|
||||
assert.NoError(ioutil.WriteFile("temp.json", []byte(emptyConfig), 0777))
|
||||
|
||||
_, err := NewCollector("tempCollector", "temp.json")
|
||||
configFile, err := ioutil.ReadFile("temp.json")
|
||||
assert.NoError(err)
|
||||
|
||||
_, err = NewCollector("tempCollector", configFile)
|
||||
assert.Error(err)
|
||||
|
||||
assert.NoError(os.Remove("temp.json"))
|
||||
@ -67,8 +70,10 @@ func TestConfigWithErrors(t *testing.T) {
|
||||
|
||||
//Create a temporary config file 'temp.json' with invalid json format
|
||||
assert.NoError(ioutil.WriteFile("temp.json", []byte(invalid), 0777))
|
||||
configFile, err := ioutil.ReadFile("temp.json")
|
||||
assert.NoError(err)
|
||||
|
||||
_, err := NewCollector("tempCollector", "temp.json")
|
||||
_, err = NewCollector("tempCollector", configFile)
|
||||
assert.Error(err)
|
||||
|
||||
assert.NoError(os.Remove("temp.json"))
|
||||
@ -103,7 +108,10 @@ func TestConfigWithRegexErrors(t *testing.T) {
|
||||
//Create a temporary config file 'temp.json'
|
||||
assert.NoError(ioutil.WriteFile("temp.json", []byte(invalid), 0777))
|
||||
|
||||
_, err := NewCollector("tempCollector", "temp.json")
|
||||
configFile, err := ioutil.ReadFile("temp.json")
|
||||
assert.NoError(err)
|
||||
|
||||
_, err = NewCollector("tempCollector", configFile)
|
||||
assert.Error(err)
|
||||
|
||||
assert.NoError(os.Remove("temp.json"))
|
||||
@ -113,7 +121,10 @@ func TestConfig(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
//Create an nginx collector using the config file 'sample_config.json'
|
||||
collector, err := NewCollector("nginx", "config/sample_config.json")
|
||||
configFile, err := ioutil.ReadFile("config/sample_config.json")
|
||||
assert.NoError(err)
|
||||
|
||||
collector, err := NewCollector("nginx", configFile)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "nginx")
|
||||
assert.Equal(collector.configFile.Endpoint, "http://localhost:8000/nginx_status")
|
||||
@ -124,7 +135,10 @@ func TestMetricCollection(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
//Collect nginx metrics from a fake nginx endpoint
|
||||
fakeCollector, err := NewCollector("nginx", "config/sample_config.json")
|
||||
configFile, err := ioutil.ReadFile("config/sample_config.json")
|
||||
assert.NoError(err)
|
||||
|
||||
fakeCollector, err := NewCollector("nginx", configFile)
|
||||
assert.NoError(err)
|
||||
|
||||
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -73,6 +73,9 @@ type ContainerHandler interface {
|
||||
// Returns absolute cgroup path for the requested resource.
|
||||
GetCgroupPath(resource string) (string, error)
|
||||
|
||||
// Returns container labels, if available.
|
||||
GetContainerLabels() map[string]string
|
||||
|
||||
// Returns whether the container still exists.
|
||||
Exists() bool
|
||||
}
|
||||
|
@ -332,6 +332,10 @@ func (self *dockerContainerHandler) ListThreads(listType container.ListType) ([]
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (self *dockerContainerHandler) GetContainerLabels() map[string]string {
|
||||
return self.labels
|
||||
}
|
||||
|
||||
func (self *dockerContainerHandler) ListProcesses(listType container.ListType) ([]int, error) {
|
||||
return containerLibcontainer.GetProcesses(self.cgroupManager)
|
||||
}
|
||||
|
@ -95,6 +95,11 @@ func (self *MockContainerHandler) GetCgroupPath(path string) (string, error) {
|
||||
return args.Get(0).(string), args.Error(1)
|
||||
}
|
||||
|
||||
func (self *MockContainerHandler) GetContainerLabels() map[string]string {
|
||||
args := self.Called()
|
||||
return args.Get(0).(map[string]string)
|
||||
}
|
||||
|
||||
type FactoryForMockContainerHandler struct {
|
||||
Name string
|
||||
PrepareContainerHandlerFunc func(name string, handler *MockContainerHandler)
|
||||
|
@ -356,6 +356,10 @@ func (self *rawContainerHandler) GetCgroupPath(resource string) (string, error)
|
||||
return path, nil
|
||||
}
|
||||
|
||||
func (self *rawContainerHandler) GetContainerLabels() map[string]string {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
// Lists all directories under "path" and outputs the results as children of "parent".
|
||||
func listDirectories(dirpath string, parent string, recursive bool, output map[string]struct{}) error {
|
||||
// Ignore if this hierarchy does not exist.
|
||||
|
@ -17,8 +17,10 @@ package manager
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os/exec"
|
||||
"path"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
@ -136,11 +138,32 @@ func (c *containerData) getCgroupPath(cgroups string) (string, error) {
|
||||
return string(matches[1]), nil
|
||||
}
|
||||
|
||||
func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace bool) ([]v2.ProcessInfo, error) {
|
||||
// report all processes for root.
|
||||
isRoot := c.info.Name == "/"
|
||||
// TODO(rjnagal): Take format as an option?
|
||||
format := "user,pid,ppid,stime,pcpu,pmem,rss,vsz,stat,time,comm,cgroup"
|
||||
// Returns contents of a file inside the container root.
|
||||
// Takes in a path relative to container root.
|
||||
func (c *containerData) ReadFile(filepath string, inHostNamespace bool) ([]byte, error) {
|
||||
pids, err := c.getContainerPids(inHostNamespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO(rjnagal): Optimize by just reading container's cgroup.proc file when in host namespace.
|
||||
rootfs := "/"
|
||||
if !inHostNamespace {
|
||||
rootfs = "/rootfs"
|
||||
}
|
||||
for _, pid := range pids {
|
||||
filePath := path.Join(rootfs, "/proc", pid, "/root", filepath)
|
||||
glog.V(3).Infof("Trying path %q", filePath)
|
||||
data, err := ioutil.ReadFile(filePath)
|
||||
if err == nil {
|
||||
return data, err
|
||||
}
|
||||
}
|
||||
// No process paths could be found. Declare config non-existent.
|
||||
return nil, fmt.Errorf("file %q does not exist.", filepath)
|
||||
}
|
||||
|
||||
// Return output for ps command in host /proc with specified format
|
||||
func (c *containerData) getPsOutput(inHostNamespace bool, format string) ([]byte, error) {
|
||||
args := []string{}
|
||||
command := "ps"
|
||||
if !inHostNamespace {
|
||||
@ -148,11 +171,53 @@ func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace
|
||||
args = append(args, "/rootfs", "ps")
|
||||
}
|
||||
args = append(args, "-e", "-o", format)
|
||||
expectedFields := 12
|
||||
out, err := exec.Command(command, args...).Output()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to execute %q command: %v", command, err)
|
||||
}
|
||||
return out, err
|
||||
}
|
||||
|
||||
// Get pids of processes in this container.
|
||||
// A slightly lighterweight call than GetProcessList if other details are not required.
|
||||
func (c *containerData) getContainerPids(inHostNamespace bool) ([]string, error) {
|
||||
format := "pid,cgroup"
|
||||
out, err := c.getPsOutput(inHostNamespace, format)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
expectedFields := 2
|
||||
lines := strings.Split(string(out), "\n")
|
||||
pids := []string{}
|
||||
for _, line := range lines[1:] {
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) < expectedFields {
|
||||
return nil, fmt.Errorf("expected at least %d fields, found %d: output: %q", expectedFields, len(fields), line)
|
||||
}
|
||||
pid := fields[0]
|
||||
cgroup, err := c.getCgroupPath(fields[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse cgroup path from %q: %v", fields[1], err)
|
||||
}
|
||||
if c.info.Name == cgroup {
|
||||
pids = append(pids, pid)
|
||||
}
|
||||
}
|
||||
return pids, nil
|
||||
}
|
||||
|
||||
func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace bool) ([]v2.ProcessInfo, error) {
|
||||
// report all processes for root.
|
||||
isRoot := c.info.Name == "/"
|
||||
format := "user,pid,ppid,stime,pcpu,pmem,rss,vsz,stat,time,comm,cgroup"
|
||||
out, err := c.getPsOutput(inHostNamespace, format)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
expectedFields := 12
|
||||
processes := []v2.ProcessInfo{}
|
||||
lines := strings.Split(string(out), "\n")
|
||||
for _, line := range lines[1:] {
|
||||
@ -189,7 +254,7 @@ func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace
|
||||
}
|
||||
cgroup, err := c.getCgroupPath(fields[11])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse cgroup path from %q: %v", fields[10], err)
|
||||
return nil, fmt.Errorf("could not parse cgroup path from %q: %v", fields[11], err)
|
||||
}
|
||||
// Remove the ps command we just ran from cadvisor container.
|
||||
// Not necessary, but makes the cadvisor page look cleaner.
|
||||
|
@ -689,6 +689,28 @@ func (m *manager) GetProcessList(containerName string, options v2.RequestOptions
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *containerData) error {
|
||||
for k, v := range collectorConfigs {
|
||||
configFile, err := cont.ReadFile(v, m.inHostNamespace)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read config file %q for config %q, container %q: %v", k, v, cont.info.Name, err)
|
||||
}
|
||||
glog.V(3).Infof("Got config from %q: %q", v, configFile)
|
||||
|
||||
newCollector, err := collector.NewCollector(k, configFile)
|
||||
if err != nil {
|
||||
glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
|
||||
return err
|
||||
}
|
||||
err = cont.collectorManager.RegisterCollector(newCollector)
|
||||
if err != nil {
|
||||
glog.Infof("failed to register collector for container %q, config %q: %v", cont.info.Name, k, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create a container.
|
||||
func (m *manager) createContainer(containerName string) error {
|
||||
handler, accept, err := container.NewContainerHandler(containerName)
|
||||
@ -700,17 +722,26 @@ func (m *manager) createContainer(containerName string) error {
|
||||
glog.V(4).Infof("ignoring container %q", containerName)
|
||||
return nil
|
||||
}
|
||||
// TODO(vmarmol): Register collectors.
|
||||
collectorManager, err := collector.NewCollectorManager()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
|
||||
cont, err := newContainerData(containerName, m.memoryCache, handler, m.loadReader, logUsage, collectorManager)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add collectors
|
||||
labels := handler.GetContainerLabels()
|
||||
collectorConfigs := collector.GetCollectorConfigs(labels)
|
||||
err = m.registerCollectors(collectorConfigs, cont)
|
||||
if err != nil {
|
||||
glog.Infof("failed to register collectors for %q: %v", containerName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Add to the containers map.
|
||||
alreadyExists := func() bool {
|
||||
m.containersLock.Lock()
|
||||
|
Loading…
Reference in New Issue
Block a user