Merge pull request #1379 from mwringe/collector_httpclient
Update collectors to use a customized httpClient.
This commit is contained in:
commit
552d2578ec
34
cadvisor.go
34
cadvisor.go
@ -32,6 +32,7 @@ import (
|
||||
"github.com/google/cadvisor/utils/sysfs"
|
||||
"github.com/google/cadvisor/version"
|
||||
|
||||
"crypto/tls"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
@ -53,6 +54,9 @@ var allowDynamicHousekeeping = flag.Bool("allow_dynamic_housekeeping", true, "Wh
|
||||
|
||||
var enableProfiling = flag.Bool("profiling", false, "Enable profiling via web interface host:port/debug/pprof/")
|
||||
|
||||
var collectorCert = flag.String("collector_cert", "", "Collector's certificate, exposed to endpoints for certificate based authentication.")
|
||||
var collectorKey = flag.String("collector_key", "", "Key for the collector's certificate")
|
||||
|
||||
var (
|
||||
// Metrics to be ignored.
|
||||
// Tcp metrics are ignored by default.
|
||||
@ -118,7 +122,9 @@ func main() {
|
||||
glog.Fatalf("Failed to create a system interface: %s", err)
|
||||
}
|
||||
|
||||
containerManager, err := manager.New(memoryStorage, sysFs, *maxHousekeepingInterval, *allowDynamicHousekeeping, ignoreMetrics.MetricSet)
|
||||
collectorHttpClient := createCollectorHttpClient(*collectorCert, *collectorKey)
|
||||
|
||||
containerManager, err := manager.New(memoryStorage, sysFs, *maxHousekeepingInterval, *allowDynamicHousekeeping, ignoreMetrics.MetricSet, &collectorHttpClient)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to create a Container Manager: %s", err)
|
||||
}
|
||||
@ -186,3 +192,29 @@ func installSignalHandler(containerManager manager.Manager) {
|
||||
os.Exit(0)
|
||||
}()
|
||||
}
|
||||
|
||||
func createCollectorHttpClient(collectorCert, collectorKey string) http.Client {
|
||||
//Enable accessing insecure endpoints. We should be able to access metrics from any endpoint
|
||||
tlsConfig := &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
|
||||
if collectorCert != "" {
|
||||
if collectorKey == "" {
|
||||
glog.Fatal("The collector_key value must be specified if the collector_cert value is set.")
|
||||
}
|
||||
cert, err := tls.LoadX509KeyPair(collectorCert, collectorKey)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to use the collector certificate and key: %s", err)
|
||||
}
|
||||
|
||||
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||
tlsConfig.BuildNameToCertificate()
|
||||
}
|
||||
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
}
|
||||
|
||||
return http.Client{Transport: transport}
|
||||
}
|
||||
|
@ -37,6 +37,9 @@ type GenericCollector struct {
|
||||
|
||||
//holds information necessary to extract metrics
|
||||
info *collectorInfo
|
||||
|
||||
// The Http client to use when connecting to metric endpoints
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
type collectorInfo struct {
|
||||
@ -52,7 +55,7 @@ type collectorInfo struct {
|
||||
}
|
||||
|
||||
//Returns a new collector using the information extracted from the configfile
|
||||
func NewCollector(collectorName string, configFile []byte, metricCountLimit int, containerHandler container.ContainerHandler) (*GenericCollector, error) {
|
||||
func NewCollector(collectorName string, configFile []byte, metricCountLimit int, containerHandler container.ContainerHandler, httpClient *http.Client) (*GenericCollector, error) {
|
||||
var configInJSON Config
|
||||
err := json.Unmarshal(configFile, &configInJSON)
|
||||
if err != nil {
|
||||
@ -102,6 +105,7 @@ func NewCollector(collectorName string, configFile []byte, metricCountLimit int,
|
||||
regexps: regexprs,
|
||||
metricCountLimit: metricCountLimit,
|
||||
},
|
||||
httpClient: httpClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -134,7 +138,7 @@ func (collector *GenericCollector) Collect(metrics map[string][]v1.MetricVal) (t
|
||||
nextCollectionTime := currentTime.Add(time.Duration(collector.info.minPollingFrequency))
|
||||
|
||||
uri := collector.configFile.Endpoint.URL
|
||||
response, err := http.Get(uri)
|
||||
response, err := collector.httpClient.Get(uri)
|
||||
if err != nil {
|
||||
return nextCollectionTime, nil, err
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ func TestEmptyConfig(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
_, err = NewCollector("tempCollector", configFile, 100, containerHandler)
|
||||
_, err = NewCollector("tempCollector", configFile, 100, containerHandler, http.DefaultClient)
|
||||
assert.Error(err)
|
||||
|
||||
assert.NoError(os.Remove("temp.json"))
|
||||
@ -77,7 +77,7 @@ func TestConfigWithErrors(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
_, err = NewCollector("tempCollector", configFile, 100, containerHandler)
|
||||
_, err = NewCollector("tempCollector", configFile, 100, containerHandler, http.DefaultClient)
|
||||
assert.Error(err)
|
||||
|
||||
assert.NoError(os.Remove("temp.json"))
|
||||
@ -116,7 +116,7 @@ func TestConfigWithRegexErrors(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
_, err = NewCollector("tempCollector", configFile, 100, containerHandler)
|
||||
_, err = NewCollector("tempCollector", configFile, 100, containerHandler, http.DefaultClient)
|
||||
assert.Error(err)
|
||||
|
||||
assert.NoError(os.Remove("temp.json"))
|
||||
@ -130,7 +130,7 @@ func TestConfig(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
collector, err := NewCollector("nginx", configFile, 100, containerHandler)
|
||||
collector, err := NewCollector("nginx", configFile, 100, containerHandler, http.DefaultClient)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "nginx")
|
||||
assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8000/nginx_status")
|
||||
@ -147,7 +147,7 @@ func TestEndpointConfig(t *testing.T) {
|
||||
"111.111.111.111",
|
||||
)
|
||||
|
||||
collector, err := NewCollector("nginx", configFile, 100, containerHandler)
|
||||
collector, err := NewCollector("nginx", configFile, 100, containerHandler, http.DefaultClient)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "nginx")
|
||||
assert.Equal(collector.configFile.Endpoint.URL, "https://111.111.111.111:8000/nginx_status")
|
||||
@ -162,7 +162,7 @@ func TestMetricCollection(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
fakeCollector, err := NewCollector("nginx", configFile, 100, containerHandler)
|
||||
fakeCollector, err := NewCollector("nginx", configFile, 100, containerHandler, http.DefaultClient)
|
||||
assert.NoError(err)
|
||||
|
||||
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@ -198,6 +198,6 @@ func TestMetricCollectionLimit(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
_, err = NewCollector("nginx", configFile, 1, containerHandler)
|
||||
_, err = NewCollector("nginx", configFile, 1, containerHandler, http.DefaultClient)
|
||||
assert.Error(err)
|
||||
}
|
||||
|
@ -44,10 +44,13 @@ type PrometheusCollector struct {
|
||||
// Limit for the number of scaped metrics. If the count is higher,
|
||||
// no metrics will be returned.
|
||||
metricCountLimit int
|
||||
|
||||
// The Http client to use when connecting to metric endpoints
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
//Returns a new collector using the information extracted from the configfile
|
||||
func NewPrometheusCollector(collectorName string, configFile []byte, metricCountLimit int, containerHandler container.ContainerHandler) (*PrometheusCollector, error) {
|
||||
func NewPrometheusCollector(collectorName string, configFile []byte, metricCountLimit int, containerHandler container.ContainerHandler, httpClient *http.Client) (*PrometheusCollector, error) {
|
||||
var configInJSON Prometheus
|
||||
err := json.Unmarshal(configFile, &configInJSON)
|
||||
if err != nil {
|
||||
@ -88,6 +91,7 @@ func NewPrometheusCollector(collectorName string, configFile []byte, metricCount
|
||||
configFile: configInJSON,
|
||||
metricsSet: metricsSet,
|
||||
metricCountLimit: metricCountLimit,
|
||||
httpClient: httpClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -111,7 +115,8 @@ func getMetricData(line string) string {
|
||||
|
||||
func (collector *PrometheusCollector) GetSpec() []v1.MetricSpec {
|
||||
specs := []v1.MetricSpec{}
|
||||
response, err := http.Get(collector.configFile.Endpoint.URL)
|
||||
|
||||
response, err := collector.httpClient.Get(collector.configFile.Endpoint.URL)
|
||||
if err != nil {
|
||||
return specs
|
||||
}
|
||||
@ -157,7 +162,7 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal)
|
||||
nextCollectionTime := currentTime.Add(time.Duration(collector.pollingFrequency))
|
||||
|
||||
uri := collector.configFile.Endpoint.URL
|
||||
response, err := http.Get(uri)
|
||||
response, err := collector.httpClient.Get(uri)
|
||||
if err != nil {
|
||||
return nextCollectionTime, nil, err
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ func TestPrometheus(t *testing.T) {
|
||||
//Create a prometheus collector using the config file 'sample_config_prometheus.json'
|
||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json")
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler)
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler, http.DefaultClient)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "Prometheus")
|
||||
assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics")
|
||||
@ -87,7 +87,7 @@ func TestPrometheusEndpointConfig(t *testing.T) {
|
||||
"222.222.222.222",
|
||||
)
|
||||
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler)
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler, http.DefaultClient)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "Prometheus")
|
||||
assert.Equal(collector.configFile.Endpoint.URL, "http://222.222.222.222:8081/METRICS")
|
||||
@ -99,7 +99,7 @@ func TestPrometheusShortResponse(t *testing.T) {
|
||||
//Create a prometheus collector using the config file 'sample_config_prometheus.json'
|
||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json")
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler)
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler, http.DefaultClient)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "Prometheus")
|
||||
assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics")
|
||||
@ -122,7 +122,7 @@ func TestPrometheusMetricCountLimit(t *testing.T) {
|
||||
//Create a prometheus collector using the config file 'sample_config_prometheus.json'
|
||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus.json")
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 10, containerHandler)
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 10, containerHandler, http.DefaultClient)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "Prometheus")
|
||||
assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics")
|
||||
@ -151,7 +151,7 @@ func TestPrometheusFiltersMetrics(t *testing.T) {
|
||||
//Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json'
|
||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json")
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler)
|
||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler, http.DefaultClient)
|
||||
assert.NoError(err)
|
||||
assert.Equal(collector.name, "Prometheus")
|
||||
assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics")
|
||||
@ -187,6 +187,6 @@ func TestPrometheusFiltersMetricsCountLimit(t *testing.T) {
|
||||
//Create a prometheus collector using the config file 'sample_config_prometheus_filtered.json'
|
||||
configFile, err := ioutil.ReadFile("config/sample_config_prometheus_filtered.json")
|
||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||
_, err = NewPrometheusCollector("Prometheus", configFile, 1, containerHandler)
|
||||
_, err = NewPrometheusCollector("Prometheus", configFile, 1, containerHandler, http.DefaultClient)
|
||||
assert.Error(err)
|
||||
}
|
||||
|
@ -46,6 +46,7 @@ import (
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/opencontainers/runc/libcontainer/cgroups"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings")
|
||||
@ -125,7 +126,7 @@ type Manager interface {
|
||||
}
|
||||
|
||||
// New takes a memory storage and returns a new manager.
|
||||
func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool, ignoreMetricsSet container.MetricSet) (Manager, error) {
|
||||
func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool, ignoreMetricsSet container.MetricSet, collectorHttpClient *http.Client) (Manager, error) {
|
||||
if memoryCache == nil {
|
||||
return nil, fmt.Errorf("manager requires memory storage")
|
||||
}
|
||||
@ -182,6 +183,7 @@ func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingIn
|
||||
ignoreMetrics: ignoreMetricsSet,
|
||||
containerWatchers: []watcher.ContainerWatcher{},
|
||||
eventsChannel: eventsChannel,
|
||||
collectorHttpClient: collectorHttpClient,
|
||||
}
|
||||
|
||||
machineInfo, err := machine.Info(sysfs, fsInfo, inHostNamespace)
|
||||
@ -226,6 +228,7 @@ type manager struct {
|
||||
ignoreMetrics container.MetricSet
|
||||
containerWatchers []watcher.ContainerWatcher
|
||||
eventsChannel chan watcher.ContainerEvent
|
||||
collectorHttpClient *http.Client
|
||||
}
|
||||
|
||||
// Start the container manager.
|
||||
@ -752,7 +755,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
|
||||
glog.V(3).Infof("Got config from %q: %q", v, configFile)
|
||||
|
||||
if strings.HasPrefix(k, "prometheus") || strings.HasPrefix(k, "Prometheus") {
|
||||
newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit, cont.handler)
|
||||
newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit, cont.handler, m.collectorHttpClient)
|
||||
if err != nil {
|
||||
glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
|
||||
return err
|
||||
@ -763,7 +766,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit, cont.handler)
|
||||
newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit, cont.handler, m.collectorHttpClient)
|
||||
if err != nil {
|
||||
glog.Infof("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
|
||||
return err
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
"github.com/google/cadvisor/info/v2"
|
||||
"github.com/google/cadvisor/utils/sysfs/fakesysfs"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// TODO(vmarmol): Refactor these tests.
|
||||
@ -298,7 +299,7 @@ func TestDockerContainersInfo(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewNilManager(t *testing.T) {
|
||||
_, err := New(nil, nil, 60*time.Second, true, container.MetricSet{})
|
||||
_, err := New(nil, nil, 60*time.Second, true, container.MetricSet{}, http.DefaultClient)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected nil manager to return error")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user