Add a flag to control the number of custom metrics scraped by collectors
This commit is contained in:
parent
8221b8cebc
commit
fb8efe0cf6
@ -1,8 +1,9 @@
|
||||
{
|
||||
"endpoint" : "http://localhost:8080/metrics",
|
||||
"polling_frequency" : 10,
|
||||
"polling_frequency" : 10,
|
||||
"metrics_config" : [
|
||||
"go_goroutines"
|
||||
"go_goroutines",
|
||||
"qps"
|
||||
]
|
||||
}
|
||||
|
||||
|
@ -44,10 +44,14 @@ type collectorInfo struct {
|
||||
|
||||
//regular expresssions for all metrics
|
||||
regexps []*regexp.Regexp
|
||||
|
||||
// Limit for the number of srcaped metrics. If the count is higher,
|
||||
// no metrics will be returned.
|
||||
metricCountLimit int
|
||||
}
|
||||
|
||||
//Returns a new collector using the information extracted from the configfile
|
||||
func NewCollector(collectorName string, configFile []byte) (*GenericCollector, error) {
|
||||
func NewCollector(collectorName string, configFile []byte, metricCountLimit int) (*GenericCollector, error) {
|
||||
var configInJSON Config
|
||||
err := json.Unmarshal(configFile, &configInJSON)
|
||||
if err != nil {
|
||||
@ -83,12 +87,18 @@ func NewCollector(collectorName string, configFile []byte) (*GenericCollector, e
|
||||
minPollFrequency = minSupportedFrequency
|
||||
}
|
||||
|
||||
if len(configInJSON.MetricsConfig) > metricCountLimit {
|
||||
return nil, fmt.Errorf("Too many metrics defined: %d limit: %d", len(configInJSON.MetricsConfig), metricCountLimit)
|
||||
}
|
||||
|
||||
return &GenericCollector{
|
||||
name: collectorName,
|
||||
configFile: configInJSON,
|
||||
info: &collectorInfo{
|
||||
minPollingFrequency: minPollFrequency,
|
||||
regexps: regexprs},
|
||||
regexps: regexprs,
|
||||
metricCountLimit: metricCountLimit,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -134,6 +144,7 @@ func (collector *GenericCollector) Collect(metrics map[string][]v1.MetricVal) (t
|
||||
}
|
||||
|
||||
var errorSlice []error
|
||||
|
||||
for ind, metricConfig := range collector.configFile.MetricsConfig {
|
||||
matchString := collector.info.regexps[ind].FindStringSubmatch(string(pageContent))
|
||||
if matchString != nil {
|
||||
|
@ -44,7 +44,7 @@ func TestEmptyConfig(t *testing.T) {
|
||||
configFile, err := ioutil.ReadFile("temp.json")
|
||||
assert.NoError(err)
|
||||
|
||||
_, err = NewCollector("tempCollector", configFile)
|
||||
_, err = NewCollector("tempCollector", configFile, 100)
|
||||
assert.Error(err)
|
||||
|
||||
assert.NoError(os.Remove("temp.json"))
|
||||
@ -74,7 +74,7 @@ func TestConfigWithErrors(t *testing.T) {
|
||||
configFile, err := ioutil.ReadFile("temp.json")
|
||||
assert.NoError(err)
|
||||
|
||||
_, err = NewCollector("tempCollector", configFile)
|
||||
_, err = NewCollector("tempCollector", configFile, 100)
|
||||
assert.Error(err)
|
||||
|
||||
assert.NoError(os.Remove("temp.json"))
|
||||
@ -112,7 +112,7 @@ func TestConfigWithRegexErrors(t *testing.T) {
|
||||
configFile, err := ioutil.ReadFile("temp.json")
|
||||
assert.NoError(err)
|
||||
|
||||
_, err = NewCollector("tempCollector", configFile)
|
||||
_, err = NewCollector("tempCollector", configFile, 100)
|
||||
assert.Error(err)
|
||||
|
||||
assert.NoError(os.Remove("temp.json"))
|
||||
@ -125,7 +125,7 @@ func TestConfig(t *testing.T) {
|
||||
configFile, err := ioutil.ReadFile("config/sample_config.json")
|
||||
assert.NoError(err)
|
||||
|
||||
collector, err := NewCollector("nginx", configFile)
|
||||
collector, err := NewCollector("nginx", configFile, 100)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "nginx")
|
||||
assert.Equal(collector.configFile.Endpoint, "http://localhost:8000/nginx_status")
|
||||
@ -139,7 +139,7 @@ func TestMetricCollection(t *testing.T) {
|
||||
configFile, err := ioutil.ReadFile("config/sample_config.json")
|
||||
assert.NoError(err)
|
||||
|
||||
fakeCollector, err := NewCollector("nginx", configFile)
|
||||
fakeCollector, err := NewCollector("nginx", configFile, 100)
|
||||
assert.NoError(err)
|
||||
|
||||
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@ -166,3 +166,14 @@ func TestMetricCollection(t *testing.T) {
|
||||
assert.Equal(metrics[metricNames[3]][0].IntValue, 2)
|
||||
assert.Equal(metrics[metricNames[3]][0].FloatValue, 0)
|
||||
}
|
||||
|
||||
func TestMetricCollectionLimit(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
//Collect nginx metrics from a fake nginx endpoint
|
||||
configFile, err := ioutil.ReadFile("config/sample_config.json")
|
||||
assert.NoError(err)
|
||||
|
||||
_, err = NewCollector("nginx", configFile, 1)
|
||||
assert.Error(err)
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package collector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net/http"
|
||||
@ -38,10 +39,14 @@ type PrometheusCollector struct {
|
||||
|
||||
// the metrics to gather (uses a map as a set)
|
||||
metricsSet map[string]bool
|
||||
|
||||
// Limit for the number of scaped metrics. If the count is higher,
|
||||
// no metrics will be returned.
|
||||
metricCountLimit int
|
||||
}
|
||||
|
||||
//Returns a new collector using the information extracted from the configfile
|
||||
func NewPrometheusCollector(collectorName string, configFile []byte) (*PrometheusCollector, error) {
|
||||
func NewPrometheusCollector(collectorName string, configFile []byte, metricCountLimit int) (*PrometheusCollector, error) {
|
||||
var configInJSON Prometheus
|
||||
err := json.Unmarshal(configFile, &configInJSON)
|
||||
if err != nil {
|
||||
@ -57,6 +62,10 @@ func NewPrometheusCollector(collectorName string, configFile []byte) (*Prometheu
|
||||
minPollingFrequency = minSupportedFrequency
|
||||
}
|
||||
|
||||
if metricCountLimit < 0 {
|
||||
return nil, fmt.Errorf("Metric count limit must be greater than 0")
|
||||
}
|
||||
|
||||
var metricsSet map[string]bool
|
||||
if len(configInJSON.MetricsConfig) > 0 {
|
||||
metricsSet = make(map[string]bool, len(configInJSON.MetricsConfig))
|
||||
@ -65,12 +74,17 @@ func NewPrometheusCollector(collectorName string, configFile []byte) (*Prometheu
|
||||
}
|
||||
}
|
||||
|
||||
if len(configInJSON.MetricsConfig) > metricCountLimit {
|
||||
return nil, fmt.Errorf("Too many metrics defined: %d limit %d", len(configInJSON.MetricsConfig), metricCountLimit)
|
||||
}
|
||||
|
||||
//TODO : Add checks for validity of config file (eg : Accurate JSON fields)
|
||||
return &PrometheusCollector{
|
||||
name: collectorName,
|
||||
pollingFrequency: minPollingFrequency,
|
||||
configFile: configInJSON,
|
||||
metricsSet: metricsSet,
|
||||
metricCountLimit: metricCountLimit,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -148,6 +162,8 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal)
|
||||
var errorSlice []error
|
||||
lines := strings.Split(string(pageContent), "\n")
|
||||
|
||||
newMetrics := make(map[string][]v1.MetricVal)
|
||||
|
||||
for _, line := range lines {
|
||||
if line == "" {
|
||||
break
|
||||
@ -182,8 +198,15 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal)
|
||||
FloatValue: metVal,
|
||||
Timestamp: currentTime,
|
||||
}
|
||||
metrics[metName] = append(metrics[metName], metric)
|
||||
newMetrics[metName] = append(newMetrics[metName], metric)
|
||||
if len(newMetrics) > collector.metricCountLimit {
|
||||
return nextCollectionTime, nil, fmt.Errorf("too many metrics to collect")
|
||||
}
|
||||
}
|
||||
}
|
||||
for key, val := range newMetrics {
|
||||
metrics[key] = append(metrics[key], val...)
|
||||
}
|
||||
|
||||
return nextCollectionTime, metrics, compileErrors(errorSlice)
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ func TestPrometheus(t *testing.T) {
|
||||
|
||||
//Create a prometheus collector using the config file 'sample_config_prometheus.json'
|
||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json")
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile)
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "Prometheus")
|
||||
assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics")
|
||||
@ -64,12 +64,40 @@ func TestPrometheus(t *testing.T) {
|
||||
assert.Equal(goRoutines[0].FloatValue, 16)
|
||||
}
|
||||
|
||||
func TestPrometheusMetricCountLimit(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
//Create a prometheus collector using the config file 'sample_config_prometheus.json'
|
||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json")
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 10)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "Prometheus")
|
||||
assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics")
|
||||
|
||||
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
for i := 0; i < 30; i++ {
|
||||
fmt.Fprintf(w, "# HELP m%d Number of goroutines that currently exist.\n", i)
|
||||
fmt.Fprintf(w, "# TYPE m%d gauge\n", i)
|
||||
fmt.Fprintf(w, "m%d %d", i, i)
|
||||
}
|
||||
}))
|
||||
defer tempServer.Close()
|
||||
|
||||
collector.configFile.Endpoint = tempServer.URL
|
||||
metrics := map[string][]v1.MetricVal{}
|
||||
_, result, errMetric := collector.Collect(metrics)
|
||||
|
||||
assert.Error(errMetric)
|
||||
assert.Equal(len(metrics), 0)
|
||||
assert.Nil(result)
|
||||
}
|
||||
|
||||
func TestPrometheusFiltersMetrics(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
//Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json'
|
||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json")
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile)
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "Prometheus")
|
||||
assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics")
|
||||
@ -98,3 +126,12 @@ func TestPrometheusFiltersMetrics(t *testing.T) {
|
||||
goRoutines := metrics["go_goroutines"]
|
||||
assert.Equal(goRoutines[0].FloatValue, 16)
|
||||
}
|
||||
|
||||
func TestPrometheusFiltersMetricsCountLimit(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
//Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json'
|
||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json")
|
||||
_, err = NewPrometheusCollector("Prometheus", configFile, 1)
|
||||
assert.Error(err)
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ var logCadvisorUsage = flag.Bool("log_cadvisor_usage", false, "Whether to log th
|
||||
var enableLoadReader = flag.Bool("enable_load_reader", false, "Whether to enable cpu load reader")
|
||||
var eventStorageAgeLimit = flag.String("event_storage_age_limit", "default=24h", "Max length of time for which to store events (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is a duration. Default is applied to all non-specified event types")
|
||||
var eventStorageEventLimit = flag.String("event_storage_event_limit", "default=100000", "Max number of events to store (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is an integer. Default is applied to all non-specified event types")
|
||||
var applicationMetricsCountLimit = flag.Int("application_metrics_count_limit", 100, "Max number of application metrics to store (per container)")
|
||||
|
||||
// The Manager interface defines operations for starting a manager and getting
|
||||
// container and machine information.
|
||||
@ -718,7 +719,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
|
||||
glog.V(3).Infof("Got config from %q: %q", v, configFile)
|
||||
|
||||
if strings.HasPrefix(k, "prometheus") || strings.HasPrefix(k, "Prometheus") {
|
||||
newCollector, err := collector.NewPrometheusCollector(k, configFile)
|
||||
newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit)
|
||||
if err != nil {
|
||||
glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
|
||||
return err
|
||||
@ -729,7 +730,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
newCollector, err := collector.NewCollector(k, configFile)
|
||||
newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit)
|
||||
if err != nil {
|
||||
glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
|
||||
return err
|
||||
@ -784,7 +785,6 @@ func (m *manager) createContainer(containerName string) error {
|
||||
err = m.registerCollectors(collectorConfigs, cont)
|
||||
if err != nil {
|
||||
glog.Infof("failed to register collectors for %q: %v", containerName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Add the container name and all its aliases. The aliases must be within the namespace of the factory.
|
||||
|
Loading…
Reference in New Issue
Block a user