percentiles

This commit is contained in:
Nan Deng 2014-07-02 16:06:27 -07:00
parent 41dcf6d42d
commit fceb2707dd
3 changed files with 127 additions and 21 deletions

View File

@ -289,7 +289,7 @@ func TestContainerStatsCopy(t *testing.T) {
stats.Cpu.Load = shadowStats.Cpu.Load + 1 stats.Cpu.Load = shadowStats.Cpu.Load + 1
stats.Memory.Usage = shadowStats.Memory.Usage + 1 stats.Memory.Usage = shadowStats.Memory.Usage + 1
if reflect.DeepEqual(stats, shadowStats) { if reflect.DeepEqual(stats, shadowStats) {
t.Errorf("Copy() did not deeply copied the object") t.Errorf("Copy() did not deeply copy the object")
} }
stats = shadowStats.Copy(stats) stats = shadowStats.Copy(stats)
if !reflect.DeepEqual(stats, shadowStats) { if !reflect.DeepEqual(stats, shadowStats) {

View File

@ -33,6 +33,37 @@ type influxdbStorage struct {
windowLen time.Duration windowLen time.Duration
} }
const (
colTimestamp string = "timestamp"
colMachineName string = "machine"
colContainerName string = "container_name"
colCpuCumulativeUsage string = "cpu_cumulative_usage"
// Cumulative Cpu Usage in system mode
colCpuCumulativeUsageSystem string = "cpu_cumulative_usage_system"
// Cumulative Cpu Usage in user mode
colCpuCumulativeUsageUser string = "cpu_cumulative_usage_user"
// Memory Usage
colMemoryUsage string = "memory_usage"
// Working set size
colMemoryWorkingSet string = "memory_working_set"
// container page fault
colMemoryContainerPgfault string = "memory_container_pgfault"
// container major page fault
colMemoryContainerPgmajfault string = "memory_container_pgmajfault"
// hierarchical page fault
colMemoryHierarchicalPgfault string = "memory_hierarchical_pgfault"
// hierarchical major page fault
colMemoryHierarchicalPgmajfault string = "memory_hierarchical_pgmajfault"
// Cumulative per core usage
colPerCoreCumulativeUsagePrefix string = "per_core_cumulative_usage_core_"
// Optional: sample duration. Unit: Nanosecond.
colSampleDuration string = "sample_duration"
// Optional: Instant cpu usage
colCpuInstantUsage string = "cpu_instant_usage"
// Optional: Instant per core usage
colPerCoreInstantUsagePrefix string = "per_core_instant_usage_core_"
)
func (self *influxdbStorage) containerStatsToValues( func (self *influxdbStorage) containerStatsToValues(
ref info.ContainerReference, ref info.ContainerReference,
stats *info.ContainerStats, stats *info.ContainerStats,
@ -46,8 +77,8 @@ func (self *influxdbStorage) containerStatsToValues(
columns = append(columns, "machine") columns = append(columns, "machine")
values = append(values, self.machineName) values = append(values, self.machineName)
// Container path // Container name
columns = append(columns, "container_path") columns = append(columns, "container_name")
values = append(values, ref.Name) values = append(values, ref.Name)
// Cumulative Cpu Usage // Cumulative Cpu Usage
@ -220,33 +251,29 @@ func (self *influxdbStorage) valuesToContainerSample(columns []string, values []
var err error var err error
for i, col := range columns { for i, col := range columns {
v := values[i] v := values[i]
switch col { switch {
case "timestamp": case col == "timestamp":
if str, ok := v.(string); ok { if str, ok := v.(string); ok {
sample.Timestamp, err = time.Parse(time.RFC3339Nano, str) sample.Timestamp, err = time.Parse(time.RFC3339Nano, str)
} }
case "machine": case col == "machine":
if v.(string) != self.machineName { if v.(string) != self.machineName {
return nil, fmt.Errorf("different machine") return nil, fmt.Errorf("different machine")
} }
// Memory Usage // Memory Usage
case "memory_usage": case col == "memory_usage":
sample.Memory.Usage, err = convertToUint64(v) sample.Memory.Usage, err = convertToUint64(v)
// sample duration. Unit: Nanosecond. // sample duration. Unit: Nanosecond.
case "sample_duration": case col == "sample_duration":
if v == nil { if v == nil {
// this record does not have sample_duration, so it's the first stats. // this record does not have sample_duration, so it's the first stats.
return nil, nil return nil, nil
} }
sample.Duration, err = time.ParseDuration(v.(string)) sample.Duration, err = time.ParseDuration(v.(string))
// Instant cpu usage // Instant cpu usage
case "cpu_instant_usage": case col == "cpu_instant_usage":
sample.Cpu.Usage, err = convertToUint64(v) sample.Cpu.Usage, err = convertToUint64(v)
case strings.HasPrefix(col, "per_core_instant_usage_core_"):
default:
if !strings.HasPrefix(col, "per_core_instant_usage_core_") {
continue
}
idxStr := col[len("per_core_instant_usage_core_"):] idxStr := col[len("per_core_instant_usage_core_"):]
idx, err := strconv.Atoi(idxStr) idx, err := strconv.Atoi(idxStr)
if err != nil { if err != nil {
@ -290,7 +317,7 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C
func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) { func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
// TODO(dengnan): select only columns that we need // TODO(dengnan): select only columns that we need
// TODO(dengnan): escape containerName // TODO(dengnan): escape containerName
query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName) query := fmt.Sprintf("select * from %v where container_name='%v' and machine='%v'", self.tableName, containerName, self.machineName)
if numStats > 0 { if numStats > 0 {
query = fmt.Sprintf("%v limit %v", query, numStats) query = fmt.Sprintf("%v limit %v", query, numStats)
} }
@ -317,7 +344,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]
func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) { func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
// TODO(dengnan): select only columns that we need // TODO(dengnan): select only columns that we need
// TODO(dengnan): escape containerName // TODO(dengnan): escape containerName
query := fmt.Sprintf("select * from %v where container_path='%v' and machine='%v'", self.tableName, containerName, self.machineName) query := fmt.Sprintf("select * from %v where container_name='%v' and machine='%v'", self.tableName, containerName, self.machineName)
if numSamples > 0 { if numSamples > 0 {
query = fmt.Sprintf("%v limit %v", query, numSamples) query = fmt.Sprintf("%v limit %v", query, numSamples)
} }
@ -352,24 +379,87 @@ func (self *influxdbStorage) Percentiles(
memUsagePercentiles []int, memUsagePercentiles []int,
) (*info.ContainerStatsPercentiles, error) { ) (*info.ContainerStatsPercentiles, error) {
// TODO(dengnan): Implement it // TODO(dengnan): Implement it
return nil, nil selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1)
selectedCol = append(selectedCol, "max(memory_usage)")
for _, p := range cpuUsagePercentiles {
selectedCol = append(selectedCol, fmt.Sprintf("percentile(cpu_instant_usage, %v)", p))
}
for _, p := range memUsagePercentiles {
selectedCol = append(selectedCol, fmt.Sprintf("percentile(memory_usage, %v)", p))
}
query := fmt.Sprintf("select %v from %v where container_name='%v' and machine='%v' and time > now() - %v",
strings.Join(selectedCol, ","),
self.tableName,
containerName,
self.machineName,
fmt.Sprintf("%vs", self.windowLen.Seconds()),
)
series, err := self.client.Query(query)
if err != nil {
return nil, err
}
if len(series) != 1 {
return nil, nil
}
if len(series[0].Points) == 0 {
return nil, nil
}
point := series[0].Points[0]
ret := new(info.ContainerStatsPercentiles)
ret.MaxMemoryUsage, err = convertToUint64(point[1])
if err != nil {
return nil, fmt.Errorf("invalid max memory usage: %v", err)
}
retrievedCpuPercentiles := point[2:]
for i, p := range cpuUsagePercentiles {
v, err := convertToUint64(retrievedCpuPercentiles[i])
if err != nil {
return nil, fmt.Errorf("invalid cpu usage: %v", err)
}
ret.CpuUsagePercentiles = append(
ret.CpuUsagePercentiles,
info.Percentile{
Percentage: p,
Value: v,
},
)
}
retrievedMemoryPercentiles := point[2+len(cpuUsagePercentiles):]
for i, p := range memUsagePercentiles {
v, err := convertToUint64(retrievedMemoryPercentiles[i])
if err != nil {
return nil, fmt.Errorf("invalid memory usage: %v", err)
}
ret.MemoryUsagePercentiles = append(
ret.MemoryUsagePercentiles,
info.Percentile{
Percentage: p,
Value: v,
},
)
}
return ret, nil
} }
// machineName: A unique identifier to identify the host that current cAdvisor // machineName: A unique identifier to identify the host that current cAdvisor
// instance is running on. // instance is running on.
// hostname: The host which runs influxdb. // influxdbHost: The host which runs influxdb.
// percentilesDuration: Time window which will be considered when calls Percentiles() // percentilesDuration: Time window which will be considered when calls Percentiles()
func New(machineName, func New(machineName,
tablename, tablename,
database, database,
username, username,
password, password,
hostname string, influxdbHost string,
isSecure bool, isSecure bool,
percentilesDuration time.Duration, percentilesDuration time.Duration,
) (storage.StorageDriver, error) { ) (storage.StorageDriver, error) {
config := &influxdb.ClientConfig{ config := &influxdb.ClientConfig{
Host: hostname, Host: influxdbHost,
Username: username, Username: username,
Password: password, Password: password,
Database: database, Database: database,

View File

@ -106,3 +106,19 @@ func TestNoRecentStats(t *testing.T) {
func TestNoSamples(t *testing.T) { func TestNoSamples(t *testing.T) {
runStorageTest(test.StorageDriverTestNoSamples, t) runStorageTest(test.StorageDriverTestNoSamples, t)
} }
func TestPercentiles(t *testing.T) {
runStorageTest(test.StorageDriverTestPercentiles, t)
}
func TestMaxMemoryUsage(t *testing.T) {
runStorageTest(test.StorageDriverTestMaxMemoryUsage, t)
}
func TestPercentilesWithoutSample(t *testing.T) {
runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t)
}
func TestPercentilesWithoutStats(t *testing.T) {
runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t)
}