Merge pull request #1286 from timstclair/subcontainers2

Make manager multi-container functions robust to partial failures
This commit is contained in:
Vish Kannan 2016-05-17 13:22:17 -07:00
commit 05809d5936
3 changed files with 139 additions and 24 deletions

View File

@ -361,7 +361,10 @@ func (self *version2_0) HandleRequest(requestType string, request []string, m ma
glog.V(4).Infof("Api - Stats: Looking for stats for container %q, options %+v", name, opt) glog.V(4).Infof("Api - Stats: Looking for stats for container %q, options %+v", name, opt)
infos, err := m.GetRequestedContainersInfo(name, opt) infos, err := m.GetRequestedContainersInfo(name, opt)
if err != nil { if err != nil {
return err if len(infos) == 0 {
return err
}
glog.Errorf("Error calling GetRequestedContainersInfo: %v", err)
} }
contStats := make(map[string][]v2.DeprecatedContainerStats, 0) contStats := make(map[string][]v2.DeprecatedContainerStats, 0)
for name, cinfo := range infos { for name, cinfo := range infos {
@ -482,7 +485,10 @@ func (self *version2_1) HandleRequest(requestType string, request []string, m ma
glog.V(4).Infof("Api - MachineStats(%v)", request) glog.V(4).Infof("Api - MachineStats(%v)", request)
cont, err := m.GetRequestedContainersInfo("/", opt) cont, err := m.GetRequestedContainersInfo("/", opt)
if err != nil { if err != nil {
return err if len(cont) == 0 {
return err
}
glog.Errorf("Error calling GetRequestedContainersInfo: %v", err)
} }
return writeResult(v2.MachineStatsFromV1(cont["/"]), w) return writeResult(v2.MachineStatsFromV1(cont["/"]), w)
case statsApi: case statsApi:
@ -490,7 +496,10 @@ func (self *version2_1) HandleRequest(requestType string, request []string, m ma
glog.V(4).Infof("Api - Stats: Looking for stats for container %q, options %+v", name, opt) glog.V(4).Infof("Api - Stats: Looking for stats for container %q, options %+v", name, opt)
conts, err := m.GetRequestedContainersInfo(name, opt) conts, err := m.GetRequestedContainersInfo(name, opt)
if err != nil { if err != nil {
return err if len(conts) == 0 {
return err
}
glog.Errorf("Error calling GetRequestedContainersInfo: %v", err)
} }
contStats := make(map[string]v2.ContainerInfo, len(conts)) contStats := make(map[string]v2.ContainerInfo, len(conts))
for name, cont := range conts { for name, cont := range conts {

View File

@ -70,6 +70,8 @@ type Manager interface {
GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
// Get V2 information about a container. // Get V2 information about a container.
// Recursive (subcontainer) requests are best-effort, and may return a partial result alongside an
// error in the partial failure case.
GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error) GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error)
// Get information about all subcontainers of the specified container (includes self). // Get information about all subcontainers of the specified container (includes self).
@ -394,15 +396,16 @@ func (self *manager) GetDerivedStats(containerName string, options v2.RequestOpt
if err != nil { if err != nil {
return nil, err return nil, err
} }
var errs partialFailure
stats := make(map[string]v2.DerivedStats) stats := make(map[string]v2.DerivedStats)
for name, cont := range conts { for name, cont := range conts {
d, err := cont.DerivedStats() d, err := cont.DerivedStats()
if err != nil { if err != nil {
return nil, err errs.append(name, "DerivedStats", err)
} }
stats[name] = d stats[name] = d
} }
return stats, nil return stats, errs
} }
func (self *manager) GetContainerSpec(containerName string, options v2.RequestOptions) (map[string]v2.ContainerSpec, error) { func (self *manager) GetContainerSpec(containerName string, options v2.RequestOptions) (map[string]v2.ContainerSpec, error) {
@ -410,16 +413,17 @@ func (self *manager) GetContainerSpec(containerName string, options v2.RequestOp
if err != nil { if err != nil {
return nil, err return nil, err
} }
var errs partialFailure
specs := make(map[string]v2.ContainerSpec) specs := make(map[string]v2.ContainerSpec)
for name, cont := range conts { for name, cont := range conts {
cinfo, err := cont.GetInfo() cinfo, err := cont.GetInfo()
if err != nil { if err != nil {
return nil, err errs.append(name, "GetInfo", err)
} }
spec := self.getV2Spec(cinfo) spec := self.getV2Spec(cinfo)
specs[name] = spec specs[name] = spec
} }
return specs, nil return specs, errs
} }
// Get V2 container spec from v1 container info. // Get V2 container spec from v1 container info.
@ -455,26 +459,32 @@ func (self *manager) GetContainerInfoV2(containerName string, options v2.Request
return nil, err return nil, err
} }
var errs partialFailure
var nilTime time.Time // Ignored.
infos := make(map[string]v2.ContainerInfo, len(containers)) infos := make(map[string]v2.ContainerInfo, len(containers))
for name, container := range containers { for name, container := range containers {
result := v2.ContainerInfo{}
cinfo, err := container.GetInfo() cinfo, err := container.GetInfo()
if err != nil { if err != nil {
return nil, err errs.append(name, "GetInfo", err)
infos[name] = result
continue
} }
result.Spec = self.getV2Spec(cinfo)
var nilTime time.Time // Ignored.
stats, err := self.memoryCache.RecentStats(name, nilTime, nilTime, options.Count) stats, err := self.memoryCache.RecentStats(name, nilTime, nilTime, options.Count)
if err != nil { if err != nil {
return nil, err errs.append(name, "RecentStats", err)
infos[name] = result
continue
} }
infos[name] = v2.ContainerInfo{ result.Stats = v2.ContainerStatsFromV1(&cinfo.Spec, stats)
Spec: self.getV2Spec(cinfo), infos[name] = result
Stats: v2.ContainerStatsFromV1(&cinfo.Spec, stats),
}
} }
return infos, nil return infos, errs
} }
func (self *manager) containerDataToContainerInfo(cont *containerData, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) { func (self *manager) containerDataToContainerInfo(cont *containerData, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
@ -615,6 +625,7 @@ func (self *manager) GetRequestedContainersInfo(containerName string, options v2
if err != nil { if err != nil {
return nil, err return nil, err
} }
var errs partialFailure
containersMap := make(map[string]*info.ContainerInfo) containersMap := make(map[string]*info.ContainerInfo)
query := info.ContainerInfoRequest{ query := info.ContainerInfoRequest{
NumStats: options.Count, NumStats: options.Count,
@ -622,12 +633,11 @@ func (self *manager) GetRequestedContainersInfo(containerName string, options v2
for name, data := range containers { for name, data := range containers {
info, err := self.containerDataToContainerInfo(data, &query) info, err := self.containerDataToContainerInfo(data, &query)
if err != nil { if err != nil {
// Skip containers with errors, we try to degrade gracefully. errs.append(name, "containerDataToContainerInfo", err)
continue
} }
containersMap[name] = info containersMap[name] = info
} }
return containersMap, nil return containersMap, errs
} }
func (self *manager) getRequestedContainers(containerName string, options v2.RequestOptions) (map[string]*containerData, error) { func (self *manager) getRequestedContainers(containerName string, options v2.RequestOptions) (map[string]*containerData, error) {
@ -1058,19 +1068,18 @@ func (self *manager) watchForNewContainers(quit chan error) error {
glog.Warningf("Failed to process watch event %+v: %v", event, err) glog.Warningf("Failed to process watch event %+v: %v", event, err)
} }
case <-quit: case <-quit:
errors := []string{} var errs partialFailure
// Stop processing events if asked to quit. // Stop processing events if asked to quit.
for _, watcher := range self.containerWatchers { for i, watcher := range self.containerWatchers {
err := watcher.Stop() err := watcher.Stop()
if err != nil { if err != nil {
errors = append(errors, err.Error()) errs.append(fmt.Sprintf("watcher %d", i), "Stop", err)
} }
} }
if len(errors) > 0 { if len(errs) > 0 {
err_str := strings.Join(errors, ", ") quit <- errs
quit <- fmt.Errorf("Error quiting watchers: %v", err_str)
} else { } else {
quit <- nil quit <- nil
glog.Infof("Exiting thread watching subcontainers") glog.Infof("Exiting thread watching subcontainers")
@ -1244,3 +1253,14 @@ func getVersionInfo() (*info.VersionInfo, error) {
CadvisorRevision: version.Info["revision"], CadvisorRevision: version.Info["revision"],
}, nil }, nil
} }
// Helper for accumulating partial failures.
type partialFailure []string
func (f *partialFailure) append(id, operation string, err error) {
*f = append(*f, fmt.Sprintf("[%q: %s: %s]", id, operation, err))
}
func (f partialFailure) Error() string {
return fmt.Sprintf("partial failures: %s", strings.Join(f, ", "))
}

View File

@ -17,6 +17,7 @@
package manager package manager
import ( import (
"fmt"
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
@ -28,7 +29,10 @@ import (
"github.com/google/cadvisor/container/docker" "github.com/google/cadvisor/container/docker"
info "github.com/google/cadvisor/info/v1" info "github.com/google/cadvisor/info/v1"
itest "github.com/google/cadvisor/info/v1/test" itest "github.com/google/cadvisor/info/v1/test"
"github.com/google/cadvisor/info/v2"
"github.com/google/cadvisor/utils/sysfs/fakesysfs" "github.com/google/cadvisor/utils/sysfs/fakesysfs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
// TODO(vmarmol): Refactor these tests. // TODO(vmarmol): Refactor these tests.
@ -151,6 +155,88 @@ func TestGetContainerInfo(t *testing.T) {
} }
func TestGetContainerInfoV2(t *testing.T) {
containers := []string{
"/",
"/c1",
"/c2",
}
options := v2.RequestOptions{
IdType: v2.TypeName,
Count: 1,
Recursive: true,
}
query := &info.ContainerInfoRequest{
NumStats: 2,
}
m, _, handlerMap := expectManagerWithContainers(containers, query, t)
infos, err := m.GetContainerInfoV2("/", options)
require.NoError(t, err, "Error calling GetContainerInfoV2")
for container, handler := range handlerMap {
handler.AssertExpectations(t)
info, ok := infos[container]
assert.True(t, ok, "Missing info for container %q", container)
assert.NotEqual(t, v2.ContainerSpec{}, info.Spec, "Empty spec for container %q", container)
assert.NotEmpty(t, info.Stats, "Missing stats for container %q", container)
}
}
func TestGetContainerInfoV2Failure(t *testing.T) {
successful := "/"
statless := "/c1"
failing := "/c2"
containers := []string{
successful, statless, failing,
}
options := v2.RequestOptions{
IdType: v2.TypeName,
Count: 1,
Recursive: true,
}
query := &info.ContainerInfoRequest{
NumStats: 2,
}
m, _, handlerMap := expectManagerWithContainers(containers, query, t)
// Remove /c1 stats
require.NoError(t, m.memoryCache.RemoveContainer(statless))
// Make GetSpec fail on /c2
mockErr := fmt.Errorf("intentional GetSpec failure")
failingHandler := container.NewMockContainerHandler(failing)
failingHandler.On("GetSpec").Return(info.ContainerSpec{}, mockErr)
failingHandler.On("Exists").Return(true)
*handlerMap[failing] = *failingHandler
m.containers[namespacedContainerName{Name: failing}].lastUpdatedTime = time.Time{} // Force GetSpec.
infos, err := m.GetContainerInfoV2("/", options)
assert.Error(t, err, "Expected error calling GetContainerInfoV2")
// Successful containers still successful.
info, ok := infos[successful]
assert.True(t, ok, "Missing info for container %q", successful)
assert.NotEqual(t, v2.ContainerSpec{}, info.Spec, "Empty spec for container %q", successful)
assert.NotEmpty(t, info.Stats, "Missing stats for container %q", successful)
// "/c1" present with spec.
info, ok = infos[statless]
assert.True(t, ok, "Missing info for container %q", statless)
assert.NotEqual(t, v2.ContainerSpec{}, info.Spec, "Empty spec for container %q", statless)
assert.Empty(t, info.Stats, "Missing stats for container %q", successful)
// "/c2" should be present but empty.
info, ok = infos[failing]
assert.True(t, ok, "Missing info for failed container")
assert.Equal(t, v2.ContainerInfo{}, info, "Empty spec for failed container")
assert.Empty(t, info.Stats, "Missing stats for failed container")
}
func TestSubcontainersInfo(t *testing.T) { func TestSubcontainersInfo(t *testing.T) {
containers := []string{ containers := []string{
"/c1", "/c1",