Update InfluxDB dependency to v0.9.5.1

This commit is contained in:
Matthias Nüßler 2015-12-04 01:29:22 +00:00 committed by Jimmi Dyson
parent ada6e3d354
commit 071e910ba2
6 changed files with 1385 additions and 589 deletions

4
Godeps/Godeps.json generated
View File

@ -88,8 +88,8 @@
}, },
{ {
"ImportPath": "github.com/influxdb/influxdb/client", "ImportPath": "github.com/influxdb/influxdb/client",
"Comment": "v0.8.0-rc.3-9-g3284662", "Comment": "v0.9.5.1",
"Rev": "3284662b350688b651359f9124928856071bd3f5" "Rev": "9eab56311373ee6f788ae5dfc87e2240038f0eb4"
}, },
{ {
"ImportPath": "github.com/kr/pretty", "ImportPath": "github.com/kr/pretty",

View File

@ -1,2 +1,256 @@
influxdb-go # InfluxDB Client
===========
[![GoDoc](https://godoc.org/github.com/influxdb/influxdb?status.svg)](http://godoc.org/github.com/influxdb/influxdb/client/v2)
## Description
**NOTE:** The Go client library now has a "v2" version, with the old version
being deprecated. The new version can be imported at
`import "github.com/influxdb/influxdb/client/v2"`. It is not backwards-compatible.
A Go client library written and maintained by the **InfluxDB** team.
This package provides convenience functions to read and write time series data.
It uses the HTTP protocol to communicate with your **InfluxDB** cluster.
## Getting Started
### Connecting To Your Database
Connecting to an **InfluxDB** database is straightforward. You will need a host
name, a port and the cluster user credentials if applicable. The default port is
8086. You can customize these settings to your specific installation via the
**InfluxDB** configuration file.
Thought not necessary for experimentation, you may want to create a new user
and authenticate the connection to your database.
For more information please check out the
[Cluster Admin Docs](http://influxdb.com/docs/v0.9/query_language/database_administration.html).
For the impatient, you can create a new admin user _bubba_ by firing off the
[InfluxDB CLI](https://github.com/influxdb/influxdb/blob/master/cmd/influx/main.go).
```shell
influx
> create user bubba with password 'bumblebeetuna'
> grant all privileges to bubba
```
And now for good measure set the credentials in you shell environment.
In the example below we will use $INFLUX_USER and $INFLUX_PWD
Now with the administrivia out of the way, let's connect to our database.
NOTE: If you've opted out of creating a user, you can omit Username and Password in
the configuration below.
```go
package main
import
import (
"net/url"
"fmt"
"log"
"os"
"github.com/influxdb/influxdb/client/v2"
)
const (
MyDB = "square_holes"
username = "bubba"
password = "bumblebeetuna"
)
func main() {
// Make client
u, _ := url.Parse("http://localhost:8086")
c := client.NewClient(client.Config{
URL: u,
Username: username,
Password: password,
})
// Create a new point batch
bp := client.NewBatchPoints(client.BatchPointsConfig{
Database: MyDB,
Precision: "s",
})
// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt := client.NewPoint("cpu_usage", tags, fields, time.Now())
bp.AddPoint(pt)
// Write the batch
c.Write(bp)
}
```
### Inserting Data
Time series data aka *points* are written to the database using batch inserts.
The mechanism is to create one or more points and then create a batch aka
*batch points* and write these to a given database and series. A series is a
combination of a measurement (time/values) and a set of tags.
In this sample we will create a batch of a 1,000 points. Each point has a time and
a single value as well as 2 tags indicating a shape and color. We write these points
to a database called _square_holes_ using a measurement named _shapes_.
NOTE: You can specify a RetentionPolicy as part of the batch points. If not
provided InfluxDB will use the database _default_ retention policy.
```go
func writePoints(clnt client.Client) {
sampleSize := 1000
rand.Seed(42)
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: "systemstats",
Precision: "us",
})
for i := 0; i < sampleSize; i++ {
regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"}
tags := map[string]string{
"cpu": "cpu-total",
"host": fmt.Sprintf("host%d", rand.Intn(1000)),
"region": regions[rand.Intn(len(regions))],
}
idle := rand.Float64() * 100.0
fields := map[string]interface{}{
"idle": idle,
"busy": 100.0 - idle,
}
bp.AddPoint(client.NewPoint(
"cpu_usage",
tags,
fields,
time.Now(),
))
}
err := clnt.Write(bp)
if err != nil {
log.Fatal(err)
}
}
```
### Querying Data
One nice advantage of using **InfluxDB** the ability to query your data using familiar
SQL constructs. In this example we can create a convenience function to query the database
as follows:
```go
// queryDB convenience function to query the database
func queryDB(clnt client.Client, cmd string) (res []client.Result, err error) {
q := client.Query{
Command: cmd,
Database: MyDB,
}
if response, err := clnt.Query(q); err == nil {
if response.Error() != nil {
return res, response.Error()
}
res = response.Results
}
return res, nil
}
```
#### Creating a Database
```go
_, err := queryDB(clnt, fmt.Sprintf("CREATE DATABASE %s", MyDB))
if err != nil {
log.Fatal(err)
}
```
#### Count Records
```go
q := fmt.Sprintf("SELECT count(%s) FROM %s", "value", MyMeasurement)
res, err := queryDB(clnt, q)
if err != nil {
log.Fatal(err)
}
count := res[0].Series[0].Values[0][1]
log.Printf("Found a total of %v records\n", count)
```
#### Find the last 10 _shapes_ records
```go
q := fmt.Sprintf("SELECT * FROM %s LIMIT %d", MyMeasurement, 20)
res, err = queryDB(clnt, q)
if err != nil {
log.Fatal(err)
}
for i, row := range res[0].Series[0].Values {
t, err := time.Parse(time.RFC3339, row[0].(string))
if err != nil {
log.Fatal(err)
}
val := row[1].(string)
log.Printf("[%2d] %s: %s\n", i, t.Format(time.Stamp), val)
}
```
### Using the UDP Client
The **InfluxDB** client also supports writing over UDP.
```go
func WriteUDP() {
// Make client
c := client.NewUDPClient("localhost:8089")
// Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Precision: "s",
})
// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err != nil {
panic(err.Error())
}
bp.AddPoint(pt)
// Write the batch
c.Write(bp)
}
```
## Go Docs
Please refer to
[http://godoc.org/github.com/influxdb/influxdb/client/v2](http://godoc.org/github.com/influxdb/influxdb/client/v2)
for documentation.
## See Also
You can also examine how the client library is used by the
[InfluxDB CLI](https://github.com/influxdb/influxdb/blob/master/cmd/influx/main.go).

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +0,0 @@
package client
type Series struct {
Name string `json:"name"`
Columns []string `json:"columns"`
Points [][]interface{} `json:"points"`
}
func (self *Series) GetName() string {
return self.Name
}
func (self *Series) GetColumns() []string {
return self.Columns
}
func (self *Series) GetPoints() [][]interface{} {
return self.Points
}

View File

@ -1,15 +0,0 @@
package client
type ShardSpace struct {
// required, must be unique within the database
Name string `json:"name"`
// required, a database has many shard spaces and a shard space belongs to a database
Database string `json:"database"`
// this is optional, if they don't set it, we'll set to /.*/
Regex string `json:"regex"`
// this is optional, if they don't set it, it will default to the storage.dir in the config
RetentionPolicy string `json:"retentionPolicy"`
ShardDuration string `json:"shardDuration"`
ReplicationFactor uint32 `json:"replicationFactor"`
Split uint32 `json:"split"`
}

View File

@ -0,0 +1,498 @@
package client
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"time"
"github.com/influxdb/influxdb/models"
)
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
const (
UDPPayloadSize = 512
)
type HTTPConfig struct {
// Addr should be of the form "http://host:port"
// or "http://[ipv6-host%zone]:port".
Addr string
// Username is the influxdb username, optional
Username string
// Password is the influxdb password, optional
Password string
// UserAgent is the http User Agent, defaults to "InfluxDBClient"
UserAgent string
// Timeout for influxdb writes, defaults to no timeout
Timeout time.Duration
// InsecureSkipVerify gets passed to the http client, if true, it will
// skip https certificate verification. Defaults to false
InsecureSkipVerify bool
}
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPBufferSize.
PayloadSize int
}
type BatchPointsConfig struct {
// Precision is the write precision of the points, defaults to "ns"
Precision string
// Database is the database to write points to
Database string
// RetentionPolicy is the retention policy of the points
RetentionPolicy string
// Write consistency is the number of servers required to confirm write
WriteConsistency string
}
// Client is a client interface for writing & querying the database
type Client interface {
// Write takes a BatchPoints object and writes all Points to InfluxDB.
Write(bp BatchPoints) error
// Query makes an InfluxDB Query on the database. This will fail if using
// the UDP client.
Query(q Query) (*Response, error)
// Close releases any resources a Client may be using.
Close() error
}
// NewClient creates a client interface from the given config.
func NewHTTPClient(conf HTTPConfig) (Client, error) {
if conf.UserAgent == "" {
conf.UserAgent = "InfluxDBClient"
}
u, err := url.Parse(conf.Addr)
if err != nil {
return nil, err
} else if u.Scheme != "http" && u.Scheme != "https" {
m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
" must start with http:// or https://", u.Scheme)
return nil, errors.New(m)
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
return &client{
url: u,
username: conf.Username,
password: conf.Password,
useragent: conf.UserAgent,
httpClient: &http.Client{
Timeout: conf.Timeout,
Transport: tr,
},
}, nil
}
// Close releases the client's resources.
func (c *client) Close() error {
return nil
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
type client struct {
url *url.URL
username string
password string
useragent string
httpClient *http.Client
}
type udpclient struct {
conn *net.UDPConn
payloadSize int
}
// BatchPoints is an interface into a batched grouping of points to write into
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
// batch for each goroutine.
type BatchPoints interface {
// AddPoint adds the given point to the Batch of points
AddPoint(p *Point)
// Points lists the points in the Batch
Points() []*Point
// Precision returns the currently set precision of this Batch
Precision() string
// SetPrecision sets the precision of this batch.
SetPrecision(s string) error
// Database returns the currently set database of this Batch
Database() string
// SetDatabase sets the database of this Batch
SetDatabase(s string)
// WriteConsistency returns the currently set write consistency of this Batch
WriteConsistency() string
// SetWriteConsistency sets the write consistency of this Batch
SetWriteConsistency(s string)
// RetentionPolicy returns the currently set retention policy of this Batch
RetentionPolicy() string
// SetRetentionPolicy sets the retention policy of this Batch
SetRetentionPolicy(s string)
}
// NewBatchPoints returns a BatchPoints interface based on the given config.
func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
if conf.Precision == "" {
conf.Precision = "ns"
}
if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
return nil, err
}
bp := &batchpoints{
database: conf.Database,
precision: conf.Precision,
retentionPolicy: conf.RetentionPolicy,
writeConsistency: conf.WriteConsistency,
}
return bp, nil
}
type batchpoints struct {
points []*Point
database string
precision string
retentionPolicy string
writeConsistency string
}
func (bp *batchpoints) AddPoint(p *Point) {
bp.points = append(bp.points, p)
}
func (bp *batchpoints) Points() []*Point {
return bp.points
}
func (bp *batchpoints) Precision() string {
return bp.precision
}
func (bp *batchpoints) Database() string {
return bp.database
}
func (bp *batchpoints) WriteConsistency() string {
return bp.writeConsistency
}
func (bp *batchpoints) RetentionPolicy() string {
return bp.retentionPolicy
}
func (bp *batchpoints) SetPrecision(p string) error {
if _, err := time.ParseDuration("1" + p); err != nil {
return err
}
bp.precision = p
return nil
}
func (bp *batchpoints) SetDatabase(db string) {
bp.database = db
}
func (bp *batchpoints) SetWriteConsistency(wc string) {
bp.writeConsistency = wc
}
func (bp *batchpoints) SetRetentionPolicy(rp string) {
bp.retentionPolicy = rp
}
type Point struct {
pt models.Point
}
// NewPoint returns a point with the given timestamp. If a timestamp is not
// given, then data is sent to the database without a timestamp, in which case
// the server will assign local time upon reception. NOTE: it is recommended
// to send data without a timestamp.
func NewPoint(
name string,
tags map[string]string,
fields map[string]interface{},
t ...time.Time,
) (*Point, error) {
var T time.Time
if len(t) > 0 {
T = t[0]
}
pt, err := models.NewPoint(name, tags, fields, T)
if err != nil {
return nil, err
}
return &Point{
pt: pt,
}, nil
}
// String returns a line-protocol string of the Point
func (p *Point) String() string {
return p.pt.String()
}
// PrecisionString returns a line-protocol string of the Point, at precision
func (p *Point) PrecisionString(precison string) string {
return p.pt.PrecisionString(precison)
}
// Name returns the measurement name of the point
func (p *Point) Name() string {
return p.pt.Name()
}
// Name returns the tags associated with the point
func (p *Point) Tags() map[string]string {
return p.pt.Tags()
}
// Time return the timestamp for the point
func (p *Point) Time() time.Time {
return p.pt.Time()
}
// UnixNano returns the unix nano time of the point
func (p *Point) UnixNano() int64 {
return p.pt.UnixNano()
}
// Fields returns the fields for the point
func (p *Point) Fields() map[string]interface{} {
return p.pt.Fields()
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b bytes.Buffer
var d time.Duration
d, _ = time.ParseDuration("1" + bp.Precision())
for _, p := range bp.Points() {
pointstring := p.pt.RoundedString(d) + "\n"
// Write and reset the buffer if we reach the max size
if b.Len()+len(pointstring) >= uc.payloadSize {
if _, err := uc.conn.Write(b.Bytes()); err != nil {
return err
}
b.Reset()
}
if _, err := b.WriteString(pointstring); err != nil {
return err
}
}
_, err := uc.conn.Write(b.Bytes())
return err
}
func (c *client) Write(bp BatchPoints) error {
var b bytes.Buffer
for _, p := range bp.Points() {
if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
return err
}
if err := b.WriteByte('\n'); err != nil {
return err
}
}
u := c.url
u.Path = "write"
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", bp.Database())
params.Set("rp", bp.RetentionPolicy())
params.Set("precision", bp.Precision())
params.Set("consistency", bp.WriteConsistency())
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
return err
}
return nil
}
// Query defines a query to send to the server
type Query struct {
Command string
Database string
Precision string
}
// NewQuery returns a query object
// database and precision strings can be empty strings if they are not needed
// for the query.
func NewQuery(command, database, precision string) Query {
return Query{
Command: command,
Database: database,
Precision: precision,
}
}
// Response represents a list of statement results.
type Response struct {
Results []Result
Err error
}
// Error returns the first error from any statement.
// Returns nil if no errors occurred on any statements.
func (r *Response) Error() error {
if r.Err != nil {
return r.Err
}
for _, result := range r.Results {
if result.Err != nil {
return result.Err
}
}
return nil
}
// Result represents a resultset returned from a single statement.
type Result struct {
Series []models.Row
Err error
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
// Query sends a command to the server and returns the Response
func (c *client) Query(q Query) (*Response, error) {
u := c.url
u.Path = "query"
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("q", q.Command)
params.Set("db", q.Database)
if q.Precision != "" {
params.Set("epoch", q.Precision)
}
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)
// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, decErr
}
// If we don't have an error in our json response, and didn't get statusOK
// then send back an error
if resp.StatusCode != http.StatusOK && response.Error() == nil {
return &response, fmt.Errorf("received status code %d from server",
resp.StatusCode)
}
return &response, nil
}