moving deps to vendor subdirectory

Signed-off-by: Vishnu kannan <vishnuk@google.com>
This commit is contained in:
Vishnu kannan 2016-07-11 12:19:43 -07:00
parent 40fd72ef26
commit de4af1288b
982 changed files with 259 additions and 16 deletions

2
Godeps/Godeps.json generated
View File

@ -1,6 +1,6 @@
{ {
"ImportPath": "github.com/google/cadvisor", "ImportPath": "github.com/google/cadvisor",
"GoVersion": "go1.5", "GoVersion": "go1.6",
"GodepVersion": "v74", "GodepVersion": "v74",
"Packages": [ "Packages": [
"./..." "./..."

2
Godeps/_workspace/.gitignore generated vendored
View File

@ -1,2 +0,0 @@
/pkg
/bin

View File

@ -1,7 +1,7 @@
language: go language: go
go: go:
- 1.4.3 - 1.4.3
- 1.5.2 - 1.5.3
env: env:
global: global:
@ -11,7 +11,6 @@ env:
- KAFKA_HOSTNAME=localhost - KAFKA_HOSTNAME=localhost
- DEBUG=true - DEBUG=true
matrix: matrix:
- KAFKA_VERSION=0.8.1.1
- KAFKA_VERSION=0.8.2.2 - KAFKA_VERSION=0.8.2.2
- KAFKA_VERSION=0.9.0.0 - KAFKA_VERSION=0.9.0.0

View File

@ -1,5 +1,24 @@
# Changelog # Changelog
#### Version 1.8.0 (2016-02-01)
New Features:
- Full support for Kafka 0.9:
- All protocol messages and fields
([#586](https://github.com/Shopify/sarama/pull/586),
[#588](https://github.com/Shopify/sarama/pull/588),
[#590](https://github.com/Shopify/sarama/pull/590)).
- Verified that TLS support works
([#581](https://github.com/Shopify/sarama/pull/581)).
- Fixed the OffsetManager compatibility
([#585](https://github.com/Shopify/sarama/pull/585)).
Improvements:
- Optimize for fewer system calls when reading from the network
([#584](https://github.com/Shopify/sarama/pull/584)).
- Automatically retry `InvalidMessage` errors to match upstream behaviour
([#589](https://github.com/Shopify/sarama/pull/589)).
#### Version 1.7.0 (2015-12-11) #### Version 1.7.0 (2015-12-11)
New Features: New Features:

View File

@ -7,7 +7,7 @@ vet:
go vet ./... go vet ./...
errcheck: errcheck:
errcheck github.com/Shopify/sarama/... @if go version | grep -q go1.5; then errcheck github.com/Shopify/sarama/...; fi
fmt: fmt:
@if [ -n "$$(go fmt ./...)" ]; then echo 'Please run go fmt on your code.' && exit 1; fi @if [ -n "$$(go fmt ./...)" ]; then echo 'Please run go fmt on your code.' && exit 1; fi
@ -15,7 +15,7 @@ fmt:
install_dependencies: install_errcheck install_go_vet get install_dependencies: install_errcheck install_go_vet get
install_errcheck: install_errcheck:
go get github.com/kisielk/errcheck @if go version | grep -q go1.5; then go get github.com/kisielk/errcheck; fi
install_go_vet: install_go_vet:
go get golang.org/x/tools/cmd/vet go get golang.org/x/tools/cmd/vet

View File

@ -18,7 +18,7 @@ Sarama is an MIT-licensed Go client library for [Apache Kafka](https://kafka.apa
Sarama provides a "2 releases + 2 months" compatibility guarantee: we support Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
the two latest stable releases of Kafka and Go, and we provide a two month the two latest stable releases of Kafka and Go, and we provide a two month
grace period for older releases. This means we currently officially support grace period for older releases. This means we currently officially support
Go 1.4 and 1.5, and Kafka 0.8.1 and 0.8.2, although older releases are still Go 1.5 and 1.4, and Kafka 0.9.0 and 0.8.2, although older releases are still
likely to work. likely to work.
Sarama follows semantic versioning and provides API stability via the gopkg.in service. Sarama follows semantic versioning and provides API stability via the gopkg.in service.

View File

@ -581,7 +581,8 @@ func (bp *brokerProducer) run() {
select { select {
case msg := <-bp.input: case msg := <-bp.input:
if msg == nil { if msg == nil {
goto shutdown bp.shutdown()
return
} }
if msg.flags&syn == syn { if msg.flags&syn == syn {
@ -637,8 +638,9 @@ func (bp *brokerProducer) run() {
output = nil output = nil
} }
} }
}
shutdown: func (bp *brokerProducer) shutdown() {
for !bp.buffer.empty() { for !bp.buffer.empty() {
select { select {
case response := <-bp.responses: case response := <-bp.responses:
@ -725,7 +727,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
} }
bp.parent.returnSuccesses(msgs) bp.parent.returnSuccesses(msgs)
// Retriable errors // Retriable errors
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err) bp.broker.ID(), topic, partition, block.Err)

View File

@ -85,6 +85,7 @@ func (b *Broker) Open(conf *Config) error {
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
return return
} }
b.conn = newBufConn(b.conn)
b.conf = conf b.conf = conf
b.done = make(chan bool) b.done = make(chan bool)
@ -239,6 +240,72 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
return response, nil return response, nil
} }
func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
response := new(JoinGroupResponse)
err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}
return response, nil
}
func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
response := new(SyncGroupResponse)
err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}
return response, nil
}
func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
response := new(LeaveGroupResponse)
err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}
return response, nil
}
func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
response := new(HeartbeatResponse)
err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}
return response, nil
}
func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
response := new(ListGroupsResponse)
err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}
return response, nil
}
func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
response := new(DescribeGroupsResponse)
err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}
return response, nil
}
func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) { func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock() b.lock.Lock()
defer b.lock.Unlock() defer b.lock.Unlock()

View File

@ -21,8 +21,6 @@ type Config struct {
ReadTimeout time.Duration // How long to wait for a response. ReadTimeout time.Duration // How long to wait for a response.
WriteTimeout time.Duration // How long to wait for a transmit. WriteTimeout time.Duration // How long to wait for a transmit.
// NOTE: these config values have no compatibility guarantees; they may
// change when Kafka releases its official TLS support in version 0.9.
TLS struct { TLS struct {
// Whether or not to use TLS when connecting to the broker // Whether or not to use TLS when connecting to the broker
// (defaults to false). // (defaults to false).

View File

@ -0,0 +1,94 @@
package sarama
type ConsumerGroupMemberMetadata struct {
Version int16
Topics []string
UserData []byte
}
func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
pe.putInt16(m.Version)
if err := pe.putStringArray(m.Topics); err != nil {
return err
}
if err := pe.putBytes(m.UserData); err != nil {
return err
}
return nil
}
func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}
if m.Topics, err = pd.getStringArray(); err != nil {
return
}
if m.UserData, err = pd.getBytes(); err != nil {
return
}
return nil
}
type ConsumerGroupMemberAssignment struct {
Version int16
Topics map[string][]int32
UserData []byte
}
func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
pe.putInt16(m.Version)
if err := pe.putArrayLength(len(m.Topics)); err != nil {
return err
}
for topic, partitions := range m.Topics {
if err := pe.putString(topic); err != nil {
return err
}
if err := pe.putInt32Array(partitions); err != nil {
return err
}
}
if err := pe.putBytes(m.UserData); err != nil {
return err
}
return nil
}
func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}
var topicLen int
if topicLen, err = pd.getArrayLength(); err != nil {
return
}
m.Topics = make(map[string][]int32, topicLen)
for i := 0; i < topicLen; i++ {
var topic string
if topic, err = pd.getString(); err != nil {
return
}
if m.Topics[topic], err = pd.getInt32Array(); err != nil {
return
}
}
if m.UserData, err = pd.getBytes(); err != nil {
return
}
return nil
}

View File

@ -92,3 +92,13 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
r.GroupProtocols[name] = metadata r.GroupProtocols[name] = metadata
} }
func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
bin, err := encode(metadata)
if err != nil {
return err
}
r.AddGroupProtocol(name, bin)
return nil
}

View File

@ -9,6 +9,18 @@ type JoinGroupResponse struct {
Members map[string][]byte Members map[string][]byte
} }
func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
for id, bin := range r.Members {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(bin, meta); err != nil {
return nil, err
}
members[id] = *meta
}
return members, nil
}
func (r *JoinGroupResponse) encode(pe packetEncoder) error { func (r *JoinGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err)) pe.putInt16(int16(r.Err))
pe.putInt32(r.GenerationId) pe.putInt32(r.GenerationId)

View File

@ -5,6 +5,11 @@ package sarama
// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2. // The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
const ReceiveTime int64 = -1 const ReceiveTime int64 = -1
// GroupGenerationUndefined is a special value for the group generation field of
// Offset Commit Requests that should be used when a consumer group does not rely
// on Kafka for partition management.
const GroupGenerationUndefined = -1
type offsetCommitRequestBlock struct { type offsetCommitRequestBlock struct {
offset int64 offset int64
timestamp int64 timestamp int64

View File

@ -476,8 +476,9 @@ func (bom *brokerOffsetManager) flushToBroker() {
func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest { func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
r := &OffsetCommitRequest{ r := &OffsetCommitRequest{
Version: 1, Version: 1,
ConsumerGroup: bom.parent.group, ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
} }
for s := range bom.subscriptions { for s := range bom.subscriptions {

View File

@ -84,3 +84,13 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment
r.GroupAssignments[memberId] = memberAssignment r.GroupAssignments[memberId] = memberAssignment
} }
func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error {
bin, err := encode(memberAssignment)
if err != nil {
return err
}
r.AddGroupAssignment(memberId, bin)
return nil
}

View File

@ -5,6 +5,12 @@ type SyncGroupResponse struct {
MemberAssignment []byte MemberAssignment []byte
} }
func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
assignment := new(ConsumerGroupMemberAssignment)
err := decode(r.MemberAssignment, assignment)
return assignment, err
}
func (r *SyncGroupResponse) encode(pe packetEncoder) error { func (r *SyncGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err)) pe.putInt16(int16(r.Err))
return pe.putBytes(r.MemberAssignment) return pe.putBytes(r.MemberAssignment)

View File

@ -1,6 +1,10 @@
package sarama package sarama
import "sort" import (
"bufio"
"net"
"sort"
)
type none struct{} type none struct{}
@ -87,3 +91,21 @@ func (b ByteEncoder) Encode() ([]byte, error) {
func (b ByteEncoder) Length() int { func (b ByteEncoder) Length() int {
return len(b) return len(b)
} }
// bufConn wraps a net.Conn with a buffer for reads to reduce the number of
// reads that trigger syscalls.
type bufConn struct {
net.Conn
buf *bufio.Reader
}
func newBufConn(conn net.Conn) *bufConn {
return &bufConn{
Conn: conn,
buf: bufio.NewReader(conn),
}
}
func (bc *bufConn) Read(b []byte) (n int, err error) {
return bc.buf.Read(b)
}

Some files were not shown because too many files have changed in this diff Show More