Merge pull request #172 from rjnagal/influxdb

Reduce influxdb schema to only the values we currently plan to use.
This commit is contained in:
Victor Marmol 2014-08-11 13:27:57 -04:00
commit 1cac435b86
3 changed files with 30 additions and 100 deletions

View File

@ -16,7 +16,6 @@ package influxdb
import ( import (
"fmt" "fmt"
"strconv"
"strings" "strings"
"time" "time"
@ -38,30 +37,14 @@ const (
colMachineName string = "machine" colMachineName string = "machine"
colContainerName string = "container_name" colContainerName string = "container_name"
colCpuCumulativeUsage string = "cpu_cumulative_usage" colCpuCumulativeUsage string = "cpu_cumulative_usage"
// Cumulative Cpu Usage in system mode
colCpuCumulativeUsageSystem string = "cpu_cumulative_usage_system"
// Cumulative Cpu Usage in user mode
colCpuCumulativeUsageUser string = "cpu_cumulative_usage_user"
// Memory Usage // Memory Usage
colMemoryUsage string = "memory_usage" colMemoryUsage string = "memory_usage"
// Working set size // Working set size
colMemoryWorkingSet string = "memory_working_set" colMemoryWorkingSet string = "memory_working_set"
// container page fault
colMemoryContainerPgfault string = "memory_container_pgfault"
// container major page fault
colMemoryContainerPgmajfault string = "memory_container_pgmajfault"
// hierarchical page fault
colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault"
// hierarchical major page fault
colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault"
// Cumulative per core usage
colPerCoreCumulativeUsagePrefix string = "per_core_cumulative_usage_core_"
// Optional: sample duration. Unit: Nanosecond. // Optional: sample duration. Unit: Nanosecond.
colSampleDuration string = "sample_duration" colSampleDuration string = "sample_duration"
// Optional: Instant cpu usage // Optional: Instant cpu usage
colCpuInstantUsage string = "cpu_instant_usage" colCpuInstantUsage string = "cpu_instant_usage"
// Optional: Instant per core usage
colPerCoreInstantUsagePrefix string = "per_core_instant_usage_core_"
// Cumulative count of bytes received. // Cumulative count of bytes received.
colRxBytes string = "rx_bytes" colRxBytes string = "rx_bytes"
// Cumulative count of receive errors encountered. // Cumulative count of receive errors encountered.
@ -97,14 +80,6 @@ func (self *influxdbStorage) containerStatsToValues(
columns = append(columns, colCpuCumulativeUsage) columns = append(columns, colCpuCumulativeUsage)
values = append(values, stats.Cpu.Usage.Total) values = append(values, stats.Cpu.Usage.Total)
// Cumulative Cpu Usage in system mode
columns = append(columns, colCpuCumulativeUsageSystem)
values = append(values, stats.Cpu.Usage.System)
// Cumulative Cpu Usage in user mode
columns = append(columns, colCpuCumulativeUsageUser)
values = append(values, stats.Cpu.Usage.User)
// Memory Usage // Memory Usage
columns = append(columns, colMemoryUsage) columns = append(columns, colMemoryUsage)
values = append(values, stats.Memory.Usage) values = append(values, stats.Memory.Usage)
@ -113,22 +88,6 @@ func (self *influxdbStorage) containerStatsToValues(
columns = append(columns, colMemoryWorkingSet) columns = append(columns, colMemoryWorkingSet)
values = append(values, stats.Memory.WorkingSet) values = append(values, stats.Memory.WorkingSet)
// container page fault
columns = append(columns, colMemoryContainerPgfault)
values = append(values, stats.Memory.ContainerData.Pgfault)
// container major page fault
columns = append(columns, colMemoryContainerPgmajfault)
values = append(values, stats.Memory.ContainerData.Pgmajfault)
// hierarchical page fault
columns = append(columns, colMemoryHierarchicalPgfault)
values = append(values, stats.Memory.HierarchicalData.Pgfault)
// hierarchical major page fault
columns = append(columns, colMemoryHierarchicalPgmajfault)
values = append(values, stats.Memory.HierarchicalData.Pgmajfault)
// Optional: Network stats. // Optional: Network stats.
if stats.Network != nil { if stats.Network != nil {
columns = append(columns, colRxBytes) columns = append(columns, colRxBytes)
@ -144,12 +103,6 @@ func (self *influxdbStorage) containerStatsToValues(
values = append(values, stats.Network.TxErrors) values = append(values, stats.Network.TxErrors)
} }
// per cpu cumulative usage
for i, u := range stats.Cpu.Usage.PerCpu {
columns = append(columns, fmt.Sprintf("%v%v", colPerCoreCumulativeUsagePrefix, i))
values = append(values, u)
}
sample, err := info.NewSample(self.prevStats, stats) sample, err := info.NewSample(self.prevStats, stats)
if err != nil || sample == nil { if err != nil || sample == nil {
return columns, values return columns, values
@ -164,12 +117,6 @@ func (self *influxdbStorage) containerStatsToValues(
columns = append(columns, colCpuInstantUsage) columns = append(columns, colCpuInstantUsage)
values = append(values, sample.Cpu.Usage) values = append(values, sample.Cpu.Usage)
// Optional: Instant per core usage
for i, u := range sample.Cpu.PerCpuUsage {
columns = append(columns, fmt.Sprintf("%v%v", colPerCoreInstantUsagePrefix, i))
values = append(values, u)
}
return columns, values return columns, values
} }
@ -212,7 +159,6 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i
Memory: &info.MemoryStats{}, Memory: &info.MemoryStats{},
Network: &info.NetworkStats{}, Network: &info.NetworkStats{},
} }
perCoreUsage := make(map[int]uint64, 32)
var err error var err error
for i, col := range columns { for i, col := range columns {
v := values[i] v := values[i]
@ -232,30 +178,12 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i
// Cumulative Cpu Usage // Cumulative Cpu Usage
case col == colCpuCumulativeUsage: case col == colCpuCumulativeUsage:
stats.Cpu.Usage.Total, err = convertToUint64(v) stats.Cpu.Usage.Total, err = convertToUint64(v)
// Cumulative Cpu used by the system
case col == colCpuCumulativeUsageSystem:
stats.Cpu.Usage.System, err = convertToUint64(v)
// Cumulative Cpu Usage in user mode
case col == colCpuCumulativeUsageUser:
stats.Cpu.Usage.User, err = convertToUint64(v)
// Memory Usage // Memory Usage
case col == colMemoryUsage: case col == colMemoryUsage:
stats.Memory.Usage, err = convertToUint64(v) stats.Memory.Usage, err = convertToUint64(v)
// Working set size // Working set size
case col == colMemoryWorkingSet: case col == colMemoryWorkingSet:
stats.Memory.WorkingSet, err = convertToUint64(v) stats.Memory.WorkingSet, err = convertToUint64(v)
// container page fault
case col == colMemoryContainerPgfault:
stats.Memory.ContainerData.Pgfault, err = convertToUint64(v)
// container major page fault
case col == colMemoryContainerPgmajfault:
stats.Memory.ContainerData.Pgmajfault, err = convertToUint64(v)
// hierarchical page fault
case col == colMemoryHierarchicalPgfault:
stats.Memory.HierarchicalData.Pgfault, err = convertToUint64(v)
// hierarchical major page fault
case col == colMemoryHierarchicalPgmajfault:
stats.Memory.HierarchicalData.Pgmajfault, err = convertToUint64(v)
case col == colRxBytes: case col == colRxBytes:
stats.Network.RxBytes, err = convertToUint64(v) stats.Network.RxBytes, err = convertToUint64(v)
case col == colRxErrors: case col == colRxErrors:
@ -264,28 +192,16 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i
stats.Network.TxBytes, err = convertToUint64(v) stats.Network.TxBytes, err = convertToUint64(v)
case col == colTxErrors: case col == colTxErrors:
stats.Network.TxErrors, err = convertToUint64(v) stats.Network.TxErrors, err = convertToUint64(v)
case strings.HasPrefix(col, colPerCoreCumulativeUsagePrefix):
idxStr := col[len(colPerCoreCumulativeUsagePrefix):]
idx, err := strconv.Atoi(idxStr)
if err != nil {
continue
}
perCoreUsage[idx], err = convertToUint64(v)
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
} }
} }
stats.Cpu.Usage.PerCpu = make([]uint64, len(perCoreUsage))
for idx, usage := range perCoreUsage {
stats.Cpu.Usage.PerCpu[idx] = usage
}
return stats, nil return stats, nil
} }
func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) { func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) {
sample := &info.ContainerStatsSample{} sample := &info.ContainerStatsSample{}
perCoreUsage := make(map[int]uint64, 32)
var err error var err error
for i, col := range columns { for i, col := range columns {
v := values[i] v := values[i]
@ -315,22 +231,11 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values []
// Instant cpu usage // Instant cpu usage
case col == colCpuInstantUsage: case col == colCpuInstantUsage:
sample.Cpu.Usage, err = convertToUint64(v) sample.Cpu.Usage, err = convertToUint64(v)
case strings.HasPrefix(col, colPerCoreInstantUsagePrefix):
idxStr := col[len(colPerCoreInstantUsagePrefix):]
idx, err := strconv.Atoi(idxStr)
if err != nil {
continue
}
perCoreUsage[idx], err = convertToUint64(v)
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err) return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
} }
} }
sample.Cpu.PerCpuUsage = make([]uint64, len(perCoreUsage))
for idx, usage := range perCoreUsage {
sample.Cpu.PerCpuUsage[idx] = usage
}
if sample.Duration.Nanoseconds() == 0 { if sample.Duration.Nanoseconds() == 0 {
return nil, nil return nil, nil
} }

View File

@ -16,13 +16,38 @@ package influxdb
import ( import (
"fmt" "fmt"
"reflect"
"testing" "testing"
"time" "time"
"github.com/google/cadvisor/info"
"github.com/google/cadvisor/storage/test" "github.com/google/cadvisor/storage/test"
influxdb "github.com/influxdb/influxdb/client" influxdb "github.com/influxdb/influxdb/client"
) )
func StatsEq(a, b *info.ContainerStats) bool {
if !test.TimeEq(a.Timestamp, b.Timestamp, 10*time.Millisecond) {
return false
}
// Check only the stats populated in influxdb.
if a.Cpu.Usage.Total != b.Cpu.Usage.Total {
return false
}
if a.Memory.Usage != b.Memory.Usage {
return false
}
if a.Memory.WorkingSet != b.Memory.WorkingSet {
return false
}
if !reflect.DeepEqual(a.Network, b.Network) {
return false
}
return true
}
func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) { func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
machineName := "machineA" machineName := "machineA"
tablename := "t" tablename := "t"
@ -75,7 +100,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
testDriver := test.TestStorageDriver{Driver: driver, StatsEq: test.DefaultStatsEq} testDriver := test.TestStorageDriver{Driver: driver, StatsEq: StatsEq}
// generate another container's data on same machine. // generate another container's data on same machine.
test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100, testDriver, t) test.StorageDriverFillRandomStatsFunc("containerOnSameMachine", 100, testDriver, t)
@ -92,7 +117,7 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
defer driverForAnotherMachine.Close() defer driverForAnotherMachine.Close()
testDriverOtherMachine := test.TestStorageDriver{Driver: driverForAnotherMachine, StatsEq: test.DefaultStatsEq} testDriverOtherMachine := test.TestStorageDriver{Driver: driverForAnotherMachine, StatsEq: StatsEq}
test.StorageDriverFillRandomStatsFunc("containerOnAnotherMachine", 100, testDriverOtherMachine, t) test.StorageDriverFillRandomStatsFunc("containerOnAnotherMachine", 100, testDriverOtherMachine, t)
f(testDriver, t) f(testDriver, t)
} }

View File

@ -63,7 +63,7 @@ func buildTrace(cpu, mem []uint64, duration time.Duration) []*info.ContainerStat
return ret return ret
} }
func timeEq(t1, t2 time.Time, tolerance time.Duration) bool { func TimeEq(t1, t2 time.Time, tolerance time.Duration) bool {
// t1 should not be later than t2 // t1 should not be later than t2
if t1.After(t2) { if t1.After(t2) {
t1, t2 = t2, t1 t1, t2 = t2, t1
@ -94,7 +94,7 @@ const (
// This function is useful because we do not require precise time // This function is useful because we do not require precise time
// representation. // representation.
func DefaultStatsEq(a, b *info.ContainerStats) bool { func DefaultStatsEq(a, b *info.ContainerStats) bool {
if !timeEq(a.Timestamp, b.Timestamp, timePrecision) { if !TimeEq(a.Timestamp, b.Timestamp, timePrecision) {
return false return false
} }
if !reflect.DeepEqual(a.Cpu, b.Cpu) { if !reflect.DeepEqual(a.Cpu, b.Cpu) {
@ -112,7 +112,7 @@ func DefaultStatsEq(a, b *info.ContainerStats) bool {
// This function is useful because we do not require precise time // This function is useful because we do not require precise time
// representation. // representation.
func sampleEq(a, b *info.ContainerStatsSample) bool { func sampleEq(a, b *info.ContainerStatsSample) bool {
if !timeEq(a.Timestamp, b.Timestamp, timePrecision) { if !TimeEq(a.Timestamp, b.Timestamp, timePrecision) {
return false return false
} }
if !durationEq(a.Duration, b.Duration, timePrecision) { if !durationEq(a.Duration, b.Duration, timePrecision) {