From b94c9936d076e5aa0a122bb06a3e9127f09192e5 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Tue, 1 Jul 2014 19:21:43 -0700 Subject: [PATCH] influxdb --- storage/influxdb/influxdb.go | 313 ++++++++++++++++++++++++++++++ storage/influxdb/influxdb_test.go | 83 ++++++++ storage/memory/memory.go | 22 +-- storage/storage.go | 2 +- 4 files changed, 398 insertions(+), 22 deletions(-) create mode 100644 storage/influxdb/influxdb.go create mode 100644 storage/influxdb/influxdb_test.go diff --git a/storage/influxdb/influxdb.go b/storage/influxdb/influxdb.go new file mode 100644 index 00000000..b6685688 --- /dev/null +++ b/storage/influxdb/influxdb.go @@ -0,0 +1,313 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxdb + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/google/cadvisor/info" + "github.com/google/cadvisor/storage" + "github.com/influxdb/influxdb-go" + "github.com/kr/pretty" +) + +type influxdbStorage struct { + client *influxdb.Client + prevStats *info.ContainerStats + machineName string + tableName string + windowLen time.Duration +} + +func (self *influxdbStorage) containerStatsToValues( + ref info.ContainerReference, + stats *info.ContainerStats, +) (columns []string, values []interface{}) { + + // Machine name + columns = append(columns, "machine") + values = append(values, self.machineName) + + // Container path + columns = append(columns, "container_path") + values = append(values, ref.Name) + + // Cumulative Cpu Usage + columns = append(columns, "cpu_cumulative_usage") + values = append(values, stats.Cpu.Usage.Total) + + // Cumulative Cpu Usage in kernel mode + columns = append(columns, "cpu_cumulative_usage_kernel") + values = append(values, stats.Cpu.Usage.System) + + // Cumulative Cpu Usage in user mode + columns = append(columns, "cpu_cumulative_usage_user") + values = append(values, stats.Cpu.Usage.User) + + // Memory Usage + columns = append(columns, "memory_usage") + values = append(values, stats.Memory.Usage) + + // Working set size + columns = append(columns, "memory_working_set") + values = append(values, stats.Memory.WorkingSet) + + // container page fault + columns = append(columns, "memory_container_pgfault") + values = append(values, stats.Memory.ContainerData.Pgfault) + + // container major page fault + columns = append(columns, "memory_container_pgmajfault") + values = append(values, stats.Memory.ContainerData.Pgmajfault) + + // hierarchical page fault + columns = append(columns, "memory_hierarchical_pgfault") + values = append(values, stats.Memory.HierarchicalData.Pgfault) + + // hierarchical major page fault + columns = append(columns, "memory_hierarchical_pgmajfault") + values = append(values, stats.Memory.HierarchicalData.Pgmajfault) + + // per cpu cumulative usage + for i, u := range stats.Cpu.Usage.PerCpu { + columns = append(columns, fmt.Sprintf("per_core_cumulative_usage_core_%v", i)) + values = append(values, u) + } + + sample, err := info.NewSample(self.prevStats, stats) + if err != nil || sample == nil { + return columns, values + } + + // Optional: sample duration. Unit: Nanosecond. + columns = append(columns, "sample_duration") + values = append(values, sample.Duration.Nanoseconds()) + + // Optional: Instant cpu usage + columns = append(columns, "cpu_instant_usage") + values = append(values, sample.Cpu.Usage) + + // Optional: Instant per core usage + for i, u := range sample.Cpu.PerCpuUsage { + columns = append(columns, fmt.Sprintf("per_core_instant_usage_core_%v", i)) + values = append(values, u) + } + + return columns, values +} + +func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) *info.ContainerStats { + stats := &info.ContainerStats{ + Cpu: &info.CpuStats{}, + Memory: &info.MemoryStats{}, + } + perCoreUsage := make(map[int]uint64, 32) + for i, col := range columns { + v := values[i] + switch col { + case "machine": + if v.(string) != self.machineName { + return nil + } + // Cumulative Cpu Usage + case "cpu_cumulative_usage": + stats.Cpu.Usage.Total = v.(uint64) + // Cumulative Cpu Usage in kernel mode + case "cpu_cumulative_usage_kernel": + stats.Cpu.Usage.System = v.(uint64) + // Cumulative Cpu Usage in user mode + case "cpu_cumulative_usage_user": + stats.Cpu.Usage.User = v.(uint64) + // Memory Usage + case "memory_usage": + stats.Memory.Usage = v.(uint64) + // Working set size + case "memory_working_set": + stats.Memory.WorkingSet = v.(uint64) + // container page fault + case "memory_container_pgfault": + stats.Memory.ContainerData.Pgfault = v.(uint64) + // container major page fault + case "memory_container_pgmajfault": + stats.Memory.ContainerData.Pgmajfault = v.(uint64) + // hierarchical page fault + case "memory_hierarchical_pgfault": + stats.Memory.HierarchicalData.Pgfault = v.(uint64) + // hierarchical major page fault + case "memory_hierarchical_pgmajfault": + stats.Memory.HierarchicalData.Pgmajfault = v.(uint64) + default: + if !strings.HasPrefix(col, "per_core_cumulative_usage_core_") { + continue + } + idxStr := col[len("per_core_cumulative_usage_core_"):] + idx, err := strconv.Atoi(idxStr) + if err != nil { + continue + } + perCoreUsage[idx] = v.(uint64) + } + } + stats.Cpu.Usage.PerCpu = make([]uint64, len(perCoreUsage)) + for idx, usage := range perCoreUsage { + stats.Cpu.Usage.PerCpu[idx] = usage + } + return stats +} + +func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) *info.ContainerStatsSample { + sample := &info.ContainerStatsSample{} + perCoreUsage := make(map[int]uint64, 32) + for i, col := range columns { + v := values[i] + switch col { + case "machine": + if v.(string) != self.machineName { + return nil + } + // Memory Usage + case "memory_usage": + sample.Memory.Usage = v.(uint64) + // sample duration. Unit: Nanosecond. + case "sample_duration": + sample.Duration = time.Duration(v.(int64)) + // Instant cpu usage + case "cpu_instant_usage": + sample.Cpu.Usage = v.(uint64) + + default: + if !strings.HasPrefix(col, "per_core_instant_usage_core_") { + continue + } + idxStr := col[len("per_core_instant_usage_core_"):] + idx, err := strconv.Atoi(idxStr) + if err != nil { + continue + } + perCoreUsage[idx] = v.(uint64) + } + } + sample.Cpu.PerCpuUsage = make([]uint64, len(perCoreUsage)) + for idx, usage := range perCoreUsage { + sample.Cpu.PerCpuUsage[idx] = usage + } + if sample.Duration.Nanoseconds() == 0 { + return nil + } + return sample +} + +func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { + series := &influxdb.Series{ + Name: self.tableName, + // There's only one point for each stats + Points: make([][]interface{}, 1), + } + series.Columns, series.Points[0] = self.containerStatsToValues(ref, stats) + + self.prevStats = stats.Copy(self.prevStats) + pretty.Printf("% #v", series) + err := self.client.WriteSeries([]*influxdb.Series{series}) + if err != nil { + return err + } + return nil +} + +func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { + // TODO(dengnan): select only columns that we need + // TODO(dengnan): escape containerName + query := fmt.Sprintf("select * from %v limit %v where container_path=\"%v\"", self.tableName, numStats, containerName) + series, err := self.client.Query(query) + if err != nil { + return nil, err + } + statsList := make([]*info.ContainerStats, 0, len(series)) + for _, s := range series { + for _, values := range s.Points { + stats := self.valuesToContainerStats(s.Columns, values) + statsList = append(statsList, stats) + } + } + return statsList, nil +} + +func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { + query := fmt.Sprintf("select * from %v limit %v where container_path=\"%v\"", self.tableName, numSamples, containerName) + series, err := self.client.Query(query) + if err != nil { + return nil, err + } + sampleList := make([]*info.ContainerStatsSample, 0, len(series)) + for _, s := range series { + for _, values := range s.Points { + sample := self.valuesToContainerSample(s.Columns, values) + sampleList = append(sampleList, sample) + } + } + return sampleList, nil +} + +func (self *influxdbStorage) Close() error { + self.client = nil + return nil +} + +func (self *influxdbStorage) Percentiles( + containerName string, + cpuUsagePercentiles []int, + memUsagePercentiles []int, +) (*info.ContainerStatsPercentiles, error) { + // TODO(dengnan): Implement it + return nil, nil +} + +// machineName: A unique identifier to identify the host that current cAdvisor +// instance is running on. +// hostname: The host which runs influxdb. +// percentilesDuration: Time window which will be considered when calls Percentiles() +func New(machineName, + tablename, + database, + username, + password, + hostname string, + percentilesDuration time.Duration, +) (storage.StorageDriver, error) { + config := &influxdb.ClientConfig{ + Host: hostname, + Username: username, + Password: password, + Database: database, + // IsSecure: true, + } + client, err := influxdb.NewClient(config) + if err != nil { + return nil, err + } + if percentilesDuration.Seconds() < 1.0 { + percentilesDuration = 5 * time.Minute + } + + ret := &influxdbStorage{ + client: client, + windowLen: percentilesDuration, + machineName: machineName, + } + return ret, nil +} diff --git a/storage/influxdb/influxdb_test.go b/storage/influxdb/influxdb_test.go new file mode 100644 index 00000000..1fe15b6d --- /dev/null +++ b/storage/influxdb/influxdb_test.go @@ -0,0 +1,83 @@ +// Copyright 2014 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxdb + +import ( + "fmt" + "testing" + "time" + + "github.com/google/cadvisor/storage" + "github.com/google/cadvisor/storage/test" + "github.com/influxdb/influxdb-go" +) + +func runStorageTest(f func(storage.StorageDriver, *testing.T), t *testing.T) { + machineName := "mymachine" + tablename := "cadivsorTable" + database := "cadvisor" + username := "root" + password := "root" + hostname := "localhost:8086" + percentilesDuration := 10 * time.Minute + config := &influxdb.ClientConfig{ + Host: hostname, + Username: username, + Password: password, + Database: database, + // IsSecure: true, + } + client, err := influxdb.NewClient(config) + if err != nil { + t.Fatal(err) + } + deleteAll := fmt.Sprintf("drop series %v", tablename) + _, err = client.Query(deleteAll) + if err != nil { + t.Fatal(err) + } + // delete all data by the end of the call + defer client.Query(deleteAll) + + /* + series := &influxdb.Series{ + Name: tablename, + Columns: []string{"col1"}, + Points: [][]interface{}{ + []interface{}{1}, + }, + } + err = client.WriteSeries([]*influxdb.Series{series}) + if err != nil { + t.Fatal(err) + } + */ + + driver, err := New(machineName, + tablename, + database, + username, + password, + hostname, + percentilesDuration) + if err != nil { + t.Fatal(err) + } + f(driver, t) +} + +func TestMaxMemoryUsage(t *testing.T) { + runStorageTest(test.StorageDriverTestMaxMemoryUsage, t) +} diff --git a/storage/memory/memory.go b/storage/memory/memory.go index 63c6d552..d30a3df6 100644 --- a/storage/memory/memory.go +++ b/storage/memory/memory.go @@ -41,27 +41,7 @@ func (self *containerStorage) updatePrevStats(stats *info.ContainerStats) { self.prevStats = nil return } - if self.prevStats == nil { - self.prevStats = &info.ContainerStats{ - Cpu: &info.CpuStats{}, - Memory: &info.MemoryStats{}, - } - } - // make a deep copy. - self.prevStats.Timestamp = stats.Timestamp - // copy the slice first. - percpuSlice := self.prevStats.Cpu.Usage.PerCpu - *self.prevStats.Cpu = *stats.Cpu - // If the old slice is enough to hold the new data, then don't allocate - // a new slice. - if len(percpuSlice) != len(stats.Cpu.Usage.PerCpu) { - percpuSlice = make([]uint64, len(stats.Cpu.Usage.PerCpu)) - } - for i, perCpu := range stats.Cpu.Usage.PerCpu { - percpuSlice[i] = perCpu - } - self.prevStats.Cpu.Usage.PerCpu = percpuSlice - *self.prevStats.Memory = *stats.Memory + self.prevStats = stats.Copy(self.prevStats) } func (self *containerStorage) AddStats(stats *info.ContainerStats) error { diff --git a/storage/storage.go b/storage/storage.go index 51f0b9c0..8b264a93 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -31,7 +31,7 @@ type StorageDriver interface { // Returns samples of the container stats. If numSamples < 0, then // the number of returned samples is implementation defined. Otherwise, the driver // should return at most numSamples samples. - Samples(containername string, numSamples int) ([]*info.ContainerStatsSample, error) + Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) // Close will clear the state of the storage driver. The elements // stored in the underlying storage may or may not be deleted depending