diff --git a/cadvisor.go b/cadvisor.go index c145c74e..aabaa270 100644 --- a/cadvisor.go +++ b/cadvisor.go @@ -44,7 +44,9 @@ func main() { NumSamples: *argSampleSize, ResetPeriod: *argResetPeriod, }) - containerManager, err := manager.New() + + // TODO(monnand): Add stats writer for manager + containerManager, err := manager.New(nil) if err != nil { log.Fatalf("Failed to create a Container Manager: %s", err) } diff --git a/manager/container.go b/manager/container.go index 9431483c..ac7e03a2 100644 --- a/manager/container.go +++ b/manager/container.go @@ -43,9 +43,10 @@ type containerInfo struct { } type containerData struct { - handler container.ContainerHandler - info containerInfo - lock sync.Mutex + handler container.ContainerHandler + info containerInfo + statsChan chan<- *ContainerStats + lock sync.Mutex // Tells the container to stop. stop chan bool @@ -84,7 +85,7 @@ func (c *containerData) GetInfo() (*containerInfo, error) { return &ret, nil } -func NewContainerData(containerName string) (*containerData, error) { +func NewContainerData(containerName string, statsChan chan<- *ContainerStats) (*containerData, error) { cont := &containerData{} handler, err := container.NewContainerHandler(containerName) if err != nil { @@ -98,6 +99,7 @@ func NewContainerData(containerName string) (*containerData, error) { cont.info.Name = ref.Name cont.info.Aliases = ref.Aliases cont.info.Stats = list.New() + cont.statsChan = statsChan cont.stop = make(chan bool, 1) return cont, nil @@ -151,6 +153,15 @@ func (c *containerData) updateStats() error { if stats == nil { return nil } + if c.statsChan != nil { + ref, err := c.handler.ContainerReference() + if err == nil { + c.statsChan <- &ContainerStats{ + ContainerReference: ref, + Stats: stats, + } + } + } summary, err := c.handler.StatsSummary() if err != nil { return err diff --git a/manager/manager.go b/manager/manager.go index c223d5fd..c86bd830 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -22,6 +22,7 @@ import ( "github.com/google/cadvisor/container" "github.com/google/cadvisor/info" + "github.com/google/cadvisor/storage" ) type Manager interface { @@ -38,7 +39,7 @@ type Manager interface { GetVersionInfo() (*info.VersionInfo, error) } -func New() (Manager, error) { +func New(statsWriter storage.ContainerStatsWriter) (Manager, error) { newManager := &manager{} newManager.containers = make(map[string]*containerData) @@ -55,6 +56,15 @@ func New() (Manager, error) { } newManager.versionInfo = *versionInfo log.Printf("Version: %+v", newManager.versionInfo) + if statsWriter != nil { + // XXX(monnand): about numWorkers and queueLength, should we make it + // configurable? + ch, err := StartContainerStatsWriters(16, 64, statsWriter) + if err != nil { + return nil, err + } + newManager.statsChan = ch + } return newManager, nil } @@ -62,6 +72,7 @@ func New() (Manager, error) { type manager struct { containers map[string]*containerData containersLock sync.RWMutex + statsChan chan<- *ContainerStats machineInfo info.MachineInfo versionInfo info.VersionInfo } @@ -160,7 +171,7 @@ func (m *manager) GetVersionInfo() (*info.VersionInfo, error) { // Create a container. This expects to only be called from the global manager thread. func (m *manager) createContainer(containerName string) (*containerData, error) { - cont, err := NewContainerData(containerName) + cont, err := NewContainerData(containerName, m.statsChan) if err != nil { return nil, err } diff --git a/manager/statsproc.go b/manager/statsproc.go index 17f38ff3..6011f3ec 100644 --- a/manager/statsproc.go +++ b/manager/statsproc.go @@ -15,6 +15,7 @@ package manager import ( + "fmt" "log" "github.com/google/cadvisor/info" @@ -24,31 +25,35 @@ import ( type ContainerStats struct { ContainerReference info.ContainerReference Stats *info.ContainerStats - ResChan chan<- error + + // ResChan is nil if the sender wants to ignore the result + ResChan chan<- error } -type ContainerStatsProcessor interface { - StartStatsProcessors(numProcs int) (chan<- *ContainerStats, error) - StopAllProcessors() -} - -type containerStatsWriter struct { - config *storage.Config -} - -func (self *containerStatsWriter) StartStatsProcessors(numProcs int) (chan<- *ContainerStats, error) { - ch := make(chan *ContainerStats) - statsWriter, err := storage.NewContainerStatsWriter(self.config) - if err != nil { - return nil, err +// Create numWorkers goroutines to write container stats info into the +// specified storage. Returns a write-only channel for the caller to write +// container stats. Closing this channel will stop all workers. +func StartContainerStatsWriters( + numWorkers int, + queueLength int, + statsWriter storage.ContainerStatsWriter, +) (chan<- *ContainerStats, error) { + if statsWriter == nil { + return nil, fmt.Errorf("invalid stats writer") } - for i := 0; i < numProcs; i++ { - go self.process(statsWriter, ch) + var ch chan *ContainerStats + if queueLength > 0 { + ch = make(chan *ContainerStats, queueLength) + } else { + ch = make(chan *ContainerStats) + } + for i := 0; i < numWorkers; i++ { + go writeContainerStats(statsWriter, ch) } return ch, nil } -func (self *containerStatsWriter) process(statsWriter storage.ContainerStatsWriter, ch <-chan *ContainerStats) { +func writeContainerStats(statsWriter storage.ContainerStatsWriter, ch <-chan *ContainerStats) { for stats := range ch { err := statsWriter.Write(stats.ContainerReference, stats.Stats) if err != nil {