Merge pull request #1103 from mwielgus/cm-limit
Add a flag to control the number of custom metrics scraped by collectors
This commit is contained in:
commit
6639697283
@ -2,7 +2,8 @@
|
|||||||
"endpoint" : "http://localhost:8080/metrics",
|
"endpoint" : "http://localhost:8080/metrics",
|
||||||
"polling_frequency" : 10,
|
"polling_frequency" : 10,
|
||||||
"metrics_config" : [
|
"metrics_config" : [
|
||||||
"go_goroutines"
|
"go_goroutines",
|
||||||
|
"qps"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,10 +44,14 @@ type collectorInfo struct {
|
|||||||
|
|
||||||
//regular expresssions for all metrics
|
//regular expresssions for all metrics
|
||||||
regexps []*regexp.Regexp
|
regexps []*regexp.Regexp
|
||||||
|
|
||||||
|
// Limit for the number of srcaped metrics. If the count is higher,
|
||||||
|
// no metrics will be returned.
|
||||||
|
metricCountLimit int
|
||||||
}
|
}
|
||||||
|
|
||||||
//Returns a new collector using the information extracted from the configfile
|
//Returns a new collector using the information extracted from the configfile
|
||||||
func NewCollector(collectorName string, configFile []byte) (*GenericCollector, error) {
|
func NewCollector(collectorName string, configFile []byte, metricCountLimit int) (*GenericCollector, error) {
|
||||||
var configInJSON Config
|
var configInJSON Config
|
||||||
err := json.Unmarshal(configFile, &configInJSON)
|
err := json.Unmarshal(configFile, &configInJSON)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -83,12 +87,18 @@ func NewCollector(collectorName string, configFile []byte) (*GenericCollector, e
|
|||||||
minPollFrequency = minSupportedFrequency
|
minPollFrequency = minSupportedFrequency
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(configInJSON.MetricsConfig) > metricCountLimit {
|
||||||
|
return nil, fmt.Errorf("Too many metrics defined: %d limit: %d", len(configInJSON.MetricsConfig), metricCountLimit)
|
||||||
|
}
|
||||||
|
|
||||||
return &GenericCollector{
|
return &GenericCollector{
|
||||||
name: collectorName,
|
name: collectorName,
|
||||||
configFile: configInJSON,
|
configFile: configInJSON,
|
||||||
info: &collectorInfo{
|
info: &collectorInfo{
|
||||||
minPollingFrequency: minPollFrequency,
|
minPollingFrequency: minPollFrequency,
|
||||||
regexps: regexprs},
|
regexps: regexprs,
|
||||||
|
metricCountLimit: metricCountLimit,
|
||||||
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -134,6 +144,7 @@ func (collector *GenericCollector) Collect(metrics map[string][]v1.MetricVal) (t
|
|||||||
}
|
}
|
||||||
|
|
||||||
var errorSlice []error
|
var errorSlice []error
|
||||||
|
|
||||||
for ind, metricConfig := range collector.configFile.MetricsConfig {
|
for ind, metricConfig := range collector.configFile.MetricsConfig {
|
||||||
matchString := collector.info.regexps[ind].FindStringSubmatch(string(pageContent))
|
matchString := collector.info.regexps[ind].FindStringSubmatch(string(pageContent))
|
||||||
if matchString != nil {
|
if matchString != nil {
|
||||||
|
@ -44,7 +44,7 @@ func TestEmptyConfig(t *testing.T) {
|
|||||||
configFile, err := ioutil.ReadFile("temp.json")
|
configFile, err := ioutil.ReadFile("temp.json")
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
_, err = NewCollector("tempCollector", configFile)
|
_, err = NewCollector("tempCollector", configFile, 100)
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
|
|
||||||
assert.NoError(os.Remove("temp.json"))
|
assert.NoError(os.Remove("temp.json"))
|
||||||
@ -74,7 +74,7 @@ func TestConfigWithErrors(t *testing.T) {
|
|||||||
configFile, err := ioutil.ReadFile("temp.json")
|
configFile, err := ioutil.ReadFile("temp.json")
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
_, err = NewCollector("tempCollector", configFile)
|
_, err = NewCollector("tempCollector", configFile, 100)
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
|
|
||||||
assert.NoError(os.Remove("temp.json"))
|
assert.NoError(os.Remove("temp.json"))
|
||||||
@ -112,7 +112,7 @@ func TestConfigWithRegexErrors(t *testing.T) {
|
|||||||
configFile, err := ioutil.ReadFile("temp.json")
|
configFile, err := ioutil.ReadFile("temp.json")
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
_, err = NewCollector("tempCollector", configFile)
|
_, err = NewCollector("tempCollector", configFile, 100)
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
|
|
||||||
assert.NoError(os.Remove("temp.json"))
|
assert.NoError(os.Remove("temp.json"))
|
||||||
@ -125,7 +125,7 @@ func TestConfig(t *testing.T) {
|
|||||||
configFile, err := ioutil.ReadFile("config/sample_config.json")
|
configFile, err := ioutil.ReadFile("config/sample_config.json")
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
collector, err := NewCollector("nginx", configFile)
|
collector, err := NewCollector("nginx", configFile, 100)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
assert.Equal(collector.name, "nginx")
|
assert.Equal(collector.name, "nginx")
|
||||||
assert.Equal(collector.configFile.Endpoint, "http://localhost:8000/nginx_status")
|
assert.Equal(collector.configFile.Endpoint, "http://localhost:8000/nginx_status")
|
||||||
@ -139,7 +139,7 @@ func TestMetricCollection(t *testing.T) {
|
|||||||
configFile, err := ioutil.ReadFile("config/sample_config.json")
|
configFile, err := ioutil.ReadFile("config/sample_config.json")
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
fakeCollector, err := NewCollector("nginx", configFile)
|
fakeCollector, err := NewCollector("nginx", configFile, 100)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
|
|
||||||
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -166,3 +166,14 @@ func TestMetricCollection(t *testing.T) {
|
|||||||
assert.Equal(metrics[metricNames[3]][0].IntValue, 2)
|
assert.Equal(metrics[metricNames[3]][0].IntValue, 2)
|
||||||
assert.Equal(metrics[metricNames[3]][0].FloatValue, 0)
|
assert.Equal(metrics[metricNames[3]][0].FloatValue, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMetricCollectionLimit(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
//Collect nginx metrics from a fake nginx endpoint
|
||||||
|
configFile, err := ioutil.ReadFile("config/sample_config.json")
|
||||||
|
assert.NoError(err)
|
||||||
|
|
||||||
|
_, err = NewCollector("nginx", configFile, 1)
|
||||||
|
assert.Error(err)
|
||||||
|
}
|
||||||
|
@ -16,6 +16,7 @@ package collector
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -38,10 +39,14 @@ type PrometheusCollector struct {
|
|||||||
|
|
||||||
// the metrics to gather (uses a map as a set)
|
// the metrics to gather (uses a map as a set)
|
||||||
metricsSet map[string]bool
|
metricsSet map[string]bool
|
||||||
|
|
||||||
|
// Limit for the number of scaped metrics. If the count is higher,
|
||||||
|
// no metrics will be returned.
|
||||||
|
metricCountLimit int
|
||||||
}
|
}
|
||||||
|
|
||||||
//Returns a new collector using the information extracted from the configfile
|
//Returns a new collector using the information extracted from the configfile
|
||||||
func NewPrometheusCollector(collectorName string, configFile []byte) (*PrometheusCollector, error) {
|
func NewPrometheusCollector(collectorName string, configFile []byte, metricCountLimit int) (*PrometheusCollector, error) {
|
||||||
var configInJSON Prometheus
|
var configInJSON Prometheus
|
||||||
err := json.Unmarshal(configFile, &configInJSON)
|
err := json.Unmarshal(configFile, &configInJSON)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -57,6 +62,10 @@ func NewPrometheusCollector(collectorName string, configFile []byte) (*Prometheu
|
|||||||
minPollingFrequency = minSupportedFrequency
|
minPollingFrequency = minSupportedFrequency
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if metricCountLimit < 0 {
|
||||||
|
return nil, fmt.Errorf("Metric count limit must be greater than 0")
|
||||||
|
}
|
||||||
|
|
||||||
var metricsSet map[string]bool
|
var metricsSet map[string]bool
|
||||||
if len(configInJSON.MetricsConfig) > 0 {
|
if len(configInJSON.MetricsConfig) > 0 {
|
||||||
metricsSet = make(map[string]bool, len(configInJSON.MetricsConfig))
|
metricsSet = make(map[string]bool, len(configInJSON.MetricsConfig))
|
||||||
@ -65,12 +74,17 @@ func NewPrometheusCollector(collectorName string, configFile []byte) (*Prometheu
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(configInJSON.MetricsConfig) > metricCountLimit {
|
||||||
|
return nil, fmt.Errorf("Too many metrics defined: %d limit %d", len(configInJSON.MetricsConfig), metricCountLimit)
|
||||||
|
}
|
||||||
|
|
||||||
//TODO : Add checks for validity of config file (eg : Accurate JSON fields)
|
//TODO : Add checks for validity of config file (eg : Accurate JSON fields)
|
||||||
return &PrometheusCollector{
|
return &PrometheusCollector{
|
||||||
name: collectorName,
|
name: collectorName,
|
||||||
pollingFrequency: minPollingFrequency,
|
pollingFrequency: minPollingFrequency,
|
||||||
configFile: configInJSON,
|
configFile: configInJSON,
|
||||||
metricsSet: metricsSet,
|
metricsSet: metricsSet,
|
||||||
|
metricCountLimit: metricCountLimit,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,6 +162,8 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal)
|
|||||||
var errorSlice []error
|
var errorSlice []error
|
||||||
lines := strings.Split(string(pageContent), "\n")
|
lines := strings.Split(string(pageContent), "\n")
|
||||||
|
|
||||||
|
newMetrics := make(map[string][]v1.MetricVal)
|
||||||
|
|
||||||
for _, line := range lines {
|
for _, line := range lines {
|
||||||
if line == "" {
|
if line == "" {
|
||||||
break
|
break
|
||||||
@ -182,8 +198,15 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal)
|
|||||||
FloatValue: metVal,
|
FloatValue: metVal,
|
||||||
Timestamp: currentTime,
|
Timestamp: currentTime,
|
||||||
}
|
}
|
||||||
metrics[metName] = append(metrics[metName], metric)
|
newMetrics[metName] = append(newMetrics[metName], metric)
|
||||||
|
if len(newMetrics) > collector.metricCountLimit {
|
||||||
|
return nextCollectionTime, nil, fmt.Errorf("too many metrics to collect")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
for key, val := range newMetrics {
|
||||||
|
metrics[key] = append(metrics[key], val...)
|
||||||
|
}
|
||||||
|
|
||||||
return nextCollectionTime, metrics, compileErrors(errorSlice)
|
return nextCollectionTime, metrics, compileErrors(errorSlice)
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ func TestPrometheus(t *testing.T) {
|
|||||||
|
|
||||||
//Create a prometheus collector using the config file 'sample_config_prometheus.json'
|
//Create a prometheus collector using the config file 'sample_config_prometheus.json'
|
||||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json")
|
configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json")
|
||||||
collector, err := NewPrometheusCollector("Prometheus", configFile)
|
collector, err := NewPrometheusCollector("Prometheus", configFile, 100)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
assert.Equal(collector.name, "Prometheus")
|
assert.Equal(collector.name, "Prometheus")
|
||||||
assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics")
|
assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics")
|
||||||
@ -64,12 +64,40 @@ func TestPrometheus(t *testing.T) {
|
|||||||
assert.Equal(goRoutines[0].FloatValue, 16)
|
assert.Equal(goRoutines[0].FloatValue, 16)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPrometheusMetricCountLimit(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
//Create a prometheus collector using the config file 'sample_config_prometheus.json'
|
||||||
|
configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json")
|
||||||
|
collector, err := NewPrometheusCollector("Prometheus", configFile, 10)
|
||||||
|
assert.NoError(err)
|
||||||
|
assert.Equal(collector.name, "Prometheus")
|
||||||
|
assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics")
|
||||||
|
|
||||||
|
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
for i := 0; i < 30; i++ {
|
||||||
|
fmt.Fprintf(w, "# HELP m%d Number of goroutines that currently exist.\n", i)
|
||||||
|
fmt.Fprintf(w, "# TYPE m%d gauge\n", i)
|
||||||
|
fmt.Fprintf(w, "m%d %d", i, i)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer tempServer.Close()
|
||||||
|
|
||||||
|
collector.configFile.Endpoint = tempServer.URL
|
||||||
|
metrics := map[string][]v1.MetricVal{}
|
||||||
|
_, result, errMetric := collector.Collect(metrics)
|
||||||
|
|
||||||
|
assert.Error(errMetric)
|
||||||
|
assert.Equal(len(metrics), 0)
|
||||||
|
assert.Nil(result)
|
||||||
|
}
|
||||||
|
|
||||||
func TestPrometheusFiltersMetrics(t *testing.T) {
|
func TestPrometheusFiltersMetrics(t *testing.T) {
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
|
||||||
//Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json'
|
//Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json'
|
||||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json")
|
configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json")
|
||||||
collector, err := NewPrometheusCollector("Prometheus", configFile)
|
collector, err := NewPrometheusCollector("Prometheus", configFile, 100)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
assert.Equal(collector.name, "Prometheus")
|
assert.Equal(collector.name, "Prometheus")
|
||||||
assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics")
|
assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics")
|
||||||
@ -98,3 +126,12 @@ func TestPrometheusFiltersMetrics(t *testing.T) {
|
|||||||
goRoutines := metrics["go_goroutines"]
|
goRoutines := metrics["go_goroutines"]
|
||||||
assert.Equal(goRoutines[0].FloatValue, 16)
|
assert.Equal(goRoutines[0].FloatValue, 16)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPrometheusFiltersMetricsCountLimit(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
//Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json'
|
||||||
|
configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json")
|
||||||
|
_, err = NewPrometheusCollector("Prometheus", configFile, 1)
|
||||||
|
assert.Error(err)
|
||||||
|
}
|
||||||
|
@ -48,6 +48,7 @@ var logCadvisorUsage = flag.Bool("log_cadvisor_usage", false, "Whether to log th
|
|||||||
var enableLoadReader = flag.Bool("enable_load_reader", false, "Whether to enable cpu load reader")
|
var enableLoadReader = flag.Bool("enable_load_reader", false, "Whether to enable cpu load reader")
|
||||||
var eventStorageAgeLimit = flag.String("event_storage_age_limit", "default=24h", "Max length of time for which to store events (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is a duration. Default is applied to all non-specified event types")
|
var eventStorageAgeLimit = flag.String("event_storage_age_limit", "default=24h", "Max length of time for which to store events (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is a duration. Default is applied to all non-specified event types")
|
||||||
var eventStorageEventLimit = flag.String("event_storage_event_limit", "default=100000", "Max number of events to store (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is an integer. Default is applied to all non-specified event types")
|
var eventStorageEventLimit = flag.String("event_storage_event_limit", "default=100000", "Max number of events to store (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is an integer. Default is applied to all non-specified event types")
|
||||||
|
var applicationMetricsCountLimit = flag.Int("application_metrics_count_limit", 100, "Max number of application metrics to store (per container)")
|
||||||
|
|
||||||
// The Manager interface defines operations for starting a manager and getting
|
// The Manager interface defines operations for starting a manager and getting
|
||||||
// container and machine information.
|
// container and machine information.
|
||||||
@ -718,7 +719,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
|
|||||||
glog.V(3).Infof("Got config from %q: %q", v, configFile)
|
glog.V(3).Infof("Got config from %q: %q", v, configFile)
|
||||||
|
|
||||||
if strings.HasPrefix(k, "prometheus") || strings.HasPrefix(k, "Prometheus") {
|
if strings.HasPrefix(k, "prometheus") || strings.HasPrefix(k, "Prometheus") {
|
||||||
newCollector, err := collector.NewPrometheusCollector(k, configFile)
|
newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
|
glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
|
||||||
return err
|
return err
|
||||||
@ -729,7 +730,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
newCollector, err := collector.NewCollector(k, configFile)
|
newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
|
glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
|
||||||
return err
|
return err
|
||||||
@ -784,7 +785,6 @@ func (m *manager) createContainer(containerName string) error {
|
|||||||
err = m.registerCollectors(collectorConfigs, cont)
|
err = m.registerCollectors(collectorConfigs, cont)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Infof("failed to register collectors for %q: %v", containerName, err)
|
glog.Infof("failed to register collectors for %q: %v", containerName, err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the container name and all its aliases. The aliases must be within the namespace of the factory.
|
// Add the container name and all its aliases. The aliases must be within the namespace of the factory.
|
||||||
|
Loading…
Reference in New Issue
Block a user