diff --git a/collector/collector_manager.go b/collector/collector_manager.go new file mode 100644 index 00000000..0641d199 --- /dev/null +++ b/collector/collector_manager.go @@ -0,0 +1,83 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "fmt" + "strings" + "time" +) + +type collectorManager struct { + collectors []*collectorData +} + +var _ CollectorManager = &collectorManager{} + +type collectorData struct { + collector Collector + nextCollectionTime time.Time +} + +// Returns a new CollectorManager that is thread-compatible. +func NewCollectorManager() (CollectorManager, error) { + return &collectorManager{ + collectors: []*collectorData{}, + }, nil +} + +func (cm *collectorManager) RegisterCollector(collector Collector) error { + cm.collectors = append(cm.collectors, &collectorData{ + collector: collector, + nextCollectionTime: time.Now(), + }) + return nil +} + +func (cm *collectorManager) Collect() (time.Time, error) { + var errors []error + + // Collect from all collectors that are ready. + var next time.Time + for _, c := range cm.collectors { + if c.nextCollectionTime.Before(time.Now()) { + nextCollection, err := c.collector.Collect() + if err != nil { + errors = append(errors, err) + } + c.nextCollectionTime = nextCollection + } + + // Keep track of the next collector that will be ready. + if next.IsZero() || next.After(c.nextCollectionTime) { + next = c.nextCollectionTime + } + } + + return next, compileErrors(errors) +} + +// Make an error slice into a single error. +func compileErrors(errors []error) error { + if len(errors) == 0 { + return nil + } + + res := make([]string, len(errors)) + for i := range errors { + res[i] = fmt.Sprintf("Error %d: %v", i, errors[i].Error()) + } + return fmt.Errorf("%s", strings.Join(res, ",")) +} diff --git a/collector/collector_manager_test.go b/collector/collector_manager_test.go new file mode 100644 index 00000000..85d6bbc0 --- /dev/null +++ b/collector/collector_manager_test.go @@ -0,0 +1,70 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type fakeCollector struct { + nextCollectionTime time.Time + err error + collectedFrom int +} + +func (fc *fakeCollector) Collect() (time.Time, error) { + fc.collectedFrom++ + return fc.nextCollectionTime, fc.err +} + +func (fc *fakeCollector) Name() string { + return "fake-collector" +} + +func TestCollect(t *testing.T) { + cm := &collectorManager{} + + firstTime := time.Now().Add(-time.Hour) + secondTime := time.Now().Add(time.Hour) + f1 := &fakeCollector{ + nextCollectionTime: firstTime, + } + f2 := &fakeCollector{ + nextCollectionTime: secondTime, + } + + assert := assert.New(t) + assert.NoError(cm.RegisterCollector(f1)) + assert.NoError(cm.RegisterCollector(f2)) + + // First collection, everyone gets collected from. + nextTime, err := cm.Collect() + assert.Equal(firstTime, nextTime) + assert.NoError(err) + assert.Equal(1, f1.collectedFrom) + assert.Equal(1, f2.collectedFrom) + + f1.nextCollectionTime = time.Now().Add(2 * time.Hour) + + // Second collection, only the one that is ready gets collected from. + nextTime, err = cm.Collect() + assert.Equal(secondTime, nextTime) + assert.NoError(err) + assert.Equal(2, f1.collectedFrom) + assert.Equal(1, f2.collectedFrom) +} diff --git a/collector/fakes.go b/collector/fakes.go new file mode 100644 index 00000000..d36f1136 --- /dev/null +++ b/collector/fakes.go @@ -0,0 +1,31 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "time" +) + +type FakeCollectorManager struct { +} + +func (fkm *FakeCollectorManager) RegisterCollector(collector Collector) error { + return nil +} + +func (fkm *FakeCollectorManager) Collect() (time.Time, error) { + var zero time.Time + return zero, nil +} diff --git a/collector/types.go b/collector/types.go new file mode 100644 index 00000000..4967a6ce --- /dev/null +++ b/collector/types.go @@ -0,0 +1,45 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "time" +) + +// TODO(vmarmol): Export to a custom metrics type when that is available. + +// Metric collector. +type Collector interface { + // Collect metrics from this collector. + // Returns the next time this collector should be collected from. + // Next collection time is always returned, even when an error occurs. + // A collection time of zero means no more collection. + Collect() (time.Time, error) + + // Name of this collector. + Name() string +} + +// Manages and runs collectors. +type CollectorManager interface { + // Register a collector. + RegisterCollector(collector Collector) error + + // Collect from collectors that are ready and return the next time + // at which a collector will be ready to collect from. + // Next collection time is always returned, even when an error occurs. + // A collection time of zero means no more collection. + Collect() (time.Time, error) +}