diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go index b7f0ec11..96c62f26 100644 --- a/storage/elasticsearch/elasticsearch.go +++ b/storage/elasticsearch/elasticsearch.go @@ -17,6 +17,7 @@ package elasticsearch import ( "fmt" "sync" + "time" info "github.com/google/cadvisor/info/v1" storage "github.com/google/cadvisor/storage" @@ -67,7 +68,7 @@ func (self *elasticStorage) AddStats(ref info.ContainerReference, stats *info.Co // Add some default params based on ContainerStats detail := self.containerStatsAndDefaultValues(ref, stats) // Index a cadvisor (using JSON serialization) - put, err := self.client.Index(). + _, err := self.client.Index(). Index(self.indexName). Type(self.typeName). BodyJson(detail). @@ -76,7 +77,6 @@ func (self *elasticStorage) AddStats(ref info.ContainerReference, stats *info.Co // Handle error panic(fmt.Errorf("failed to write stats to ElasticSearch- %s", err)) } - fmt.Printf("Indexed tweet %s to index %s, type %s\n", put.Id, put.Index, put.Type) }() return nil } @@ -93,19 +93,24 @@ func New(machineName, indexName, typeName, elasticHost string, + enableSniffer bool, ) (storage.StorageDriver, error) { // Obtain a client and connect to the default Elasticsearch installation // on 127.0.0.1:9200. Of course you can configure your client to connect // to other hosts and configure it in various other ways. client, err := elastic.NewClient( - elastic.SetURL(elasticHost)) + elastic.SetHealthcheck(true), + elastic.SetSniff(enableSniffer), + elastic.SetHealthcheckInterval(30*time.Second), + elastic.SetURL(elasticHost), + ) if err != nil { // Handle error panic(err) } // Ping the Elasticsearch server to get e.g. the version number - info, code, err := client.Ping().Do() + info, code, err := client.Ping().URL(elasticHost).Do() if err != nil { // Handle error panic(err) diff --git a/storagedriver.go b/storagedriver.go index b177131c..5193a8ff 100644 --- a/storagedriver.go +++ b/storagedriver.go @@ -39,9 +39,10 @@ var argDbTable = flag.String("storage_driver_table", "stats", "table name") var argDbIsSecure = flag.Bool("storage_driver_secure", false, "use secure connection with database") var argDbBufferDuration = flag.Duration("storage_driver_buffer_duration", 60*time.Second, "Writes in the storage driver will be buffered for this duration, and committed to the non memory backends as a single transaction") var storageDuration = flag.Duration("storage_duration", 2*time.Minute, "How long to keep data stored (Default: 2min).") -var argElasticHost = flag.String("storage_driver_es_host", "http://localhost:9200", "database host:port") -var argIndexName = flag.String("storage_driver_index", "cadvisor", "index name") -var argTypeName = flag.String("storage_driver_type", "stats", "type name") +var argElasticHost = flag.String("storage_driver_es_host", "http://localhost:9200", "ElasticSearch host:port") +var argIndexName = flag.String("storage_driver_es_index", "cadvisor", "ElasticSearch index name") +var argTypeName = flag.String("storage_driver_es_type", "stats", "ElasticSearch type name") +var argEnableSniffer = flag.Bool("storage_driver_es_enable_sniffer", false, "ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically") // Creates a memory storage with an optional backend storage option. func NewMemoryStorage(backendStorageName string) (*memory.InMemoryCache, error) { @@ -108,6 +109,7 @@ func NewMemoryStorage(backendStorageName string) (*memory.InMemoryCache, error) *argIndexName, *argTypeName, *argElasticHost, + *argEnableSniffer, ) case "statsd": backendStorage, err = statsd.New(