Merge pull request #1270 from devx/add-kafka-tls-client-auth
add optional TLS client auth to Kafka storage
This commit is contained in:
commit
85badb7422
4
Godeps/Godeps.json
generated
4
Godeps/Godeps.json
generated
@ -22,8 +22,8 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/Shopify/sarama",
|
"ImportPath": "github.com/Shopify/sarama",
|
||||||
"Comment": "v1.7.0",
|
"Comment": "v1.8.0",
|
||||||
"Rev": "87ec8d76e89cc002ff5673ee7598ae1db2f32424"
|
"Rev": "4ba9bba6adb6697bcec3841e1ecdfecf5227c3b9"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/Sirupsen/logrus",
|
"ImportPath": "github.com/Sirupsen/logrus",
|
||||||
|
@ -7,7 +7,7 @@ cAdvisor supports exporting stats to various storage driver plugins. To enable a
|
|||||||
- [BigQuery](https://cloud.google.com/bigquery/). See the [documentation](../../storage/bigquery/README.md) for usage.
|
- [BigQuery](https://cloud.google.com/bigquery/). See the [documentation](../../storage/bigquery/README.md) for usage.
|
||||||
- [ElasticSearch](https://www.elastic.co/). See the [documentation](elasticsearch.md) for usage and examples.
|
- [ElasticSearch](https://www.elastic.co/). See the [documentation](elasticsearch.md) for usage and examples.
|
||||||
- [InfluxDB](https://influxdb.com/). See the [documentation](influxdb.md) for usage and examples.
|
- [InfluxDB](https://influxdb.com/). See the [documentation](influxdb.md) for usage and examples.
|
||||||
- [Kafka](http://kafka.apache.org/)
|
- [Kafka](http://kafka.apache.org/). See the [documentation](kafka.md) for usage.
|
||||||
- [Prometheus](http://prometheus.io). See the [documentation](prometheus.md) for usage and examples.
|
- [Prometheus](http://prometheus.io). See the [documentation](prometheus.md) for usage and examples.
|
||||||
- [Redis](http://redis.io/)
|
- [Redis](http://redis.io/)
|
||||||
- [StatsD](https://github.com/etsy/statsd)
|
- [StatsD](https://github.com/etsy/statsd)
|
||||||
|
43
docs/storage/kafka.md
Normal file
43
docs/storage/kafka.md
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
# Exporting cAdvisor Stats to Kafka
|
||||||
|
|
||||||
|
cAdvisor supports exporting stats to [Kafka](http://kafka.apache.org/). To use Kafka, you need to provide the additional flags to cAdvisor:
|
||||||
|
|
||||||
|
Set the storage driver as Kafka:
|
||||||
|
|
||||||
|
```
|
||||||
|
-storage_driver=kafka
|
||||||
|
```
|
||||||
|
|
||||||
|
If no broker are provided it will default to a broker listening at localhost:9092, with 'stats' as the default topic.
|
||||||
|
|
||||||
|
|
||||||
|
Specify a Kafka broker address:
|
||||||
|
|
||||||
|
```
|
||||||
|
-storage_driver_kafka_broker_list=localhost:9092
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
Specify a Kafka topic:
|
||||||
|
|
||||||
|
```
|
||||||
|
-storage_driver_kafka_topic=myTopic
|
||||||
|
```
|
||||||
|
|
||||||
|
As of version 9.0. Kafka supports TLS client auth:
|
||||||
|
|
||||||
|
```
|
||||||
|
# To enable TLS client auth support you need to provide the following:
|
||||||
|
|
||||||
|
# Location to Certificate Authority certificate
|
||||||
|
-storage_driver_kafka_ssl_ca=/path/to/ca.pem
|
||||||
|
|
||||||
|
# Location to client certificate certificate
|
||||||
|
-storage_driver_kafka_ssl_cert=/path/to/client_cert.pem
|
||||||
|
|
||||||
|
# Location to client certificate key
|
||||||
|
-storage_driver_kafka_ssl_key=/path/to/client_key.pem
|
||||||
|
|
||||||
|
# Verify SSL certificate chain (default: true)
|
||||||
|
-storage_driver_kafka_ssl_verify=false
|
||||||
|
```
|
@ -15,8 +15,11 @@
|
|||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -36,6 +39,10 @@ func init() {
|
|||||||
var (
|
var (
|
||||||
brokers = flag.String("storage_driver_kafka_broker_list", "localhost:9092", "kafka broker(s) csv")
|
brokers = flag.String("storage_driver_kafka_broker_list", "localhost:9092", "kafka broker(s) csv")
|
||||||
topic = flag.String("storage_driver_kafka_topic", "stats", "kafka topic")
|
topic = flag.String("storage_driver_kafka_topic", "stats", "kafka topic")
|
||||||
|
certFile = flag.String("storage_driver_kafka_ssl_cert", "", "optional certificate file for TLS client authentication")
|
||||||
|
keyFile = flag.String("storage_driver_kafka_ssl_key", "", "optional key file for TLS client authentication")
|
||||||
|
caFile = flag.String("storage_driver_kafka_ssl_ca", "", "optional certificate authority file for TLS client authentication")
|
||||||
|
verifySSL = flag.Bool("storage_driver_kafka_ssl_verify", true, "verify ssl certificate chain")
|
||||||
)
|
)
|
||||||
|
|
||||||
type kafkaStorage struct {
|
type kafkaStorage struct {
|
||||||
@ -94,8 +101,43 @@ func new() (storage.StorageDriver, error) {
|
|||||||
return newStorage(machineName)
|
return newStorage(machineName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func generateTLSConfig() (*tls.Config, error) {
|
||||||
|
if *certFile != "" && *keyFile != "" && *caFile != "" {
|
||||||
|
cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
caCert, err := ioutil.ReadFile(*caFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
caCertPool := x509.NewCertPool()
|
||||||
|
caCertPool.AppendCertsFromPEM(caCert)
|
||||||
|
|
||||||
|
return &tls.Config{
|
||||||
|
Certificates: []tls.Certificate{cert},
|
||||||
|
RootCAs: caCertPool,
|
||||||
|
InsecureSkipVerify: *verifySSL,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func newStorage(machineName string) (storage.StorageDriver, error) {
|
func newStorage(machineName string) (storage.StorageDriver, error) {
|
||||||
config := kafka.NewConfig()
|
config := kafka.NewConfig()
|
||||||
|
|
||||||
|
tlsConfig, err := generateTLSConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if tlsConfig != nil {
|
||||||
|
config.Net.TLS.Enable = true
|
||||||
|
config.Net.TLS.Config = tlsConfig
|
||||||
|
}
|
||||||
|
|
||||||
config.Producer.RequiredAcks = kafka.WaitForAll
|
config.Producer.RequiredAcks = kafka.WaitForAll
|
||||||
|
|
||||||
brokerList := strings.Split(*brokers, ",")
|
brokerList := strings.Split(*brokers, ",")
|
||||||
|
Loading…
Reference in New Issue
Block a user