This commit is contained in:
Nan Deng 2014-06-14 22:27:26 -07:00
parent a8401c422e
commit dc9f4422b4
5 changed files with 41 additions and 47 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/google/cadvisor/container" "github.com/google/cadvisor/container"
"github.com/google/cadvisor/info" "github.com/google/cadvisor/info"
"github.com/google/cadvisor/storage"
) )
var historyDuration = flag.Int("history_duration", 60, "number of seconds of container history to keep") var historyDuration = flag.Int("history_duration", 60, "number of seconds of container history to keep")
@ -45,7 +46,7 @@ type containerInfo struct {
type containerData struct { type containerData struct {
handler container.ContainerHandler handler container.ContainerHandler
info containerInfo info containerInfo
statsChan chan<- *ContainerStats storageDriver storage.StorageDriver
lock sync.Mutex lock sync.Mutex
// Tells the container to stop. // Tells the container to stop.
@ -85,7 +86,7 @@ func (c *containerData) GetInfo() (*containerInfo, error) {
return &ret, nil return &ret, nil
} }
func NewContainerData(containerName string, statsChan chan<- *ContainerStats) (*containerData, error) { func NewContainerData(containerName string, driver storage.StorageDriver) (*containerData, error) {
cont := &containerData{} cont := &containerData{}
handler, err := container.NewContainerHandler(containerName) handler, err := container.NewContainerHandler(containerName)
if err != nil { if err != nil {
@ -99,7 +100,7 @@ func NewContainerData(containerName string, statsChan chan<- *ContainerStats) (*
cont.info.Name = ref.Name cont.info.Name = ref.Name
cont.info.Aliases = ref.Aliases cont.info.Aliases = ref.Aliases
cont.info.Stats = list.New() cont.info.Stats = list.New()
cont.statsChan = statsChan cont.storageDriver = driver
cont.stop = make(chan bool, 1) cont.stop = make(chan bool, 1)
return cont, nil return cont, nil
@ -153,13 +154,14 @@ func (c *containerData) updateStats() error {
if stats == nil { if stats == nil {
return nil return nil
} }
if c.statsChan != nil { if c.storageDriver != nil {
ref, err := c.handler.ContainerReference() ref, err := c.handler.ContainerReference()
if err == nil { if err != nil {
c.statsChan <- &ContainerStats{ return err
ContainerReference: ref,
Stats: stats,
} }
err = c.storageDriver.WriteStats(ref, stats)
if err != nil {
return err
} }
} }
summary, err := c.handler.StatsSummary() summary, err := c.handler.StatsSummary()

View File

@ -39,7 +39,7 @@ type Manager interface {
GetVersionInfo() (*info.VersionInfo, error) GetVersionInfo() (*info.VersionInfo, error)
} }
func New(statsWriter storage.ContainerStatsWriter) (Manager, error) { func New(driver storage.StorageDriver) (Manager, error) {
newManager := &manager{} newManager := &manager{}
newManager.containers = make(map[string]*containerData) newManager.containers = make(map[string]*containerData)
@ -56,15 +56,7 @@ func New(statsWriter storage.ContainerStatsWriter) (Manager, error) {
} }
newManager.versionInfo = *versionInfo newManager.versionInfo = *versionInfo
log.Printf("Version: %+v", newManager.versionInfo) log.Printf("Version: %+v", newManager.versionInfo)
if statsWriter != nil { newManager.storageDriver = driver
// XXX(monnand): about numWorkers and queueLength, should we make it
// configurable?
ch, err := StartContainerStatsWriters(16, 64, statsWriter)
if err != nil {
return nil, err
}
newManager.statsChan = ch
}
return newManager, nil return newManager, nil
} }
@ -72,7 +64,7 @@ func New(statsWriter storage.ContainerStatsWriter) (Manager, error) {
type manager struct { type manager struct {
containers map[string]*containerData containers map[string]*containerData
containersLock sync.RWMutex containersLock sync.RWMutex
statsChan chan<- *ContainerStats storageDriver storage.StorageDriver
machineInfo info.MachineInfo machineInfo info.MachineInfo
versionInfo info.VersionInfo versionInfo info.VersionInfo
} }
@ -171,7 +163,7 @@ func (m *manager) GetVersionInfo() (*info.VersionInfo, error) {
// Create a container. This expects to only be called from the global manager thread. // Create a container. This expects to only be called from the global manager thread.
func (m *manager) createContainer(containerName string) (*containerData, error) { func (m *manager) createContainer(containerName string) (*containerData, error) {
cont, err := NewContainerData(containerName, m.statsChan) cont, err := NewContainerData(containerName, m.storageDriver)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -36,7 +36,7 @@ type ContainerStats struct {
func StartContainerStatsWriters( func StartContainerStatsWriters(
numWorkers int, numWorkers int,
queueLength int, queueLength int,
statsWriter storage.ContainerStatsWriter, statsWriter storage.StorageDriver,
) (chan<- *ContainerStats, error) { ) (chan<- *ContainerStats, error) {
if statsWriter == nil { if statsWriter == nil {
return nil, fmt.Errorf("invalid stats writer") return nil, fmt.Errorf("invalid stats writer")
@ -53,7 +53,7 @@ func StartContainerStatsWriters(
return ch, nil return ch, nil
} }
func writeContainerStats(statsWriter storage.ContainerStatsWriter, ch <-chan *ContainerStats) { func writeContainerStats(statsWriter storage.StorageDriver, ch <-chan *ContainerStats) {
for stats := range ch { for stats := range ch {
err := statsWriter.Write(stats.ContainerReference, stats.Stats) err := statsWriter.Write(stats.ContainerReference, stats.Stats)
if err != nil { if err != nil {

View File

@ -21,8 +21,8 @@ import (
"github.com/google/cadvisor/info" "github.com/google/cadvisor/info"
) )
type ContainerStatsWriter interface { type StorageDriver interface {
Write(ref info.ContainerReference, stats *info.ContainerStats) error WriteStats(ref info.ContainerReference, stats *info.ContainerStats) error
} }
// Database config which should contain all information used to connect to // Database config which should contain all information used to connect to
@ -37,28 +37,28 @@ type Config struct {
Params map[string]string `json:"parameters,omitempty"` Params map[string]string `json:"parameters,omitempty"`
} }
type ContainerStatsWriterFactory interface { type StorageFactory interface {
String() string String() string
New(config *Config) (ContainerStatsWriter, error) New(config *Config) (StorageDriver, error)
} }
type containerStatsWriterFactoryManager struct { type containerStatsWriterFactoryManager struct {
lock sync.RWMutex lock sync.RWMutex
factories map[string]ContainerStatsWriterFactory factories map[string]StorageFactory
} }
func (self *containerStatsWriterFactoryManager) Register(factory ContainerStatsWriterFactory) { func (self *containerStatsWriterFactoryManager) Register(factory StorageFactory) {
self.lock.Lock() self.lock.Lock()
defer self.lock.Unlock() defer self.lock.Unlock()
if self.factories == nil { if self.factories == nil {
self.factories = make(map[string]ContainerStatsWriterFactory, 8) self.factories = make(map[string]StorageFactory, 8)
} }
self.factories[factory.String()] = factory self.factories[factory.String()] = factory
} }
func (self *containerStatsWriterFactoryManager) New(config *Config) (ContainerStatsWriter, error) { func (self *containerStatsWriterFactoryManager) New(config *Config) (StorageDriver, error) {
self.lock.RLock() self.lock.RLock()
defer self.lock.RUnlock() defer self.lock.RUnlock()
@ -70,10 +70,10 @@ func (self *containerStatsWriterFactoryManager) New(config *Config) (ContainerSt
var globalContainerStatsWriterFactoryManager containerStatsWriterFactoryManager var globalContainerStatsWriterFactoryManager containerStatsWriterFactoryManager
func RegisterContainerStatsWriterFactory(factory ContainerStatsWriterFactory) { func RegisterStorage(factory StorageFactory) {
globalContainerStatsWriterFactoryManager.Register(factory) globalContainerStatsWriterFactoryManager.Register(factory)
} }
func NewContainerStatsWriter(config *Config) (ContainerStatsWriter, error) { func NewStorage(config *Config) (StorageDriver, error) {
return globalContainerStatsWriterFactoryManager.New(config) return globalContainerStatsWriterFactoryManager.New(config)
} }

View File

@ -8,12 +8,12 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
) )
type mockContainerStatsWriter struct { type mockStorageDriver struct {
storageName string storageName string
mock.Mock mock.Mock
} }
func (self *mockContainerStatsWriter) Write( func (self *mockStorageDriver) WriteStats(
ref info.ContainerReference, ref info.ContainerReference,
stats *info.ContainerStats, stats *info.ContainerStats,
) error { ) error {
@ -21,18 +21,18 @@ func (self *mockContainerStatsWriter) Write(
return args.Error(0) return args.Error(0)
} }
type mockContainerStatsWriterFactory struct { type mockStorageFactory struct {
name string name string
} }
func (self *mockContainerStatsWriterFactory) String() string { func (self *mockStorageFactory) String() string {
return self.name return self.name
} }
func (self *mockContainerStatsWriterFactory) New( func (self *mockStorageFactory) New(
config *Config, config *Config,
) (ContainerStatsWriter, error) { ) (StorageDriver, error) {
mockWriter := &mockContainerStatsWriter{ mockWriter := &mockStorageDriver{
storageName: self.name, storageName: self.name,
} }
return mockWriter, nil return mockWriter, nil
@ -50,10 +50,10 @@ func TestContainerStatsWriterFactoryManager(t *testing.T) {
wg.Add(1) wg.Add(1)
go func(n string) { go func(n string) {
defer wg.Done() defer wg.Done()
factory := &mockContainerStatsWriterFactory{ factory := &mockStorageFactory{
name: n, name: n,
} }
RegisterContainerStatsWriterFactory(factory) RegisterStorage(factory)
}(name) }(name)
} }
wg.Wait() wg.Wait()
@ -64,11 +64,11 @@ func TestContainerStatsWriterFactoryManager(t *testing.T) {
} }
go func(n string) { go func(n string) {
defer wg.Done() defer wg.Done()
writer, err := NewContainerStatsWriter(config) writer, err := NewStorage(config)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
if mw, ok := writer.(*mockContainerStatsWriter); ok { if mw, ok := writer.(*mockStorageDriver); ok {
if mw.storageName != n { if mw.storageName != n {
t.Errorf("wrong writer. should be %v, got %v", n, mw.storageName) t.Errorf("wrong writer. should be %v, got %v", n, mw.storageName)
} }