diff --git a/cadvisor.go b/cadvisor.go index 7503be3e..9a5102ca 100644 --- a/cadvisor.go +++ b/cadvisor.go @@ -37,7 +37,7 @@ var argIp = flag.String("listen_ip", "", "IP to listen on, defaults to all IPs") var argPort = flag.Int("port", 8080, "port to listen") var maxProcs = flag.Int("max_procs", 0, "max number of CPUs that can be used simultaneously. Less than 1 for default (number of cores).") -var argDbDriver = flag.String("storage_driver", "", "storage driver to use. Data is always cached shortly in memory, this controls where data is pushed besides the local cache. Empty means none. Options are: (default), bigquery, and influxdb") +var argDbDriver = flag.String("storage_driver", "", "storage driver to use. Data is always cached shortly in memory, this controls where data is pushed besides the local cache. Empty means none. Options are: (default), bigquery, influxdb, and kafka") var versionFlag = flag.Bool("version", false, "print cAdvisor version and exit") var httpAuthFile = flag.String("http_auth_file", "", "HTTP auth file for the web UI") diff --git a/container/docker/handler.go b/container/docker/handler.go index ebcd46ac..34be520c 100644 --- a/container/docker/handler.go +++ b/container/docker/handler.go @@ -179,9 +179,11 @@ func (self *dockerContainerHandler) Cleanup() { func (self *dockerContainerHandler) ContainerReference() (info.ContainerReference, error) { return info.ContainerReference{ + Id: self.id, Name: self.name, Aliases: self.aliases, Namespace: DockerNamespace, + Labels: self.labels, }, nil } diff --git a/info/v1/container.go b/info/v1/container.go index 6b7f40de..bb4fc322 100644 --- a/info/v1/container.go +++ b/info/v1/container.go @@ -70,6 +70,9 @@ type ContainerSpec struct { // Container reference contains enough information to uniquely identify a container type ContainerReference struct { + // The container id + Id string `json:"id,omitempty"` + // The absolute name of the container. This is unique on the machine. Name string `json:"name"` @@ -80,6 +83,8 @@ type ContainerReference struct { // Namespace under which the aliases of a container are unique. // An example of a namespace is "docker" for Docker containers. Namespace string `json:"namespace,omitempty"` + + Labels map[string]string `json:"labels,omitempty"` } // Sorts by container name. diff --git a/storage/kafka/kafka.go b/storage/kafka/kafka.go new file mode 100644 index 00000000..e43a374c --- /dev/null +++ b/storage/kafka/kafka.go @@ -0,0 +1,114 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "encoding/json" + "flag" + "os" + "strings" + "time" + + info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/storage" + "github.com/google/cadvisor/utils/container" + + kafka "github.com/Shopify/sarama" + "github.com/golang/glog" +) + +func init() { + storage.RegisterStorageDriver("kafka", new) +} + +var ( + brokers = flag.String("storage_driver_kafka_broker_list", "localhost:9092", "kafka broker(s) csv") + topic = flag.String("storage_driver_kafka_topic", "stats", "kafka topic") +) + +type kafkaStorage struct { + producer kafka.AsyncProducer + topic string + machineName string +} + +type detailSpec struct { + Timestamp time.Time `json:"timestamp"` + MachineName string `json:"machine_name,omitempty"` + ContainerName string `json:"container_Name,omitempty"` + ContainerID string `json:"container_Id,omitempty"` + ContainerLabels map[string]string `json:"container_labels,omitempty"` + ContainerStats *info.ContainerStats `json:"container_stats,omitempty"` +} + +func (driver *kafkaStorage) infoToDetailSpec(ref info.ContainerReference, stats *info.ContainerStats) *detailSpec { + timestamp := time.Now() + containerID := ref.Id + containerLabels := ref.Labels + containerName := container.GetPreferredName(ref) + + detail := &detailSpec{ + Timestamp: timestamp, + MachineName: driver.machineName, + ContainerName: containerName, + ContainerID: containerID, + ContainerLabels: containerLabels, + ContainerStats: stats, + } + return detail +} + +func (driver *kafkaStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error { + detail := driver.infoToDetailSpec(ref, stats) + b, err := json.Marshal(detail) + + driver.producer.Input() <- &kafka.ProducerMessage{ + Topic: driver.topic, + Value: kafka.StringEncoder(b), + } + + return err +} + +func (self *kafkaStorage) Close() error { + return self.producer.Close() +} + +func new() (storage.StorageDriver, error) { + machineName, err := os.Hostname() + if err != nil { + return nil, err + } + return newStorage(machineName) +} + +func newStorage(machineName string) (storage.StorageDriver, error) { + config := kafka.NewConfig() + config.Producer.RequiredAcks = kafka.WaitForAll + + brokerList := strings.Split(*brokers, ",") + glog.V(4).Infof("Kafka brokers:%q", brokers) + + producer, err := kafka.NewAsyncProducer(brokerList, config) + if err != nil { + return nil, err + } + ret := &kafkaStorage{ + producer: producer, + topic: *topic, + machineName: machineName, + } + return ret, nil +} diff --git a/storagedriver.go b/storagedriver.go index 43f72987..6da3b305 100644 --- a/storagedriver.go +++ b/storagedriver.go @@ -23,6 +23,7 @@ import ( _ "github.com/google/cadvisor/storage/bigquery" _ "github.com/google/cadvisor/storage/elasticsearch" _ "github.com/google/cadvisor/storage/influxdb" + _ "github.com/google/cadvisor/storage/kafka" _ "github.com/google/cadvisor/storage/redis" _ "github.com/google/cadvisor/storage/statsd" _ "github.com/google/cadvisor/storage/stdout" diff --git a/utils/container/container.go b/utils/container/container.go new file mode 100644 index 00000000..ea956a5c --- /dev/null +++ b/utils/container/container.go @@ -0,0 +1,31 @@ +// Copyright 2016 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package container + +import ( + info "github.com/google/cadvisor/info/v1" +) + +// Returns the alias a container is known by within a certain namespace, +// if available. Otherwise returns the absolute name of the container. +func GetPreferredName(ref info.ContainerReference) string { + var containerName string + if len(ref.Aliases) > 0 { + containerName = ref.Aliases[0] + } else { + containerName = ref.Name + } + return containerName +}