diff --git a/collector/prometheus_collector.go b/collector/prometheus_collector.go index 37ee44c8..7f4102f6 100644 --- a/collector/prometheus_collector.go +++ b/collector/prometheus_collector.go @@ -15,15 +15,18 @@ package collector import ( + "bytes" "encoding/json" "fmt" - "io/ioutil" - "math" + "io" "net/http" - "strconv" - "strings" + "sort" "time" + rawmodel "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + "github.com/google/cadvisor/container" "github.com/google/cadvisor/info/v1" ) @@ -100,62 +103,104 @@ func (collector *PrometheusCollector) Name() string { return collector.name } -func getMetricData(line string) string { - fields := strings.Fields(line) - data := fields[3] - if len(fields) > 4 { - for i := range fields { - if i > 3 { - data = data + "_" + fields[i] - } - } - } - return strings.TrimSpace(data) -} - func (collector *PrometheusCollector) GetSpec() []v1.MetricSpec { - specs := []v1.MetricSpec{} response, err := collector.httpClient.Get(collector.configFile.Endpoint.URL) if err != nil { - return specs + return nil } defer response.Body.Close() - pageContent, err := ioutil.ReadAll(response.Body) - if err != nil { - return specs + if response.StatusCode != http.StatusOK { + return nil } - lines := strings.Split(string(pageContent), "\n") - lineCount := len(lines) - for i, line := range lines { - if strings.HasPrefix(line, "# HELP") { - if i+2 >= lineCount { - break - } + dec := expfmt.NewDecoder(response.Body, expfmt.ResponseFormat(response.Header)) - stopIndex := strings.IndexAny(lines[i+2], "{ ") - if stopIndex == -1 { - continue - } + var specs []v1.MetricSpec - name := strings.TrimSpace(lines[i+2][0:stopIndex]) - if _, ok := collector.metricsSet[name]; collector.metricsSet != nil && !ok { - continue - } - spec := v1.MetricSpec{ - Name: name, - Type: v1.MetricType(getMetricData(lines[i+1])), - Format: "float", - Units: getMetricData(lines[i]), - } - specs = append(specs, spec) + for { + d := rawmodel.MetricFamily{} + if err = dec.Decode(&d); err != nil { + break } + name := d.GetName() + if len(name) == 0 { + continue + } + // If metrics to collect is specified, skip any metrics not in the list to collect. + if _, ok := collector.metricsSet[name]; collector.metricsSet != nil && !ok { + continue + } + + spec := v1.MetricSpec{ + Name: name, + Type: metricType(d.GetType()), + Format: v1.FloatType, + } + specs = append(specs, spec) } + + if err != nil && err != io.EOF { + return nil + } + return specs } +// metricType converts Prometheus metric type to cadvisor metric type. +// If there is no mapping then just return the name of the Prometheus metric type. +func metricType(t rawmodel.MetricType) v1.MetricType { + switch t { + case rawmodel.MetricType_COUNTER: + return v1.MetricCumulative + case rawmodel.MetricType_GAUGE: + return v1.MetricGauge + default: + return v1.MetricType(t.String()) + } +} + +type prometheusLabels []*rawmodel.LabelPair + +func labelSetToLabelPairs(labels model.Metric) prometheusLabels { + var promLabels prometheusLabels + for k, v := range labels { + name := string(k) + value := string(v) + promLabels = append(promLabels, &rawmodel.LabelPair{Name: &name, Value: &value}) + } + return promLabels +} + +func (s prometheusLabels) Len() int { return len(s) } +func (s prometheusLabels) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +// ByName implements sort.Interface by providing Less and using the Len and +// Swap methods of the embedded PrometheusLabels value. +type byName struct{ prometheusLabels } + +func (s byName) Less(i, j int) bool { + return s.prometheusLabels[i].GetName() < s.prometheusLabels[j].GetName() +} + +func prometheusLabelSetToCadvisorLabel(promLabels model.Metric) string { + labels := labelSetToLabelPairs(promLabels) + sort.Sort(byName{labels}) + var b bytes.Buffer + + for i, l := range labels { + if i > 0 { + b.WriteString("\xff") + } + b.WriteString(l.GetName()) + b.WriteString("=") + b.WriteString(l.GetValue()) + } + + return string(b.Bytes()) +} + //Returns collected metrics and the next collection time of the collector func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal) (time.Time, map[string][]v1.MetricVal, error) { currentTime := time.Now() @@ -168,59 +213,61 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal) } defer response.Body.Close() - pageContent, err := ioutil.ReadAll(response.Body) - if err != nil { - return nextCollectionTime, nil, err + if response.StatusCode != http.StatusOK { + return nextCollectionTime, nil, fmt.Errorf("server returned HTTP status %s", response.Status) } - var errorSlice []error - lines := strings.Split(string(pageContent), "\n") + sdec := expfmt.SampleDecoder{ + Dec: expfmt.NewDecoder(response.Body, expfmt.ResponseFormat(response.Header)), + Opts: &expfmt.DecodeOptions{ + Timestamp: model.TimeFromUnixNano(currentTime.UnixNano()), + }, + } - newMetrics := make(map[string][]v1.MetricVal) - - for _, line := range lines { - if line == "" { + var ( + // 50 is chosen as a reasonable guesstimate at a number of metrics we can + // expect from virtually any endpoint to try to save allocations. + decSamples = make(model.Vector, 0, 50) + newMetrics = make(map[string][]v1.MetricVal) + ) + for { + if err = sdec.Decode(&decSamples); err != nil { break } - if !strings.HasPrefix(line, "# HELP") && !strings.HasPrefix(line, "# TYPE") { - var metLabel string - startLabelIndex := strings.Index(line, "{") - spaceIndex := strings.Index(line, " ") - if startLabelIndex == -1 { - startLabelIndex = spaceIndex - } - metName := strings.TrimSpace(line[0:startLabelIndex]) + for _, sample := range decSamples { + metName := string(sample.Metric[model.MetricNameLabel]) + if len(metName) == 0 { + continue + } + // If metrics to collect is specified, skip any metrics not in the list to collect. if _, ok := collector.metricsSet[metName]; collector.metricsSet != nil && !ok { continue } - - if startLabelIndex+1 <= spaceIndex-1 { - metLabel = strings.TrimSpace(line[(startLabelIndex + 1):(spaceIndex - 1)]) - } - - metVal, err := strconv.ParseFloat(line[spaceIndex+1:], 64) - if err != nil { - errorSlice = append(errorSlice, err) - } - if math.IsNaN(metVal) { - metVal = 0 - } + // TODO Handle multiple labels nicer. Prometheus metrics can have multiple + // labels, cadvisor only accepts a single string for the metric label. + label := prometheusLabelSetToCadvisorLabel(sample.Metric) metric := v1.MetricVal{ - Label: metLabel, - FloatValue: metVal, - Timestamp: currentTime, + FloatValue: float64(sample.Value), + Timestamp: sample.Timestamp.Time(), + Label: label, } newMetrics[metName] = append(newMetrics[metName], metric) if len(newMetrics) > collector.metricCountLimit { return nextCollectionTime, nil, fmt.Errorf("too many metrics to collect") } } + decSamples = decSamples[:0] } + + if err != nil && err != io.EOF { + return nextCollectionTime, nil, err + } + for key, val := range newMetrics { metrics[key] = append(metrics[key], val...) } - return nextCollectionTime, metrics, compileErrors(errorSlice) + return nextCollectionTime, metrics, nil } diff --git a/collector/prometheus_collector_test.go b/collector/prometheus_collector_test.go index a31cee66..67926fbf 100644 --- a/collector/prometheus_collector_test.go +++ b/collector/prometheus_collector_test.go @@ -36,21 +36,29 @@ func TestPrometheus(t *testing.T) { containerHandler := containertest.NewMockContainerHandler("mockContainer") collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler, http.DefaultClient) assert.NoError(err) - assert.Equal(collector.name, "Prometheus") - assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics") + assert.Equal("Prometheus", collector.name) + assert.Equal("http://localhost:8080/metrics", collector.configFile.Endpoint.URL) tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - text := "# HELP go_gc_duration_seconds A summary of the GC invocation durations.\n" - text += "# TYPE go_gc_duration_seconds summary\n" - text += "go_gc_duration_seconds{quantile=\"0\"} 5.8348000000000004e-05\n" - text += "go_gc_duration_seconds{quantile=\"1\"} 0.000499764\n" - text += "# HELP go_goroutines Number of goroutines that currently exist.\n" - text += "# TYPE go_goroutines gauge\n" - text += "go_goroutines 16\n" - text += "# HELP empty_metric A metric without any values\n" - text += "# TYPE empty_metric counter\n" - text += "\n" + text := `# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 5.8348000000000004e-05 +go_gc_duration_seconds{quantile="1"} 0.000499764 +go_gc_duration_seconds_sum 1.7560473e+07 +go_gc_duration_seconds_count 2693 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 16 +# HELP empty_metric A metric without any values +# TYPE empty_metric counter +# HELP metric_with_spaces_in_label A metric with spaces in a label. +# TYPE metric_with_spaces_in_label gauge +metric_with_spaces_in_label{name="Network Agent"} 72 +# HELP metric_with_multiple_labels A metric with multiple labels. +# TYPE metric_with_multiple_labels gauge +metric_with_multiple_labels{label1="One", label2="Two", label3="Three"} 81 +` fmt.Fprintln(w, text) })) @@ -60,9 +68,18 @@ func TestPrometheus(t *testing.T) { var spec []v1.MetricSpec require.NotPanics(t, func() { spec = collector.GetSpec() }) - assert.Len(spec, 2) - assert.Equal(spec[0].Name, "go_gc_duration_seconds") - assert.Equal(spec[1].Name, "go_goroutines") + assert.Len(spec, 4) + specNames := make(map[string]struct{}, 3) + for _, s := range spec { + specNames[s.Name] = struct{}{} + } + expectedSpecNames := map[string]struct{}{ + "go_gc_duration_seconds": {}, + "go_goroutines": {}, + "metric_with_spaces_in_label": {}, + "metric_with_multiple_labels": {}, + } + assert.Equal(expectedSpecNames, specNames) metrics := map[string][]v1.MetricVal{} _, metrics, errMetric := collector.Collect(metrics) @@ -70,11 +87,28 @@ func TestPrometheus(t *testing.T) { assert.NoError(errMetric) go_gc_duration := metrics["go_gc_duration_seconds"] - assert.Equal(go_gc_duration[0].FloatValue, 5.8348000000000004e-05) - assert.Equal(go_gc_duration[1].FloatValue, 0.000499764) + assert.Equal(5.8348000000000004e-05, go_gc_duration[0].FloatValue) + assert.Equal("__name__=go_gc_duration_seconds\xffquantile=0", go_gc_duration[0].Label) + assert.Equal(0.000499764, go_gc_duration[1].FloatValue) + assert.Equal("__name__=go_gc_duration_seconds\xffquantile=1", go_gc_duration[1].Label) + go_gc_duration_sum := metrics["go_gc_duration_seconds_sum"] + assert.Equal(1.7560473e+07, go_gc_duration_sum[0].FloatValue) + assert.Equal("__name__=go_gc_duration_seconds_sum", go_gc_duration_sum[0].Label) + go_gc_duration_count := metrics["go_gc_duration_seconds_count"] + assert.Equal(2693, go_gc_duration_count[0].FloatValue) + assert.Equal("__name__=go_gc_duration_seconds_count", go_gc_duration_count[0].Label) goRoutines := metrics["go_goroutines"] - assert.Equal(goRoutines[0].FloatValue, 16) + assert.Equal(16, goRoutines[0].FloatValue) + assert.Equal("__name__=go_goroutines", goRoutines[0].Label) + + metricWithSpaces := metrics["metric_with_spaces_in_label"] + assert.Equal(72, metricWithSpaces[0].FloatValue) + assert.Equal("__name__=metric_with_spaces_in_label\xffname=Network Agent", metricWithSpaces[0].Label) + + metricWithMultipleLabels := metrics["metric_with_multiple_labels"] + assert.Equal(81, metricWithMultipleLabels[0].FloatValue) + assert.Equal("__name__=metric_with_multiple_labels\xfflabel1=One\xfflabel2=Two\xfflabel3=Three", metricWithMultipleLabels[0].Label) } func TestPrometheusEndpointConfig(t *testing.T) { @@ -158,13 +192,16 @@ func TestPrometheusFiltersMetrics(t *testing.T) { tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - text := "# HELP go_gc_duration_seconds A summary of the GC invocation durations.\n" - text += "# TYPE go_gc_duration_seconds summary\n" - text += "go_gc_duration_seconds{quantile=\"0\"} 5.8348000000000004e-05\n" - text += "go_gc_duration_seconds{quantile=\"1\"} 0.000499764\n" - text += "# HELP go_goroutines Number of goroutines that currently exist.\n" - text += "# TYPE go_goroutines gauge\n" - text += "go_goroutines 16" + text := `# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 5.8348000000000004e-05 +go_gc_duration_seconds{quantile="1"} 0.000499764 +go_gc_duration_seconds_sum 1.7560473e+07 +go_gc_duration_seconds_count 2693 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 16 +` fmt.Fprintln(w, text) })) diff --git a/info/v1/metric.go b/info/v1/metric.go index 90fd9e49..30c23ed1 100644 --- a/info/v1/metric.go +++ b/info/v1/metric.go @@ -26,10 +26,10 @@ const ( MetricGauge MetricType = "gauge" // A counter-like value that is only expected to increase. - MetricCumulative = "cumulative" + MetricCumulative MetricType = "cumulative" // Rate over a time period. - MetricDelta = "delta" + MetricDelta MetricType = "delta" ) // DataType for metric being exported. @@ -37,7 +37,7 @@ type DataType string const ( IntType DataType = "int" - FloatType = "float" + FloatType DataType = "float" ) // Spec for custom metric. diff --git a/manager/manager_test.go b/manager/manager_test.go index 2229de9b..efa33816 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "net/http" + "github.com/google/cadvisor/cache/memory" "github.com/google/cadvisor/collector" "github.com/google/cadvisor/container" @@ -33,7 +35,6 @@ import ( "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/utils/sysfs/fakesysfs" "github.com/stretchr/testify/assert" - "net/http" ) // TODO(vmarmol): Refactor these tests.