Merge pull request #838 from rjnagal/docker
Add custom metrics to spec.
This commit is contained in:
commit
ef41402a39
@ -372,13 +372,21 @@ func (self *version2_0) HandleRequest(requestType string, request []string, m ma
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
contMetrics := make(map[string][][]info.Metric, 0)
|
||||
metrics := [][]info.Metric{}
|
||||
specs, err := m.GetContainerSpec(containerName, opt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
contMetrics := make(map[string]map[string][]info.MetricVal, 0)
|
||||
for _, cont := range conts {
|
||||
metrics := map[string][]info.MetricVal{}
|
||||
contStats := convertStats(cont)
|
||||
spec := specs[cont.Name]
|
||||
for _, contStat := range contStats {
|
||||
metric := contStat.CustomMetrics
|
||||
metrics = append(metrics, metric)
|
||||
for _, ms := range spec.CustomMetrics {
|
||||
if contStat.HasCustomMetrics && !contStat.CustomMetrics[ms.Name].Timestamp.IsZero() {
|
||||
metrics[ms.Name] = append(metrics[ms.Name], contStat.CustomMetrics[ms.Name])
|
||||
}
|
||||
}
|
||||
}
|
||||
contMetrics[containerName] = metrics
|
||||
}
|
||||
|
@ -61,20 +61,29 @@ func (cm *GenericCollectorManager) RegisterCollector(collector Collector) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *GenericCollectorManager) Collect() (time.Time, []v1.Metric, error) {
|
||||
func (cm *GenericCollectorManager) GetSpec() ([]v1.MetricSpec, error) {
|
||||
metricSpec := []v1.MetricSpec{}
|
||||
for _, c := range cm.Collectors {
|
||||
specs := c.collector.GetSpec()
|
||||
metricSpec = append(metricSpec, specs...)
|
||||
}
|
||||
|
||||
return metricSpec, nil
|
||||
}
|
||||
|
||||
func (cm *GenericCollectorManager) Collect() (time.Time, map[string]v1.MetricVal, error) {
|
||||
var errors []error
|
||||
|
||||
// Collect from all collectors that are ready.
|
||||
var next time.Time
|
||||
var metrics []v1.Metric
|
||||
metrics := map[string]v1.MetricVal{}
|
||||
for _, c := range cm.Collectors {
|
||||
if c.nextCollectionTime.Before(time.Now()) {
|
||||
nextCollection, newMetrics, err := c.collector.Collect()
|
||||
var err error
|
||||
c.nextCollectionTime, metrics, err = c.collector.Collect(metrics)
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
metrics = append(metrics, newMetrics...)
|
||||
c.nextCollectionTime = nextCollection
|
||||
}
|
||||
|
||||
// Keep track of the next collector that will be ready.
|
||||
|
@ -28,15 +28,19 @@ type fakeCollector struct {
|
||||
collectedFrom int
|
||||
}
|
||||
|
||||
func (fc *fakeCollector) Collect() (time.Time, []v1.Metric, error) {
|
||||
func (fc *fakeCollector) Collect(metric map[string]v1.MetricVal) (time.Time, map[string]v1.MetricVal, error) {
|
||||
fc.collectedFrom++
|
||||
return fc.nextCollectionTime, []v1.Metric{}, fc.err
|
||||
return fc.nextCollectionTime, metric, fc.err
|
||||
}
|
||||
|
||||
func (fc *fakeCollector) Name() string {
|
||||
return "fake-collector"
|
||||
}
|
||||
|
||||
func (fc *fakeCollector) GetSpec() []v1.MetricSpec {
|
||||
return []v1.MetricSpec{}
|
||||
}
|
||||
|
||||
func TestCollect(t *testing.T) {
|
||||
cm := &GenericCollectorManager{}
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"github.com/google/cadvisor/info/v1"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -32,7 +33,7 @@ type MetricConfig struct {
|
||||
Name string `json:"name"`
|
||||
|
||||
//enum type for the metric type
|
||||
MetricType MetricType `json:"metric_type"`
|
||||
MetricType v1.MetricType `json:"metric_type"`
|
||||
|
||||
//data type of the metric (eg: integer, string)
|
||||
Units string `json:"units"`
|
||||
@ -43,11 +44,3 @@ type MetricConfig struct {
|
||||
//the regular expression that can be used to extract the metric
|
||||
Regex string `json:"regex"`
|
||||
}
|
||||
|
||||
// MetricType is an enum type that lists the possible type of the metric
|
||||
type MetricType string
|
||||
|
||||
const (
|
||||
Counter MetricType = "counter"
|
||||
Gauge MetricType = "gauge"
|
||||
)
|
||||
|
@ -27,7 +27,11 @@ func (fkm *FakeCollectorManager) RegisterCollector(collector Collector) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fkm *FakeCollectorManager) Collect() (time.Time, []v1.Metric, error) {
|
||||
var zero time.Time
|
||||
return zero, []v1.Metric{}, nil
|
||||
func (fkm *FakeCollectorManager) GetSpec() ([]v1.MetricSpec, error) {
|
||||
return []v1.MetricSpec{}, nil
|
||||
}
|
||||
|
||||
func (fkm *FakeCollectorManager) Collect(metric map[string]v1.MetricVal) (time.Time, map[string]v1.MetricVal, error) {
|
||||
var zero time.Time
|
||||
return zero, metric, nil
|
||||
}
|
||||
|
@ -97,8 +97,24 @@ func (collector *GenericCollector) Name() string {
|
||||
return collector.name
|
||||
}
|
||||
|
||||
func (collector *GenericCollector) configToSpec(config MetricConfig) v1.MetricSpec {
|
||||
return v1.MetricSpec{
|
||||
Name: config.Name,
|
||||
Type: config.MetricType,
|
||||
}
|
||||
}
|
||||
|
||||
func (collector *GenericCollector) GetSpec() []v1.MetricSpec {
|
||||
specs := []v1.MetricSpec{}
|
||||
for _, metricConfig := range collector.configFile.MetricsConfig {
|
||||
spec := collector.configToSpec(metricConfig)
|
||||
specs = append(specs, spec)
|
||||
}
|
||||
return specs
|
||||
}
|
||||
|
||||
//Returns collected metrics and the next collection time of the collector
|
||||
func (collector *GenericCollector) Collect() (time.Time, []v1.Metric, error) {
|
||||
func (collector *GenericCollector) Collect(metrics map[string]v1.MetricVal) (time.Time, map[string]v1.MetricVal, error) {
|
||||
currentTime := time.Now()
|
||||
nextCollectionTime := currentTime.Add(time.Duration(collector.info.minPollingFrequency))
|
||||
|
||||
@ -115,9 +131,7 @@ func (collector *GenericCollector) Collect() (time.Time, []v1.Metric, error) {
|
||||
return nextCollectionTime, nil, err
|
||||
}
|
||||
|
||||
metrics := make([]v1.Metric, len(collector.configFile.MetricsConfig))
|
||||
var errorSlice []error
|
||||
|
||||
for ind, metricConfig := range collector.configFile.MetricsConfig {
|
||||
matchString := collector.info.regexps[ind].FindStringSubmatch(string(pageContent))
|
||||
if matchString != nil {
|
||||
@ -126,16 +140,16 @@ func (collector *GenericCollector) Collect() (time.Time, []v1.Metric, error) {
|
||||
if err != nil {
|
||||
errorSlice = append(errorSlice, err)
|
||||
}
|
||||
metrics[ind].FloatPoints = []v1.FloatPoint{
|
||||
{Value: regVal, Timestamp: currentTime},
|
||||
metrics[metricConfig.Name] = v1.MetricVal{
|
||||
FloatValue: regVal, Timestamp: currentTime,
|
||||
}
|
||||
} else if metricConfig.Units == "integer" || metricConfig.Units == "int" {
|
||||
regVal, err := strconv.ParseInt(strings.TrimSpace(matchString[1]), 10, 64)
|
||||
if err != nil {
|
||||
errorSlice = append(errorSlice, err)
|
||||
}
|
||||
metrics[ind].IntPoints = []v1.IntPoint{
|
||||
{Value: regVal, Timestamp: currentTime},
|
||||
metrics[metricConfig.Name] = v1.MetricVal{
|
||||
IntValue: regVal, Timestamp: currentTime,
|
||||
}
|
||||
|
||||
} else {
|
||||
@ -144,14 +158,6 @@ func (collector *GenericCollector) Collect() (time.Time, []v1.Metric, error) {
|
||||
} else {
|
||||
errorSlice = append(errorSlice, fmt.Errorf("No match found for regexp: %v for metric '%v' in config", metricConfig.Regex, metricConfig.Name))
|
||||
}
|
||||
|
||||
metrics[ind].Name = metricConfig.Name
|
||||
if metricConfig.MetricType == "gauge" {
|
||||
metrics[ind].Type = v1.MetricGauge
|
||||
} else if metricConfig.MetricType == "counter" {
|
||||
metrics[ind].Type = v1.MetricCumulative
|
||||
}
|
||||
}
|
||||
|
||||
return nextCollectionTime, metrics, compileErrors(errorSlice)
|
||||
}
|
||||
|
@ -148,15 +148,20 @@ func TestMetricCollection(t *testing.T) {
|
||||
defer tempServer.Close()
|
||||
fakeCollector.configFile.Endpoint = tempServer.URL
|
||||
|
||||
_, metrics, errMetric := fakeCollector.Collect()
|
||||
metrics := map[string]v1.MetricVal{}
|
||||
_, metrics, errMetric := fakeCollector.Collect(metrics)
|
||||
assert.NoError(errMetric)
|
||||
assert.Equal(metrics[0].Name, "activeConnections")
|
||||
assert.Equal(metrics[0].Type, v1.MetricGauge)
|
||||
assert.Nil(metrics[0].FloatPoints)
|
||||
assert.Equal(metrics[1].Name, "reading")
|
||||
assert.Equal(metrics[2].Name, "writing")
|
||||
assert.Equal(metrics[3].Name, "waiting")
|
||||
|
||||
//Assert: Number of active connections = Number of connections reading + Number of connections writing + Number of connections waiting
|
||||
assert.Equal(metrics[0].IntPoints[0].Value, (metrics[1].IntPoints[0].Value)+(metrics[2].IntPoints[0].Value)+(metrics[3].IntPoints[0].Value))
|
||||
metricNames := []string{"activeConnections", "reading", "writing", "waiting"}
|
||||
// activeConnections = 3
|
||||
assert.Equal(metrics[metricNames[0]].IntValue, 3)
|
||||
assert.Equal(metrics[metricNames[0]].FloatValue, 0)
|
||||
// reading = 0
|
||||
assert.Equal(metrics[metricNames[1]].IntValue, 0)
|
||||
assert.Equal(metrics[metricNames[1]].FloatValue, 0)
|
||||
// writing = 1
|
||||
assert.Equal(metrics[metricNames[2]].IntValue, 1)
|
||||
assert.Equal(metrics[metricNames[2]].FloatValue, 0)
|
||||
// waiting = 2
|
||||
assert.Equal(metrics[metricNames[3]].IntValue, 2)
|
||||
assert.Equal(metrics[metricNames[3]].FloatValue, 0)
|
||||
}
|
||||
|
@ -27,7 +27,10 @@ type Collector interface {
|
||||
// Returns the next time this collector should be collected from.
|
||||
// Next collection time is always returned, even when an error occurs.
|
||||
// A collection time of zero means no more collection.
|
||||
Collect() (time.Time, []v1.Metric, error)
|
||||
Collect(map[string]v1.MetricVal) (time.Time, map[string]v1.MetricVal, error)
|
||||
|
||||
// Return spec for all metrics associated with this collector
|
||||
GetSpec() []v1.MetricSpec
|
||||
|
||||
// Name of this collector.
|
||||
Name() string
|
||||
@ -42,5 +45,8 @@ type CollectorManager interface {
|
||||
// at which a collector will be ready to collect from.
|
||||
// Next collection time is always returned, even when an error occurs.
|
||||
// A collection time of zero means no more collection.
|
||||
Collect() (time.Time, []v1.Metric, error)
|
||||
Collect() (time.Time, map[string]v1.MetricVal, error)
|
||||
|
||||
// Get metric spec from all registered collectors.
|
||||
GetSpec() ([]v1.MetricSpec, error)
|
||||
}
|
||||
|
@ -59,7 +59,8 @@ type ContainerSpec struct {
|
||||
// HasDiskIo when true, indicates that DiskIo stats will be available.
|
||||
HasDiskIo bool `json:"has_diskio"`
|
||||
|
||||
HasCustomMetrics bool `json:"has_custom_metrics"`
|
||||
HasCustomMetrics bool `json:"has_custom_metrics"`
|
||||
CustomMetrics []MetricSpec `json:"custom_metrics,omitempty"`
|
||||
}
|
||||
|
||||
// Container reference contains enough information to uniquely identify a container
|
||||
@ -426,7 +427,7 @@ type ContainerStats struct {
|
||||
TaskStats LoadStats `json:"task_stats,omitempty"`
|
||||
|
||||
//Custom metrics from all collectors
|
||||
CustomMetrics []Metric `json:"custom_metrics,omitempty"`
|
||||
CustomMetrics map[string]MetricVal `json:"custom_metrics,omitempty"`
|
||||
}
|
||||
|
||||
func timeEq(t1, t2 time.Time, tolerance time.Duration) bool {
|
||||
|
@ -32,38 +32,21 @@ const (
|
||||
MetricDelta = "delta"
|
||||
)
|
||||
|
||||
// An exported metric.
|
||||
type Metric struct {
|
||||
// Spec for custom metric.
|
||||
type MetricSpec struct {
|
||||
// The name of the metric.
|
||||
Name string `json:"name"`
|
||||
|
||||
// Type of the metric.
|
||||
Type MetricType `json:"type"`
|
||||
|
||||
// Metadata associated with this metric.
|
||||
Labels map[string]string
|
||||
|
||||
// Value of the metric. Only one of these values will be
|
||||
// available according to the output type of the metric.
|
||||
// If no values are available, there are no data points.
|
||||
IntPoints []IntPoint `json:"int_points,omitempty"`
|
||||
FloatPoints []FloatPoint `json:"float_points,omitempty"`
|
||||
}
|
||||
|
||||
// An integer metric data point.
|
||||
type IntPoint struct {
|
||||
// An exported metric.
|
||||
type MetricVal struct {
|
||||
// Time at which the metric was queried
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
|
||||
// The value of the metric at this point.
|
||||
Value int64 `json:"value"`
|
||||
}
|
||||
|
||||
// A float metric data point.
|
||||
type FloatPoint struct {
|
||||
// Time at which the metric was queried
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
|
||||
// The value of the metric at this point.
|
||||
Value float64 `json:"value"`
|
||||
IntValue int64 `json:"int_value,omitempty"`
|
||||
FloatValue float64 `json:"float_value,omitempty"`
|
||||
}
|
||||
|
@ -73,7 +73,8 @@ type ContainerSpec struct {
|
||||
HasMemory bool `json:"has_memory"`
|
||||
Memory MemorySpec `json:"memory,omitempty"`
|
||||
|
||||
HasCustomMetrics bool `json:"has_custom_metrics"`
|
||||
HasCustomMetrics bool `json:"has_custom_metrics"`
|
||||
CustomMetrics []v1.MetricSpec `json:"custom_metrics,omitempty"`
|
||||
|
||||
// Following resources have no associated spec, but are being isolated.
|
||||
HasNetwork bool `json:"has_network"`
|
||||
@ -102,9 +103,9 @@ type ContainerStats struct {
|
||||
// Task load statistics
|
||||
HasLoad bool `json:"has_load"`
|
||||
Load v1.LoadStats `json:"load_stats,omitempty"`
|
||||
//Custom statistics
|
||||
HasCustomMetrics bool `json:"has_custom_metrics"`
|
||||
CustomMetrics []v1.Metric `json:"custom_metrics,omitempty"`
|
||||
// Custom Metrics
|
||||
HasCustomMetrics bool `json:"has_custom_metrics"`
|
||||
CustomMetrics map[string]v1.MetricVal `json:"custom_metrics,omitempty"`
|
||||
}
|
||||
|
||||
type Percentiles struct {
|
||||
|
@ -439,6 +439,12 @@ func (c *containerData) updateSpec() error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
customMetrics, err := c.collectorManager.GetSpec()
|
||||
if len(customMetrics) > 0 {
|
||||
spec.HasCustomMetrics = true
|
||||
spec.CustomMetrics = customMetrics
|
||||
}
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
c.info.Spec = spec
|
||||
@ -523,7 +529,7 @@ func (c *containerData) updateStats() error {
|
||||
return customStatsErr
|
||||
}
|
||||
|
||||
func (c *containerData) updateCustomStats() ([]info.Metric, error) {
|
||||
func (c *containerData) updateCustomStats() (map[string]info.MetricVal, error) {
|
||||
_, customStats, customStatsErr := c.collectorManager.Collect()
|
||||
if customStatsErr != nil {
|
||||
if !c.handler.Exists() {
|
||||
|
@ -375,12 +375,13 @@ func (self *manager) GetContainerSpec(containerName string, options v2.RequestOp
|
||||
func (self *manager) getV2Spec(cinfo *containerInfo) v2.ContainerSpec {
|
||||
specV1 := self.getAdjustedSpec(cinfo)
|
||||
specV2 := v2.ContainerSpec{
|
||||
CreationTime: specV1.CreationTime,
|
||||
HasCpu: specV1.HasCpu,
|
||||
HasMemory: specV1.HasMemory,
|
||||
HasFilesystem: specV1.HasFilesystem,
|
||||
HasNetwork: specV1.HasNetwork,
|
||||
HasDiskIo: specV1.HasDiskIo,
|
||||
CreationTime: specV1.CreationTime,
|
||||
HasCpu: specV1.HasCpu,
|
||||
HasMemory: specV1.HasMemory,
|
||||
HasFilesystem: specV1.HasFilesystem,
|
||||
HasNetwork: specV1.HasNetwork,
|
||||
HasDiskIo: specV1.HasDiskIo,
|
||||
HasCustomMetrics: specV1.HasCustomMetrics,
|
||||
}
|
||||
if specV1.HasCpu {
|
||||
specV2.Cpu.Limit = specV1.Cpu.Limit
|
||||
@ -392,6 +393,9 @@ func (self *manager) getV2Spec(cinfo *containerInfo) v2.ContainerSpec {
|
||||
specV2.Memory.Reservation = specV1.Memory.Reservation
|
||||
specV2.Memory.SwapLimit = specV1.Memory.SwapLimit
|
||||
}
|
||||
if specV1.HasCustomMetrics {
|
||||
specV2.CustomMetrics = specV1.CustomMetrics
|
||||
}
|
||||
specV2.Aliases = cinfo.Aliases
|
||||
specV2.Namespace = cinfo.Namespace
|
||||
return specV2
|
||||
|
Loading…
Reference in New Issue
Block a user