Merge pull request #1369 from vishh/move-to-vendor
Switch to using vendor directory for deps.
This commit is contained in:
commit
2290707a3b
2
Godeps/Godeps.json
generated
2
Godeps/Godeps.json
generated
@ -1,6 +1,6 @@
|
||||
{
|
||||
"ImportPath": "github.com/google/cadvisor",
|
||||
"GoVersion": "go1.5",
|
||||
"GoVersion": "go1.6",
|
||||
"GodepVersion": "v74",
|
||||
"Packages": [
|
||||
"./..."
|
||||
|
2
Godeps/_workspace/.gitignore
generated
vendored
2
Godeps/_workspace/.gitignore
generated
vendored
@ -1,2 +0,0 @@
|
||||
/pkg
|
||||
/bin
|
14
Makefile
14
Makefile
@ -11,10 +11,10 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
|
||||
GO := godep go
|
||||
pkgs = $(shell $(GO) list ./...)
|
||||
GO := go
|
||||
pkgs = $(shell $(GO) list ./... | grep -v vendor)
|
||||
|
||||
all: format build test
|
||||
all: presubmit build test
|
||||
|
||||
test:
|
||||
@echo ">> running tests"
|
||||
@ -43,4 +43,10 @@ release:
|
||||
docker:
|
||||
@docker build -t cadvisor:$(shell git rev-parse --short HEAD) -f deploy/Dockerfile .
|
||||
|
||||
.PHONY: all build docker format release test test-integration vet
|
||||
presubmit: vet
|
||||
@echo ">> checking go formatting"
|
||||
@./build/check_gofmt.sh .
|
||||
@echo ">> checking file boilerplate"
|
||||
@./build/check_boilerplate.sh
|
||||
|
||||
.PHONY: all build docker format release test test-integration vet presubmit
|
||||
|
@ -95,7 +95,7 @@ def file_passes(filename, refs, regexs):
|
||||
def file_extension(filename):
|
||||
return os.path.splitext(filename)[1].split(".")[-1].lower()
|
||||
|
||||
skipped_dirs = ['Godeps', 'third_party', '_gopath', '_output', '.git']
|
||||
skipped_dirs = ['Godeps', 'vendor', 'third_party', '_gopath', '_output', '.git']
|
||||
def normalize_files(files):
|
||||
newfiles = []
|
||||
for pathname in files:
|
||||
|
@ -68,6 +68,6 @@ if [ "$RELEASE" == "true" ]; then
|
||||
echo "Building release candidate with -ldflags $ldflags"
|
||||
fi
|
||||
|
||||
GOBIN=$PWD godep go "$GO_CMD" ${GO_FLAGS} -ldflags "${ldflags}" "${repo_path}"
|
||||
GOBIN=$PWD go "$GO_CMD" ${GO_FLAGS} -ldflags "${ldflags}" "${repo_path}"
|
||||
|
||||
exit 0
|
||||
|
@ -14,7 +14,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
GO_FILES=$(find . -not -wholename "*Godeps*" -name "*.go")
|
||||
GO_FILES=$(find . -not -wholename "*Godeps*" -not -wholename "*vendor*" -name "*.go")
|
||||
|
||||
for FILE in ${GO_FILES}; do
|
||||
ERRS=`grep 'fmt.Errorf("[A-Z]' ${FILE}`
|
||||
|
@ -21,7 +21,7 @@ if [ $# -ne 1 ]; then
|
||||
fi
|
||||
|
||||
# Check formatting on non Godep'd code.
|
||||
GOFMT_PATHS=$(find . -not -wholename "*.git*" -not -wholename "*Godeps*" -not -name "." -type d)
|
||||
GOFMT_PATHS=$(find . -not -wholename "*.git*" -not -wholename "*Godeps*" -not -wholename "*vendor*" -not -name "." -type d)
|
||||
|
||||
# Find any files with gofmt problems
|
||||
BAD_FILES=$(gofmt -s -l $GOFMT_PATHS)
|
||||
|
@ -57,4 +57,4 @@ while [ "$(curl -Gs http://localhost:8080/healthz)" != "ok" ]; do
|
||||
done
|
||||
|
||||
echo ">> running integration tests against local cAdvisor"
|
||||
godep go test github.com/google/cadvisor/integration/tests/... --vmodule=*=2
|
||||
go test github.com/google/cadvisor/integration/tests/... --vmodule=*=2
|
||||
|
@ -22,9 +22,8 @@ BUILDER=${BUILDER:-false} # Whether this is running a PR builder job.
|
||||
export GO_FLAGS="-race"
|
||||
export GORACE="halt_on_error=1"
|
||||
|
||||
go get -u github.com/tools/godep
|
||||
./build/presubmit.sh
|
||||
godep go build -tags test github.com/google/cadvisor/integration/runner
|
||||
make
|
||||
go build -tags test github.com/google/cadvisor/integration/runner
|
||||
|
||||
# Nodes that are currently stable. When tests fail on a specific node, and the failure is not remedied within a week, that node will be removed from this list.
|
||||
golden_nodes=(
|
||||
|
@ -1,24 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Copyright 2015 Google Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
set -e
|
||||
set -x
|
||||
|
||||
./build/check_gofmt.sh .
|
||||
./build/check_boilerplate.sh
|
||||
go vet github.com/google/cadvisor/...
|
||||
godep go test -v -race -test.short github.com/google/cadvisor/...
|
||||
godep go build github.com/google/cadvisor
|
@ -17,6 +17,6 @@
|
||||
set -e
|
||||
set -x
|
||||
|
||||
godep go build -a github.com/google/cadvisor
|
||||
make build
|
||||
|
||||
docker build -t google/cadvisor:beta .
|
||||
|
@ -3,8 +3,7 @@ MAINTAINER vmarmol@google.com
|
||||
|
||||
RUN apt-get install -y git dmsetup
|
||||
RUN git clone https://github.com/google/cadvisor.git /go/src/github.com/google/cadvisor
|
||||
RUN go get github.com/tools/godep
|
||||
RUN cd /go/src/github.com/google/cadvisor && godep go build .
|
||||
RUN cd /go/src/github.com/google/cadvisor && make
|
||||
|
||||
ENTRYPOINT ["/go/src/github.com/google/cadvisor/cadvisor"]
|
||||
|
||||
|
@ -4,9 +4,9 @@
|
||||
|
||||
## Installing Dependencies
|
||||
|
||||
cAdvisor is written in the [Go](http://golang.org) programming language. If you haven't set up a Go development environment, please follow [these instructions](http://golang.org/doc/code.html) to install go tool and set up GOPATH. Ensure that your version of Go is at least 1.3. Note that the version of Go in package repositories of some operating systems is outdated, so please [download](https://golang.org/dl/) the latest version.
|
||||
cAdvisor is written in the [Go](http://golang.org) programming language. If you haven't set up a Go development environment, please follow [these instructions](http://golang.org/doc/code.html) to install go tool and set up GOPATH. Note that the version of Go in package repositories of some operating systems is outdated, so please [download](https://golang.org/dl/) the latest version.
|
||||
|
||||
**Note**: cAdvisor requires Go 1.5 to build.
|
||||
**Note**: cAdvisor requires Go 1.6 to build.
|
||||
|
||||
After setting up Go, you should be able to `go get` cAdvisor as expected (we use `-d` to only download):
|
||||
|
||||
@ -14,24 +14,18 @@ After setting up Go, you should be able to `go get` cAdvisor as expected (we use
|
||||
$ go get -d github.com/google/cadvisor
|
||||
```
|
||||
|
||||
We use `godep` so you will need to get that as well:
|
||||
|
||||
```
|
||||
$ go get github.com/tools/godep
|
||||
```
|
||||
|
||||
## Building from Source
|
||||
|
||||
At this point you can build cAdvisor from the source folder:
|
||||
|
||||
```
|
||||
$GOPATH/src/github.com/google/cadvisor $ godep go build .
|
||||
$GOPATH/src/github.com/google/cadvisor $ make build
|
||||
```
|
||||
|
||||
or run only unit tests:
|
||||
|
||||
```
|
||||
$GOPATH/src/github.com/google/cadvisor $ godep go test ./... -test.short
|
||||
$GOPATH/src/github.com/google/cadvisor $ make test
|
||||
```
|
||||
|
||||
For integration tests, see the [integration testing](integration_testing.md) page.
|
||||
@ -44,12 +38,3 @@ Now you can run the built binary:
|
||||
$GOPATH/src/github.com/google/cadvisor $ sudo ./cadvisor
|
||||
```
|
||||
|
||||
## Compiling Assets
|
||||
|
||||
If you modify files in the /assets folder, you will need to rebuild the assets:
|
||||
|
||||
```
|
||||
$GOPATH/src/github.com/google/cadvisor $ ./build/assets.sh
|
||||
$GOPATH/src/github.com/google/cadvisor $ godep go build .
|
||||
```
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
The cAdvisor integration tests can be found in `integration/tests`. These run queries on a running cAdvisor. To run these tests:
|
||||
|
||||
```
|
||||
$ godep go run integration/runner/runner.go -port=PORT <hosts to test>
|
||||
$ go run integration/runner/runner.go -port=PORT <hosts to test>
|
||||
```
|
||||
|
||||
This will build a cAdvisor from the current repository and start it on the target machine before running the tests.
|
||||
@ -11,7 +11,7 @@ This will build a cAdvisor from the current repository and start it on the targe
|
||||
To simply run the tests against an existing cAdvisor:
|
||||
|
||||
```
|
||||
$ godep go test github.com/google/cadvisor/integration/tests/... -host=HOST -port=PORT
|
||||
$ go test github.com/google/cadvisor/integration/tests/... -host=HOST -port=PORT
|
||||
```
|
||||
|
||||
Note that `HOST` and `PORT` default to `localhost` and `8080` respectively.
|
||||
|
@ -30,7 +30,7 @@ if ! git diff --name-only origin/master | grep -c -E "*.go|*.sh" &> /dev/null; t
|
||||
fi
|
||||
|
||||
# Build the runner.
|
||||
godep go build github.com/google/cadvisor/integration/runner
|
||||
go build github.com/google/cadvisor/integration/runner
|
||||
|
||||
# Run it.
|
||||
HOSTS=$@
|
||||
|
@ -37,7 +37,7 @@ import (
|
||||
)
|
||||
|
||||
// must be able to ssh into hosts without password
|
||||
// godep go run ./integration/runner/runner.go --logtostderr --v 2 --ssh-config <.ssh/config file> <list of hosts>
|
||||
// go run ./integration/runner/runner.go --logtostderr --v 2 --ssh-config <.ssh/config file> <list of hosts>
|
||||
|
||||
const (
|
||||
cadvisorBinary = "cadvisor"
|
||||
@ -180,7 +180,7 @@ func PushAndRunTests(host, testDir string) (result error) {
|
||||
}
|
||||
// Run the command
|
||||
|
||||
err = RunCommand("godep", "go", "test", "--timeout", testTimeout.String(), "github.com/google/cadvisor/integration/tests/...", "--host", host, "--port", portStr, "--ssh-options", *sshOptions)
|
||||
err = RunCommand("go", "test", "--timeout", testTimeout.String(), "github.com/google/cadvisor/integration/tests/...", "--host", host, "--port", portStr, "--ssh-options", *sshOptions)
|
||||
if err == nil {
|
||||
// On success, break out of retry loop
|
||||
break
|
||||
|
@ -1,7 +1,7 @@
|
||||
language: go
|
||||
go:
|
||||
- 1.4.3
|
||||
- 1.5.2
|
||||
- 1.5.3
|
||||
|
||||
env:
|
||||
global:
|
||||
@ -11,7 +11,6 @@ env:
|
||||
- KAFKA_HOSTNAME=localhost
|
||||
- DEBUG=true
|
||||
matrix:
|
||||
- KAFKA_VERSION=0.8.1.1
|
||||
- KAFKA_VERSION=0.8.2.2
|
||||
- KAFKA_VERSION=0.9.0.0
|
||||
|
@ -1,5 +1,24 @@
|
||||
# Changelog
|
||||
|
||||
#### Version 1.8.0 (2016-02-01)
|
||||
|
||||
New Features:
|
||||
- Full support for Kafka 0.9:
|
||||
- All protocol messages and fields
|
||||
([#586](https://github.com/Shopify/sarama/pull/586),
|
||||
[#588](https://github.com/Shopify/sarama/pull/588),
|
||||
[#590](https://github.com/Shopify/sarama/pull/590)).
|
||||
- Verified that TLS support works
|
||||
([#581](https://github.com/Shopify/sarama/pull/581)).
|
||||
- Fixed the OffsetManager compatibility
|
||||
([#585](https://github.com/Shopify/sarama/pull/585)).
|
||||
|
||||
Improvements:
|
||||
- Optimize for fewer system calls when reading from the network
|
||||
([#584](https://github.com/Shopify/sarama/pull/584)).
|
||||
- Automatically retry `InvalidMessage` errors to match upstream behaviour
|
||||
([#589](https://github.com/Shopify/sarama/pull/589)).
|
||||
|
||||
#### Version 1.7.0 (2015-12-11)
|
||||
|
||||
New Features:
|
@ -7,7 +7,7 @@ vet:
|
||||
go vet ./...
|
||||
|
||||
errcheck:
|
||||
errcheck github.com/Shopify/sarama/...
|
||||
@if go version | grep -q go1.5; then errcheck github.com/Shopify/sarama/...; fi
|
||||
|
||||
fmt:
|
||||
@if [ -n "$$(go fmt ./...)" ]; then echo 'Please run go fmt on your code.' && exit 1; fi
|
||||
@ -15,7 +15,7 @@ fmt:
|
||||
install_dependencies: install_errcheck install_go_vet get
|
||||
|
||||
install_errcheck:
|
||||
go get github.com/kisielk/errcheck
|
||||
@if go version | grep -q go1.5; then go get github.com/kisielk/errcheck; fi
|
||||
|
||||
install_go_vet:
|
||||
go get golang.org/x/tools/cmd/vet
|
@ -18,7 +18,7 @@ Sarama is an MIT-licensed Go client library for [Apache Kafka](https://kafka.apa
|
||||
Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
|
||||
the two latest stable releases of Kafka and Go, and we provide a two month
|
||||
grace period for older releases. This means we currently officially support
|
||||
Go 1.4 and 1.5, and Kafka 0.8.1 and 0.8.2, although older releases are still
|
||||
Go 1.5 and 1.4, and Kafka 0.9.0 and 0.8.2, although older releases are still
|
||||
likely to work.
|
||||
|
||||
Sarama follows semantic versioning and provides API stability via the gopkg.in service.
|
@ -581,7 +581,8 @@ func (bp *brokerProducer) run() {
|
||||
select {
|
||||
case msg := <-bp.input:
|
||||
if msg == nil {
|
||||
goto shutdown
|
||||
bp.shutdown()
|
||||
return
|
||||
}
|
||||
|
||||
if msg.flags&syn == syn {
|
||||
@ -637,8 +638,9 @@ func (bp *brokerProducer) run() {
|
||||
output = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shutdown:
|
||||
func (bp *brokerProducer) shutdown() {
|
||||
for !bp.buffer.empty() {
|
||||
select {
|
||||
case response := <-bp.responses:
|
||||
@ -725,7 +727,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
|
||||
}
|
||||
bp.parent.returnSuccesses(msgs)
|
||||
// Retriable errors
|
||||
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
|
||||
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
|
||||
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
|
||||
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
|
||||
bp.broker.ID(), topic, partition, block.Err)
|
@ -85,6 +85,7 @@ func (b *Broker) Open(conf *Config) error {
|
||||
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
|
||||
return
|
||||
}
|
||||
b.conn = newBufConn(b.conn)
|
||||
|
||||
b.conf = conf
|
||||
b.done = make(chan bool)
|
||||
@ -239,6 +240,72 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
|
||||
response := new(JoinGroupResponse)
|
||||
|
||||
err := b.sendAndReceive(request, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
|
||||
response := new(SyncGroupResponse)
|
||||
|
||||
err := b.sendAndReceive(request, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
|
||||
response := new(LeaveGroupResponse)
|
||||
|
||||
err := b.sendAndReceive(request, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
|
||||
response := new(HeartbeatResponse)
|
||||
|
||||
err := b.sendAndReceive(request, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
|
||||
response := new(ListGroupsResponse)
|
||||
|
||||
err := b.sendAndReceive(request, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
|
||||
response := new(DescribeGroupsResponse)
|
||||
|
||||
err := b.sendAndReceive(request, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
|
||||
b.lock.Lock()
|
||||
defer b.lock.Unlock()
|
@ -21,8 +21,6 @@ type Config struct {
|
||||
ReadTimeout time.Duration // How long to wait for a response.
|
||||
WriteTimeout time.Duration // How long to wait for a transmit.
|
||||
|
||||
// NOTE: these config values have no compatibility guarantees; they may
|
||||
// change when Kafka releases its official TLS support in version 0.9.
|
||||
TLS struct {
|
||||
// Whether or not to use TLS when connecting to the broker
|
||||
// (defaults to false).
|
94
vendor/github.com/Shopify/sarama/consumer_group_members.go
generated
vendored
Normal file
94
vendor/github.com/Shopify/sarama/consumer_group_members.go
generated
vendored
Normal file
@ -0,0 +1,94 @@
|
||||
package sarama
|
||||
|
||||
type ConsumerGroupMemberMetadata struct {
|
||||
Version int16
|
||||
Topics []string
|
||||
UserData []byte
|
||||
}
|
||||
|
||||
func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
|
||||
pe.putInt16(m.Version)
|
||||
|
||||
if err := pe.putStringArray(m.Topics); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := pe.putBytes(m.UserData); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
|
||||
if m.Version, err = pd.getInt16(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if m.Topics, err = pd.getStringArray(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if m.UserData, err = pd.getBytes(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ConsumerGroupMemberAssignment struct {
|
||||
Version int16
|
||||
Topics map[string][]int32
|
||||
UserData []byte
|
||||
}
|
||||
|
||||
func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
|
||||
pe.putInt16(m.Version)
|
||||
|
||||
if err := pe.putArrayLength(len(m.Topics)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for topic, partitions := range m.Topics {
|
||||
if err := pe.putString(topic); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := pe.putInt32Array(partitions); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := pe.putBytes(m.UserData); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
|
||||
if m.Version, err = pd.getInt16(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var topicLen int
|
||||
if topicLen, err = pd.getArrayLength(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.Topics = make(map[string][]int32, topicLen)
|
||||
for i := 0; i < topicLen; i++ {
|
||||
var topic string
|
||||
if topic, err = pd.getString(); err != nil {
|
||||
return
|
||||
}
|
||||
if m.Topics[topic], err = pd.getInt32Array(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if m.UserData, err = pd.getBytes(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -92,3 +92,13 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
|
||||
|
||||
r.GroupProtocols[name] = metadata
|
||||
}
|
||||
|
||||
func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
|
||||
bin, err := encode(metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.AddGroupProtocol(name, bin)
|
||||
return nil
|
||||
}
|
@ -9,6 +9,18 @@ type JoinGroupResponse struct {
|
||||
Members map[string][]byte
|
||||
}
|
||||
|
||||
func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
|
||||
members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
|
||||
for id, bin := range r.Members {
|
||||
meta := new(ConsumerGroupMemberMetadata)
|
||||
if err := decode(bin, meta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
members[id] = *meta
|
||||
}
|
||||
return members, nil
|
||||
}
|
||||
|
||||
func (r *JoinGroupResponse) encode(pe packetEncoder) error {
|
||||
pe.putInt16(int16(r.Err))
|
||||
pe.putInt32(r.GenerationId)
|
@ -5,6 +5,11 @@ package sarama
|
||||
// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
|
||||
const ReceiveTime int64 = -1
|
||||
|
||||
// GroupGenerationUndefined is a special value for the group generation field of
|
||||
// Offset Commit Requests that should be used when a consumer group does not rely
|
||||
// on Kafka for partition management.
|
||||
const GroupGenerationUndefined = -1
|
||||
|
||||
type offsetCommitRequestBlock struct {
|
||||
offset int64
|
||||
timestamp int64
|
@ -476,8 +476,9 @@ func (bom *brokerOffsetManager) flushToBroker() {
|
||||
|
||||
func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
|
||||
r := &OffsetCommitRequest{
|
||||
Version: 1,
|
||||
ConsumerGroup: bom.parent.group,
|
||||
Version: 1,
|
||||
ConsumerGroup: bom.parent.group,
|
||||
ConsumerGroupGeneration: GroupGenerationUndefined,
|
||||
}
|
||||
|
||||
for s := range bom.subscriptions {
|
@ -84,3 +84,13 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment
|
||||
|
||||
r.GroupAssignments[memberId] = memberAssignment
|
||||
}
|
||||
|
||||
func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error {
|
||||
bin, err := encode(memberAssignment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.AddGroupAssignment(memberId, bin)
|
||||
return nil
|
||||
}
|
@ -5,6 +5,12 @@ type SyncGroupResponse struct {
|
||||
MemberAssignment []byte
|
||||
}
|
||||
|
||||
func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
|
||||
assignment := new(ConsumerGroupMemberAssignment)
|
||||
err := decode(r.MemberAssignment, assignment)
|
||||
return assignment, err
|
||||
}
|
||||
|
||||
func (r *SyncGroupResponse) encode(pe packetEncoder) error {
|
||||
pe.putInt16(int16(r.Err))
|
||||
return pe.putBytes(r.MemberAssignment)
|
@ -1,6 +1,10 @@
|
||||
package sarama
|
||||
|
||||
import "sort"
|
||||
import (
|
||||
"bufio"
|
||||
"net"
|
||||
"sort"
|
||||
)
|
||||
|
||||
type none struct{}
|
||||
|
||||
@ -87,3 +91,21 @@ func (b ByteEncoder) Encode() ([]byte, error) {
|
||||
func (b ByteEncoder) Length() int {
|
||||
return len(b)
|
||||
}
|
||||
|
||||
// bufConn wraps a net.Conn with a buffer for reads to reduce the number of
|
||||
// reads that trigger syscalls.
|
||||
type bufConn struct {
|
||||
net.Conn
|
||||
buf *bufio.Reader
|
||||
}
|
||||
|
||||
func newBufConn(conn net.Conn) *bufConn {
|
||||
return &bufConn{
|
||||
Conn: conn,
|
||||
buf: bufio.NewReader(conn),
|
||||
}
|
||||
}
|
||||
|
||||
func (bc *bufConn) Read(b []byte) (n int, err error) {
|
||||
return bc.buf.Read(b)
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user