Merge pull request #218 from monnand/write-only-storage
Write only storage: Part 1
This commit is contained in:
commit
5200138f3e
@ -17,7 +17,6 @@ package bigquery
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
bigquery "code.google.com/p/google-api-go-client/bigquery/v2"
|
bigquery "code.google.com/p/google-api-go-client/bigquery/v2"
|
||||||
@ -437,25 +436,7 @@ func (self *bigqueryStorage) RecentStats(containerName string, numStats int) ([]
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (self *bigqueryStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
func (self *bigqueryStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
||||||
if numSamples == 0 {
|
return nil, fmt.Errorf("will be removed")
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
header, rows, err := self.getRecentRows(containerName, numSamples)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
sampleList := make([]*info.ContainerStatsSample, 0, len(rows))
|
|
||||||
for _, row := range rows {
|
|
||||||
sample, err := self.valuesToContainerSample(header, row)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if sample == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sampleList = append(sampleList, sample)
|
|
||||||
}
|
|
||||||
return sampleList, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *bigqueryStorage) Close() error {
|
func (self *bigqueryStorage) Close() error {
|
||||||
@ -469,74 +450,7 @@ func (self *bigqueryStorage) Percentiles(
|
|||||||
cpuUsagePercentiles []int,
|
cpuUsagePercentiles []int,
|
||||||
memUsagePercentiles []int,
|
memUsagePercentiles []int,
|
||||||
) (*info.ContainerStatsPercentiles, error) {
|
) (*info.ContainerStatsPercentiles, error) {
|
||||||
selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1)
|
return nil, fmt.Errorf("will be removed")
|
||||||
|
|
||||||
selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage))
|
|
||||||
for _, p := range cpuUsagePercentiles {
|
|
||||||
selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colCpuInstantUsage, p))
|
|
||||||
}
|
|
||||||
for _, p := range memUsagePercentiles {
|
|
||||||
selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colMemoryUsage, p))
|
|
||||||
}
|
|
||||||
|
|
||||||
tableName, err := self.client.GetTableName()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
query := fmt.Sprintf("SELECT %v FROM %v WHERE %v='%v' AND %v='%v' AND timestamp > DATE_ADD(CURRENT_TIMESTAMP(), -%v, 'SECOND')",
|
|
||||||
strings.Join(selectedCol, ","),
|
|
||||||
tableName,
|
|
||||||
colContainerName,
|
|
||||||
containerName,
|
|
||||||
colMachineName,
|
|
||||||
self.machineName,
|
|
||||||
self.windowLen.Seconds(),
|
|
||||||
)
|
|
||||||
_, rows, err := self.client.Query(query)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(rows) != 1 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
point := rows[0]
|
|
||||||
|
|
||||||
ret := new(info.ContainerStatsPercentiles)
|
|
||||||
ret.MaxMemoryUsage, err = convertToUint64(point[0])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid max memory usage: %v", err)
|
|
||||||
}
|
|
||||||
retrievedCpuPercentiles := point[1 : 1+len(cpuUsagePercentiles)]
|
|
||||||
for i, p := range cpuUsagePercentiles {
|
|
||||||
v, err := convertToUint64(retrievedCpuPercentiles[i])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid cpu usage: %v", err)
|
|
||||||
}
|
|
||||||
ret.CpuUsagePercentiles = append(
|
|
||||||
ret.CpuUsagePercentiles,
|
|
||||||
info.Percentile{
|
|
||||||
Percentage: p,
|
|
||||||
Value: v,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
retrievedMemoryPercentiles := point[1+len(cpuUsagePercentiles):]
|
|
||||||
for i, p := range memUsagePercentiles {
|
|
||||||
v, err := convertToUint64(retrievedMemoryPercentiles[i])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid memory usage: %v", err)
|
|
||||||
}
|
|
||||||
ret.MemoryUsagePercentiles = append(
|
|
||||||
ret.MemoryUsagePercentiles,
|
|
||||||
info.Percentile{
|
|
||||||
Percentage: p,
|
|
||||||
Value: v,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return ret, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new bigquery storage driver.
|
// Create a new bigquery storage driver.
|
||||||
|
74
storage/cache/memcache.go
vendored
74
storage/cache/memcache.go
vendored
@ -1,74 +0,0 @@
|
|||||||
// Copyright 2014 Google Inc. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package cache
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/google/cadvisor/info"
|
|
||||||
"github.com/google/cadvisor/storage"
|
|
||||||
"github.com/google/cadvisor/storage/memory"
|
|
||||||
)
|
|
||||||
|
|
||||||
type cachedStorageDriver struct {
|
|
||||||
maxNumStatsInCache int
|
|
||||||
maxNumSamplesInCache int
|
|
||||||
cache storage.StorageDriver
|
|
||||||
backend storage.StorageDriver
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *cachedStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
|
||||||
err := self.cache.AddStats(ref, stats)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = self.backend.AddStats(ref, stats)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *cachedStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
|
|
||||||
if numStats <= self.maxNumStatsInCache {
|
|
||||||
return self.cache.RecentStats(containerName, numStats)
|
|
||||||
}
|
|
||||||
return self.backend.RecentStats(containerName, numStats)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(vishh): Calculate percentiles from cached stats instead of reaching the DB. This will make the UI truly independent of the backend storage.
|
|
||||||
func (self *cachedStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) {
|
|
||||||
return self.backend.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *cachedStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
|
||||||
if numSamples <= self.maxNumSamplesInCache {
|
|
||||||
return self.cache.Samples(containerName, numSamples)
|
|
||||||
}
|
|
||||||
return self.backend.Samples(containerName, numSamples)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *cachedStorageDriver) Close() error {
|
|
||||||
self.cache.Close()
|
|
||||||
return self.backend.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(vishh): Cache all samples for a given duration and do not cap the maximum number of samples. This is useful if we happen to change the housekeeping duration.
|
|
||||||
func MemoryCache(maxNumSamplesInCache, maxNumStatsInCache int, driver storage.StorageDriver) storage.StorageDriver {
|
|
||||||
return &cachedStorageDriver{
|
|
||||||
maxNumStatsInCache: maxNumStatsInCache,
|
|
||||||
maxNumSamplesInCache: maxNumSamplesInCache,
|
|
||||||
cache: memory.New(maxNumSamplesInCache, maxNumStatsInCache),
|
|
||||||
backend: driver,
|
|
||||||
}
|
|
||||||
}
|
|
108
storage/cache/memcache_test.go
vendored
108
storage/cache/memcache_test.go
vendored
@ -1,108 +0,0 @@
|
|||||||
// Copyright 2014 Google Inc. All Rights Reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package cache
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/google/cadvisor/info"
|
|
||||||
"github.com/google/cadvisor/storage"
|
|
||||||
"github.com/google/cadvisor/storage/memory"
|
|
||||||
"github.com/google/cadvisor/storage/test"
|
|
||||||
)
|
|
||||||
|
|
||||||
type cacheTestStorageDriver struct {
|
|
||||||
base storage.StorageDriver
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *cacheTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool {
|
|
||||||
return test.DefaultStatsEq(a, b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *cacheTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
|
||||||
return self.base.AddStats(ref, stats)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *cacheTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
|
|
||||||
return self.base.RecentStats(containerName, numStats)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *cacheTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) {
|
|
||||||
return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *cacheTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
|
||||||
return self.base.Samples(containerName, numSamples)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *cacheTestStorageDriver) Close() error {
|
|
||||||
return self.base.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
|
|
||||||
maxSize := 200
|
|
||||||
|
|
||||||
for N := 10; N < maxSize; N += 10 {
|
|
||||||
testDriver := &cacheTestStorageDriver{}
|
|
||||||
backend := memory.New(N*2, N*2)
|
|
||||||
testDriver.base = MemoryCache(N, N, backend)
|
|
||||||
f(testDriver, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMaxMemoryUsage(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestMaxMemoryUsage, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSampleCpuUsage(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestSampleCpuUsage, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSamplesWithoutSample(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestSamplesWithoutSample, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPercentilesWithoutSample(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPercentiles(t *testing.T) {
|
|
||||||
N := 100
|
|
||||||
testDriver := &cacheTestStorageDriver{}
|
|
||||||
backend := memory.New(N*2, N*2)
|
|
||||||
testDriver.base = MemoryCache(N, N, backend)
|
|
||||||
test.StorageDriverTestPercentiles(testDriver, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRetrievePartialRecentStats(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRetrieveAllRecentStats(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNoRecentStats(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestNoRecentStats, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNoSamples(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestNoSamples, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPercentilesWithoutStats(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t)
|
|
||||||
}
|
|
@ -16,7 +16,6 @@ package influxdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -26,7 +25,6 @@ import (
|
|||||||
|
|
||||||
type influxdbStorage struct {
|
type influxdbStorage struct {
|
||||||
client *influxdb.Client
|
client *influxdb.Client
|
||||||
prevStats *info.ContainerStats
|
|
||||||
machineName string
|
machineName string
|
||||||
tableName string
|
tableName string
|
||||||
windowLen time.Duration
|
windowLen time.Duration
|
||||||
@ -46,10 +44,6 @@ const (
|
|||||||
colMemoryUsage string = "memory_usage"
|
colMemoryUsage string = "memory_usage"
|
||||||
// Working set size
|
// Working set size
|
||||||
colMemoryWorkingSet string = "memory_working_set"
|
colMemoryWorkingSet string = "memory_working_set"
|
||||||
// Optional: sample duration. Unit: Nanosecond.
|
|
||||||
colSampleDuration string = "sample_duration"
|
|
||||||
// Optional: Instant cpu usage
|
|
||||||
colCpuInstantUsage string = "cpu_instant_usage"
|
|
||||||
// Cumulative count of bytes received.
|
// Cumulative count of bytes received.
|
||||||
colRxBytes string = "rx_bytes"
|
colRxBytes string = "rx_bytes"
|
||||||
// Cumulative count of receive errors encountered.
|
// Cumulative count of receive errors encountered.
|
||||||
@ -67,7 +61,7 @@ func (self *influxdbStorage) containerStatsToValues(
|
|||||||
|
|
||||||
// Timestamp
|
// Timestamp
|
||||||
columns = append(columns, colTimestamp)
|
columns = append(columns, colTimestamp)
|
||||||
values = append(values, stats.Timestamp.Unix())
|
values = append(values, stats.Timestamp.UnixNano()/1E3)
|
||||||
|
|
||||||
// Machine name
|
// Machine name
|
||||||
columns = append(columns, colMachineName)
|
columns = append(columns, colMachineName)
|
||||||
@ -108,20 +102,6 @@ func (self *influxdbStorage) containerStatsToValues(
|
|||||||
values = append(values, stats.Network.TxErrors)
|
values = append(values, stats.Network.TxErrors)
|
||||||
}
|
}
|
||||||
|
|
||||||
sample, err := info.NewSample(self.prevStats, stats)
|
|
||||||
if err != nil || sample == nil {
|
|
||||||
return columns, values
|
|
||||||
}
|
|
||||||
// DO NOT ADD ANY STATS BELOW THAT ARE NOT PART OF SAMPLING
|
|
||||||
|
|
||||||
// Optional: sample duration. Unit: Nanosecond.
|
|
||||||
columns = append(columns, colSampleDuration)
|
|
||||||
values = append(values, sample.Duration.String())
|
|
||||||
|
|
||||||
// Optional: Instant cpu usage
|
|
||||||
columns = append(columns, colCpuInstantUsage)
|
|
||||||
values = append(values, sample.Cpu.Usage)
|
|
||||||
|
|
||||||
return columns, values
|
return columns, values
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,8 +149,8 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i
|
|||||||
v := values[i]
|
v := values[i]
|
||||||
switch {
|
switch {
|
||||||
case col == colTimestamp:
|
case col == colTimestamp:
|
||||||
if str, ok := v.(string); ok {
|
if f64sec, ok := v.(float64); ok && stats.Timestamp.IsZero() {
|
||||||
stats.Timestamp, err = time.Parse(time.RFC3339Nano, str)
|
stats.Timestamp = time.Unix(int64(f64sec)/1E3, (int64(f64sec)%1E3)*1E6)
|
||||||
}
|
}
|
||||||
case col == colMachineName:
|
case col == colMachineName:
|
||||||
if m, ok := v.(string); ok {
|
if m, ok := v.(string); ok {
|
||||||
@ -205,48 +185,6 @@ func (self *influxdbStorage) valuesToContainerStats(columns []string, values []i
|
|||||||
return stats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *influxdbStorage) valuesToContainerSample(columns []string, values []interface{}) (*info.ContainerStatsSample, error) {
|
|
||||||
sample := &info.ContainerStatsSample{}
|
|
||||||
var err error
|
|
||||||
for i, col := range columns {
|
|
||||||
v := values[i]
|
|
||||||
switch {
|
|
||||||
case col == colTimestamp:
|
|
||||||
if str, ok := v.(string); ok {
|
|
||||||
sample.Timestamp, err = time.Parse(time.RFC3339Nano, str)
|
|
||||||
}
|
|
||||||
case col == colMachineName:
|
|
||||||
if m, ok := v.(string); ok {
|
|
||||||
if m != self.machineName {
|
|
||||||
return nil, fmt.Errorf("different machine")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("machine name field is not a string: %v", v)
|
|
||||||
}
|
|
||||||
// Memory Usage
|
|
||||||
case col == colMemoryUsage:
|
|
||||||
sample.Memory.Usage, err = convertToUint64(v)
|
|
||||||
// sample duration. Unit: Nanosecond.
|
|
||||||
case col == colSampleDuration:
|
|
||||||
if v == nil {
|
|
||||||
// this record does not have sample_duration, so it's the first stats.
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
sample.Duration, err = time.ParseDuration(v.(string))
|
|
||||||
// Instant cpu usage
|
|
||||||
case col == colCpuInstantUsage:
|
|
||||||
sample.Cpu.Usage, err = convertToUint64(v)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("column %v has invalid value %v: %v", col, v, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if sample.Duration.Nanoseconds() == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return sample, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) {
|
func (self *influxdbStorage) OverrideReadyToFlush(readyToFlush func() bool) {
|
||||||
self.readyToFlush = readyToFlush
|
self.readyToFlush = readyToFlush
|
||||||
}
|
}
|
||||||
@ -266,7 +204,6 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C
|
|||||||
defer self.lock.Unlock()
|
defer self.lock.Unlock()
|
||||||
series := self.newSeries(self.containerStatsToValues(ref, stats))
|
series := self.newSeries(self.containerStatsToValues(ref, stats))
|
||||||
self.series = append(self.series, series)
|
self.series = append(self.series, series)
|
||||||
self.prevStats = stats.Copy(self.prevStats)
|
|
||||||
if self.readyToFlush() {
|
if self.readyToFlush() {
|
||||||
seriesToFlush = self.series
|
seriesToFlush = self.series
|
||||||
self.series = make([]*influxdb.Series, 0)
|
self.series = make([]*influxdb.Series, 0)
|
||||||
@ -274,7 +211,7 @@ func (self *influxdbStorage) AddStats(ref info.ContainerReference, stats *info.C
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if len(seriesToFlush) > 0 {
|
if len(seriesToFlush) > 0 {
|
||||||
err := self.client.WriteSeriesWithTimePrecision(seriesToFlush, influxdb.Second)
|
err := self.client.WriteSeriesWithTimePrecision(seriesToFlush, influxdb.Microsecond)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to write stats to influxDb - %s", err)
|
return fmt.Errorf("failed to write stats to influxDb - %s", err)
|
||||||
}
|
}
|
||||||
@ -303,6 +240,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]
|
|||||||
// so we need to go through from the last one to the first one.
|
// so we need to go through from the last one to the first one.
|
||||||
for i := len(series) - 1; i >= 0; i-- {
|
for i := len(series) - 1; i >= 0; i-- {
|
||||||
s := series[i]
|
s := series[i]
|
||||||
|
|
||||||
for j := len(s.Points) - 1; j >= 0; j-- {
|
for j := len(s.Points) - 1; j >= 0; j-- {
|
||||||
values := s.Points[j]
|
values := s.Points[j]
|
||||||
stats, err := self.valuesToContainerStats(s.Columns, values)
|
stats, err := self.valuesToContainerStats(s.Columns, values)
|
||||||
@ -319,35 +257,7 @@ func (self *influxdbStorage) RecentStats(containerName string, numStats int) ([]
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
func (self *influxdbStorage) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
||||||
if numSamples == 0 {
|
return nil, fmt.Errorf("will be removed")
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
// TODO(dengnan): select only columns that we need
|
|
||||||
// TODO(dengnan): escape names
|
|
||||||
query := fmt.Sprintf("select * from %v where %v='%v' and %v='%v'", self.tableName, colContainerName, containerName, colMachineName, self.machineName)
|
|
||||||
if numSamples > 0 {
|
|
||||||
query = fmt.Sprintf("%v limit %v", query, numSamples)
|
|
||||||
}
|
|
||||||
series, err := self.client.Query(query)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
sampleList := make([]*info.ContainerStatsSample, 0, len(series))
|
|
||||||
for i := len(series) - 1; i >= 0; i-- {
|
|
||||||
s := series[i]
|
|
||||||
for j := len(s.Points) - 1; j >= 0; j-- {
|
|
||||||
values := s.Points[j]
|
|
||||||
sample, err := self.valuesToContainerSample(s.Columns, values)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if sample == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sampleList = append(sampleList, sample)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return sampleList, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *influxdbStorage) Close() error {
|
func (self *influxdbStorage) Close() error {
|
||||||
@ -360,72 +270,7 @@ func (self *influxdbStorage) Percentiles(
|
|||||||
cpuUsagePercentiles []int,
|
cpuUsagePercentiles []int,
|
||||||
memUsagePercentiles []int,
|
memUsagePercentiles []int,
|
||||||
) (*info.ContainerStatsPercentiles, error) {
|
) (*info.ContainerStatsPercentiles, error) {
|
||||||
selectedCol := make([]string, 0, len(cpuUsagePercentiles)+len(memUsagePercentiles)+1)
|
return nil, fmt.Errorf("will be removed")
|
||||||
|
|
||||||
selectedCol = append(selectedCol, fmt.Sprintf("max(%v)", colMemoryUsage))
|
|
||||||
for _, p := range cpuUsagePercentiles {
|
|
||||||
selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colCpuInstantUsage, p))
|
|
||||||
}
|
|
||||||
for _, p := range memUsagePercentiles {
|
|
||||||
selectedCol = append(selectedCol, fmt.Sprintf("percentile(%v, %v)", colMemoryUsage, p))
|
|
||||||
}
|
|
||||||
|
|
||||||
query := fmt.Sprintf("select %v from %v where %v='%v' and %v='%v' and time > now() - %v",
|
|
||||||
strings.Join(selectedCol, ","),
|
|
||||||
self.tableName,
|
|
||||||
colContainerName,
|
|
||||||
containerName,
|
|
||||||
colMachineName,
|
|
||||||
self.machineName,
|
|
||||||
fmt.Sprintf("%vs", self.windowLen.Seconds()),
|
|
||||||
)
|
|
||||||
series, err := self.client.Query(query)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if len(series) != 1 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
if len(series[0].Points) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
point := series[0].Points[0]
|
|
||||||
|
|
||||||
ret := new(info.ContainerStatsPercentiles)
|
|
||||||
ret.MaxMemoryUsage, err = convertToUint64(point[1])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid max memory usage: %v", err)
|
|
||||||
}
|
|
||||||
retrievedCpuPercentiles := point[2 : 2+len(cpuUsagePercentiles)]
|
|
||||||
for i, p := range cpuUsagePercentiles {
|
|
||||||
v, err := convertToUint64(retrievedCpuPercentiles[i])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid cpu usage: %v", err)
|
|
||||||
}
|
|
||||||
ret.CpuUsagePercentiles = append(
|
|
||||||
ret.CpuUsagePercentiles,
|
|
||||||
info.Percentile{
|
|
||||||
Percentage: p,
|
|
||||||
Value: v,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
retrievedMemoryPercentiles := point[2+len(cpuUsagePercentiles):]
|
|
||||||
for i, p := range memUsagePercentiles {
|
|
||||||
v, err := convertToUint64(retrievedMemoryPercentiles[i])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid memory usage: %v", err)
|
|
||||||
}
|
|
||||||
ret.MemoryUsagePercentiles = append(
|
|
||||||
ret.MemoryUsagePercentiles,
|
|
||||||
info.Percentile{
|
|
||||||
Percentage: p,
|
|
||||||
Value: v,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return ret, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns a new influxdb series.
|
// Returns a new influxdb series.
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
//+build ignore
|
|
||||||
// Copyright 2014 Google Inc. All Rights Reserved.
|
// Copyright 2014 Google Inc. All Rights Reserved.
|
||||||
//
|
//
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
@ -13,6 +12,9 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build influxdb_test
|
||||||
|
// To run unit test: go test -tags influxdb_test
|
||||||
|
|
||||||
package influxdb
|
package influxdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -168,18 +170,10 @@ func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T, bu
|
|||||||
f(testDriver, t)
|
f(testDriver, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSampleCpuUsage(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestSampleCpuUsage, t, kCacheDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRetrievePartialRecentStats(t *testing.T) {
|
func TestRetrievePartialRecentStats(t *testing.T) {
|
||||||
runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t, 20)
|
runStorageTest(test.StorageDriverTestRetrievePartialRecentStats, t, 20)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSamplesWithoutSample(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestSamplesWithoutSample, t, kCacheDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRetrieveAllRecentStats(t *testing.T) {
|
func TestRetrieveAllRecentStats(t *testing.T) {
|
||||||
runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t, 10)
|
runStorageTest(test.StorageDriverTestRetrieveAllRecentStats, t, 10)
|
||||||
}
|
}
|
||||||
@ -187,33 +181,3 @@ func TestRetrieveAllRecentStats(t *testing.T) {
|
|||||||
func TestNoRecentStats(t *testing.T) {
|
func TestNoRecentStats(t *testing.T) {
|
||||||
runStorageTest(test.StorageDriverTestNoRecentStats, t, kCacheDuration)
|
runStorageTest(test.StorageDriverTestNoRecentStats, t, kCacheDuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNoSamples(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestNoSamples, t, kCacheDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPercentiles(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestPercentiles, t, kCacheDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMaxMemoryUsage(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestMaxMemoryUsage, t, kCacheDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPercentilesWithoutSample(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestPercentilesWithoutSample, t, kCacheDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPercentilesWithoutStats(t *testing.T) {
|
|
||||||
runStorageTest(test.StorageDriverTestPercentilesWithoutStats, t, kCacheDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRetrieveZeroStats(t *testing.T) {
|
|
||||||
t.SkipNow()
|
|
||||||
runStorageTest(test.StorageDriverTestRetrieveZeroRecentStats, t, kCacheDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRetrieveZeroSamples(t *testing.T) {
|
|
||||||
t.SkipNow()
|
|
||||||
runStorageTest(test.StorageDriverTestRetrieveZeroSamples, t, kCacheDuration)
|
|
||||||
}
|
|
||||||
|
@ -151,6 +151,7 @@ type InMemoryStorage struct {
|
|||||||
containerStorageMap map[string]*containerStorage
|
containerStorageMap map[string]*containerStorage
|
||||||
maxNumSamples int
|
maxNumSamples int
|
||||||
maxNumStats int
|
maxNumStats int
|
||||||
|
backend storage.StorageDriver
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *InMemoryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
func (self *InMemoryStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
||||||
@ -165,6 +166,13 @@ func (self *InMemoryStorage) AddStats(ref info.ContainerReference, stats *info.C
|
|||||||
self.containerStorageMap[ref.Name] = cstore
|
self.containerStorageMap[ref.Name] = cstore
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if self.backend != nil {
|
||||||
|
// TODO(monnand): To deal with long delay write operations, we
|
||||||
|
// may want to start a pool of goroutines to do write
|
||||||
|
// operations.
|
||||||
|
self.backend.AddStats(ref, stats)
|
||||||
|
}
|
||||||
return cstore.AddStats(stats)
|
return cstore.AddStats(stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,11 +239,16 @@ func (self *InMemoryStorage) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(maxNumSamples, maxNumStats int) storage.StorageDriver {
|
func New(
|
||||||
|
maxNumSamples,
|
||||||
|
maxNumStats int,
|
||||||
|
backend storage.StorageDriver,
|
||||||
|
) *InMemoryStorage {
|
||||||
ret := &InMemoryStorage{
|
ret := &InMemoryStorage{
|
||||||
containerStorageMap: make(map[string]*containerStorage, 32),
|
containerStorageMap: make(map[string]*containerStorage, 32),
|
||||||
maxNumSamples: maxNumSamples,
|
maxNumSamples: maxNumSamples,
|
||||||
maxNumStats: maxNumStats,
|
maxNumStats: maxNumStats,
|
||||||
|
backend: backend,
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
@ -23,39 +23,19 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type memoryTestStorageDriver struct {
|
type memoryTestStorageDriver struct {
|
||||||
base storage.StorageDriver
|
storage.StorageDriver
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *memoryTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool {
|
func (self *memoryTestStorageDriver) StatsEq(a, b *info.ContainerStats) bool {
|
||||||
return test.DefaultStatsEq(a, b)
|
return test.DefaultStatsEq(a, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *memoryTestStorageDriver) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
|
|
||||||
return self.base.AddStats(ref, stats)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *memoryTestStorageDriver) RecentStats(containerName string, numStats int) ([]*info.ContainerStats, error) {
|
|
||||||
return self.base.RecentStats(containerName, numStats)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *memoryTestStorageDriver) Percentiles(containerName string, cpuUsagePercentiles []int, memUsagePercentiles []int) (*info.ContainerStatsPercentiles, error) {
|
|
||||||
return self.base.Percentiles(containerName, cpuUsagePercentiles, memUsagePercentiles)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *memoryTestStorageDriver) Samples(containerName string, numSamples int) ([]*info.ContainerStatsSample, error) {
|
|
||||||
return self.base.Samples(containerName, numSamples)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *memoryTestStorageDriver) Close() error {
|
|
||||||
return self.base.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
|
func runStorageTest(f func(test.TestStorageDriver, *testing.T), t *testing.T) {
|
||||||
maxSize := 200
|
maxSize := 200
|
||||||
|
|
||||||
for N := 10; N < maxSize; N += 10 {
|
for N := 10; N < maxSize; N += 10 {
|
||||||
testDriver := &memoryTestStorageDriver{}
|
testDriver := &memoryTestStorageDriver{}
|
||||||
testDriver.base = New(N, N)
|
testDriver.StorageDriver = New(N, N, nil)
|
||||||
f(testDriver, t)
|
f(testDriver, t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -79,7 +59,7 @@ func TestPercentilesWithoutSample(t *testing.T) {
|
|||||||
func TestPercentiles(t *testing.T) {
|
func TestPercentiles(t *testing.T) {
|
||||||
N := 100
|
N := 100
|
||||||
testDriver := &memoryTestStorageDriver{}
|
testDriver := &memoryTestStorageDriver{}
|
||||||
testDriver.base = New(N, N)
|
testDriver.StorageDriver = New(N, N, nil)
|
||||||
test.StorageDriverTestPercentiles(testDriver, t)
|
test.StorageDriverTestPercentiles(testDriver, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,7 +24,6 @@ import (
|
|||||||
"github.com/google/cadvisor/manager"
|
"github.com/google/cadvisor/manager"
|
||||||
"github.com/google/cadvisor/storage"
|
"github.com/google/cadvisor/storage"
|
||||||
"github.com/google/cadvisor/storage/bigquery"
|
"github.com/google/cadvisor/storage/bigquery"
|
||||||
"github.com/google/cadvisor/storage/cache"
|
|
||||||
"github.com/google/cadvisor/storage/influxdb"
|
"github.com/google/cadvisor/storage/influxdb"
|
||||||
"github.com/google/cadvisor/storage/memory"
|
"github.com/google/cadvisor/storage/memory"
|
||||||
)
|
)
|
||||||
@ -39,8 +38,9 @@ var argDbBufferDuration = flag.Duration("storage_driver_buffer_duration", 60*tim
|
|||||||
|
|
||||||
const statsRequestedByUI = 60
|
const statsRequestedByUI = 60
|
||||||
|
|
||||||
func NewStorageDriver(driverName string) (storage.StorageDriver, error) {
|
func NewStorageDriver(driverName string) (*memory.InMemoryStorage, error) {
|
||||||
var storageDriver storage.StorageDriver
|
var storageDriver *memory.InMemoryStorage
|
||||||
|
var backendStorage storage.StorageDriver
|
||||||
var err error
|
var err error
|
||||||
// TODO(vmarmol): We shouldn't need the housekeeping interval here and it shouldn't be public.
|
// TODO(vmarmol): We shouldn't need the housekeeping interval here and it shouldn't be public.
|
||||||
samplesToCache := int(*argDbBufferDuration / *manager.HousekeepingInterval)
|
samplesToCache := int(*argDbBufferDuration / *manager.HousekeepingInterval)
|
||||||
@ -49,12 +49,6 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) {
|
|||||||
samplesToCache = statsRequestedByUI
|
samplesToCache = statsRequestedByUI
|
||||||
}
|
}
|
||||||
switch driverName {
|
switch driverName {
|
||||||
case "":
|
|
||||||
// empty string by default is the in memory store
|
|
||||||
fallthrough
|
|
||||||
case "memory":
|
|
||||||
storageDriver = memory.New(*argSampleSize, int(*argDbBufferDuration))
|
|
||||||
return storageDriver, nil
|
|
||||||
case "influxdb":
|
case "influxdb":
|
||||||
var hostname string
|
var hostname string
|
||||||
hostname, err = os.Hostname()
|
hostname, err = os.Hostname()
|
||||||
@ -62,7 +56,7 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
storageDriver, err = influxdb.New(
|
backendStorage, err = influxdb.New(
|
||||||
hostname,
|
hostname,
|
||||||
"stats",
|
"stats",
|
||||||
*argDbName,
|
*argDbName,
|
||||||
@ -74,22 +68,18 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) {
|
|||||||
// TODO(monnand): One hour? Or user-defined?
|
// TODO(monnand): One hour? Or user-defined?
|
||||||
1*time.Hour,
|
1*time.Hour,
|
||||||
)
|
)
|
||||||
glog.V(2).Infof("Caching %d recent stats in memory\n", samplesToCache)
|
|
||||||
storageDriver = cache.MemoryCache(samplesToCache, samplesToCache, storageDriver)
|
|
||||||
case "bigquery":
|
case "bigquery":
|
||||||
var hostname string
|
var hostname string
|
||||||
hostname, err = os.Hostname()
|
hostname, err = os.Hostname()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
storageDriver, err = bigquery.New(
|
backendStorage, err = bigquery.New(
|
||||||
hostname,
|
hostname,
|
||||||
"cadvisor",
|
"cadvisor",
|
||||||
*argDbName,
|
*argDbName,
|
||||||
1*time.Hour,
|
1*time.Hour,
|
||||||
)
|
)
|
||||||
glog.V(2).Infof("Caching %d recent stats in memory\n", samplesToCache)
|
|
||||||
storageDriver = cache.MemoryCache(samplesToCache, samplesToCache, storageDriver)
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("Unknown database driver: %v", *argDbDriver)
|
err = fmt.Errorf("Unknown database driver: %v", *argDbDriver)
|
||||||
@ -97,5 +87,7 @@ func NewStorageDriver(driverName string) (storage.StorageDriver, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
glog.Infof("Caching %d recent stats in memory; using %v storage driver\n", samplesToCache, driverName)
|
||||||
|
storageDriver = memory.New(samplesToCache, samplesToCache, backendStorage)
|
||||||
return storageDriver, nil
|
return storageDriver, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user