stats writers

This commit is contained in:
Nan Deng 2014-06-14 17:22:06 -07:00
parent 103e83fd86
commit a8401c422e
4 changed files with 54 additions and 25 deletions

View File

@ -44,7 +44,9 @@ func main() {
NumSamples: *argSampleSize, NumSamples: *argSampleSize,
ResetPeriod: *argResetPeriod, ResetPeriod: *argResetPeriod,
}) })
containerManager, err := manager.New()
// TODO(monnand): Add stats writer for manager
containerManager, err := manager.New(nil)
if err != nil { if err != nil {
log.Fatalf("Failed to create a Container Manager: %s", err) log.Fatalf("Failed to create a Container Manager: %s", err)
} }

View File

@ -43,9 +43,10 @@ type containerInfo struct {
} }
type containerData struct { type containerData struct {
handler container.ContainerHandler handler container.ContainerHandler
info containerInfo info containerInfo
lock sync.Mutex statsChan chan<- *ContainerStats
lock sync.Mutex
// Tells the container to stop. // Tells the container to stop.
stop chan bool stop chan bool
@ -84,7 +85,7 @@ func (c *containerData) GetInfo() (*containerInfo, error) {
return &ret, nil return &ret, nil
} }
func NewContainerData(containerName string) (*containerData, error) { func NewContainerData(containerName string, statsChan chan<- *ContainerStats) (*containerData, error) {
cont := &containerData{} cont := &containerData{}
handler, err := container.NewContainerHandler(containerName) handler, err := container.NewContainerHandler(containerName)
if err != nil { if err != nil {
@ -98,6 +99,7 @@ func NewContainerData(containerName string) (*containerData, error) {
cont.info.Name = ref.Name cont.info.Name = ref.Name
cont.info.Aliases = ref.Aliases cont.info.Aliases = ref.Aliases
cont.info.Stats = list.New() cont.info.Stats = list.New()
cont.statsChan = statsChan
cont.stop = make(chan bool, 1) cont.stop = make(chan bool, 1)
return cont, nil return cont, nil
@ -151,6 +153,15 @@ func (c *containerData) updateStats() error {
if stats == nil { if stats == nil {
return nil return nil
} }
if c.statsChan != nil {
ref, err := c.handler.ContainerReference()
if err == nil {
c.statsChan <- &ContainerStats{
ContainerReference: ref,
Stats: stats,
}
}
}
summary, err := c.handler.StatsSummary() summary, err := c.handler.StatsSummary()
if err != nil { if err != nil {
return err return err

View File

@ -22,6 +22,7 @@ import (
"github.com/google/cadvisor/container" "github.com/google/cadvisor/container"
"github.com/google/cadvisor/info" "github.com/google/cadvisor/info"
"github.com/google/cadvisor/storage"
) )
type Manager interface { type Manager interface {
@ -38,7 +39,7 @@ type Manager interface {
GetVersionInfo() (*info.VersionInfo, error) GetVersionInfo() (*info.VersionInfo, error)
} }
func New() (Manager, error) { func New(statsWriter storage.ContainerStatsWriter) (Manager, error) {
newManager := &manager{} newManager := &manager{}
newManager.containers = make(map[string]*containerData) newManager.containers = make(map[string]*containerData)
@ -55,6 +56,15 @@ func New() (Manager, error) {
} }
newManager.versionInfo = *versionInfo newManager.versionInfo = *versionInfo
log.Printf("Version: %+v", newManager.versionInfo) log.Printf("Version: %+v", newManager.versionInfo)
if statsWriter != nil {
// XXX(monnand): about numWorkers and queueLength, should we make it
// configurable?
ch, err := StartContainerStatsWriters(16, 64, statsWriter)
if err != nil {
return nil, err
}
newManager.statsChan = ch
}
return newManager, nil return newManager, nil
} }
@ -62,6 +72,7 @@ func New() (Manager, error) {
type manager struct { type manager struct {
containers map[string]*containerData containers map[string]*containerData
containersLock sync.RWMutex containersLock sync.RWMutex
statsChan chan<- *ContainerStats
machineInfo info.MachineInfo machineInfo info.MachineInfo
versionInfo info.VersionInfo versionInfo info.VersionInfo
} }
@ -160,7 +171,7 @@ func (m *manager) GetVersionInfo() (*info.VersionInfo, error) {
// Create a container. This expects to only be called from the global manager thread. // Create a container. This expects to only be called from the global manager thread.
func (m *manager) createContainer(containerName string) (*containerData, error) { func (m *manager) createContainer(containerName string) (*containerData, error) {
cont, err := NewContainerData(containerName) cont, err := NewContainerData(containerName, m.statsChan)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -15,6 +15,7 @@
package manager package manager
import ( import (
"fmt"
"log" "log"
"github.com/google/cadvisor/info" "github.com/google/cadvisor/info"
@ -24,31 +25,35 @@ import (
type ContainerStats struct { type ContainerStats struct {
ContainerReference info.ContainerReference ContainerReference info.ContainerReference
Stats *info.ContainerStats Stats *info.ContainerStats
ResChan chan<- error
// ResChan is nil if the sender wants to ignore the result
ResChan chan<- error
} }
type ContainerStatsProcessor interface { // Create numWorkers goroutines to write container stats info into the
StartStatsProcessors(numProcs int) (chan<- *ContainerStats, error) // specified storage. Returns a write-only channel for the caller to write
StopAllProcessors() // container stats. Closing this channel will stop all workers.
} func StartContainerStatsWriters(
numWorkers int,
type containerStatsWriter struct { queueLength int,
config *storage.Config statsWriter storage.ContainerStatsWriter,
} ) (chan<- *ContainerStats, error) {
if statsWriter == nil {
func (self *containerStatsWriter) StartStatsProcessors(numProcs int) (chan<- *ContainerStats, error) { return nil, fmt.Errorf("invalid stats writer")
ch := make(chan *ContainerStats)
statsWriter, err := storage.NewContainerStatsWriter(self.config)
if err != nil {
return nil, err
} }
for i := 0; i < numProcs; i++ { var ch chan *ContainerStats
go self.process(statsWriter, ch) if queueLength > 0 {
ch = make(chan *ContainerStats, queueLength)
} else {
ch = make(chan *ContainerStats)
}
for i := 0; i < numWorkers; i++ {
go writeContainerStats(statsWriter, ch)
} }
return ch, nil return ch, nil
} }
func (self *containerStatsWriter) process(statsWriter storage.ContainerStatsWriter, ch <-chan *ContainerStats) { func writeContainerStats(statsWriter storage.ContainerStatsWriter, ch <-chan *ContainerStats) {
for stats := range ch { for stats := range ch {
err := statsWriter.Write(stats.ContainerReference, stats.Stats) err := statsWriter.Write(stats.ContainerReference, stats.Stats)
if err != nil { if err != nil {