cadvisor/storage/influxdb/influxdb.go
2014-07-02 14:05:06 -07:00

393 lines
11 KiB
Go

// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package influxdb
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/google/cadvisor/info"
"github.com/google/cadvisor/storage"
"github.com/influxdb/influxdb-go"
)
type influxdbStorage struct {
client *influxdb.Client
prevStats *info.ContainerStats
machineName string
tableName string
windowLen time.Duration
}
func (self *influxdbStorage) containerStatsToValues(
ref info.ContainerReference,
stats *info.ContainerStats,
) (columns []string, values []interface{}) {
// Timestamp
columns = append(columns, "timestamp")
values = append(values, stats.Timestamp.Format(time.RFC3339Nano))
// Machine name
columns = append(columns, "machine")
values = append(values, self.machineName)
// Container path
columns = append(columns, "container_path")
values = append(values, ref.Name)
// Cumulative Cpu Usage
columns = append(columns, "cpu_cumulative_usage")
values = append(values, stats.Cpu.Usage.Total)
// Cumulative Cpu Usage in kernel mode
columns = append(columns, "cpu_cumulative_usage_kernel")
values = append(values, stats.Cpu.Usage.System)
// Cumulative Cpu Usage in user mode
columns = append(columns, "cpu_cumulative_usage_user")
values = append(values, stats.Cpu.Usage.User)
// Memory Usage
columns = append(columns, "memory_usage")
values = append(values, stats.Memory.Usage)
// Working set size
columns = append(columns, "memory_working_set")
values = append(values, stats.Memory.WorkingSet)
// container page fault
columns = append(columns, "memory_container_pgfault")
values = append(values, stats.Memory.ContainerData.Pgfault)
// container major page fault
columns = append(columns, "memory_container_pgmajfault")
values = append(values, stats.Memory.ContainerData.Pgmajfault)
// hierarchical page fault
columns = append(columns, "memory_hierarchical_pgfault")
values = append(values, stats.Memory.HierarchicalData.Pgfault)
// hierarchical major page fault
columns = append(columns, "memory_hierarchical_pgmajfault")
values = append(values, stats.Memory.HierarchicalData.Pgmajfault)
// per cpu cumulative usage
for i, u := range stats.Cpu.Usage.PerCpu {
columns = append(columns, fmt.Sprintf("per_core_cumulative_usage_core_%v", i))
values = append(values, u)
}
sample, err := info.NewSample(self.prevStats, stats)
if err != nil || sample == nil {
return columns, values
}
// Optional: sample duration. Unit: Nanosecond.
columns = append(columns, "sample_duration")
values = append(values, sample.Duration.String())
// Optional: Instant cpu usage
columns = append(columns, "cpu_instant_usage")
values = append(values, sample.Cpu.Usage)
// Optional: Instant per core usage
for i, u := range sample.Cpu.PerCpuUsage {
columns = append(columns, fmt.Sprintf("per_core_instant_usage_core_%v", i))
values = append(values, u)
}
return columns, values
}
func convertToUint64(v interface{}) (uint64, error) {
if v == nil {
return 0, nil
}
switch x := v.(type) {
case uint64:
return x, nil
case int:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case int32:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case int64:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case float64:
if x < 0 {
return 0, fmt.Errorf("negative value: %v", x)
}
return uint64(x), nil
case uint32:
return uint64(x), nil
}
return 0, fmt.Errorf("Unknown type")
}
func (self *influxdbStorage) valuesToContainerStats(columns []string, values []interface{}) (*info.ContainerStats, error) {
stats := &info.ContainerStats{
Cpu: &info.CpuStats{},
Memory: &info.MemoryStats{},
}
perCoreUsage := make(map[int]uint64, 32)
var err error
for i, col := range columns {
v := values[i]
switch col {
case "timestamp":
if str, ok := v.(string); ok {
stats.Timestamp, err = time.Parse(time.RFC3339Nano, str)
}
case "machine":
if v.(string) != self.machineName {
return nil, fmt.Errorf("different machine")
}
// Cumulative Cpu Usage
case "cpu_cumulative_usage":
stats.Cpu.Usage.Total, err = convertToUint64(v)
// Cumulative Cpu Usage in kernel mode
case "cpu_cumulative_usage_kernel":
stats.Cpu.Usage.System, err = convertToUint64(v)
// Cumulative Cpu Usage in user mode
case "cpu_cumulative_usage_user":
stats.Cpu.Usage.User, err = convertToUint64(v)
// Memory Usage
case "memory_usage":
stats.Memory.Usage, err = convertToUint64(v)
// Working set size
case "memory_working_set":
stats.Memory.WorkingSet, err = convertToUint64(v)
// container page fault
case "memory_container_pgfault":
stats.Memory.ContainerData.Pgfault, err = convertToUint64(v)
// container major page fault
case "memory_container_pgmajfault":
stats.Memory.ContainerData.Pgmajfault, err = convertToUint64(v)
// hierarchical page fault
case "memory_hierarchical_pgfault":
stats.Memory.HierarchicalData.Pgfault, err = convertToUint64(v)
// hierarchical major page fault
case "memory_hierarchical_pgmajfault":
stats.Memory.HierarchicalData.Pgmajfault, err = convertToUint64(v)
default:
if !strings.HasPrefix(col, "per_core_cumulative_usage_core_") {
continue
}
idxStr := col[len("per_core_cumulative_usage_core_"):]
idx, err := strconv.Atoi(idxStr)
if err != nil {
continue
}
perCoreUsage[idx], err = convertToUint64(v)
}
if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
}
}
stats.Cpu.Usage.PerCpu = make([]uint64, len(perCoreUsage))
for idx, usage := range perCoreUsage {
stats.Cpu.Usage.PerCpu[idx] = usage
}
return stats, nil
}
func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) {
sample := &info.ContainerStatsSample{}
perCoreUsage := make(map[int]uint64, 32)
var err error
for i, col := range columns {
v := values[i]
switch col {
case "timestamp":
if str, ok := v.(string); ok {
sample.Timestamp, err = time.Parse(time.RFC3339Nano, str)
}
case "machine":
if v.(string) != self.machineName {
return nil, fmt.Errorf("different machine")
}
// Memory Usage
case "memory_usage":
sample.Memory.Usage, err = convertToUint64(v)
// sample duration. Unit: Nanosecond.
case "sample_duration":
if v == nil {
// this record does not have sample_duration, so it's the first stats.
return nil, nil
}
sample.Duration, err = time.ParseDuration(v.(string))
// Instant cpu usage
case "cpu_instant_usage":
sample.Cpu.Usage, err = convertToUint64(v)
default:
if !strings.HasPrefix(col, "per_core_instant_usage_core_") {
continue
}
idxStr := col[len("per_core_instant_usage_core_"):]
idx, err := strconv.Atoi(idxStr)
if err != nil {
continue
}
perCoreUsage[idx], err = convertToUint64(v)
}
if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
}
}
sample.Cpu.PerCpuUsage = make([]uint64, len(perCoreUsage))
for idx, usage := range perCoreUsage {
sample.Cpu.PerCpuUsage[idx] = usage
}
if sample.Duration.Nanoseconds() == 0 {
return nil, nil
}
return sample, nil
}
func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
series := &influxdb.Series{
Name: self.tableName,
// There's only one point for each stats
Points: make([][]interface{}, 1),
}
series.Columns, series.Points[0] = self.containerStatsToValues(ref, stats)
self.prevStats = stats.Copy(self.prevStats)
err := self.client.WriteSeries([]*influxdb.Series{series})
if err != nil {
return err
}
return nil
}
func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
// TODO(dengnan): select only columns that we need
// TODO(dengnan): escape containerName
query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName)
if numStats > 0 {
query = fmt.Sprintf("%v limit %v", query, numStats)
}
series, err := self.client.Query(query)
if err != nil {
return nil, err
}
statsList := make([]*info.ContainerStats, 0, len(series))
for _, s := range series {
for _, values := range s.Points {
stats, err := self.valuesToContainerStats(s.Columns, values)
if err != nil {
return nil, err
}
if stats == nil {
continue
}
statsList = append(statsList, stats)
}
}
return statsList, nil
}
func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
// TODO(dengnan): select only columns that we need
// TODO(dengnan): escape containerName
query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName)
if numSamples > 0 {
query = fmt.Sprintf("%v limit %v", query, numSamples)
}
series, err := self.client.Query(query)
if err != nil {
return nil, err
}
sampleList := make([]*info.ContainerStatsSample, 0, len(series))
for _, s := range series {
for _, values := range s.Points {
sample, err := self.valuesToContainerSample(s.Columns, values)
if err != nil {
return nil, err
}
if sample == nil {
continue
}
sampleList = append(sampleList, sample)
}
}
return sampleList, nil
}
func (self *influxdbStorage) Close() error {
self.client = nil
return nil
}
func (self *influxdbStorage) Percentiles(
containerName string,
cpuUsagePercentiles []int,
memUsagePercentiles []int,
) (*info.ContainerStatsPercentiles, error) {
// TODO(dengnan): Implement it
return nil, nil
}
// machineName: A unique identifier to identify the host that current cAdvisor
// instance is running on.
// hostname: The host which runs influxdb.
// percentilesDuration: Time window which will be considered when calls Percentiles()
func New(machineName,
tablename,
database,
username,
password,
hostname string,
isSecure bool,
percentilesDuration time.Duration,
) (storage.StorageDriver, error) {
config := &influxdb.ClientConfig{
Host: hostname,
Username: username,
Password: password,
Database: database,
IsSecure: isSecure,
}
client, err := influxdb.NewClient(config)
if err != nil {
return nil, err
}
// TODO(monnand): With go 1.3, we cannot compress data now.
client.DisableCompression()
if percentilesDuration.Seconds() < 1.0 {
percentilesDuration = 5 * time.Minute
}
ret := &influxdbStorage{
client: client,
windowLen: percentilesDuration,
machineName: machineName,
tableName: tablename,
}
return ret, nil
}