From ed0e3f0f432780b2324e1b349cdd4f97e53961ea Mon Sep 17 00:00:00 2001 From: Sashank Appireddy Date: Tue, 29 May 2018 17:08:07 -0700 Subject: [PATCH] Add mesos containerizer support This commit includes support for collecting container stats launched by mesos containerizer. --- container/container.go | 1 + container/mesos/client.go | 162 +++++++++++++++++++++ container/mesos/client_test.go | 40 ++++++ container/mesos/factory.go | 153 ++++++++++++++++++++ container/mesos/factory_test.go | 85 +++++++++++ container/mesos/handler.go | 210 ++++++++++++++++++++++++++++ container/mesos/handler_test.go | 111 +++++++++++++++ container/mesos/mesos_agent.go | 122 ++++++++++++++++ container/mesos/mesos_agent_test.go | 129 +++++++++++++++++ manager/manager.go | 6 + 10 files changed, 1019 insertions(+) create mode 100644 container/mesos/client.go create mode 100644 container/mesos/client_test.go create mode 100644 container/mesos/factory.go create mode 100644 container/mesos/factory_test.go create mode 100644 container/mesos/handler.go create mode 100644 container/mesos/handler_test.go create mode 100644 container/mesos/mesos_agent.go create mode 100644 container/mesos/mesos_agent_test.go diff --git a/container/container.go b/container/container.go index 45504524..37c0a7d2 100644 --- a/container/container.go +++ b/container/container.go @@ -36,6 +36,7 @@ const ( ContainerTypeSystemd ContainerTypeCrio ContainerTypeContainerd + ContainerTypeMesos ) // Interface for container operation handlers. diff --git a/container/mesos/client.go b/container/mesos/client.go new file mode 100644 index 00000000..8b73a38d --- /dev/null +++ b/container/mesos/client.go @@ -0,0 +1,162 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mesos + +import ( + "fmt" + "github.com/Rican7/retry" + "github.com/Rican7/retry/strategy" + "github.com/mesos/mesos-go/api/v1/lib" + "github.com/mesos/mesos-go/api/v1/lib/agent" + "github.com/mesos/mesos-go/api/v1/lib/agent/calls" + mclient "github.com/mesos/mesos-go/api/v1/lib/client" + "github.com/mesos/mesos-go/api/v1/lib/encoding/codecs" + "github.com/mesos/mesos-go/api/v1/lib/httpcli" + "net/url" + "sync" +) + +const ( + maxRetryAttempts = 3 +) + +var ( + mesosClientOnce sync.Once + mesosClient *client +) + +type client struct { + hc *httpcli.Client +} + +type mesosAgentClient interface { + ContainerInfo(id string) (*containerInfo, error) +} + +type containerInfo struct { + cntr *mContainer + labels map[string]string +} + +// Client is an interface to query mesos agent http endpoints +func Client() (mesosAgentClient, error) { + mesosClientOnce.Do(func() { + // Start Client + apiURL := url.URL{ + Scheme: "http", + Host: *MesosAgentAddress, + Path: "/api/v1", + } + + mesosClient = &client{ + hc: httpcli.New( + httpcli.Endpoint(apiURL.String()), + httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]), + httpcli.Do(httpcli.With(httpcli.Timeout(*MesosAgentTimeout))), + ), + } + }) + return mesosClient, nil +} + +// ContainerInfo returns the container information of the given container id +func (self *client) ContainerInfo(id string) (*containerInfo, error) { + c, err := self.getContainer(id) + if err != nil { + return nil, err + } + + // Get labels of the container + l, err := self.getLabels(c) + if err != nil { + return nil, err + } + + return &containerInfo{ + cntr: c, + labels: l, + }, nil +} + +func (self *client) getContainer(id string) (*mContainer, error) { + // Get all containers + cntrs, err := self.getContainers() + if err != nil { + return nil, err + } + + // Check if there is a container with given id and return the container + for _, c := range cntrs.Containers { + if c.ContainerID.Value == id { + return &c, nil + } + } + return nil, fmt.Errorf("can't locate container %s", id) +} + +func (self *client) getContainers() (mContainers, error) { + req := calls.NonStreaming(calls.GetContainers()) + result, err := self.fetchAndDecode(req) + if err != nil { + return nil, fmt.Errorf("failed to get mesos containers: %v", err) + } + cntrs := result.GetContainers + return cntrs, nil +} + +func (self *client) getLabels(c *mContainer) (map[string]string, error) { + // Get mesos agent state which contains all containers labels + var s state + req := calls.NonStreaming(calls.GetState()) + result, err := self.fetchAndDecode(req) + if err != nil { + return map[string]string{}, fmt.Errorf("failed to get mesos agent state: %v", err) + } + s.st = result.GetState + + // Fetch labels from state object + labels, err := s.FetchLabels(c.FrameworkID.Value, c.ExecutorID.Value) + if err != nil { + return labels, fmt.Errorf("error while fetching labels from executor: %v", err) + } + + return labels, nil +} + +func (self *client) fetchAndDecode(req calls.RequestFunc) (*agent.Response, error) { + var res mesos.Response + var err error + + // Send request + err = retry.Retry( + func(attempt uint) error { + res, err = mesosClient.hc.Send(req, mclient.ResponseClassSingleton, nil) + return err + }, + strategy.Limit(maxRetryAttempts), + ) + if err != nil { + return nil, fmt.Errorf("error fetching %s: %s", req.Call(), err) + } + + // Decode the result + var target agent.Response + err = res.Decode(&target) + if err != nil { + return nil, fmt.Errorf("error while decoding response body from %s: %s", res, err) + } + + return &target, nil +} diff --git a/container/mesos/client_test.go b/container/mesos/client_test.go new file mode 100644 index 00000000..e9edf0aa --- /dev/null +++ b/container/mesos/client_test.go @@ -0,0 +1,40 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mesos + +import "fmt" + +type FakeMesosAgentClient struct { + cntrInfo map[string]*containerInfo + err error +} + +func (c *FakeMesosAgentClient) ContainerInfo(id string) (*containerInfo, error) { + if c.err != nil { + return nil, c.err + } + cInfo, ok := c.cntrInfo[id] + if !ok { + return nil, fmt.Errorf("can't locate container %s", id) + } + return cInfo, nil +} + +func fakeMesosAgentClient(cntrInfo map[string]*containerInfo, err error) mesosAgentClient { + return &FakeMesosAgentClient{ + err: err, + cntrInfo: cntrInfo, + } +} diff --git a/container/mesos/factory.go b/container/mesos/factory.go new file mode 100644 index 00000000..2a882b4d --- /dev/null +++ b/container/mesos/factory.go @@ -0,0 +1,153 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mesos + +import ( + "flag" + "fmt" + "path" + "regexp" + "strings" + "time" + + "github.com/golang/glog" + "github.com/google/cadvisor/container" + "github.com/google/cadvisor/container/libcontainer" + "github.com/google/cadvisor/fs" + info "github.com/google/cadvisor/info/v1" + "github.com/google/cadvisor/manager/watcher" +) + +var MesosAgentAddress = flag.String("mesos_agent", "127.0.0.1:5051", "Mesos agent address") +var MesosAgentTimeout = flag.Duration("mesos_agent_timeout", 10*time.Second, "Mesos agent timeout") + +// The namespace under which mesos aliases are unique. +const MesosNamespace = "mesos" + +// Regexp that identifies mesos cgroups, containers started with +// --cgroup-parent have another prefix than 'mesos' +var mesosCgroupRegexp = regexp.MustCompile(`([a-z-0-9]{36})`) + +// mesosFactory implements the interface ContainerHandlerFactory +type mesosFactory struct { + machineInfoFactory info.MachineInfoFactory + + // Information about the cgroup subsystems. + cgroupSubsystems libcontainer.CgroupSubsystems + + // Information about mounted filesystems. + fsInfo fs.FsInfo + + ignoreMetrics map[container.MetricKind]struct{} + + client mesosAgentClient +} + +func (self *mesosFactory) String() string { + return MesosNamespace +} + +func (self *mesosFactory) NewContainerHandler(name string, inHostNamespace bool) (container.ContainerHandler, error) { + client, err := Client() + if err != nil { + return nil, err + } + + return newMesosContainerHandler( + name, + &self.cgroupSubsystems, + self.machineInfoFactory, + self.fsInfo, + self.ignoreMetrics, + inHostNamespace, + client, + ) +} + +// ContainerNameToMesosId returns the Mesos ID from the full container name. +func ContainerNameToMesosId(name string) string { + id := path.Base(name) + + if matches := mesosCgroupRegexp.FindStringSubmatch(id); matches != nil { + return matches[1] + } + + return id +} + +// isContainerName returns true if the cgroup with associated name +// corresponds to a mesos container. +func isContainerName(name string) bool { + // always ignore .mount cgroup even if associated with mesos and delegate to systemd + if strings.HasSuffix(name, ".mount") { + return false + } + return mesosCgroupRegexp.MatchString(path.Base(name)) +} + +// The mesos factory can handle any container. +func (self *mesosFactory) CanHandleAndAccept(name string) (handle bool, accept bool, err error) { + // if the container is not associated with mesos, we can't handle it or accept it. + if !isContainerName(name) { + return false, false, nil + } + + // Check if the container is known to mesos and it is active. + id := ContainerNameToMesosId(name) + + c, err := self.client.ContainerInfo(id) + if err != nil { + return false, true, fmt.Errorf("error getting running container: %v", err) + } + + pid := int(*c.cntr.ContainerStatus.ExecutorPID) + if pid <= 0 { + return false, true, fmt.Errorf("mesos container pid: %d is invalid", pid) + } + + return true, true, nil +} + +func (self *mesosFactory) DebugInfo() map[string][]string { + return map[string][]string{} +} + +func Register( + machineInfoFactory info.MachineInfoFactory, + fsInfo fs.FsInfo, + ignoreMetrics container.MetricSet, +) error { + client, err := Client() + + if err != nil { + return fmt.Errorf("unable to create mesos agent client: %v", err) + } + + cgroupSubsystems, err := libcontainer.GetCgroupSubsystems() + if err != nil { + return fmt.Errorf("failed to get cgroup subsystems: %v", err) + } + + glog.V(1).Infof("Registering mesos factory") + factory := &mesosFactory{ + machineInfoFactory: machineInfoFactory, + cgroupSubsystems: cgroupSubsystems, + fsInfo: fsInfo, + ignoreMetrics: ignoreMetrics, + client: client, + } + container.RegisterContainerHandlerFactory(factory, []watcher.ContainerWatchSource{watcher.Raw}) + return nil +} diff --git a/container/mesos/factory_test.go b/container/mesos/factory_test.go new file mode 100644 index 00000000..f9953f51 --- /dev/null +++ b/container/mesos/factory_test.go @@ -0,0 +1,85 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mesos + +import ( + containerlibcontainer "github.com/google/cadvisor/container/libcontainer" + "github.com/mesos/mesos-go/api/v1/lib" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestIsContainerName(t *testing.T) { + tests := []struct { + name string + expected bool + }{ + { + name: "/system.slice/var-lib-mesos-provisioner-containers-04e20821-d67d3-4bf7-96b4-7d4495f50b28-backends-overlay-rootfses-6d97be39-7359-4bb7-a46b-e55c6771da81.mount", + expected: false, + }, + { + name: "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28", + expected: true, + }, + } + for _, test := range tests { + if actual := isContainerName(test.name); actual != test.expected { + t.Errorf("%s: expected: %v, actual: %v", test.name, test.expected, actual) + } + } +} +func TestCanHandleAndAccept(t *testing.T) { + as := assert.New(t) + testContainers := make(map[string]*containerInfo) + var pid uint32 = 123 + testContainer := &containerInfo{ + cntr: &mContainer{ + ContainerStatus: &mesos.ContainerStatus{ + ExecutorPID: &pid, + }, + }, + } + + testContainers["04e20821-67d3-4bf7-96b4-7d4495f50b28"] = testContainer + + f := &mesosFactory{ + machineInfoFactory: nil, + cgroupSubsystems: containerlibcontainer.CgroupSubsystems{}, + fsInfo: nil, + ignoreMetrics: nil, + client: fakeMesosAgentClient(testContainers, nil), + } + tests := []struct { + name string + expected bool + }{ + { + name: "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28", + expected: true, + }, + { + name: "/system.slice/var-lib-mesos-provisioner-containers-04e20821-d67d3-4bf7-96b4-7d4495f50b28-backends-overlay-rootfses-6d97be39-7359-4bb7-a46b-e55c6771da81.mount", + expected: false, + }, + } + + for _, test := range tests { + b1, b2, err := f.CanHandleAndAccept(test.name) + as.Nil(err) + as.Equal(b1, test.expected) + as.Equal(b2, test.expected) + } +} diff --git a/container/mesos/handler.go b/container/mesos/handler.go new file mode 100644 index 00000000..1ede4bf4 --- /dev/null +++ b/container/mesos/handler.go @@ -0,0 +1,210 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Handler for "mesos" containers. +package mesos + +import ( + "fmt" + "path" + + "github.com/google/cadvisor/container" + "github.com/google/cadvisor/container/common" + containerlibcontainer "github.com/google/cadvisor/container/libcontainer" + "github.com/google/cadvisor/fs" + info "github.com/google/cadvisor/info/v1" + + cgroupfs "github.com/opencontainers/runc/libcontainer/cgroups/fs" + libcontainerconfigs "github.com/opencontainers/runc/libcontainer/configs" +) + +type mesosContainerHandler struct { + // Name of the container for this handler. + name string + + // machineInfoFactory provides info.MachineInfo + machineInfoFactory info.MachineInfoFactory + + // Absolute path to the cgroup hierarchies of this container. + // (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test") + cgroupPaths map[string]string + + // File System Info + fsInfo fs.FsInfo + + // Metrics to be ignored. + ignoreMetrics container.MetricSet + + labels map[string]string + + // Reference to the container + reference info.ContainerReference + + libcontainerHandler *containerlibcontainer.Handler +} + +func isRootCgroup(name string) bool { + return name == "/" +} + +func newMesosContainerHandler( + name string, + cgroupSubsystems *containerlibcontainer.CgroupSubsystems, + machineInfoFactory info.MachineInfoFactory, + fsInfo fs.FsInfo, + ignoreMetrics container.MetricSet, + inHostNamespace bool, + client mesosAgentClient, +) (container.ContainerHandler, error) { + cgroupPaths := common.MakeCgroupPaths(cgroupSubsystems.MountPoints, name) + for key, val := range cgroupSubsystems.MountPoints { + cgroupPaths[key] = path.Join(val, name) + } + + // Generate the equivalent cgroup manager for this container. + cgroupManager := &cgroupfs.Manager{ + Cgroups: &libcontainerconfigs.Cgroup{ + Name: name, + }, + Paths: cgroupPaths, + } + + rootFs := "/" + if !inHostNamespace { + rootFs = "/rootfs" + } + + id := ContainerNameToMesosId(name) + + cinfo, err := client.ContainerInfo(id) + + if err != nil { + return nil, err + } + + labels := cinfo.labels + pid := int(*cinfo.cntr.ContainerStatus.ExecutorPID) + + libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootFs, pid, ignoreMetrics) + + reference := info.ContainerReference{ + Id: id, + Name: name, + Namespace: MesosNamespace, + Aliases: []string{id, name}, + } + + handler := &mesosContainerHandler{ + name: name, + machineInfoFactory: machineInfoFactory, + cgroupPaths: cgroupPaths, + fsInfo: fsInfo, + ignoreMetrics: ignoreMetrics, + labels: labels, + reference: reference, + libcontainerHandler: libcontainerHandler, + } + + return handler, nil +} + +func (self *mesosContainerHandler) ContainerReference() (info.ContainerReference, error) { + // We only know the container by its one name. + return self.reference, nil +} + +// Nothing to start up. +func (self *mesosContainerHandler) Start() {} + +// Nothing to clean up. +func (self *mesosContainerHandler) Cleanup() {} + +func (self *mesosContainerHandler) GetSpec() (info.ContainerSpec, error) { + // TODO: Since we dont collect disk usage and network stats for mesos containers, we set + // hasFilesystem and hasNetwork to false. Revisit when we support disk usage, network + // stats for mesos containers. + hasNetwork := false + hasFilesystem := false + + spec, err := common.GetSpec(self.cgroupPaths, self.machineInfoFactory, hasNetwork, hasFilesystem) + if err != nil { + return spec, err + } + + spec.Labels = self.labels + + return spec, nil +} + +func (self *mesosContainerHandler) getFsStats(stats *info.ContainerStats) error { + + mi, err := self.machineInfoFactory.GetMachineInfo() + if err != nil { + return err + } + + if !self.ignoreMetrics.Has(container.DiskIOMetrics) { + common.AssignDeviceNamesToDiskStats((*common.MachineInfoNamer)(mi), &stats.DiskIo) + } + + return nil +} + +func (self *mesosContainerHandler) GetStats() (*info.ContainerStats, error) { + stats, err := self.libcontainerHandler.GetStats() + if err != nil { + return stats, err + } + + // Get filesystem stats. + err = self.getFsStats(stats) + if err != nil { + return stats, err + } + + return stats, nil +} + +func (self *mesosContainerHandler) GetCgroupPath(resource string) (string, error) { + path, ok := self.cgroupPaths[resource] + if !ok { + return "", fmt.Errorf("could not find path for resource %q for container %q\n", resource, self.name) + } + return path, nil +} + +func (self *mesosContainerHandler) GetContainerLabels() map[string]string { + return self.labels +} + +func (self *mesosContainerHandler) GetContainerIPAddress() string { + // the IP address for the mesos container corresponds to the system ip address. + return "127.0.0.1" +} + +func (self *mesosContainerHandler) ListContainers(listType container.ListType) ([]info.ContainerReference, error) { + return common.ListContainers(self.name, self.cgroupPaths, listType) +} + +func (self *mesosContainerHandler) ListProcesses(listType container.ListType) ([]int, error) { + return self.libcontainerHandler.GetProcesses() +} + +func (self *mesosContainerHandler) Exists() bool { + return common.CgroupExists(self.cgroupPaths) +} + +func (self *mesosContainerHandler) Type() container.ContainerType { + return container.ContainerTypeMesos +} diff --git a/container/mesos/handler_test.go b/container/mesos/handler_test.go new file mode 100644 index 00000000..d79d6186 --- /dev/null +++ b/container/mesos/handler_test.go @@ -0,0 +1,111 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mesos + +import ( + "fmt" + "testing" + + "github.com/google/cadvisor/container" + containerlibcontainer "github.com/google/cadvisor/container/libcontainer" + "github.com/google/cadvisor/fs" + info "github.com/google/cadvisor/info/v1" + "github.com/mesos/mesos-go/api/v1/lib" + "github.com/stretchr/testify/assert" +) + +func PopulateContainer() *mContainer { + var pid uint32 = 123 + cntr := &mContainer{ + ContainerStatus: &mesos.ContainerStatus{ExecutorPID: &pid}, + } + return cntr +} + +func TestContainerReference(t *testing.T) { + as := assert.New(t) + type testCase struct { + client mesosAgentClient + name string + machineInfoFactory info.MachineInfoFactory + fsInfo fs.FsInfo + cgroupSubsystems *containerlibcontainer.CgroupSubsystems + inHostNamespace bool + ignoreMetrics container.MetricSet + + hasErr bool + errContains string + checkReference *info.ContainerReference + } + for _, ts := range []testCase{ + { + fakeMesosAgentClient(nil, fmt.Errorf("no client returned")), + "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28", + nil, + nil, + &containerlibcontainer.CgroupSubsystems{}, + false, + nil, + + true, + "no client returned", + nil, + }, + { + fakeMesosAgentClient(nil, nil), + "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28", + nil, + nil, + &containerlibcontainer.CgroupSubsystems{}, + false, + nil, + + true, + "can't locate container 04e20821-67d3-4bf7-96b4-7d4495f50b28", + nil, + }, + { + fakeMesosAgentClient(map[string]*containerInfo{"04e20821-67d3-4bf7-96b4-7d4495f50b28": {cntr: PopulateContainer()}}, nil), + "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28", + nil, + nil, + &containerlibcontainer.CgroupSubsystems{}, + false, + nil, + + false, + "", + &info.ContainerReference{ + Id: "04e20821-67d3-4bf7-96b4-7d4495f50b28", + Name: "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28", + Aliases: []string{"04e20821-67d3-4bf7-96b4-7d4495f50b28", "/mesos/04e20821-67d3-4bf7-96b4-7d4495f50b28"}, + Namespace: MesosNamespace, + }, + }, + } { + handler, err := newMesosContainerHandler(ts.name, ts.cgroupSubsystems, ts.machineInfoFactory, ts.fsInfo, ts.ignoreMetrics, ts.inHostNamespace, ts.client) + if ts.hasErr { + as.NotNil(err) + if ts.errContains != "" { + as.Contains(err.Error(), ts.errContains) + } + } + if ts.checkReference != nil { + cr, err := handler.ContainerReference() + as.Nil(err) + as.Equal(*ts.checkReference, cr) + } + } +} diff --git a/container/mesos/mesos_agent.go b/container/mesos/mesos_agent.go new file mode 100644 index 00000000..48ed5f8c --- /dev/null +++ b/container/mesos/mesos_agent.go @@ -0,0 +1,122 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mesos + +import ( + "fmt" + "github.com/mesos/mesos-go/api/v1/lib" + "github.com/mesos/mesos-go/api/v1/lib/agent" +) + +const ( + cpus = "cpus" + schedulerSLA = "scheduler_sla" + framework = "framework" + source = "source" + revocable = "revocable" + nonRevocable = "non_revocable" +) + +type mContainers *agent.Response_GetContainers +type mContainer = agent.Response_GetContainers_Container + +type ( + state struct { + st *agent.Response_GetState + } +) + +// GetFramework finds a framework with the given id and returns nil if not found. Note that +// this is different from the framework name. +func (s *state) GetFramework(id string) (*mesos.FrameworkInfo, error) { + for _, fw := range s.st.GetFrameworks.Frameworks { + if fw.FrameworkInfo.ID.Value == id { + return &fw.FrameworkInfo, nil + } + } + return nil, fmt.Errorf("unable to find framework id %s", id) +} + +// GetExecutor finds an executor with the given ID and returns nil if not found. Note that +// this is different from the executor name. +func (s *state) GetExecutor(id string) (*mesos.ExecutorInfo, error) { + for _, exec := range s.st.GetExecutors.Executors { + if exec.ExecutorInfo.ExecutorID.Value == id { + return &exec.ExecutorInfo, nil + } + } + return nil, fmt.Errorf("unable to find executor with id %s", id) +} + +// GetTask returns a task launched by given executor. +func (s *state) GetTask(exID string) (*mesos.Task, error) { + for _, t := range s.st.GetTasks.LaunchedTasks { + if t.ExecutorID.Value == exID { + return &t, nil + } + } + return nil, fmt.Errorf("unable to find task matching executor id %s", exID) +} + +func (s *state) fetchLabelsFromTask(exID string, labels map[string]string) error { + t, err := s.GetTask(exID) + if err != nil { + return err + } + + // Identify revocability. Can be removed once we have a proper label + for _, resource := range t.Resources { + if resource.Name == cpus { + if resource.Revocable != nil { + labels[schedulerSLA] = revocable + } else { + labels[schedulerSLA] = nonRevocable + } + break + } + } + + for _, l := range t.Labels.Labels { + labels[l.Key] = *l.Value + } + + return nil +} + +func (s *state) FetchLabels(fwID string, exID string) (map[string]string, error) { + labels := make(map[string]string) + + // Look for the framework which launched the container. + fw, err := s.GetFramework(fwID) + if err != nil { + return labels, fmt.Errorf("framework ID %q not found: %v", fwID, err) + } + labels[framework] = fw.Name + + // Get the executor info of the container which contains all the task info. + exec, err := s.GetExecutor(exID) + if err != nil { + return labels, fmt.Errorf("executor ID %q not found: %v", exID, err) + } + + labels[source] = *exec.Source + + err = s.fetchLabelsFromTask(exID, labels) + if err != nil { + return labels, fmt.Errorf("failed to fetch labels from task with executor ID %s", exID) + } + + return labels, nil +} diff --git a/container/mesos/mesos_agent_test.go b/container/mesos/mesos_agent_test.go new file mode 100644 index 00000000..8de4b7bf --- /dev/null +++ b/container/mesos/mesos_agent_test.go @@ -0,0 +1,129 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mesos + +import ( + "fmt" + "github.com/mesos/mesos-go/api/v1/lib" + "github.com/mesos/mesos-go/api/v1/lib/agent" + "github.com/stretchr/testify/assert" + "testing" +) + +func PopulateFrameworks(fwID string) *agent.Response_GetFrameworks { + fws := &agent.Response_GetFrameworks{} + fws.Frameworks = make([]agent.Response_GetFrameworks_Framework, 1) + fw := &agent.Response_GetFrameworks_Framework{} + fw.FrameworkInfo = mesos.FrameworkInfo{ + ID: &mesos.FrameworkID{Value: fwID}, + Name: "TestFramework", + } + fws.Frameworks[0] = *fw + return fws +} + +func PopulateExecutors(exID string) *agent.Response_GetExecutors { + execs := &agent.Response_GetExecutors{} + execs.Executors = make([]agent.Response_GetExecutors_Executor, 1) + exec := &agent.Response_GetExecutors_Executor{} + source := "source1" + exec.ExecutorInfo = mesos.ExecutorInfo{ + ExecutorID: mesos.ExecutorID{Value: exID}, + Source: &source, + } + execs.Executors[0] = *exec + return execs +} + +func PopulateTasks() *agent.Response_GetTasks { + tasks := &agent.Response_GetTasks{} + tasks.LaunchedTasks = make([]mesos.Task, 1) + + task := mesos.Task{ + ExecutorID: &mesos.ExecutorID{Value: "exec-id1"}, + } + + task.Resources = make([]mesos.Resource, 1) + resource := mesos.Resource{ + Name: cpus, + Revocable: nil, + } + task.Resources[0] = resource + + task.Labels = &mesos.Labels{ + Labels: make([]mesos.Label, 1), + } + labelValue := "value1" + label := mesos.Label{ + Key: "key1", + Value: &labelValue, + } + task.Labels.Labels[0] = label + + tasks.LaunchedTasks[0] = task + return tasks +} + +func TestFetchLabels(t *testing.T) { + type testCase struct { + frameworkID string + executorID string + agentState *agent.Response_GetState + expectedError error + expectedLabels map[string]string + } + + for _, ts := range []testCase{ + { + frameworkID: "fw-id1", + executorID: "exec-id1", + agentState: &agent.Response_GetState{ + GetFrameworks: PopulateFrameworks("fw-id1"), + GetExecutors: PopulateExecutors("exec-id1"), + GetTasks: PopulateTasks(), + }, + expectedError: nil, + expectedLabels: map[string]string{ + framework: "TestFramework", + source: "source1", + schedulerSLA: nonRevocable, + "key1": "value1", + }, + }, + { + frameworkID: "fw-id2", + executorID: "exec-id1", + agentState: &agent.Response_GetState{ + GetFrameworks: PopulateFrameworks("fw-id1"), + GetExecutors: PopulateExecutors("exec-id1"), + GetTasks: PopulateTasks(), + }, + expectedError: fmt.Errorf("framework ID \"fw-id2\" not found: unable to find framework id fw-id2"), + expectedLabels: map[string]string{}, + }, + } { + + var s state + s.st = ts.agentState + + actualLabels, err := s.FetchLabels(ts.frameworkID, ts.executorID) + if ts.expectedError == nil { + assert.Nil(t, err) + } else { + assert.Equal(t, ts.expectedError.Error(), err.Error()) + } + assert.Equal(t, ts.expectedLabels, actualLabels) + } +} diff --git a/manager/manager.go b/manager/manager.go index 68c339cb..7e665be9 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -33,6 +33,7 @@ import ( "github.com/google/cadvisor/container/containerd" "github.com/google/cadvisor/container/crio" "github.com/google/cadvisor/container/docker" + "github.com/google/cadvisor/container/mesos" "github.com/google/cadvisor/container/raw" "github.com/google/cadvisor/container/rkt" "github.com/google/cadvisor/container/systemd" @@ -321,6 +322,11 @@ func (self *manager) Start() error { glog.V(5).Infof("Registration of the crio container factory failed: %v", err) } + err = mesos.Register(self, self.fsInfo, self.ignoreMetrics) + if err != nil { + glog.V(5).Infof("Registration of the mesos container factory failed: %v", err) + } + err = systemd.Register(self, self.fsInfo, self.ignoreMetrics) if err != nil { glog.V(5).Infof("Registration of the systemd container factory failed: %v", err)