diff --git a/collector/config/sample_config_prometheus_filtered.json b/collector/config/sample_config_prometheus_filtered.json index 1db3a9d2..672f0759 100644 --- a/collector/config/sample_config_prometheus_filtered.json +++ b/collector/config/sample_config_prometheus_filtered.json @@ -1,8 +1,9 @@ { "endpoint" : "http://localhost:8080/metrics", - "polling_frequency" : 10, + "polling_frequency" : 10, "metrics_config" : [ - "go_goroutines" + "go_goroutines", + "qps" ] } diff --git a/collector/generic_collector.go b/collector/generic_collector.go index 7150057e..c155da74 100644 --- a/collector/generic_collector.go +++ b/collector/generic_collector.go @@ -44,10 +44,14 @@ type collectorInfo struct { //regular expresssions for all metrics regexps []*regexp.Regexp + + // Limit for the number of srcaped metrics. If the count is higher, + // no metrics will be returned. + metricCountLimit int } //Returns a new collector using the information extracted from the configfile -func NewCollector(collectorName string, configFile []byte) (*GenericCollector, error) { +func NewCollector(collectorName string, configFile []byte, metricCountLimit int) (*GenericCollector, error) { var configInJSON Config err := json.Unmarshal(configFile, &configInJSON) if err != nil { @@ -83,12 +87,18 @@ func NewCollector(collectorName string, configFile []byte) (*GenericCollector, e minPollFrequency = minSupportedFrequency } + if len(configInJSON.MetricsConfig) > metricCountLimit { + return nil, fmt.Errorf("Too many metrics defined: %d limit: %d", len(configInJSON.MetricsConfig), metricCountLimit) + } + return &GenericCollector{ name: collectorName, configFile: configInJSON, info: &collectorInfo{ minPollingFrequency: minPollFrequency, - regexps: regexprs}, + regexps: regexprs, + metricCountLimit: metricCountLimit, + }, }, nil } @@ -134,6 +144,7 @@ func (collector *GenericCollector) Collect(metrics map[string][]v1.MetricVal) (t } var errorSlice []error + for ind, metricConfig := range collector.configFile.MetricsConfig { matchString := collector.info.regexps[ind].FindStringSubmatch(string(pageContent)) if matchString != nil { diff --git a/collector/generic_collector_test.go b/collector/generic_collector_test.go index 68ee4c97..114f70c2 100644 --- a/collector/generic_collector_test.go +++ b/collector/generic_collector_test.go @@ -44,7 +44,7 @@ func TestEmptyConfig(t *testing.T) { configFile, err := ioutil.ReadFile("temp.json") assert.NoError(err) - _, err = NewCollector("tempCollector", configFile) + _, err = NewCollector("tempCollector", configFile, 100) assert.Error(err) assert.NoError(os.Remove("temp.json")) @@ -74,7 +74,7 @@ func TestConfigWithErrors(t *testing.T) { configFile, err := ioutil.ReadFile("temp.json") assert.NoError(err) - _, err = NewCollector("tempCollector", configFile) + _, err = NewCollector("tempCollector", configFile, 100) assert.Error(err) assert.NoError(os.Remove("temp.json")) @@ -112,7 +112,7 @@ func TestConfigWithRegexErrors(t *testing.T) { configFile, err := ioutil.ReadFile("temp.json") assert.NoError(err) - _, err = NewCollector("tempCollector", configFile) + _, err = NewCollector("tempCollector", configFile, 100) assert.Error(err) assert.NoError(os.Remove("temp.json")) @@ -125,7 +125,7 @@ func TestConfig(t *testing.T) { configFile, err := ioutil.ReadFile("config/sample_config.json") assert.NoError(err) - collector, err := NewCollector("nginx", configFile) + collector, err := NewCollector("nginx", configFile, 100) assert.NoError(err) assert.Equal(collector.name, "nginx") assert.Equal(collector.configFile.Endpoint, "http://localhost:8000/nginx_status") @@ -139,7 +139,7 @@ func TestMetricCollection(t *testing.T) { configFile, err := ioutil.ReadFile("config/sample_config.json") assert.NoError(err) - fakeCollector, err := NewCollector("nginx", configFile) + fakeCollector, err := NewCollector("nginx", configFile, 100) assert.NoError(err) tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -166,3 +166,14 @@ func TestMetricCollection(t *testing.T) { assert.Equal(metrics[metricNames[3]][0].IntValue, 2) assert.Equal(metrics[metricNames[3]][0].FloatValue, 0) } + +func TestMetricCollectionLimit(t *testing.T) { + assert := assert.New(t) + + //Collect nginx metrics from a fake nginx endpoint + configFile, err := ioutil.ReadFile("config/sample_config.json") + assert.NoError(err) + + _, err = NewCollector("nginx", configFile, 1) + assert.Error(err) +} diff --git a/collector/prometheus_collector.go b/collector/prometheus_collector.go index aa91644c..03707f11 100644 --- a/collector/prometheus_collector.go +++ b/collector/prometheus_collector.go @@ -16,6 +16,7 @@ package collector import ( "encoding/json" + "fmt" "io/ioutil" "math" "net/http" @@ -38,10 +39,14 @@ type PrometheusCollector struct { // the metrics to gather (uses a map as a set) metricsSet map[string]bool + + // Limit for the number of scaped metrics. If the count is higher, + // no metrics will be returned. + metricCountLimit int } //Returns a new collector using the information extracted from the configfile -func NewPrometheusCollector(collectorName string, configFile []byte) (*PrometheusCollector, error) { +func NewPrometheusCollector(collectorName string, configFile []byte, metricCountLimit int) (*PrometheusCollector, error) { var configInJSON Prometheus err := json.Unmarshal(configFile, &configInJSON) if err != nil { @@ -57,6 +62,10 @@ func NewPrometheusCollector(collectorName string, configFile []byte) (*Prometheu minPollingFrequency = minSupportedFrequency } + if metricCountLimit < 0 { + return nil, fmt.Errorf("Metric count limit must be greater than 0") + } + var metricsSet map[string]bool if len(configInJSON.MetricsConfig) > 0 { metricsSet = make(map[string]bool, len(configInJSON.MetricsConfig)) @@ -65,12 +74,17 @@ func NewPrometheusCollector(collectorName string, configFile []byte) (*Prometheu } } + if len(configInJSON.MetricsConfig) > metricCountLimit { + return nil, fmt.Errorf("Too many metrics defined: %d limit %d", len(configInJSON.MetricsConfig), metricCountLimit) + } + //TODO : Add checks for validity of config file (eg : Accurate JSON fields) return &PrometheusCollector{ name: collectorName, pollingFrequency: minPollingFrequency, configFile: configInJSON, metricsSet: metricsSet, + metricCountLimit: metricCountLimit, }, nil } @@ -148,6 +162,8 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal) var errorSlice []error lines := strings.Split(string(pageContent), "\n") + newMetrics := make(map[string][]v1.MetricVal) + for _, line := range lines { if line == "" { break @@ -182,8 +198,15 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal) FloatValue: metVal, Timestamp: currentTime, } - metrics[metName] = append(metrics[metName], metric) + newMetrics[metName] = append(newMetrics[metName], metric) + if len(newMetrics) > collector.metricCountLimit { + return nextCollectionTime, nil, fmt.Errorf("too many metrics to collect") + } } } + for key, val := range newMetrics { + metrics[key] = append(metrics[key], val...) + } + return nextCollectionTime, metrics, compileErrors(errorSlice) } diff --git a/collector/prometheus_collector_test.go b/collector/prometheus_collector_test.go index 6384a941..73e3da7c 100644 --- a/collector/prometheus_collector_test.go +++ b/collector/prometheus_collector_test.go @@ -31,7 +31,7 @@ func TestPrometheus(t *testing.T) { //Create a prometheus collector using the config file 'sample_config_prometheus.json' configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json") - collector, err := NewPrometheusCollector("Prometheus", configFile) + collector, err := NewPrometheusCollector("Prometheus", configFile, 100) assert.NoError(err) assert.Equal(collector.name, "Prometheus") assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics") @@ -64,12 +64,40 @@ func TestPrometheus(t *testing.T) { assert.Equal(goRoutines[0].FloatValue, 16) } +func TestPrometheusMetricCountLimit(t *testing.T) { + assert := assert.New(t) + + //Create a prometheus collector using the config file 'sample_config_prometheus.json' + configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json") + collector, err := NewPrometheusCollector("Prometheus", configFile, 10) + assert.NoError(err) + assert.Equal(collector.name, "Prometheus") + assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics") + + tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for i := 0; i < 30; i++ { + fmt.Fprintf(w, "# HELP m%d Number of goroutines that currently exist.\n", i) + fmt.Fprintf(w, "# TYPE m%d gauge\n", i) + fmt.Fprintf(w, "m%d %d", i, i) + } + })) + defer tempServer.Close() + + collector.configFile.Endpoint = tempServer.URL + metrics := map[string][]v1.MetricVal{} + _, result, errMetric := collector.Collect(metrics) + + assert.Error(errMetric) + assert.Equal(len(metrics), 0) + assert.Nil(result) +} + func TestPrometheusFiltersMetrics(t *testing.T) { assert := assert.New(t) //Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json' configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json") - collector, err := NewPrometheusCollector("Prometheus", configFile) + collector, err := NewPrometheusCollector("Prometheus", configFile, 100) assert.NoError(err) assert.Equal(collector.name, "Prometheus") assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics") @@ -98,3 +126,12 @@ func TestPrometheusFiltersMetrics(t *testing.T) { goRoutines := metrics["go_goroutines"] assert.Equal(goRoutines[0].FloatValue, 16) } + +func TestPrometheusFiltersMetricsCountLimit(t *testing.T) { + assert := assert.New(t) + + //Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json' + configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json") + _, err = NewPrometheusCollector("Prometheus", configFile, 1) + assert.Error(err) +} diff --git a/manager/manager.go b/manager/manager.go index 5e4d9daf..f2bea0c2 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -48,6 +48,7 @@ var logCadvisorUsage = flag.Bool("log_cadvisor_usage", false, "Whether to log th var enableLoadReader = flag.Bool("enable_load_reader", false, "Whether to enable cpu load reader") var eventStorageAgeLimit = flag.String("event_storage_age_limit", "default=24h", "Max length of time for which to store events (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is a duration. Default is applied to all non-specified event types") var eventStorageEventLimit = flag.String("event_storage_event_limit", "default=100000", "Max number of events to store (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is an integer. Default is applied to all non-specified event types") +var applicationMetricsCountLimit = flag.Int("application_metrics_count_limit", 100, "Max number of application metrics to store (per container)") // The Manager interface defines operations for starting a manager and getting // container and machine information. @@ -718,7 +719,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c glog.V(3).Infof("Got config from %q: %q", v, configFile) if strings.HasPrefix(k, "prometheus") || strings.HasPrefix(k, "Prometheus") { - newCollector, err := collector.NewPrometheusCollector(k, configFile) + newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit) if err != nil { glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err) return err @@ -729,7 +730,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c return err } } else { - newCollector, err := collector.NewCollector(k, configFile) + newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit) if err != nil { glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err) return err @@ -784,7 +785,6 @@ func (m *manager) createContainer(containerName string) error { err = m.registerCollectors(collectorConfigs, cont) if err != nil { glog.Infof("failed to register collectors for %q: %v", containerName, err) - return err } // Add the container name and all its aliases. The aliases must be within the namespace of the factory.