diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 3081a096..25267116 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -88,8 +88,8 @@ }, { "ImportPath": "github.com/influxdb/influxdb/client", - "Comment": "v0.8.0-rc.3-9-g3284662", - "Rev": "3284662b350688b651359f9124928856071bd3f5" + "Comment": "v0.9.5.1", + "Rev": "9eab56311373ee6f788ae5dfc87e2240038f0eb4" }, { "ImportPath": "github.com/kr/pretty", diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/README.md b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/README.md index 2d849dfb..8a041128 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/README.md +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/README.md @@ -1,2 +1,256 @@ -influxdb-go -=========== +# InfluxDB Client + +[![GoDoc](https://godoc.org/github.com/influxdb/influxdb?status.svg)](http://godoc.org/github.com/influxdb/influxdb/client/v2) + +## Description + +**NOTE:** The Go client library now has a "v2" version, with the old version +being deprecated. The new version can be imported at +`import "github.com/influxdb/influxdb/client/v2"`. It is not backwards-compatible. + +A Go client library written and maintained by the **InfluxDB** team. +This package provides convenience functions to read and write time series data. +It uses the HTTP protocol to communicate with your **InfluxDB** cluster. + + +## Getting Started + +### Connecting To Your Database + +Connecting to an **InfluxDB** database is straightforward. You will need a host +name, a port and the cluster user credentials if applicable. The default port is +8086. You can customize these settings to your specific installation via the +**InfluxDB** configuration file. + +Thought not necessary for experimentation, you may want to create a new user +and authenticate the connection to your database. + +For more information please check out the +[Cluster Admin Docs](http://influxdb.com/docs/v0.9/query_language/database_administration.html). + +For the impatient, you can create a new admin user _bubba_ by firing off the +[InfluxDB CLI](https://github.com/influxdb/influxdb/blob/master/cmd/influx/main.go). + +```shell +influx +> create user bubba with password 'bumblebeetuna' +> grant all privileges to bubba +``` + +And now for good measure set the credentials in you shell environment. +In the example below we will use $INFLUX_USER and $INFLUX_PWD + +Now with the administrivia out of the way, let's connect to our database. + +NOTE: If you've opted out of creating a user, you can omit Username and Password in +the configuration below. + +```go +package main + +import +import ( + "net/url" + "fmt" + "log" + "os" + + "github.com/influxdb/influxdb/client/v2" +) + +const ( + MyDB = "square_holes" + username = "bubba" + password = "bumblebeetuna" +) + +func main() { + // Make client + u, _ := url.Parse("http://localhost:8086") + c := client.NewClient(client.Config{ + URL: u, + Username: username, + Password: password, + }) + + // Create a new point batch + bp := client.NewBatchPoints(client.BatchPointsConfig{ + Database: MyDB, + Precision: "s", + }) + + // Create a point and add to batch + tags := map[string]string{"cpu": "cpu-total"} + fields := map[string]interface{}{ + "idle": 10.1, + "system": 53.3, + "user": 46.6, + } + pt := client.NewPoint("cpu_usage", tags, fields, time.Now()) + bp.AddPoint(pt) + + // Write the batch + c.Write(bp) +} + +``` + +### Inserting Data + +Time series data aka *points* are written to the database using batch inserts. +The mechanism is to create one or more points and then create a batch aka +*batch points* and write these to a given database and series. A series is a +combination of a measurement (time/values) and a set of tags. + +In this sample we will create a batch of a 1,000 points. Each point has a time and +a single value as well as 2 tags indicating a shape and color. We write these points +to a database called _square_holes_ using a measurement named _shapes_. + +NOTE: You can specify a RetentionPolicy as part of the batch points. If not +provided InfluxDB will use the database _default_ retention policy. + +```go +func writePoints(clnt client.Client) { + sampleSize := 1000 + rand.Seed(42) + + bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ + Database: "systemstats", + Precision: "us", + }) + + for i := 0; i < sampleSize; i++ { + regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"} + tags := map[string]string{ + "cpu": "cpu-total", + "host": fmt.Sprintf("host%d", rand.Intn(1000)), + "region": regions[rand.Intn(len(regions))], + } + + idle := rand.Float64() * 100.0 + fields := map[string]interface{}{ + "idle": idle, + "busy": 100.0 - idle, + } + + bp.AddPoint(client.NewPoint( + "cpu_usage", + tags, + fields, + time.Now(), + )) + } + + err := clnt.Write(bp) + if err != nil { + log.Fatal(err) + } +} +``` + + +### Querying Data + +One nice advantage of using **InfluxDB** the ability to query your data using familiar +SQL constructs. In this example we can create a convenience function to query the database +as follows: + +```go +// queryDB convenience function to query the database +func queryDB(clnt client.Client, cmd string) (res []client.Result, err error) { + q := client.Query{ + Command: cmd, + Database: MyDB, + } + if response, err := clnt.Query(q); err == nil { + if response.Error() != nil { + return res, response.Error() + } + res = response.Results + } + return res, nil +} +``` + +#### Creating a Database + +```go +_, err := queryDB(clnt, fmt.Sprintf("CREATE DATABASE %s", MyDB)) +if err != nil { + log.Fatal(err) +} +``` + +#### Count Records + +```go +q := fmt.Sprintf("SELECT count(%s) FROM %s", "value", MyMeasurement) +res, err := queryDB(clnt, q) +if err != nil { + log.Fatal(err) +} +count := res[0].Series[0].Values[0][1] +log.Printf("Found a total of %v records\n", count) +``` + +#### Find the last 10 _shapes_ records + +```go +q := fmt.Sprintf("SELECT * FROM %s LIMIT %d", MyMeasurement, 20) +res, err = queryDB(clnt, q) +if err != nil { + log.Fatal(err) +} + +for i, row := range res[0].Series[0].Values { + t, err := time.Parse(time.RFC3339, row[0].(string)) + if err != nil { + log.Fatal(err) + } + val := row[1].(string) + log.Printf("[%2d] %s: %s\n", i, t.Format(time.Stamp), val) +} +``` + +### Using the UDP Client + +The **InfluxDB** client also supports writing over UDP. + +```go +func WriteUDP() { + // Make client + c := client.NewUDPClient("localhost:8089") + + // Create a new point batch + bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ + Precision: "s", + }) + + // Create a point and add to batch + tags := map[string]string{"cpu": "cpu-total"} + fields := map[string]interface{}{ + "idle": 10.1, + "system": 53.3, + "user": 46.6, + } + pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) + if err != nil { + panic(err.Error()) + } + bp.AddPoint(pt) + + // Write the batch + c.Write(bp) +} +``` + +## Go Docs + +Please refer to +[http://godoc.org/github.com/influxdb/influxdb/client/v2](http://godoc.org/github.com/influxdb/influxdb/client/v2) +for documentation. + +## See Also + +You can also examine how the client library is used by the +[InfluxDB CLI](https://github.com/influxdb/influxdb/blob/master/cmd/influx/main.go). diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go index 993076a2..9e0d7271 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go @@ -2,609 +2,687 @@ package client import ( "bytes" - "compress/gzip" "encoding/json" + "errors" "fmt" - "io" "io/ioutil" "net" "net/http" "net/url" + "strconv" "strings" + "time" + + "github.com/influxdb/influxdb/models" ) const ( - UDPMaxMessageSize = 2048 + // DefaultHost is the default host used to connect to an InfluxDB instance + DefaultHost = "localhost" + + // DefaultPort is the default port used to connect to an InfluxDB instance + DefaultPort = 8086 + + // DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance + DefaultTimeout = 0 ) +// Query is used to send a command to the server. Both Command and Database are required. +type Query struct { + Command string + Database string +} + +// ParseConnectionString will parse a string to create a valid connection URL +func ParseConnectionString(path string, ssl bool) (url.URL, error) { + var host string + var port int + + h, p, err := net.SplitHostPort(path) + if err != nil { + if path == "" { + host = DefaultHost + } else { + host = path + } + // If they didn't specify a port, always use the default port + port = DefaultPort + } else { + host = h + port, err = strconv.Atoi(p) + if err != nil { + return url.URL{}, fmt.Errorf("invalid port number %q: %s\n", path, err) + } + } + + u := url.URL{ + Scheme: "http", + } + if ssl { + u.Scheme = "https" + } + + u.Host = net.JoinHostPort(host, strconv.Itoa(port)) + + return u, nil +} + +// Config is used to specify what server to connect to. +// URL: The URL of the server connecting to. +// Username/Password are optional. They will be passed via basic auth if provided. +// UserAgent: If not provided, will default "InfluxDBClient", +// Timeout: If not provided, will default to 0 (no timeout) +type Config struct { + URL url.URL + Username string + Password string + UserAgent string + Timeout time.Duration + Precision string +} + +// NewConfig will create a config to be used in connecting to the client +func NewConfig() Config { + return Config{ + Timeout: DefaultTimeout, + } +} + +// Client is used to make calls to the server. type Client struct { - host string - username string - password string - database string - httpClient *http.Client - udpConn *net.UDPConn - schema string - compression bool + url url.URL + username string + password string + httpClient *http.Client + userAgent string + precision string } -type ClientConfig struct { - Host string - Username string - Password string - Database string - HttpClient *http.Client - IsSecure bool - IsUDP bool -} - -var defaults *ClientConfig - -func init() { - defaults = &ClientConfig{ - Host: "localhost:8086", - Username: "root", - Password: "root", - Database: "", - HttpClient: http.DefaultClient, - } -} - -func getDefault(value, defaultValue string) string { - if value == "" { - return defaultValue - } - return value -} - -func New(config *ClientConfig) (*Client, error) { - return NewClient(config) -} - -func NewClient(config *ClientConfig) (*Client, error) { - host := getDefault(config.Host, defaults.Host) - username := getDefault(config.Username, defaults.Username) - password := getDefault(config.Password, defaults.Password) - database := getDefault(config.Database, defaults.Database) - if config.HttpClient == nil { - config.HttpClient = defaults.HttpClient - } - var udpConn *net.UDPConn - if config.IsUDP { - serverAddr, err := net.ResolveUDPAddr("udp", host) - if err != nil { - return nil, err - } - udpConn, err = net.DialUDP("udp", nil, serverAddr) - if err != nil { - return nil, err - } - } - - schema := "http" - if config.IsSecure { - schema = "https" - } - return &Client{host, username, password, database, config.HttpClient, udpConn, schema, false}, nil -} - -func (self *Client) DisableCompression() { - self.compression = false -} - -func (self *Client) getUrl(path string) string { - return self.getUrlWithUserAndPass(path, self.username, self.password) -} - -func (self *Client) getUrlWithUserAndPass(path, username, password string) string { - return fmt.Sprintf("%s://%s%s?u=%s&p=%s", self.schema, self.host, path, username, password) -} - -func responseToError(response *http.Response, err error, closeResponse bool) error { - if err != nil { - return err - } - if closeResponse { - defer response.Body.Close() - } - if response.StatusCode >= 200 && response.StatusCode < 300 { - return nil - } - defer response.Body.Close() - body, err := ioutil.ReadAll(response.Body) - if err != nil { - return err - } - return fmt.Errorf("Server returned (%d): %s", response.StatusCode, string(body)) -} - -func (self *Client) CreateDatabase(name string) error { - url := self.getUrl("/db") - payload := map[string]string{"name": name} - data, err := json.Marshal(payload) - if err != nil { - return err - } - resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) - return responseToError(resp, err, true) -} - -func (self *Client) del(url string) (*http.Response, error) { - return self.delWithBody(url, nil) -} - -func (self *Client) delWithBody(url string, body io.Reader) (*http.Response, error) { - req, err := http.NewRequest("DELETE", url, body) - if err != nil { - return nil, err - } - return self.httpClient.Do(req) -} - -func (self *Client) DeleteDatabase(name string) error { - url := self.getUrl("/db/" + name) - resp, err := self.del(url) - return responseToError(resp, err, true) -} - -func (self *Client) get(url string) ([]byte, error) { - resp, err := self.httpClient.Get(url) - err = responseToError(resp, err, false) - if err != nil { - return nil, err - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - return body, err -} - -func (self *Client) getWithVersion(url string) ([]byte, string, error) { - resp, err := self.httpClient.Get(url) - err = responseToError(resp, err, false) - if err != nil { - return nil, "", err - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - version := resp.Header.Get("X-Influxdb-Version") - fields := strings.Fields(version) - if len(fields) > 2 { - return body, fields[1], err - } - return body, "", err -} - -func (self *Client) listSomething(url string) ([]map[string]interface{}, error) { - body, err := self.get(url) - if err != nil { - return nil, err - } - somethings := []map[string]interface{}{} - err = json.Unmarshal(body, &somethings) - if err != nil { - return nil, err - } - return somethings, nil -} - -func (self *Client) GetDatabaseList() ([]map[string]interface{}, error) { - url := self.getUrl("/db") - return self.listSomething(url) -} - -func (self *Client) CreateClusterAdmin(name, password string) error { - url := self.getUrl("/cluster_admins") - payload := map[string]string{"name": name, "password": password} - data, err := json.Marshal(payload) - if err != nil { - return err - } - resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) - return responseToError(resp, err, true) -} - -func (self *Client) UpdateClusterAdmin(name, password string) error { - url := self.getUrl("/cluster_admins/" + name) - payload := map[string]string{"password": password} - data, err := json.Marshal(payload) - if err != nil { - return err - } - resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) - return responseToError(resp, err, true) -} - -func (self *Client) DeleteClusterAdmin(name string) error { - url := self.getUrl("/cluster_admins/" + name) - resp, err := self.del(url) - return responseToError(resp, err, true) -} - -func (self *Client) GetClusterAdminList() ([]map[string]interface{}, error) { - url := self.getUrl("/cluster_admins") - return self.listSomething(url) -} - -func (self *Client) Servers() ([]map[string]interface{}, error) { - url := self.getUrl("/cluster/servers") - return self.listSomething(url) -} - -func (self *Client) RemoveServer(id int) error { - resp, err := self.del(self.getUrl(fmt.Sprintf("/cluster/servers/%d", id))) - return responseToError(resp, err, true) -} - -// Creates a new database user for the given database. permissions can -// be omitted in which case the user will be able to read and write to -// all time series. If provided, there should be two strings, the -// first for read and the second for write. The strings are regexes -// that are used to match the time series name to determine whether -// the user has the ability to read/write to the given time series. -// -// client.CreateDatabaseUser("db", "user", "pass") -// // the following user cannot read from any series and can write -// // to the limited time series only -// client.CreateDatabaseUser("db", "limited", "pass", "^$", "limited") -func (self *Client) CreateDatabaseUser(database, name, password string, permissions ...string) error { - readMatcher, writeMatcher := ".*", ".*" - switch len(permissions) { - case 0: - case 2: - readMatcher, writeMatcher = permissions[0], permissions[1] - default: - return fmt.Errorf("You have to provide two ") - } - - url := self.getUrl("/db/" + database + "/users") - payload := map[string]string{"name": name, "password": password, "readFrom": readMatcher, "writeTo": writeMatcher} - data, err := json.Marshal(payload) - if err != nil { - return err - } - resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) - return responseToError(resp, err, true) -} - -// Change the cluster admin password -func (self *Client) ChangeClusterAdminPassword(name, newPassword string) error { - url := self.getUrl("/cluster_admins/" + name) - payload := map[string]interface{}{"password": newPassword} - data, err := json.Marshal(payload) - if err != nil { - return err - } - resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) - return responseToError(resp, err, true) -} - -// Change the user password, adming flag and optionally permissions -func (self *Client) ChangeDatabaseUser(database, name, newPassword string, isAdmin bool, newPermissions ...string) error { - switch len(newPermissions) { - case 0, 2: - default: - return fmt.Errorf("You have to provide two ") - } - - url := self.getUrl("/db/" + database + "/users/" + name) - payload := map[string]interface{}{"password": newPassword, "admin": isAdmin} - if len(newPermissions) == 2 { - payload["readFrom"] = newPermissions[0] - payload["writeTo"] = newPermissions[1] - } - data, err := json.Marshal(payload) - if err != nil { - return err - } - resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) - return responseToError(resp, err, true) -} - -// See Client.CreateDatabaseUser for more info on the permissions -// argument -func (self *Client) updateDatabaseUserCommon(database, name string, password *string, isAdmin *bool, permissions ...string) error { - url := self.getUrl("/db/" + database + "/users/" + name) - payload := map[string]interface{}{} - if password != nil { - payload["password"] = *password - } - if isAdmin != nil { - payload["admin"] = *isAdmin - } - switch len(permissions) { - case 0: - case 2: - payload["readFrom"] = permissions[0] - payload["writeTo"] = permissions[1] - default: - } - - data, err := json.Marshal(payload) - if err != nil { - return err - } - resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) - return responseToError(resp, err, true) -} - -func (self *Client) UpdateDatabaseUser(database, name, password string) error { - return self.updateDatabaseUserCommon(database, name, &password, nil) -} - -func (self *Client) UpdateDatabaseUserPermissions(database, name, readPermission, writePermissions string) error { - return self.updateDatabaseUserCommon(database, name, nil, nil, readPermission, writePermissions) -} - -func (self *Client) DeleteDatabaseUser(database, name string) error { - url := self.getUrl("/db/" + database + "/users/" + name) - resp, err := self.del(url) - return responseToError(resp, err, true) -} - -func (self *Client) GetDatabaseUserList(database string) ([]map[string]interface{}, error) { - url := self.getUrl("/db/" + database + "/users") - return self.listSomething(url) -} - -func (self *Client) AlterDatabasePrivilege(database, name string, isAdmin bool, permissions ...string) error { - return self.updateDatabaseUserCommon(database, name, nil, &isAdmin, permissions...) -} - -type TimePrecision string - const ( - Second TimePrecision = "s" - Millisecond TimePrecision = "m" - Microsecond TimePrecision = "u" + // ConsistencyOne requires at least one data node acknowledged a write. + ConsistencyOne = "one" + + // ConsistencyAll requires all data nodes to acknowledge a write. + ConsistencyAll = "all" + + // ConsistencyQuorum requires a quorum of data nodes to acknowledge a write. + ConsistencyQuorum = "quorum" + + // ConsistencyAny allows for hinted hand off, potentially no write happened yet. + ConsistencyAny = "any" ) -func (self *Client) WriteSeries(series []*Series) error { - return self.writeSeriesCommon(series, nil) +// NewClient will instantiate and return a connected client to issue commands to the server. +func NewClient(c Config) (*Client, error) { + client := Client{ + url: c.URL, + username: c.Username, + password: c.Password, + httpClient: &http.Client{Timeout: c.Timeout}, + userAgent: c.UserAgent, + precision: c.Precision, + } + if client.userAgent == "" { + client.userAgent = "InfluxDBClient" + } + return &client, nil } -func (self *Client) WriteSeriesOverUDP(series []*Series) error { - if self.udpConn == nil { - return fmt.Errorf("UDP isn't enabled. Make sure to set config.IsUDP to true") +// SetAuth will update the username and passwords +func (c *Client) SetAuth(u, p string) { + c.username = u + c.password = p +} + +// SetPrecision will update the precision +func (c *Client) SetPrecision(precision string) { + c.precision = precision +} + +// Query sends a command to the server and returns the Response +func (c *Client) Query(q Query) (*Response, error) { + u := c.url + + u.Path = "query" + values := u.Query() + values.Set("q", q.Command) + values.Set("db", q.Database) + if c.precision != "" { + values.Set("epoch", c.precision) + } + u.RawQuery = values.Encode() + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("User-Agent", c.userAgent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) } - data, err := json.Marshal(series) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var response Response + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + decErr := dec.Decode(&response) + + // ignore this error if we got an invalid status code + if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK { + decErr = nil + } + // If we got a valid decode error, send that back + if decErr != nil { + return nil, decErr + } + // If we don't have an error in our json response, and didn't get StatusOK, then send back an error + if resp.StatusCode != http.StatusOK && response.Error() == nil { + return &response, fmt.Errorf("received status code %d from server", resp.StatusCode) + } + return &response, nil +} + +// Write takes BatchPoints and allows for writing of multiple points with defaults +// If successful, error is nil and Response is nil +// If an error occurs, Response may contain additional information if populated. +func (c *Client) Write(bp BatchPoints) (*Response, error) { + u := c.url + u.Path = "write" + + var b bytes.Buffer + for _, p := range bp.Points { + if p.Raw != "" { + if _, err := b.WriteString(p.Raw); err != nil { + return nil, err + } + } else { + for k, v := range bp.Tags { + if p.Tags == nil { + p.Tags = make(map[string]string, len(bp.Tags)) + } + p.Tags[k] = v + } + + if _, err := b.WriteString(p.MarshalString()); err != nil { + return nil, err + } + } + + if err := b.WriteByte('\n'); err != nil { + return nil, err + } + } + + req, err := http.NewRequest("POST", u.String(), &b) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.userAgent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + precision := bp.Precision + if precision == "" { + precision = c.precision + } + + params := req.URL.Query() + params.Set("db", bp.Database) + params.Set("rp", bp.RetentionPolicy) + params.Set("precision", precision) + params.Set("consistency", bp.WriteConsistency) + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var response Response + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + var err = fmt.Errorf(string(body)) + response.Err = err + return &response, err + } + + return nil, nil +} + +// WriteLineProtocol takes a string with line returns to delimit each write +// If successful, error is nil and Response is nil +// If an error occurs, Response may contain additional information if populated. +func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) { + u := c.url + u.Path = "write" + + r := strings.NewReader(data) + + req, err := http.NewRequest("POST", u.String(), r) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.userAgent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + params := req.URL.Query() + params.Set("db", database) + params.Set("rp", retentionPolicy) + params.Set("precision", precision) + params.Set("consistency", writeConsistency) + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var response Response + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + err := fmt.Errorf(string(body)) + response.Err = err + return &response, err + } + + return nil, nil +} + +// Ping will check to see if the server is up +// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred. +func (c *Client) Ping() (time.Duration, string, error) { + now := time.Now() + u := c.url + u.Path = "ping" + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return 0, "", err + } + req.Header.Set("User-Agent", c.userAgent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return 0, "", err + } + defer resp.Body.Close() + + version := resp.Header.Get("X-Influxdb-Version") + return time.Since(now), version, nil +} + +// Structs + +// Result represents a resultset returned from a single statement. +type Result struct { + Series []models.Row + Err error +} + +// MarshalJSON encodes the result into JSON. +func (r *Result) MarshalJSON() ([]byte, error) { + // Define a struct that outputs "error" as a string. + var o struct { + Series []models.Row `json:"series,omitempty"` + Err string `json:"error,omitempty"` + } + + // Copy fields to output struct. + o.Series = r.Series + if r.Err != nil { + o.Err = r.Err.Error() + } + + return json.Marshal(&o) +} + +// UnmarshalJSON decodes the data into the Result struct +func (r *Result) UnmarshalJSON(b []byte) error { + var o struct { + Series []models.Row `json:"series,omitempty"` + Err string `json:"error,omitempty"` + } + + dec := json.NewDecoder(bytes.NewBuffer(b)) + dec.UseNumber() + err := dec.Decode(&o) if err != nil { return err } - // because max of msg over upd is 2048 bytes - // https://github.com/influxdb/influxdb/blob/master/src/api/udp/api.go#L65 - if len(data) >= UDPMaxMessageSize { - err = fmt.Errorf("data size over limit %v limit is %v", len(data), UDPMaxMessageSize) - fmt.Println(err) - return err - } - _, err = self.udpConn.Write(data) - if err != nil { - return err + r.Series = o.Series + if o.Err != "" { + r.Err = errors.New(o.Err) } return nil } -func (self *Client) WriteSeriesWithTimePrecision(series []*Series, timePrecision TimePrecision) error { - return self.writeSeriesCommon(series, map[string]string{"time_precision": string(timePrecision)}) +// Response represents a list of statement results. +type Response struct { + Results []Result + Err error } -func (self *Client) writeSeriesCommon(series []*Series, options map[string]string) error { - data, err := json.Marshal(series) +// MarshalJSON encodes the response into JSON. +func (r *Response) MarshalJSON() ([]byte, error) { + // Define a struct that outputs "error" as a string. + var o struct { + Results []Result `json:"results,omitempty"` + Err string `json:"error,omitempty"` + } + + // Copy fields to output struct. + o.Results = r.Results + if r.Err != nil { + o.Err = r.Err.Error() + } + + return json.Marshal(&o) +} + +// UnmarshalJSON decodes the data into the Response struct +func (r *Response) UnmarshalJSON(b []byte) error { + var o struct { + Results []Result `json:"results,omitempty"` + Err string `json:"error,omitempty"` + } + + dec := json.NewDecoder(bytes.NewBuffer(b)) + dec.UseNumber() + err := dec.Decode(&o) if err != nil { return err } - url := self.getUrl("/db/" + self.database + "/series") - for name, value := range options { - url += fmt.Sprintf("&%s=%s", name, value) + r.Results = o.Results + if o.Err != "" { + r.Err = errors.New(o.Err) } - var b *bytes.Buffer - if self.compression { - b = bytes.NewBuffer(nil) - w := gzip.NewWriter(b) - if _, err := w.Write(data); err != nil { + return nil +} + +// Error returns the first error from any statement. +// Returns nil if no errors occurred on any statements. +func (r Response) Error() error { + if r.Err != nil { + return r.Err + } + for _, result := range r.Results { + if result.Err != nil { + return result.Err + } + } + return nil +} + +// Point defines the fields that will be written to the database +// Measurement, Time, and Fields are required +// Precision can be specified if the time is in epoch format (integer). +// Valid values for Precision are n, u, ms, s, m, and h +type Point struct { + Measurement string + Tags map[string]string + Time time.Time + Fields map[string]interface{} + Precision string + Raw string +} + +// MarshalJSON will format the time in RFC3339Nano +// Precision is also ignored as it is only used for writing, not reading +// Or another way to say it is we always send back in nanosecond precision +func (p *Point) MarshalJSON() ([]byte, error) { + point := struct { + Measurement string `json:"measurement,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Time string `json:"time,omitempty"` + Fields map[string]interface{} `json:"fields,omitempty"` + Precision string `json:"precision,omitempty"` + }{ + Measurement: p.Measurement, + Tags: p.Tags, + Fields: p.Fields, + Precision: p.Precision, + } + // Let it omit empty if it's really zero + if !p.Time.IsZero() { + point.Time = p.Time.UTC().Format(time.RFC3339Nano) + } + return json.Marshal(&point) +} + +// MarshalString renders string representation of a Point with specified +// precision. The default precision is nanoseconds. +func (p *Point) MarshalString() string { + pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time) + if err != nil { + return "# ERROR: " + err.Error() + " " + p.Measurement + } + if p.Precision == "" || p.Precision == "ns" || p.Precision == "n" { + return pt.String() + } + return pt.PrecisionString(p.Precision) +} + +// UnmarshalJSON decodes the data into the Point struct +func (p *Point) UnmarshalJSON(b []byte) error { + var normal struct { + Measurement string `json:"measurement"` + Tags map[string]string `json:"tags"` + Time time.Time `json:"time"` + Precision string `json:"precision"` + Fields map[string]interface{} `json:"fields"` + } + var epoch struct { + Measurement string `json:"measurement"` + Tags map[string]string `json:"tags"` + Time *int64 `json:"time"` + Precision string `json:"precision"` + Fields map[string]interface{} `json:"fields"` + } + + if err := func() error { + var err error + dec := json.NewDecoder(bytes.NewBuffer(b)) + dec.UseNumber() + if err = dec.Decode(&epoch); err != nil { return err } - w.Flush() - w.Close() - } else { - b = bytes.NewBuffer(data) + // Convert from epoch to time.Time, but only if Time + // was actually set. + var ts time.Time + if epoch.Time != nil { + ts, err = EpochToTime(*epoch.Time, epoch.Precision) + if err != nil { + return err + } + } + p.Measurement = epoch.Measurement + p.Tags = epoch.Tags + p.Time = ts + p.Precision = epoch.Precision + p.Fields = normalizeFields(epoch.Fields) + return nil + }(); err == nil { + return nil } - req, err := http.NewRequest("POST", url, b) - if err != nil { + + dec := json.NewDecoder(bytes.NewBuffer(b)) + dec.UseNumber() + if err := dec.Decode(&normal); err != nil { return err } - if self.compression { - req.Header.Set("Content-Encoding", "gzip") + normal.Time = SetPrecision(normal.Time, normal.Precision) + p.Measurement = normal.Measurement + p.Tags = normal.Tags + p.Time = normal.Time + p.Precision = normal.Precision + p.Fields = normalizeFields(normal.Fields) + + return nil +} + +// Remove any notion of json.Number +func normalizeFields(fields map[string]interface{}) map[string]interface{} { + newFields := map[string]interface{}{} + + for k, v := range fields { + switch v := v.(type) { + case json.Number: + jv, e := v.Float64() + if e != nil { + panic(fmt.Sprintf("unable to convert json.Number to float64: %s", e)) + } + newFields[k] = jv + default: + newFields[k] = v + } } - resp, err := self.httpClient.Do(req) - return responseToError(resp, err, true) + return newFields } -func (self *Client) Query(query string, precision ...TimePrecision) ([]*Series, error) { - return self.queryCommon(query, false, precision...) +// BatchPoints is used to send batched data in a single write. +// Database and Points are required +// If no retention policy is specified, it will use the databases default retention policy. +// If tags are specified, they will be "merged" with all points. If a point already has that tag, it will be ignored. +// If time is specified, it will be applied to any point with an empty time. +// Precision can be specified if the time is in epoch format (integer). +// Valid values for Precision are n, u, ms, s, m, and h +type BatchPoints struct { + Points []Point `json:"points,omitempty"` + Database string `json:"database,omitempty"` + RetentionPolicy string `json:"retentionPolicy,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Time time.Time `json:"time,omitempty"` + Precision string `json:"precision,omitempty"` + WriteConsistency string `json:"-"` } -func (self *Client) QueryWithNumbers(query string, precision ...TimePrecision) ([]*Series, error) { - return self.queryCommon(query, true, precision...) -} - -func (self *Client) queryCommon(query string, useNumber bool, precision ...TimePrecision) ([]*Series, error) { - escapedQuery := url.QueryEscape(query) - url := self.getUrl("/db/" + self.database + "/series") - if len(precision) > 0 { - url += "&time_precision=" + string(precision[0]) +// UnmarshalJSON decodes the data into the BatchPoints struct +func (bp *BatchPoints) UnmarshalJSON(b []byte) error { + var normal struct { + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Time time.Time `json:"time"` + Precision string `json:"precision"` } - url += "&q=" + escapedQuery - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, err - } - if !self.compression { - req.Header.Set("Accept-Encoding", "identity") - } - resp, err := self.httpClient.Do(req) - err = responseToError(resp, err, false) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - series := []*Series{} - decoder := json.NewDecoder(resp.Body) - if useNumber { - decoder.UseNumber() - } - err = decoder.Decode(&series) - if err != nil { - return nil, err - } - return series, nil -} - -func (self *Client) Ping() error { - url := self.getUrl("/ping") - resp, err := self.httpClient.Get(url) - return responseToError(resp, err, true) -} - -func (self *Client) AuthenticateDatabaseUser(database, username, password string) error { - url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/authenticate", database), username, password) - resp, err := self.httpClient.Get(url) - return responseToError(resp, err, true) -} - -func (self *Client) AuthenticateClusterAdmin(username, password string) error { - url := self.getUrlWithUserAndPass("/cluster_admins/authenticate", username, password) - resp, err := self.httpClient.Get(url) - return responseToError(resp, err, true) -} - -func (self *Client) GetContinuousQueries() ([]map[string]interface{}, error) { - url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/continuous_queries", self.database), self.username, self.password) - return self.listSomething(url) -} - -func (self *Client) DeleteContinuousQueries(id int) error { - url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/continuous_queries/%d", self.database, id), self.username, self.password) - resp, err := self.del(url) - return responseToError(resp, err, true) -} - -type LongTermShortTermShards struct { - // Long term shards, (doesn't get populated for version >= 0.8.0) - LongTerm []*Shard `json:"longTerm"` - // Short term shards, (doesn't get populated for version >= 0.8.0) - ShortTerm []*Shard `json:"shortTerm"` - // All shards in the system (Long + Short term shards for version < 0.8.0) - All []*Shard `json:"-"` -} - -type Shard struct { - Id uint32 `json:"id"` - EndTime int64 `json:"endTime"` - StartTime int64 `json:"startTime"` - ServerIds []uint32 `json:"serverIds"` - SpaceName string `json:"spaceName"` - Database string `json:"database"` -} - -type ShardSpaceCollection struct { - ShardSpaces []ShardSpace -} - -func (self *Client) GetShards() (*LongTermShortTermShards, error) { - url := self.getUrlWithUserAndPass("/cluster/shards", self.username, self.password) - body, version, err := self.getWithVersion(url) - if err != nil { - return nil, err - } - return parseShards(body, version) -} - -func isOrNewerThan(version, reference string) bool { - if version == "vdev" { - return true - } - majorMinor := strings.Split(version[1:], ".")[:2] - refMajorMinor := strings.Split(reference[1:], ".")[:2] - if majorMinor[0] > refMajorMinor[0] { - return true - } - if majorMinor[1] > refMajorMinor[1] { - return true - } - return majorMinor[1] == refMajorMinor[1] -} - -func parseShards(body []byte, version string) (*LongTermShortTermShards, error) { - // strip the initial v in `v0.8.0` and split on the dots - if version != "" && isOrNewerThan(version, "v0.8") { - return parseNewShards(body) - } - shards := &LongTermShortTermShards{} - err := json.Unmarshal(body, &shards) - if err != nil { - return nil, err + var epoch struct { + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Time *int64 `json:"time"` + Precision string `json:"precision"` } - shards.All = make([]*Shard, len(shards.LongTerm)+len(shards.ShortTerm)) - copy(shards.All, shards.LongTerm) - copy(shards.All[len(shards.LongTerm):], shards.ShortTerm) - return shards, nil -} - -func parseNewShards(body []byte) (*LongTermShortTermShards, error) { - shards := []*Shard{} - err := json.Unmarshal(body, &shards) - if err != nil { - return nil, err + if err := func() error { + var err error + if err = json.Unmarshal(b, &epoch); err != nil { + return err + } + // Convert from epoch to time.Time + var ts time.Time + if epoch.Time != nil { + ts, err = EpochToTime(*epoch.Time, epoch.Precision) + if err != nil { + return err + } + } + bp.Points = epoch.Points + bp.Database = epoch.Database + bp.RetentionPolicy = epoch.RetentionPolicy + bp.Tags = epoch.Tags + bp.Time = ts + bp.Precision = epoch.Precision + return nil + }(); err == nil { + return nil } - return &LongTermShortTermShards{All: shards}, nil -} - -// Added to InfluxDB in 0.8.0 -func (self *Client) GetShardSpaces() ([]*ShardSpace, error) { - url := self.getUrlWithUserAndPass("/cluster/shard_spaces", self.username, self.password) - body, err := self.get(url) - if err != nil { - return nil, err - } - spaces := []*ShardSpace{} - err = json.Unmarshal(body, &spaces) - if err != nil { - return nil, err - } - - return spaces, nil -} - -// Added to InfluxDB in 0.8.0 -func (self *Client) DropShardSpace(database, name string) error { - url := self.getUrlWithUserAndPass(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name), self.username, self.password) - _, err := self.del(url) - return err -} - -// Added to InfluxDB in 0.8.0 -func (self *Client) CreateShardSpace(space *ShardSpace) error { - url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces")) - data, err := json.Marshal(space) - if err != nil { + if err := json.Unmarshal(b, &normal); err != nil { return err } - resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) - return responseToError(resp, err, true) + normal.Time = SetPrecision(normal.Time, normal.Precision) + bp.Points = normal.Points + bp.Database = normal.Database + bp.RetentionPolicy = normal.RetentionPolicy + bp.Tags = normal.Tags + bp.Time = normal.Time + bp.Precision = normal.Precision + + return nil } -func (self *Client) DropShard(id uint32, serverIds []uint32) error { - url := self.getUrlWithUserAndPass(fmt.Sprintf("/cluster/shards/%d", id), self.username, self.password) - ids := map[string][]uint32{"serverIds": serverIds} - body, err := json.Marshal(ids) - if err != nil { - return err - } - _, err = self.delWithBody(url, bytes.NewBuffer(body)) - return err +// utility functions + +// Addr provides the current url as a string of the server the client is connected to. +func (c *Client) Addr() string { + return c.url.String() +} + +// helper functions + +// EpochToTime takes a unix epoch time and uses precision to return back a time.Time +func EpochToTime(epoch int64, precision string) (time.Time, error) { + if precision == "" { + precision = "s" + } + var t time.Time + switch precision { + case "h": + t = time.Unix(0, epoch*int64(time.Hour)) + case "m": + t = time.Unix(0, epoch*int64(time.Minute)) + case "s": + t = time.Unix(0, epoch*int64(time.Second)) + case "ms": + t = time.Unix(0, epoch*int64(time.Millisecond)) + case "u": + t = time.Unix(0, epoch*int64(time.Microsecond)) + case "n": + t = time.Unix(0, epoch) + default: + return time.Time{}, fmt.Errorf("Unknown precision %q", precision) + } + return t, nil +} + +// SetPrecision will round a time to the specified precision +func SetPrecision(t time.Time, precision string) time.Time { + switch precision { + case "n": + case "u": + return t.Round(time.Microsecond) + case "ms": + return t.Round(time.Millisecond) + case "s": + return t.Round(time.Second) + case "m": + return t.Round(time.Minute) + case "h": + return t.Round(time.Hour) + } + return t } diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/series.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/series.go deleted file mode 100644 index f18b8bbb..00000000 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/series.go +++ /dev/null @@ -1,19 +0,0 @@ -package client - -type Series struct { - Name string `json:"name"` - Columns []string `json:"columns"` - Points [][]interface{} `json:"points"` -} - -func (self *Series) GetName() string { - return self.Name -} - -func (self *Series) GetColumns() []string { - return self.Columns -} - -func (self *Series) GetPoints() [][]interface{} { - return self.Points -} diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/shard_space.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/shard_space.go deleted file mode 100644 index 87dea117..00000000 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/shard_space.go +++ /dev/null @@ -1,15 +0,0 @@ -package client - -type ShardSpace struct { - // required, must be unique within the database - Name string `json:"name"` - // required, a database has many shard spaces and a shard space belongs to a database - Database string `json:"database"` - // this is optional, if they don't set it, we'll set to /.*/ - Regex string `json:"regex"` - // this is optional, if they don't set it, it will default to the storage.dir in the config - RetentionPolicy string `json:"retentionPolicy"` - ShardDuration string `json:"shardDuration"` - ReplicationFactor uint32 `json:"replicationFactor"` - Split uint32 `json:"split"` -} diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/v2/client.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/v2/client.go new file mode 100644 index 00000000..4dce0c28 --- /dev/null +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/v2/client.go @@ -0,0 +1,498 @@ +package client + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "time" + + "github.com/influxdb/influxdb/models" +) + +// UDPPayloadSize is a reasonable default payload size for UDP packets that +// could be travelling over the internet. +const ( + UDPPayloadSize = 512 +) + +type HTTPConfig struct { + // Addr should be of the form "http://host:port" + // or "http://[ipv6-host%zone]:port". + Addr string + + // Username is the influxdb username, optional + Username string + + // Password is the influxdb password, optional + Password string + + // UserAgent is the http User Agent, defaults to "InfluxDBClient" + UserAgent string + + // Timeout for influxdb writes, defaults to no timeout + Timeout time.Duration + + // InsecureSkipVerify gets passed to the http client, if true, it will + // skip https certificate verification. Defaults to false + InsecureSkipVerify bool +} + +type UDPConfig struct { + // Addr should be of the form "host:port" + // or "[ipv6-host%zone]:port". + Addr string + + // PayloadSize is the maximum size of a UDP client message, optional + // Tune this based on your network. Defaults to UDPBufferSize. + PayloadSize int +} + +type BatchPointsConfig struct { + // Precision is the write precision of the points, defaults to "ns" + Precision string + + // Database is the database to write points to + Database string + + // RetentionPolicy is the retention policy of the points + RetentionPolicy string + + // Write consistency is the number of servers required to confirm write + WriteConsistency string +} + +// Client is a client interface for writing & querying the database +type Client interface { + // Write takes a BatchPoints object and writes all Points to InfluxDB. + Write(bp BatchPoints) error + + // Query makes an InfluxDB Query on the database. This will fail if using + // the UDP client. + Query(q Query) (*Response, error) + + // Close releases any resources a Client may be using. + Close() error +} + +// NewClient creates a client interface from the given config. +func NewHTTPClient(conf HTTPConfig) (Client, error) { + if conf.UserAgent == "" { + conf.UserAgent = "InfluxDBClient" + } + + u, err := url.Parse(conf.Addr) + if err != nil { + return nil, err + } else if u.Scheme != "http" && u.Scheme != "https" { + m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+ + " must start with http:// or https://", u.Scheme) + return nil, errors.New(m) + } + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: conf.InsecureSkipVerify, + }, + } + return &client{ + url: u, + username: conf.Username, + password: conf.Password, + useragent: conf.UserAgent, + httpClient: &http.Client{ + Timeout: conf.Timeout, + Transport: tr, + }, + }, nil +} + +// Close releases the client's resources. +func (c *client) Close() error { + return nil +} + +// NewUDPClient returns a client interface for writing to an InfluxDB UDP +// service from the given config. +func NewUDPClient(conf UDPConfig) (Client, error) { + var udpAddr *net.UDPAddr + udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr) + if err != nil { + return nil, err + } + + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + + payloadSize := conf.PayloadSize + if payloadSize == 0 { + payloadSize = UDPPayloadSize + } + + return &udpclient{ + conn: conn, + payloadSize: payloadSize, + }, nil +} + +// Close releases the udpclient's resources. +func (uc *udpclient) Close() error { + return uc.conn.Close() +} + +type client struct { + url *url.URL + username string + password string + useragent string + httpClient *http.Client +} + +type udpclient struct { + conn *net.UDPConn + payloadSize int +} + +// BatchPoints is an interface into a batched grouping of points to write into +// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate +// batch for each goroutine. +type BatchPoints interface { + // AddPoint adds the given point to the Batch of points + AddPoint(p *Point) + // Points lists the points in the Batch + Points() []*Point + + // Precision returns the currently set precision of this Batch + Precision() string + // SetPrecision sets the precision of this batch. + SetPrecision(s string) error + + // Database returns the currently set database of this Batch + Database() string + // SetDatabase sets the database of this Batch + SetDatabase(s string) + + // WriteConsistency returns the currently set write consistency of this Batch + WriteConsistency() string + // SetWriteConsistency sets the write consistency of this Batch + SetWriteConsistency(s string) + + // RetentionPolicy returns the currently set retention policy of this Batch + RetentionPolicy() string + // SetRetentionPolicy sets the retention policy of this Batch + SetRetentionPolicy(s string) +} + +// NewBatchPoints returns a BatchPoints interface based on the given config. +func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) { + if conf.Precision == "" { + conf.Precision = "ns" + } + if _, err := time.ParseDuration("1" + conf.Precision); err != nil { + return nil, err + } + bp := &batchpoints{ + database: conf.Database, + precision: conf.Precision, + retentionPolicy: conf.RetentionPolicy, + writeConsistency: conf.WriteConsistency, + } + return bp, nil +} + +type batchpoints struct { + points []*Point + database string + precision string + retentionPolicy string + writeConsistency string +} + +func (bp *batchpoints) AddPoint(p *Point) { + bp.points = append(bp.points, p) +} + +func (bp *batchpoints) Points() []*Point { + return bp.points +} + +func (bp *batchpoints) Precision() string { + return bp.precision +} + +func (bp *batchpoints) Database() string { + return bp.database +} + +func (bp *batchpoints) WriteConsistency() string { + return bp.writeConsistency +} + +func (bp *batchpoints) RetentionPolicy() string { + return bp.retentionPolicy +} + +func (bp *batchpoints) SetPrecision(p string) error { + if _, err := time.ParseDuration("1" + p); err != nil { + return err + } + bp.precision = p + return nil +} + +func (bp *batchpoints) SetDatabase(db string) { + bp.database = db +} + +func (bp *batchpoints) SetWriteConsistency(wc string) { + bp.writeConsistency = wc +} + +func (bp *batchpoints) SetRetentionPolicy(rp string) { + bp.retentionPolicy = rp +} + +type Point struct { + pt models.Point +} + +// NewPoint returns a point with the given timestamp. If a timestamp is not +// given, then data is sent to the database without a timestamp, in which case +// the server will assign local time upon reception. NOTE: it is recommended +// to send data without a timestamp. +func NewPoint( + name string, + tags map[string]string, + fields map[string]interface{}, + t ...time.Time, +) (*Point, error) { + var T time.Time + if len(t) > 0 { + T = t[0] + } + + pt, err := models.NewPoint(name, tags, fields, T) + if err != nil { + return nil, err + } + return &Point{ + pt: pt, + }, nil +} + +// String returns a line-protocol string of the Point +func (p *Point) String() string { + return p.pt.String() +} + +// PrecisionString returns a line-protocol string of the Point, at precision +func (p *Point) PrecisionString(precison string) string { + return p.pt.PrecisionString(precison) +} + +// Name returns the measurement name of the point +func (p *Point) Name() string { + return p.pt.Name() +} + +// Name returns the tags associated with the point +func (p *Point) Tags() map[string]string { + return p.pt.Tags() +} + +// Time return the timestamp for the point +func (p *Point) Time() time.Time { + return p.pt.Time() +} + +// UnixNano returns the unix nano time of the point +func (p *Point) UnixNano() int64 { + return p.pt.UnixNano() +} + +// Fields returns the fields for the point +func (p *Point) Fields() map[string]interface{} { + return p.pt.Fields() +} + +func (uc *udpclient) Write(bp BatchPoints) error { + var b bytes.Buffer + var d time.Duration + d, _ = time.ParseDuration("1" + bp.Precision()) + + for _, p := range bp.Points() { + pointstring := p.pt.RoundedString(d) + "\n" + + // Write and reset the buffer if we reach the max size + if b.Len()+len(pointstring) >= uc.payloadSize { + if _, err := uc.conn.Write(b.Bytes()); err != nil { + return err + } + b.Reset() + } + + if _, err := b.WriteString(pointstring); err != nil { + return err + } + } + + _, err := uc.conn.Write(b.Bytes()) + return err +} + +func (c *client) Write(bp BatchPoints) error { + var b bytes.Buffer + + for _, p := range bp.Points() { + if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil { + return err + } + + if err := b.WriteByte('\n'); err != nil { + return err + } + } + + u := c.url + u.Path = "write" + req, err := http.NewRequest("POST", u.String(), &b) + if err != nil { + return err + } + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.useragent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + params := req.URL.Query() + params.Set("db", bp.Database()) + params.Set("rp", bp.RetentionPolicy()) + params.Set("precision", bp.Precision()) + params.Set("consistency", bp.WriteConsistency()) + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + var err = fmt.Errorf(string(body)) + return err + } + + return nil +} + +// Query defines a query to send to the server +type Query struct { + Command string + Database string + Precision string +} + +// NewQuery returns a query object +// database and precision strings can be empty strings if they are not needed +// for the query. +func NewQuery(command, database, precision string) Query { + return Query{ + Command: command, + Database: database, + Precision: precision, + } +} + +// Response represents a list of statement results. +type Response struct { + Results []Result + Err error +} + +// Error returns the first error from any statement. +// Returns nil if no errors occurred on any statements. +func (r *Response) Error() error { + if r.Err != nil { + return r.Err + } + for _, result := range r.Results { + if result.Err != nil { + return result.Err + } + } + return nil +} + +// Result represents a resultset returned from a single statement. +type Result struct { + Series []models.Row + Err error +} + +func (uc *udpclient) Query(q Query) (*Response, error) { + return nil, fmt.Errorf("Querying via UDP is not supported") +} + +// Query sends a command to the server and returns the Response +func (c *client) Query(q Query) (*Response, error) { + u := c.url + u.Path = "query" + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.useragent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + params := req.URL.Query() + params.Set("q", q.Command) + params.Set("db", q.Database) + if q.Precision != "" { + params.Set("epoch", q.Precision) + } + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var response Response + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + decErr := dec.Decode(&response) + + // ignore this error if we got an invalid status code + if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK { + decErr = nil + } + // If we got a valid decode error, send that back + if decErr != nil { + return nil, decErr + } + // If we don't have an error in our json response, and didn't get statusOK + // then send back an error + if resp.StatusCode != http.StatusOK && response.Error() == nil { + return &response, fmt.Errorf("received status code %d from server", + resp.StatusCode) + } + return &response, nil +}