Add mesos containerizer support

This commit includes support for collecting container stats
launched by mesos containerizer.
This commit is contained in:
Sashank Appireddy 2018-05-29 17:08:07 -07:00
parent 2df69b73de
commit ed0e3f0f43
10 changed files with 1019 additions and 0 deletions

View File

@ -36,6 +36,7 @@ const (
ContainerTypeSystemd
ContainerTypeCrio
ContainerTypeContainerd
ContainerTypeMesos
)
// Interface for container operation handlers.

162
container/mesos/client.go Normal file
View File

@ -0,0 +1,162 @@
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesos
import (
"fmt"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/agent"
"github.com/mesos/mesos-go/api/v1/lib/agent/calls"
mclient "github.com/mesos/mesos-go/api/v1/lib/client"
"github.com/mesos/mesos-go/api/v1/lib/encoding/codecs"
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
"net/url"
"sync"
)
const (
maxRetryAttempts = 3
)
var (
mesosClientOnce sync.Once
mesosClient *client
)
type client struct {
hc *httpcli.Client
}
type mesosAgentClient interface {
ContainerInfo(id string) (*containerInfo, error)
}
type containerInfo struct {
cntr *mContainer
labels map[string]string
}
// Client is an interface to query mesos agent http endpoints
func Client() (mesosAgentClient, error) {
mesosClientOnce.Do(func() {
// Start Client
apiURL := url.URL{
Scheme: "http",
Host: *MesosAgentAddress,
Path: "/api/v1",
}
mesosClient = &client{
hc: httpcli.New(
httpcli.Endpoint(apiURL.String()),
httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]),
httpcli.Do(httpcli.With(httpcli.Timeout(*MesosAgentTimeout))),
),
}
})
return mesosClient, nil
}
// ContainerInfo returns the container information of the given container id
func (self *client) ContainerInfo(id string) (*containerInfo, error) {
c, err := self.getContainer(id)
if err != nil {
return nil, err
}
// Get labels of the container
l, err := self.getLabels(c)
if err != nil {
return nil, err
}
return &containerInfo{
cntr: c,
labels: l,
}, nil
}
func (self *client) getContainer(id string) (*mContainer, error) {
// Get all containers
cntrs, err := self.getContainers()
if err != nil {
return nil, err
}
// Check if there is a container with given id and return the container
for _, c := range cntrs.Containers {
if c.ContainerID.Value == id {
return &c, nil
}
}
return nil, fmt.Errorf("can't locate container %s", id)
}
func (self *client) getContainers() (mContainers, error) {
req := calls.NonStreaming(calls.GetContainers())
result, err := self.fetchAndDecode(req)
if err != nil {
return nil, fmt.Errorf("failed to get mesos containers: %v", err)
}
cntrs := result.GetContainers
return cntrs, nil
}
func (self *client) getLabels(c *mContainer) (map[string]string, error) {
// Get mesos agent state which contains all containers labels
var s state
req := calls.NonStreaming(calls.GetState())
result, err := self.fetchAndDecode(req)
if err != nil {
return map[string]string{}, fmt.Errorf("failed to get mesos agent state: %v", err)
}
s.st = result.GetState
// Fetch labels from state object
labels, err := s.FetchLabels(c.FrameworkID.Value, c.ExecutorID.Value)
if err != nil {
return labels, fmt.Errorf("error while fetching labels from executor: %v", err)
}
return labels, nil
}
func (self *client) fetchAndDecode(req calls.RequestFunc) (*agent.Response, error) {
var res mesos.Response
var err error
// Send request
err = retry.Retry(
func(attempt uint) error {
res, err = mesosClient.hc.Send(req, mclient.ResponseClassSingleton, nil)
return err
},
strategy.Limit(maxRetryAttempts),
)
if err != nil {
return nil, fmt.Errorf("error fetching %s: %s", req.Call(), err)
}
// Decode the result
var target agent.Response
err = res.Decode(&target)
if err != nil {
return nil, fmt.Errorf("error while decoding response body from %s: %s", res, err)
}
return &target, nil
}

View File

@ -0,0 +1,40 @@
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesos
import "fmt"
type FakeMesosAgentClient struct {
cntrInfo map[string]*containerInfo
err error
}
func (c *FakeMesosAgentClient) ContainerInfo(id string) (*containerInfo, error) {
if c.err != nil {
return nil, c.err
}
cInfo, ok := c.cntrInfo[id]
if !ok {
return nil, fmt.Errorf("can't locate container %s", id)
}
return cInfo, nil
}
func fakeMesosAgentClient(cntrInfo map[string]*containerInfo, err error) mesosAgentClient {
return &FakeMesosAgentClient{
err: err,
cntrInfo: cntrInfo,
}
}

153
container/mesos/factory.go Normal file
View File

@ -0,0 +1,153 @@
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesos
import (
"flag"
"fmt"
"path"
"regexp"
"strings"
"time"
"github.com/golang/glog"
"github.com/google/cadvisor/container"
"github.com/google/cadvisor/container/libcontainer"
"github.com/google/cadvisor/fs"
info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/manager/watcher"
)
var MesosAgentAddress = flag.String("mesos_agent", "127.0.0.1:5051", "Mesos agent address")
var MesosAgentTimeout = flag.Duration("mesos_agent_timeout", 10*time.Second, "Mesos agent timeout")
// The namespace under which mesos aliases are unique.
const MesosNamespace = "mesos"
// Regexp that identifies mesos cgroups, containers started with
// --cgroup-parent have another prefix than 'mesos'
var mesosCgroupRegexp = regexp.MustCompile(`([a-z-0-9]{36})`)
// mesosFactory implements the interface ContainerHandlerFactory
type mesosFactory struct {
machineInfoFactory info.MachineInfoFactory
// Information about the cgroup subsystems.
cgroupSubsystems libcontainer.CgroupSubsystems
// Information about mounted filesystems.
fsInfo fs.FsInfo
ignoreMetrics map[container.MetricKind]struct{}
client mesosAgentClient
}
func (self *mesosFactory) String() string {
return MesosNamespace
}
func (self *mesosFactory) NewContainerHandler(name string, inHostNamespace bool) (container.ContainerHandler, error) {
client, err := Client()
if err != nil {
return nil, err
}
return newMesosContainerHandler(
name,
&self.cgroupSubsystems,
self.machineInfoFactory,
self.fsInfo,
self.ignoreMetrics,
inHostNamespace,
client,
)
}
// ContainerNameToMesosId returns the Mesos ID from the full container name.
func ContainerNameToMesosId(name string) string {
id := path.Base(name)
if matches := mesosCgroupRegexp.FindStringSubmatch(id); matches != nil {
return matches[1]
}
return id
}
// isContainerName returns true if the cgroup with associated name
// corresponds to a mesos container.
func isContainerName(name string) bool {
// always ignore .mount cgroup even if associated with mesos and delegate to systemd
if strings.HasSuffix(name, ".mount") {
return false
}
return mesosCgroupRegexp.MatchString(path.Base(name))
}
// The mesos factory can handle any container.
func (self *mesosFactory) CanHandleAndAccept(name string) (handle bool, accept bool, err error) {
// if the container is not associated with mesos, we can't handle it or accept it.
if !isContainerName(name) {
return false, false, nil
}
// Check if the container is known to mesos and it is active.
id := ContainerNameToMesosId(name)
c, err := self.client.ContainerInfo(id)
if err != nil {
return false, true, fmt.Errorf("error getting running container: %v", err)
}
pid := int(*c.cntr.ContainerStatus.ExecutorPID)
if pid <= 0 {
return false, true, fmt.Errorf("mesos container pid: %d is invalid", pid)
}
return true, true, nil
}
func (self *mesosFactory) DebugInfo() map[string][]string {
return map[string][]string{}
}
func Register(
machineInfoFactory info.MachineInfoFactory,
fsInfo fs.FsInfo,
ignoreMetrics container.MetricSet,
) error {
client, err := Client()
if err != nil {
return fmt.Errorf("unable to create mesos agent client: %v", err)
}
cgroupSubsystems, err := libcontainer.GetCgroupSubsystems()
if err != nil {
return fmt.Errorf("failed to get cgroup subsystems: %v", err)
}
glog.V(1).Infof("Registering mesos factory")
factory := &mesosFactory{
machineInfoFactory: machineInfoFactory,
cgroupSubsystems: cgroupSubsystems,
fsInfo: fsInfo,
ignoreMetrics: ignoreMetrics,
client: client,
}
container.RegisterContainerHandlerFactory(factory, []watcher.ContainerWatchSource{watcher.Raw})
return nil
}

View File

@ -0,0 +1,85 @@
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesos
import (
containerlibcontainer "github.com/google/cadvisor/container/libcontainer"
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/stretchr/testify/assert"
"testing"
)
func TestIsContainerName(t *testing.T) {
tests := []struct {
name string
expected bool
}{
{
name: "/system.slice/var-lib-mesos-provisioner-containers-04e20821-d67d3-4bf7-96b4-7d4495f50b28-backends-overlay-rootfses-6d97be39-7359-4bb7-a46b-e55c6771da81.mount",
expected: false,
},
{
name: "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28",
expected: true,
},
}
for _, test := range tests {
if actual := isContainerName(test.name); actual != test.expected {
t.Errorf("%s: expected: %v, actual: %v", test.name, test.expected, actual)
}
}
}
func TestCanHandleAndAccept(t *testing.T) {
as := assert.New(t)
testContainers := make(map[string]*containerInfo)
var pid uint32 = 123
testContainer := &containerInfo{
cntr: &mContainer{
ContainerStatus: &mesos.ContainerStatus{
ExecutorPID: &pid,
},
},
}
testContainers["04e20821-67d3-4bf7-96b4-7d4495f50b28"] = testContainer
f := &mesosFactory{
machineInfoFactory: nil,
cgroupSubsystems: containerlibcontainer.CgroupSubsystems{},
fsInfo: nil,
ignoreMetrics: nil,
client: fakeMesosAgentClient(testContainers, nil),
}
tests := []struct {
name string
expected bool
}{
{
name: "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28",
expected: true,
},
{
name: "/system.slice/var-lib-mesos-provisioner-containers-04e20821-d67d3-4bf7-96b4-7d4495f50b28-backends-overlay-rootfses-6d97be39-7359-4bb7-a46b-e55c6771da81.mount",
expected: false,
},
}
for _, test := range tests {
b1, b2, err := f.CanHandleAndAccept(test.name)
as.Nil(err)
as.Equal(b1, test.expected)
as.Equal(b2, test.expected)
}
}

210
container/mesos/handler.go Normal file
View File

@ -0,0 +1,210 @@
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Handler for "mesos" containers.
package mesos
import (
"fmt"
"path"
"github.com/google/cadvisor/container"
"github.com/google/cadvisor/container/common"
containerlibcontainer "github.com/google/cadvisor/container/libcontainer"
"github.com/google/cadvisor/fs"
info "github.com/google/cadvisor/info/v1"
cgroupfs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
libcontainerconfigs "github.com/opencontainers/runc/libcontainer/configs"
)
type mesosContainerHandler struct {
// Name of the container for this handler.
name string
// machineInfoFactory provides info.MachineInfo
machineInfoFactory info.MachineInfoFactory
// Absolute path to the cgroup hierarchies of this container.
// (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test")
cgroupPaths map[string]string
// File System Info
fsInfo fs.FsInfo
// Metrics to be ignored.
ignoreMetrics container.MetricSet
labels map[string]string
// Reference to the container
reference info.ContainerReference
libcontainerHandler *containerlibcontainer.Handler
}
func isRootCgroup(name string) bool {
return name == "/"
}
func newMesosContainerHandler(
name string,
cgroupSubsystems *containerlibcontainer.CgroupSubsystems,
machineInfoFactory info.MachineInfoFactory,
fsInfo fs.FsInfo,
ignoreMetrics container.MetricSet,
inHostNamespace bool,
client mesosAgentClient,
) (container.ContainerHandler, error) {
cgroupPaths := common.MakeCgroupPaths(cgroupSubsystems.MountPoints, name)
for key, val := range cgroupSubsystems.MountPoints {
cgroupPaths[key] = path.Join(val, name)
}
// Generate the equivalent cgroup manager for this container.
cgroupManager := &cgroupfs.Manager{
Cgroups: &libcontainerconfigs.Cgroup{
Name: name,
},
Paths: cgroupPaths,
}
rootFs := "/"
if !inHostNamespace {
rootFs = "/rootfs"
}
id := ContainerNameToMesosId(name)
cinfo, err := client.ContainerInfo(id)
if err != nil {
return nil, err
}
labels := cinfo.labels
pid := int(*cinfo.cntr.ContainerStatus.ExecutorPID)
libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootFs, pid, ignoreMetrics)
reference := info.ContainerReference{
Id: id,
Name: name,
Namespace: MesosNamespace,
Aliases: []string{id, name},
}
handler := &mesosContainerHandler{
name: name,
machineInfoFactory: machineInfoFactory,
cgroupPaths: cgroupPaths,
fsInfo: fsInfo,
ignoreMetrics: ignoreMetrics,
labels: labels,
reference: reference,
libcontainerHandler: libcontainerHandler,
}
return handler, nil
}
func (self *mesosContainerHandler) ContainerReference() (info.ContainerReference, error) {
// We only know the container by its one name.
return self.reference, nil
}
// Nothing to start up.
func (self *mesosContainerHandler) Start() {}
// Nothing to clean up.
func (self *mesosContainerHandler) Cleanup() {}
func (self *mesosContainerHandler) GetSpec() (info.ContainerSpec, error) {
// TODO: Since we dont collect disk usage and network stats for mesos containers, we set
// hasFilesystem and hasNetwork to false. Revisit when we support disk usage, network
// stats for mesos containers.
hasNetwork := false
hasFilesystem := false
spec, err := common.GetSpec(self.cgroupPaths, self.machineInfoFactory, hasNetwork, hasFilesystem)
if err != nil {
return spec, err
}
spec.Labels = self.labels
return spec, nil
}
func (self *mesosContainerHandler) getFsStats(stats *info.ContainerStats) error {
mi, err := self.machineInfoFactory.GetMachineInfo()
if err != nil {
return err
}
if !self.ignoreMetrics.Has(container.DiskIOMetrics) {
common.AssignDeviceNamesToDiskStats((*common.MachineInfoNamer)(mi), &stats.DiskIo)
}
return nil
}
func (self *mesosContainerHandler) GetStats() (*info.ContainerStats, error) {
stats, err := self.libcontainerHandler.GetStats()
if err != nil {
return stats, err
}
// Get filesystem stats.
err = self.getFsStats(stats)
if err != nil {
return stats, err
}
return stats, nil
}
func (self *mesosContainerHandler) GetCgroupPath(resource string) (string, error) {
path, ok := self.cgroupPaths[resource]
if !ok {
return "", fmt.Errorf("could not find path for resource %q for container %q\n", resource, self.name)
}
return path, nil
}
func (self *mesosContainerHandler) GetContainerLabels() map[string]string {
return self.labels
}
func (self *mesosContainerHandler) GetContainerIPAddress() string {
// the IP address for the mesos container corresponds to the system ip address.
return "127.0.0.1"
}
func (self *mesosContainerHandler) ListContainers(listType container.ListType) ([]info.ContainerReference, error) {
return common.ListContainers(self.name, self.cgroupPaths, listType)
}
func (self *mesosContainerHandler) ListProcesses(listType container.ListType) ([]int, error) {
return self.libcontainerHandler.GetProcesses()
}
func (self *mesosContainerHandler) Exists() bool {
return common.CgroupExists(self.cgroupPaths)
}
func (self *mesosContainerHandler) Type() container.ContainerType {
return container.ContainerTypeMesos
}

View File

@ -0,0 +1,111 @@
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesos
import (
"fmt"
"testing"
"github.com/google/cadvisor/container"
containerlibcontainer "github.com/google/cadvisor/container/libcontainer"
"github.com/google/cadvisor/fs"
info "github.com/google/cadvisor/info/v1"
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/stretchr/testify/assert"
)
func PopulateContainer() *mContainer {
var pid uint32 = 123
cntr := &mContainer{
ContainerStatus: &mesos.ContainerStatus{ExecutorPID: &pid},
}
return cntr
}
func TestContainerReference(t *testing.T) {
as := assert.New(t)
type testCase struct {
client mesosAgentClient
name string
machineInfoFactory info.MachineInfoFactory
fsInfo fs.FsInfo
cgroupSubsystems *containerlibcontainer.CgroupSubsystems
inHostNamespace bool
ignoreMetrics container.MetricSet
hasErr bool
errContains string
checkReference *info.ContainerReference
}
for _, ts := range []testCase{
{
fakeMesosAgentClient(nil, fmt.Errorf("no client returned")),
"/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28",
nil,
nil,
&containerlibcontainer.CgroupSubsystems{},
false,
nil,
true,
"no client returned",
nil,
},
{
fakeMesosAgentClient(nil, nil),
"/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28",
nil,
nil,
&containerlibcontainer.CgroupSubsystems{},
false,
nil,
true,
"can't locate container 04e20821-67d3-4bf7-96b4-7d4495f50b28",
nil,
},
{
fakeMesosAgentClient(map[string]*containerInfo{"04e20821-67d3-4bf7-96b4-7d4495f50b28": {cntr: PopulateContainer()}}, nil),
"/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28",
nil,
nil,
&containerlibcontainer.CgroupSubsystems{},
false,
nil,
false,
"",
&info.ContainerReference{
Id: "04e20821-67d3-4bf7-96b4-7d4495f50b28",
Name: "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28",
Aliases: []string{"04e20821-67d3-4bf7-96b4-7d4495f50b28", "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28"},
Namespace: MesosNamespace,
},
},
} {
handler, err := newMesosContainerHandler(ts.name, ts.cgroupSubsystems, ts.machineInfoFactory, ts.fsInfo, ts.ignoreMetrics, ts.inHostNamespace, ts.client)
if ts.hasErr {
as.NotNil(err)
if ts.errContains != "" {
as.Contains(err.Error(), ts.errContains)
}
}
if ts.checkReference != nil {
cr, err := handler.ContainerReference()
as.Nil(err)
as.Equal(*ts.checkReference, cr)
}
}
}

View File

@ -0,0 +1,122 @@
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesos
import (
"fmt"
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/agent"
)
const (
cpus = "cpus"
schedulerSLA = "scheduler_sla"
framework = "framework"
source = "source"
revocable = "revocable"
nonRevocable = "non_revocable"
)
type mContainers *agent.Response_GetContainers
type mContainer = agent.Response_GetContainers_Container
type (
state struct {
st *agent.Response_GetState
}
)
// GetFramework finds a framework with the given id and returns nil if not found. Note that
// this is different from the framework name.
func (s *state) GetFramework(id string) (*mesos.FrameworkInfo, error) {
for _, fw := range s.st.GetFrameworks.Frameworks {
if fw.FrameworkInfo.ID.Value == id {
return &fw.FrameworkInfo, nil
}
}
return nil, fmt.Errorf("unable to find framework id %s", id)
}
// GetExecutor finds an executor with the given ID and returns nil if not found. Note that
// this is different from the executor name.
func (s *state) GetExecutor(id string) (*mesos.ExecutorInfo, error) {
for _, exec := range s.st.GetExecutors.Executors {
if exec.ExecutorInfo.ExecutorID.Value == id {
return &exec.ExecutorInfo, nil
}
}
return nil, fmt.Errorf("unable to find executor with id %s", id)
}
// GetTask returns a task launched by given executor.
func (s *state) GetTask(exID string) (*mesos.Task, error) {
for _, t := range s.st.GetTasks.LaunchedTasks {
if t.ExecutorID.Value == exID {
return &t, nil
}
}
return nil, fmt.Errorf("unable to find task matching executor id %s", exID)
}
func (s *state) fetchLabelsFromTask(exID string, labels map[string]string) error {
t, err := s.GetTask(exID)
if err != nil {
return err
}
// Identify revocability. Can be removed once we have a proper label
for _, resource := range t.Resources {
if resource.Name == cpus {
if resource.Revocable != nil {
labels[schedulerSLA] = revocable
} else {
labels[schedulerSLA] = nonRevocable
}
break
}
}
for _, l := range t.Labels.Labels {
labels[l.Key] = *l.Value
}
return nil
}
func (s *state) FetchLabels(fwID string, exID string) (map[string]string, error) {
labels := make(map[string]string)
// Look for the framework which launched the container.
fw, err := s.GetFramework(fwID)
if err != nil {
return labels, fmt.Errorf("framework ID %q not found: %v", fwID, err)
}
labels[framework] = fw.Name
// Get the executor info of the container which contains all the task info.
exec, err := s.GetExecutor(exID)
if err != nil {
return labels, fmt.Errorf("executor ID %q not found: %v", exID, err)
}
labels[source] = *exec.Source
err = s.fetchLabelsFromTask(exID, labels)
if err != nil {
return labels, fmt.Errorf("failed to fetch labels from task with executor ID %s", exID)
}
return labels, nil
}

View File

@ -0,0 +1,129 @@
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesos
import (
"fmt"
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/agent"
"github.com/stretchr/testify/assert"
"testing"
)
func PopulateFrameworks(fwID string) *agent.Response_GetFrameworks {
fws := &agent.Response_GetFrameworks{}
fws.Frameworks = make([]agent.Response_GetFrameworks_Framework, 1)
fw := &agent.Response_GetFrameworks_Framework{}
fw.FrameworkInfo = mesos.FrameworkInfo{
ID: &mesos.FrameworkID{Value: fwID},
Name: "TestFramework",
}
fws.Frameworks[0] = *fw
return fws
}
func PopulateExecutors(exID string) *agent.Response_GetExecutors {
execs := &agent.Response_GetExecutors{}
execs.Executors = make([]agent.Response_GetExecutors_Executor, 1)
exec := &agent.Response_GetExecutors_Executor{}
source := "source1"
exec.ExecutorInfo = mesos.ExecutorInfo{
ExecutorID: mesos.ExecutorID{Value: exID},
Source: &source,
}
execs.Executors[0] = *exec
return execs
}
func PopulateTasks() *agent.Response_GetTasks {
tasks := &agent.Response_GetTasks{}
tasks.LaunchedTasks = make([]mesos.Task, 1)
task := mesos.Task{
ExecutorID: &mesos.ExecutorID{Value: "exec-id1"},
}
task.Resources = make([]mesos.Resource, 1)
resource := mesos.Resource{
Name: cpus,
Revocable: nil,
}
task.Resources[0] = resource
task.Labels = &mesos.Labels{
Labels: make([]mesos.Label, 1),
}
labelValue := "value1"
label := mesos.Label{
Key: "key1",
Value: &labelValue,
}
task.Labels.Labels[0] = label
tasks.LaunchedTasks[0] = task
return tasks
}
func TestFetchLabels(t *testing.T) {
type testCase struct {
frameworkID string
executorID string
agentState *agent.Response_GetState
expectedError error
expectedLabels map[string]string
}
for _, ts := range []testCase{
{
frameworkID: "fw-id1",
executorID: "exec-id1",
agentState: &agent.Response_GetState{
GetFrameworks: PopulateFrameworks("fw-id1"),
GetExecutors: PopulateExecutors("exec-id1"),
GetTasks: PopulateTasks(),
},
expectedError: nil,
expectedLabels: map[string]string{
framework: "TestFramework",
source: "source1",
schedulerSLA: nonRevocable,
"key1": "value1",
},
},
{
frameworkID: "fw-id2",
executorID: "exec-id1",
agentState: &agent.Response_GetState{
GetFrameworks: PopulateFrameworks("fw-id1"),
GetExecutors: PopulateExecutors("exec-id1"),
GetTasks: PopulateTasks(),
},
expectedError: fmt.Errorf("framework ID \"fw-id2\" not found: unable to find framework id fw-id2"),
expectedLabels: map[string]string{},
},
} {
var s state
s.st = ts.agentState
actualLabels, err := s.FetchLabels(ts.frameworkID, ts.executorID)
if ts.expectedError == nil {
assert.Nil(t, err)
} else {
assert.Equal(t, ts.expectedError.Error(), err.Error())
}
assert.Equal(t, ts.expectedLabels, actualLabels)
}
}

View File

@ -33,6 +33,7 @@ import (
"github.com/google/cadvisor/container/containerd"
"github.com/google/cadvisor/container/crio"
"github.com/google/cadvisor/container/docker"
"github.com/google/cadvisor/container/mesos"
"github.com/google/cadvisor/container/raw"
"github.com/google/cadvisor/container/rkt"
"github.com/google/cadvisor/container/systemd"
@ -321,6 +322,11 @@ func (self *manager) Start() error {
glog.V(5).Infof("Registration of the crio container factory failed: %v", err)
}
err = mesos.Register(self, self.fsInfo, self.ignoreMetrics)
if err != nil {
glog.V(5).Infof("Registration of the mesos container factory failed: %v", err)
}
err = systemd.Register(self, self.fsInfo, self.ignoreMetrics)
if err != nil {
glog.V(5).Infof("Registration of the systemd container factory failed: %v", err)