Merge pull request #1473 from jimmidyson/prom-collector-label-spaces
Switch to Prometheus decoders for Prometheus custom metric endpoint parsing
This commit is contained in:
commit
0c6b72de19
@ -15,15 +15,18 @@
|
|||||||
package collector
|
package collector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io"
|
||||||
"math"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"sort"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
rawmodel "github.com/prometheus/client_model/go"
|
||||||
|
"github.com/prometheus/common/expfmt"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
"github.com/google/cadvisor/container"
|
"github.com/google/cadvisor/container"
|
||||||
"github.com/google/cadvisor/info/v1"
|
"github.com/google/cadvisor/info/v1"
|
||||||
)
|
)
|
||||||
@ -100,62 +103,104 @@ func (collector *PrometheusCollector) Name() string {
|
|||||||
return collector.name
|
return collector.name
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMetricData(line string) string {
|
|
||||||
fields := strings.Fields(line)
|
|
||||||
data := fields[3]
|
|
||||||
if len(fields) > 4 {
|
|
||||||
for i := range fields {
|
|
||||||
if i > 3 {
|
|
||||||
data = data + "_" + fields[i]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return strings.TrimSpace(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (collector *PrometheusCollector) GetSpec() []v1.MetricSpec {
|
func (collector *PrometheusCollector) GetSpec() []v1.MetricSpec {
|
||||||
specs := []v1.MetricSpec{}
|
|
||||||
|
|
||||||
response, err := collector.httpClient.Get(collector.configFile.Endpoint.URL)
|
response, err := collector.httpClient.Get(collector.configFile.Endpoint.URL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return specs
|
return nil
|
||||||
}
|
}
|
||||||
defer response.Body.Close()
|
defer response.Body.Close()
|
||||||
|
|
||||||
pageContent, err := ioutil.ReadAll(response.Body)
|
if response.StatusCode != http.StatusOK {
|
||||||
if err != nil {
|
return nil
|
||||||
return specs
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lines := strings.Split(string(pageContent), "\n")
|
dec := expfmt.NewDecoder(response.Body, expfmt.ResponseFormat(response.Header))
|
||||||
lineCount := len(lines)
|
|
||||||
for i, line := range lines {
|
|
||||||
if strings.HasPrefix(line, "# HELP") {
|
|
||||||
if i+2 >= lineCount {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
stopIndex := strings.IndexAny(lines[i+2], "{ ")
|
var specs []v1.MetricSpec
|
||||||
if stopIndex == -1 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
name := strings.TrimSpace(lines[i+2][0:stopIndex])
|
for {
|
||||||
if _, ok := collector.metricsSet[name]; collector.metricsSet != nil && !ok {
|
d := rawmodel.MetricFamily{}
|
||||||
continue
|
if err = dec.Decode(&d); err != nil {
|
||||||
}
|
break
|
||||||
spec := v1.MetricSpec{
|
|
||||||
Name: name,
|
|
||||||
Type: v1.MetricType(getMetricData(lines[i+1])),
|
|
||||||
Format: "float",
|
|
||||||
Units: getMetricData(lines[i]),
|
|
||||||
}
|
|
||||||
specs = append(specs, spec)
|
|
||||||
}
|
}
|
||||||
|
name := d.GetName()
|
||||||
|
if len(name) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// If metrics to collect is specified, skip any metrics not in the list to collect.
|
||||||
|
if _, ok := collector.metricsSet[name]; collector.metricsSet != nil && !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
spec := v1.MetricSpec{
|
||||||
|
Name: name,
|
||||||
|
Type: metricType(d.GetType()),
|
||||||
|
Format: v1.FloatType,
|
||||||
|
}
|
||||||
|
specs = append(specs, spec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
return specs
|
return specs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// metricType converts Prometheus metric type to cadvisor metric type.
|
||||||
|
// If there is no mapping then just return the name of the Prometheus metric type.
|
||||||
|
func metricType(t rawmodel.MetricType) v1.MetricType {
|
||||||
|
switch t {
|
||||||
|
case rawmodel.MetricType_COUNTER:
|
||||||
|
return v1.MetricCumulative
|
||||||
|
case rawmodel.MetricType_GAUGE:
|
||||||
|
return v1.MetricGauge
|
||||||
|
default:
|
||||||
|
return v1.MetricType(t.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type prometheusLabels []*rawmodel.LabelPair
|
||||||
|
|
||||||
|
func labelSetToLabelPairs(labels model.Metric) prometheusLabels {
|
||||||
|
var promLabels prometheusLabels
|
||||||
|
for k, v := range labels {
|
||||||
|
name := string(k)
|
||||||
|
value := string(v)
|
||||||
|
promLabels = append(promLabels, &rawmodel.LabelPair{Name: &name, Value: &value})
|
||||||
|
}
|
||||||
|
return promLabels
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s prometheusLabels) Len() int { return len(s) }
|
||||||
|
func (s prometheusLabels) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||||
|
|
||||||
|
// ByName implements sort.Interface by providing Less and using the Len and
|
||||||
|
// Swap methods of the embedded PrometheusLabels value.
|
||||||
|
type byName struct{ prometheusLabels }
|
||||||
|
|
||||||
|
func (s byName) Less(i, j int) bool {
|
||||||
|
return s.prometheusLabels[i].GetName() < s.prometheusLabels[j].GetName()
|
||||||
|
}
|
||||||
|
|
||||||
|
func prometheusLabelSetToCadvisorLabel(promLabels model.Metric) string {
|
||||||
|
labels := labelSetToLabelPairs(promLabels)
|
||||||
|
sort.Sort(byName{labels})
|
||||||
|
var b bytes.Buffer
|
||||||
|
|
||||||
|
for i, l := range labels {
|
||||||
|
if i > 0 {
|
||||||
|
b.WriteString("\xff")
|
||||||
|
}
|
||||||
|
b.WriteString(l.GetName())
|
||||||
|
b.WriteString("=")
|
||||||
|
b.WriteString(l.GetValue())
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(b.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
//Returns collected metrics and the next collection time of the collector
|
//Returns collected metrics and the next collection time of the collector
|
||||||
func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal) (time.Time, map[string][]v1.MetricVal, error) {
|
func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal) (time.Time, map[string][]v1.MetricVal, error) {
|
||||||
currentTime := time.Now()
|
currentTime := time.Now()
|
||||||
@ -168,59 +213,61 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal)
|
|||||||
}
|
}
|
||||||
defer response.Body.Close()
|
defer response.Body.Close()
|
||||||
|
|
||||||
pageContent, err := ioutil.ReadAll(response.Body)
|
if response.StatusCode != http.StatusOK {
|
||||||
if err != nil {
|
return nextCollectionTime, nil, fmt.Errorf("server returned HTTP status %s", response.Status)
|
||||||
return nextCollectionTime, nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var errorSlice []error
|
sdec := expfmt.SampleDecoder{
|
||||||
lines := strings.Split(string(pageContent), "\n")
|
Dec: expfmt.NewDecoder(response.Body, expfmt.ResponseFormat(response.Header)),
|
||||||
|
Opts: &expfmt.DecodeOptions{
|
||||||
|
Timestamp: model.TimeFromUnixNano(currentTime.UnixNano()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
newMetrics := make(map[string][]v1.MetricVal)
|
var (
|
||||||
|
// 50 is chosen as a reasonable guesstimate at a number of metrics we can
|
||||||
for _, line := range lines {
|
// expect from virtually any endpoint to try to save allocations.
|
||||||
if line == "" {
|
decSamples = make(model.Vector, 0, 50)
|
||||||
|
newMetrics = make(map[string][]v1.MetricVal)
|
||||||
|
)
|
||||||
|
for {
|
||||||
|
if err = sdec.Decode(&decSamples); err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if !strings.HasPrefix(line, "# HELP") && !strings.HasPrefix(line, "# TYPE") {
|
|
||||||
var metLabel string
|
|
||||||
startLabelIndex := strings.Index(line, "{")
|
|
||||||
spaceIndex := strings.Index(line, " ")
|
|
||||||
if startLabelIndex == -1 {
|
|
||||||
startLabelIndex = spaceIndex
|
|
||||||
}
|
|
||||||
|
|
||||||
metName := strings.TrimSpace(line[0:startLabelIndex])
|
for _, sample := range decSamples {
|
||||||
|
metName := string(sample.Metric[model.MetricNameLabel])
|
||||||
|
if len(metName) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// If metrics to collect is specified, skip any metrics not in the list to collect.
|
||||||
if _, ok := collector.metricsSet[metName]; collector.metricsSet != nil && !ok {
|
if _, ok := collector.metricsSet[metName]; collector.metricsSet != nil && !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// TODO Handle multiple labels nicer. Prometheus metrics can have multiple
|
||||||
if startLabelIndex+1 <= spaceIndex-1 {
|
// labels, cadvisor only accepts a single string for the metric label.
|
||||||
metLabel = strings.TrimSpace(line[(startLabelIndex + 1):(spaceIndex - 1)])
|
label := prometheusLabelSetToCadvisorLabel(sample.Metric)
|
||||||
}
|
|
||||||
|
|
||||||
metVal, err := strconv.ParseFloat(line[spaceIndex+1:], 64)
|
|
||||||
if err != nil {
|
|
||||||
errorSlice = append(errorSlice, err)
|
|
||||||
}
|
|
||||||
if math.IsNaN(metVal) {
|
|
||||||
metVal = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
metric := v1.MetricVal{
|
metric := v1.MetricVal{
|
||||||
Label: metLabel,
|
FloatValue: float64(sample.Value),
|
||||||
FloatValue: metVal,
|
Timestamp: sample.Timestamp.Time(),
|
||||||
Timestamp: currentTime,
|
Label: label,
|
||||||
}
|
}
|
||||||
newMetrics[metName] = append(newMetrics[metName], metric)
|
newMetrics[metName] = append(newMetrics[metName], metric)
|
||||||
if len(newMetrics) > collector.metricCountLimit {
|
if len(newMetrics) > collector.metricCountLimit {
|
||||||
return nextCollectionTime, nil, fmt.Errorf("too many metrics to collect")
|
return nextCollectionTime, nil, fmt.Errorf("too many metrics to collect")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
decSamples = decSamples[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return nextCollectionTime, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
for key, val := range newMetrics {
|
for key, val := range newMetrics {
|
||||||
metrics[key] = append(metrics[key], val...)
|
metrics[key] = append(metrics[key], val...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nextCollectionTime, metrics, compileErrors(errorSlice)
|
return nextCollectionTime, metrics, nil
|
||||||
}
|
}
|
||||||
|
@ -36,21 +36,29 @@ func TestPrometheus(t *testing.T) {
|
|||||||
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
containerHandler := containertest.NewMockContainerHandler("mockContainer")
|
||||||
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler, http.DefaultClient)
|
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler, http.DefaultClient)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
assert.Equal(collector.name, "Prometheus")
|
assert.Equal("Prometheus", collector.name)
|
||||||
assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics")
|
assert.Equal("http://localhost:8080/metrics", collector.configFile.Endpoint.URL)
|
||||||
|
|
||||||
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
text := "# HELP go_gc_duration_seconds A summary of the GC invocation durations.\n"
|
text := `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
|
||||||
text += "# TYPE go_gc_duration_seconds summary\n"
|
# TYPE go_gc_duration_seconds summary
|
||||||
text += "go_gc_duration_seconds{quantile=\"0\"} 5.8348000000000004e-05\n"
|
go_gc_duration_seconds{quantile="0"} 5.8348000000000004e-05
|
||||||
text += "go_gc_duration_seconds{quantile=\"1\"} 0.000499764\n"
|
go_gc_duration_seconds{quantile="1"} 0.000499764
|
||||||
text += "# HELP go_goroutines Number of goroutines that currently exist.\n"
|
go_gc_duration_seconds_sum 1.7560473e+07
|
||||||
text += "# TYPE go_goroutines gauge\n"
|
go_gc_duration_seconds_count 2693
|
||||||
text += "go_goroutines 16\n"
|
# HELP go_goroutines Number of goroutines that currently exist.
|
||||||
text += "# HELP empty_metric A metric without any values\n"
|
# TYPE go_goroutines gauge
|
||||||
text += "# TYPE empty_metric counter\n"
|
go_goroutines 16
|
||||||
text += "\n"
|
# HELP empty_metric A metric without any values
|
||||||
|
# TYPE empty_metric counter
|
||||||
|
# HELP metric_with_spaces_in_label A metric with spaces in a label.
|
||||||
|
# TYPE metric_with_spaces_in_label gauge
|
||||||
|
metric_with_spaces_in_label{name="Network Agent"} 72
|
||||||
|
# HELP metric_with_multiple_labels A metric with multiple labels.
|
||||||
|
# TYPE metric_with_multiple_labels gauge
|
||||||
|
metric_with_multiple_labels{label1="One", label2="Two", label3="Three"} 81
|
||||||
|
`
|
||||||
fmt.Fprintln(w, text)
|
fmt.Fprintln(w, text)
|
||||||
}))
|
}))
|
||||||
|
|
||||||
@ -60,9 +68,18 @@ func TestPrometheus(t *testing.T) {
|
|||||||
|
|
||||||
var spec []v1.MetricSpec
|
var spec []v1.MetricSpec
|
||||||
require.NotPanics(t, func() { spec = collector.GetSpec() })
|
require.NotPanics(t, func() { spec = collector.GetSpec() })
|
||||||
assert.Len(spec, 2)
|
assert.Len(spec, 4)
|
||||||
assert.Equal(spec[0].Name, "go_gc_duration_seconds")
|
specNames := make(map[string]struct{}, 3)
|
||||||
assert.Equal(spec[1].Name, "go_goroutines")
|
for _, s := range spec {
|
||||||
|
specNames[s.Name] = struct{}{}
|
||||||
|
}
|
||||||
|
expectedSpecNames := map[string]struct{}{
|
||||||
|
"go_gc_duration_seconds": {},
|
||||||
|
"go_goroutines": {},
|
||||||
|
"metric_with_spaces_in_label": {},
|
||||||
|
"metric_with_multiple_labels": {},
|
||||||
|
}
|
||||||
|
assert.Equal(expectedSpecNames, specNames)
|
||||||
|
|
||||||
metrics := map[string][]v1.MetricVal{}
|
metrics := map[string][]v1.MetricVal{}
|
||||||
_, metrics, errMetric := collector.Collect(metrics)
|
_, metrics, errMetric := collector.Collect(metrics)
|
||||||
@ -70,11 +87,28 @@ func TestPrometheus(t *testing.T) {
|
|||||||
assert.NoError(errMetric)
|
assert.NoError(errMetric)
|
||||||
|
|
||||||
go_gc_duration := metrics["go_gc_duration_seconds"]
|
go_gc_duration := metrics["go_gc_duration_seconds"]
|
||||||
assert.Equal(go_gc_duration[0].FloatValue, 5.8348000000000004e-05)
|
assert.Equal(5.8348000000000004e-05, go_gc_duration[0].FloatValue)
|
||||||
assert.Equal(go_gc_duration[1].FloatValue, 0.000499764)
|
assert.Equal("__name__=go_gc_duration_seconds\xffquantile=0", go_gc_duration[0].Label)
|
||||||
|
assert.Equal(0.000499764, go_gc_duration[1].FloatValue)
|
||||||
|
assert.Equal("__name__=go_gc_duration_seconds\xffquantile=1", go_gc_duration[1].Label)
|
||||||
|
go_gc_duration_sum := metrics["go_gc_duration_seconds_sum"]
|
||||||
|
assert.Equal(1.7560473e+07, go_gc_duration_sum[0].FloatValue)
|
||||||
|
assert.Equal("__name__=go_gc_duration_seconds_sum", go_gc_duration_sum[0].Label)
|
||||||
|
go_gc_duration_count := metrics["go_gc_duration_seconds_count"]
|
||||||
|
assert.Equal(2693, go_gc_duration_count[0].FloatValue)
|
||||||
|
assert.Equal("__name__=go_gc_duration_seconds_count", go_gc_duration_count[0].Label)
|
||||||
|
|
||||||
goRoutines := metrics["go_goroutines"]
|
goRoutines := metrics["go_goroutines"]
|
||||||
assert.Equal(goRoutines[0].FloatValue, 16)
|
assert.Equal(16, goRoutines[0].FloatValue)
|
||||||
|
assert.Equal("__name__=go_goroutines", goRoutines[0].Label)
|
||||||
|
|
||||||
|
metricWithSpaces := metrics["metric_with_spaces_in_label"]
|
||||||
|
assert.Equal(72, metricWithSpaces[0].FloatValue)
|
||||||
|
assert.Equal("__name__=metric_with_spaces_in_label\xffname=Network Agent", metricWithSpaces[0].Label)
|
||||||
|
|
||||||
|
metricWithMultipleLabels := metrics["metric_with_multiple_labels"]
|
||||||
|
assert.Equal(81, metricWithMultipleLabels[0].FloatValue)
|
||||||
|
assert.Equal("__name__=metric_with_multiple_labels\xfflabel1=One\xfflabel2=Two\xfflabel3=Three", metricWithMultipleLabels[0].Label)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPrometheusEndpointConfig(t *testing.T) {
|
func TestPrometheusEndpointConfig(t *testing.T) {
|
||||||
@ -158,13 +192,16 @@ func TestPrometheusFiltersMetrics(t *testing.T) {
|
|||||||
|
|
||||||
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
text := "# HELP go_gc_duration_seconds A summary of the GC invocation durations.\n"
|
text := `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
|
||||||
text += "# TYPE go_gc_duration_seconds summary\n"
|
# TYPE go_gc_duration_seconds summary
|
||||||
text += "go_gc_duration_seconds{quantile=\"0\"} 5.8348000000000004e-05\n"
|
go_gc_duration_seconds{quantile="0"} 5.8348000000000004e-05
|
||||||
text += "go_gc_duration_seconds{quantile=\"1\"} 0.000499764\n"
|
go_gc_duration_seconds{quantile="1"} 0.000499764
|
||||||
text += "# HELP go_goroutines Number of goroutines that currently exist.\n"
|
go_gc_duration_seconds_sum 1.7560473e+07
|
||||||
text += "# TYPE go_goroutines gauge\n"
|
go_gc_duration_seconds_count 2693
|
||||||
text += "go_goroutines 16"
|
# HELP go_goroutines Number of goroutines that currently exist.
|
||||||
|
# TYPE go_goroutines gauge
|
||||||
|
go_goroutines 16
|
||||||
|
`
|
||||||
fmt.Fprintln(w, text)
|
fmt.Fprintln(w, text)
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
@ -26,10 +26,10 @@ const (
|
|||||||
MetricGauge MetricType = "gauge"
|
MetricGauge MetricType = "gauge"
|
||||||
|
|
||||||
// A counter-like value that is only expected to increase.
|
// A counter-like value that is only expected to increase.
|
||||||
MetricCumulative = "cumulative"
|
MetricCumulative MetricType = "cumulative"
|
||||||
|
|
||||||
// Rate over a time period.
|
// Rate over a time period.
|
||||||
MetricDelta = "delta"
|
MetricDelta MetricType = "delta"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DataType for metric being exported.
|
// DataType for metric being exported.
|
||||||
@ -37,7 +37,7 @@ type DataType string
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
IntType DataType = "int"
|
IntType DataType = "int"
|
||||||
FloatType = "float"
|
FloatType DataType = "float"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Spec for custom metric.
|
// Spec for custom metric.
|
||||||
|
@ -23,6 +23,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"net/http"
|
||||||
|
|
||||||
"github.com/google/cadvisor/cache/memory"
|
"github.com/google/cadvisor/cache/memory"
|
||||||
"github.com/google/cadvisor/collector"
|
"github.com/google/cadvisor/collector"
|
||||||
"github.com/google/cadvisor/container"
|
"github.com/google/cadvisor/container"
|
||||||
@ -33,7 +35,6 @@ import (
|
|||||||
"github.com/google/cadvisor/info/v2"
|
"github.com/google/cadvisor/info/v2"
|
||||||
"github.com/google/cadvisor/utils/sysfs/fakesysfs"
|
"github.com/google/cadvisor/utils/sysfs/fakesysfs"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"net/http"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO(vmarmol): Refactor these tests.
|
// TODO(vmarmol): Refactor these tests.
|
||||||
|
Loading…
Reference in New Issue
Block a user