From 6ef612f21e042dcbec189ab253741f94e47804d7 Mon Sep 17 00:00:00 2001 From: Matt Wringe Date: Thu, 14 Jul 2016 10:36:53 -0400 Subject: [PATCH] Update collectors to be able to directly access containers by their ip address. --- collector/config.go | 42 +++++++++++++++- .../config/sample_config_endpoint_config.json | 38 +++++++++++++++ ...ple_config_prometheus_endpoint_config.json | 10 ++++ collector/generic_collector.go | 7 ++- collector/generic_collector_test.go | 40 ++++++++++++---- collector/prometheus_collector.go | 9 ++-- collector/prometheus_collector_test.go | 48 ++++++++++++++----- collector/util.go | 26 ++++++++++ manager/manager.go | 4 +- 9 files changed, 194 insertions(+), 30 deletions(-) create mode 100644 collector/config/sample_config_endpoint_config.json create mode 100644 collector/config/sample_config_prometheus_endpoint_config.json create mode 100644 collector/util.go diff --git a/collector/config.go b/collector/config.go index 637f6406..75063d93 100644 --- a/collector/config.go +++ b/collector/config.go @@ -17,12 +17,13 @@ package collector import ( "time" + "encoding/json" "github.com/google/cadvisor/info/v1" ) type Config struct { //the endpoint to hit to scrape metrics - Endpoint string `json:"endpoint"` + Endpoint EndpointConfig `json:"endpoint"` //holds information about different metrics that can be collected MetricsConfig []MetricConfig `json:"metrics_config"` @@ -52,7 +53,7 @@ type MetricConfig struct { type Prometheus struct { //the endpoint to hit to scrape metrics - Endpoint string `json:"endpoint"` + Endpoint EndpointConfig `json:"endpoint"` //the frequency at which metrics should be collected PollingFrequency time.Duration `json:"polling_frequency"` @@ -60,3 +61,40 @@ type Prometheus struct { //holds names of different metrics that can be collected MetricsConfig []string `json:"metrics_config"` } + +type EndpointConfig struct { + // The full URL of the endpoint to reach + URL string + // A configuration in which an actual URL is constructed from, using the container's ip address + URLConfig URLConfig +} + +type URLConfig struct { + // the protocol to use for connecting to the endpoint. Eg 'http' or 'https' + Protocol string `json:"protocol"` + + // the port to use for connecting to the endpoint. Eg '8778' + Port json.Number `json:"port"` + + // the path to use for the endpoint. Eg '/metrics' + Path string `json:"path"` +} + +func (ec *EndpointConfig) UnmarshalJSON(b []byte) error { + url := "" + config := URLConfig{ + Protocol: "http", + Port: "8000", + } + + if err := json.Unmarshal(b, &url); err == nil { + ec.URL = url + return nil + } + err := json.Unmarshal(b, &config) + if err == nil { + ec.URLConfig = config + return nil + } + return err +} diff --git a/collector/config/sample_config_endpoint_config.json b/collector/config/sample_config_endpoint_config.json new file mode 100644 index 00000000..706f6ae3 --- /dev/null +++ b/collector/config/sample_config_endpoint_config.json @@ -0,0 +1,38 @@ +{ + "endpoint" : { + "protocol": "https", + "port": 8000, + "path": "/nginx_status" + }, + "metrics_config" : [ + { "name" : "activeConnections", + "metric_type" : "gauge", + "units" : "number of active connections", + "data_type" : "int", + "polling_frequency" : 10, + "regex" : "Active connections: ([0-9]+)" + }, + { "name" : "reading", + "metric_type" : "gauge", + "units" : "number of reading connections", + "data_type" : "int", + "polling_frequency" : 10, + "regex" : "Reading: ([0-9]+) .*" + }, + { "name" : "writing", + "metric_type" : "gauge", + "data_type" : "int", + "units" : "number of writing connections", + "polling_frequency" : 10, + "regex" : ".*Writing: ([0-9]+).*" + }, + { "name" : "waiting", + "metric_type" : "gauge", + "units" : "number of waiting connections", + "data_type" : "int", + "polling_frequency" : 10, + "regex" : ".*Waiting: ([0-9]+)" + } + ] + +} diff --git a/collector/config/sample_config_prometheus_endpoint_config.json b/collector/config/sample_config_prometheus_endpoint_config.json new file mode 100644 index 00000000..675aeb48 --- /dev/null +++ b/collector/config/sample_config_prometheus_endpoint_config.json @@ -0,0 +1,10 @@ +{ + "endpoint" : { + "protocol": "http", + "port": 8081, + "path": "/METRICS" + }, + "polling_frequency" : 10, + "metrics_config" : [ + ] +} \ No newline at end of file diff --git a/collector/generic_collector.go b/collector/generic_collector.go index c155da74..73071fc1 100644 --- a/collector/generic_collector.go +++ b/collector/generic_collector.go @@ -24,6 +24,7 @@ import ( "strings" "time" + "github.com/google/cadvisor/container" "github.com/google/cadvisor/info/v1" ) @@ -51,13 +52,15 @@ type collectorInfo struct { } //Returns a new collector using the information extracted from the configfile -func NewCollector(collectorName string, configFile []byte, metricCountLimit int) (*GenericCollector, error) { +func NewCollector(collectorName string, configFile []byte, metricCountLimit int, containerHandler container.ContainerHandler) (*GenericCollector, error) { var configInJSON Config err := json.Unmarshal(configFile, &configInJSON) if err != nil { return nil, err } + configInJSON.Endpoint.configure(containerHandler) + //TODO : Add checks for validity of config file (eg : Accurate JSON fields) if len(configInJSON.MetricsConfig) == 0 { @@ -130,7 +133,7 @@ func (collector *GenericCollector) Collect(metrics map[string][]v1.MetricVal) (t currentTime := time.Now() nextCollectionTime := currentTime.Add(time.Duration(collector.info.minPollingFrequency)) - uri := collector.configFile.Endpoint + uri := collector.configFile.Endpoint.URL response, err := http.Get(uri) if err != nil { return nextCollectionTime, nil, err diff --git a/collector/generic_collector_test.go b/collector/generic_collector_test.go index 114f70c2..8ff3fd79 100644 --- a/collector/generic_collector_test.go +++ b/collector/generic_collector_test.go @@ -24,6 +24,7 @@ import ( "github.com/google/cadvisor/info/v1" + containertest "github.com/google/cadvisor/container/testing" "github.com/stretchr/testify/assert" ) @@ -44,7 +45,8 @@ func TestEmptyConfig(t *testing.T) { configFile, err := ioutil.ReadFile("temp.json") assert.NoError(err) - _, err = NewCollector("tempCollector", configFile, 100) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + _, err = NewCollector("tempCollector", configFile, 100, containerHandler) assert.Error(err) assert.NoError(os.Remove("temp.json")) @@ -74,7 +76,8 @@ func TestConfigWithErrors(t *testing.T) { configFile, err := ioutil.ReadFile("temp.json") assert.NoError(err) - _, err = NewCollector("tempCollector", configFile, 100) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + _, err = NewCollector("tempCollector", configFile, 100, containerHandler) assert.Error(err) assert.NoError(os.Remove("temp.json")) @@ -112,7 +115,8 @@ func TestConfigWithRegexErrors(t *testing.T) { configFile, err := ioutil.ReadFile("temp.json") assert.NoError(err) - _, err = NewCollector("tempCollector", configFile, 100) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + _, err = NewCollector("tempCollector", configFile, 100, containerHandler) assert.Error(err) assert.NoError(os.Remove("temp.json")) @@ -125,10 +129,28 @@ func TestConfig(t *testing.T) { configFile, err := ioutil.ReadFile("config/sample_config.json") assert.NoError(err) - collector, err := NewCollector("nginx", configFile, 100) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + collector, err := NewCollector("nginx", configFile, 100, containerHandler) assert.NoError(err) assert.Equal(collector.name, "nginx") - assert.Equal(collector.configFile.Endpoint, "http://localhost:8000/nginx_status") + assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8000/nginx_status") + assert.Equal(collector.configFile.MetricsConfig[0].Name, "activeConnections") +} + +func TestEndpointConfig(t *testing.T) { + assert := assert.New(t) + configFile, err := ioutil.ReadFile("config/sample_config_endpoint_config.json") + assert.NoError(err) + + containerHandler := containertest.NewMockContainerHandler("mockContainer") + containerHandler.On("GetContainerIPAddress").Return( + "111.111.111.111", + ) + + collector, err := NewCollector("nginx", configFile, 100, containerHandler) + assert.NoError(err) + assert.Equal(collector.name, "nginx") + assert.Equal(collector.configFile.Endpoint.URL, "https://111.111.111.111:8000/nginx_status") assert.Equal(collector.configFile.MetricsConfig[0].Name, "activeConnections") } @@ -139,7 +161,8 @@ func TestMetricCollection(t *testing.T) { configFile, err := ioutil.ReadFile("config/sample_config.json") assert.NoError(err) - fakeCollector, err := NewCollector("nginx", configFile, 100) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + fakeCollector, err := NewCollector("nginx", configFile, 100, containerHandler) assert.NoError(err) tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -147,7 +170,7 @@ func TestMetricCollection(t *testing.T) { fmt.Fprintln(w, "5 5 32\nReading: 0 Writing: 1 Waiting: 2") })) defer tempServer.Close() - fakeCollector.configFile.Endpoint = tempServer.URL + fakeCollector.configFile.Endpoint.URL = tempServer.URL metrics := map[string][]v1.MetricVal{} _, metrics, errMetric := fakeCollector.Collect(metrics) @@ -174,6 +197,7 @@ func TestMetricCollectionLimit(t *testing.T) { configFile, err := ioutil.ReadFile("config/sample_config.json") assert.NoError(err) - _, err = NewCollector("nginx", configFile, 1) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + _, err = NewCollector("nginx", configFile, 1, containerHandler) assert.Error(err) } diff --git a/collector/prometheus_collector.go b/collector/prometheus_collector.go index 0022d026..a2beabf6 100644 --- a/collector/prometheus_collector.go +++ b/collector/prometheus_collector.go @@ -24,6 +24,7 @@ import ( "strings" "time" + "github.com/google/cadvisor/container" "github.com/google/cadvisor/info/v1" ) @@ -46,13 +47,15 @@ type PrometheusCollector struct { } //Returns a new collector using the information extracted from the configfile -func NewPrometheusCollector(collectorName string, configFile []byte, metricCountLimit int) (*PrometheusCollector, error) { +func NewPrometheusCollector(collectorName string, configFile []byte, metricCountLimit int, containerHandler container.ContainerHandler) (*PrometheusCollector, error) { var configInJSON Prometheus err := json.Unmarshal(configFile, &configInJSON) if err != nil { return nil, err } + configInJSON.Endpoint.configure(containerHandler) + minPollingFrequency := configInJSON.PollingFrequency // Minimum supported frequency is 1s @@ -108,7 +111,7 @@ func getMetricData(line string) string { func (collector *PrometheusCollector) GetSpec() []v1.MetricSpec { specs := []v1.MetricSpec{} - response, err := http.Get(collector.configFile.Endpoint) + response, err := http.Get(collector.configFile.Endpoint.URL) if err != nil { return specs } @@ -153,7 +156,7 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal) currentTime := time.Now() nextCollectionTime := currentTime.Add(time.Duration(collector.pollingFrequency)) - uri := collector.configFile.Endpoint + uri := collector.configFile.Endpoint.URL response, err := http.Get(uri) if err != nil { return nextCollectionTime, nil, err diff --git a/collector/prometheus_collector_test.go b/collector/prometheus_collector_test.go index 9b408bf0..fbb63210 100644 --- a/collector/prometheus_collector_test.go +++ b/collector/prometheus_collector_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/cadvisor/info/v1" + containertest "github.com/google/cadvisor/container/testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -32,10 +33,11 @@ func TestPrometheus(t *testing.T) { //Create a prometheus collector using the config file 'sample_config_prometheus.json' configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json") - collector, err := NewPrometheusCollector("Prometheus", configFile, 100) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler) assert.NoError(err) assert.Equal(collector.name, "Prometheus") - assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics") + assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics") tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -54,7 +56,7 @@ func TestPrometheus(t *testing.T) { defer tempServer.Close() - collector.configFile.Endpoint = tempServer.URL + collector.configFile.Endpoint.URL = tempServer.URL var spec []v1.MetricSpec require.NotPanics(t, func() { spec = collector.GetSpec() }) @@ -75,15 +77,32 @@ func TestPrometheus(t *testing.T) { assert.Equal(goRoutines[0].FloatValue, 16) } +func TestPrometheusEndpointConfig(t *testing.T) { + assert := assert.New(t) + + //Create a prometheus collector using the config file 'sample_config_prometheus.json' + configFile, err := ioutil.ReadFile("config/sample_config_prometheus_endpoint_config.json") + containerHandler := containertest.NewMockContainerHandler("mockContainer") + containerHandler.On("GetContainerIPAddress").Return( + "222.222.222.222", + ) + + collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler) + assert.NoError(err) + assert.Equal(collector.name, "Prometheus") + assert.Equal(collector.configFile.Endpoint.URL, "http://222.222.222.222:8081/METRICS") +} + func TestPrometheusShortResponse(t *testing.T) { assert := assert.New(t) //Create a prometheus collector using the config file 'sample_config_prometheus.json' configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json") - collector, err := NewPrometheusCollector("Prometheus", configFile, 100) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler) assert.NoError(err) assert.Equal(collector.name, "Prometheus") - assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics") + assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics") tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { text := "# HELP empty_metric A metric without any values" @@ -92,7 +111,7 @@ func TestPrometheusShortResponse(t *testing.T) { defer tempServer.Close() - collector.configFile.Endpoint = tempServer.URL + collector.configFile.Endpoint.URL = tempServer.URL assert.NotPanics(func() { collector.GetSpec() }) } @@ -102,10 +121,11 @@ func TestPrometheusMetricCountLimit(t *testing.T) { //Create a prometheus collector using the config file 'sample_config_prometheus.json' configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json") - collector, err := NewPrometheusCollector("Prometheus", configFile, 10) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + collector, err := NewPrometheusCollector("Prometheus", configFile, 10, containerHandler) assert.NoError(err) assert.Equal(collector.name, "Prometheus") - assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics") + assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics") tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { for i := 0; i < 30; i++ { @@ -116,7 +136,7 @@ func TestPrometheusMetricCountLimit(t *testing.T) { })) defer tempServer.Close() - collector.configFile.Endpoint = tempServer.URL + collector.configFile.Endpoint.URL = tempServer.URL metrics := map[string][]v1.MetricVal{} _, result, errMetric := collector.Collect(metrics) @@ -130,10 +150,11 @@ func TestPrometheusFiltersMetrics(t *testing.T) { //Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json' configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json") - collector, err := NewPrometheusCollector("Prometheus", configFile, 100) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler) assert.NoError(err) assert.Equal(collector.name, "Prometheus") - assert.Equal(collector.configFile.Endpoint, "http://localhost:8080/metrics") + assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics") tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -149,7 +170,7 @@ func TestPrometheusFiltersMetrics(t *testing.T) { defer tempServer.Close() - collector.configFile.Endpoint = tempServer.URL + collector.configFile.Endpoint.URL = tempServer.URL metrics := map[string][]v1.MetricVal{} _, metrics, errMetric := collector.Collect(metrics) @@ -165,6 +186,7 @@ func TestPrometheusFiltersMetricsCountLimit(t *testing.T) { //Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json' configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json") - _, err = NewPrometheusCollector("Prometheus", configFile, 1) + containerHandler := containertest.NewMockContainerHandler("mockContainer") + _, err = NewPrometheusCollector("Prometheus", configFile, 1, containerHandler) assert.Error(err) } diff --git a/collector/util.go b/collector/util.go new file mode 100644 index 00000000..b8814d13 --- /dev/null +++ b/collector/util.go @@ -0,0 +1,26 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import "github.com/google/cadvisor/container" + +func (endpointConfig *EndpointConfig) configure(containerHandler container.ContainerHandler) { + //If the exact URL was not specified, generate it based on the ip address of the container. + endpoint := endpointConfig + if endpoint.URL == "" { + ipAddress := containerHandler.GetContainerIPAddress() + endpointConfig.URL = endpoint.URLConfig.Protocol + "://" + ipAddress + ":" + endpoint.URLConfig.Port.String() + endpoint.URLConfig.Path + } +} diff --git a/manager/manager.go b/manager/manager.go index 60bde205..27c388a7 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -752,7 +752,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c glog.V(3).Infof("Got config from %q: %q", v, configFile) if strings.HasPrefix(k, "prometheus") || strings.HasPrefix(k, "Prometheus") { - newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit) + newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit, cont.handler) if err != nil { glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err) return err @@ -763,7 +763,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c return err } } else { - newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit) + newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit, cont.handler) if err != nil { glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err) return err