From 2c96ceb4788bfc5354db66ee149b6cefd9f018e8 Mon Sep 17 00:00:00 2001 From: Sashank Appireddy Date: Fri, 27 Jul 2018 03:30:46 +0000 Subject: [PATCH] Ensure that labels for mesos tasks launched via MesosCommandExecutor are collected as well --- container/mesos/client.go | 28 ++++++++++++++++++++++++++++ container/mesos/client_test.go | 17 +++++++++++++++++ container/mesos/factory.go | 7 +------ container/mesos/handler.go | 5 ++++- container/mesos/mesos_agent.go | 27 ++++++++++++++++++++++++++- container/mesos/mesos_agent_test.go | 27 +++++++++++++++++++++++---- 6 files changed, 99 insertions(+), 12 deletions(-) diff --git a/container/mesos/client.go b/container/mesos/client.go index 8b73a38d..ce0c8ef1 100644 --- a/container/mesos/client.go +++ b/container/mesos/client.go @@ -30,6 +30,7 @@ import ( const ( maxRetryAttempts = 3 + invalidPID = -1 ) var ( @@ -43,6 +44,7 @@ type client struct { type mesosAgentClient interface { ContainerInfo(id string) (*containerInfo, error) + ContainerPid(id string) (int, error) } type containerInfo struct { @@ -90,6 +92,32 @@ func (self *client) ContainerInfo(id string) (*containerInfo, error) { }, nil } +// Get the Pid of the container +func (self *client) ContainerPid(id string) (int, error) { + var pid int + var err error + err = retry.Retry( + func(attempt uint) error { + c, err := self.ContainerInfo(id) + if err != nil { + return err + } + + if c.cntr.ContainerStatus != nil { + pid = int(*c.cntr.ContainerStatus.ExecutorPID) + } else { + err = fmt.Errorf("error fetching Pid") + } + return err + }, + strategy.Limit(maxRetryAttempts), + ) + if err != nil { + return invalidPID, fmt.Errorf("failed to fetch pid") + } + return pid, err +} + func (self *client) getContainer(id string) (*mContainer, error) { // Get all containers cntrs, err := self.getContainers() diff --git a/container/mesos/client_test.go b/container/mesos/client_test.go index e9edf0aa..70f9e787 100644 --- a/container/mesos/client_test.go +++ b/container/mesos/client_test.go @@ -32,6 +32,23 @@ func (c *FakeMesosAgentClient) ContainerInfo(id string) (*containerInfo, error) return cInfo, nil } +func (c *FakeMesosAgentClient) ContainerPid(id string) (int, error) { + if c.err != nil { + return invalidPID, c.err + } + cInfo, ok := c.cntrInfo[id] + if !ok { + return invalidPID, fmt.Errorf("can't locate container %s", id) + } + + if cInfo.cntr.ContainerStatus == nil { + return invalidPID, fmt.Errorf("error fetching Pid") + } + + pid := int(*cInfo.cntr.ContainerStatus.ExecutorPID) + return pid, nil +} + func fakeMesosAgentClient(cntrInfo map[string]*containerInfo, err error) mesosAgentClient { return &FakeMesosAgentClient{ err: err, diff --git a/container/mesos/factory.go b/container/mesos/factory.go index 3d5bf84b..66cdfe83 100644 --- a/container/mesos/factory.go +++ b/container/mesos/factory.go @@ -107,16 +107,11 @@ func (self *mesosFactory) CanHandleAndAccept(name string) (handle bool, accept b // Check if the container is known to mesos and it is active. id := ContainerNameToMesosId(name) - c, err := self.client.ContainerInfo(id) + _, err = self.client.ContainerInfo(id) if err != nil { return false, true, fmt.Errorf("error getting running container: %v", err) } - pid := int(*c.cntr.ContainerStatus.ExecutorPID) - if pid <= 0 { - return false, true, fmt.Errorf("mesos container pid: %d is invalid", pid) - } - return true, true, nil } diff --git a/container/mesos/handler.go b/container/mesos/handler.go index e51df92d..65d0b987 100644 --- a/container/mesos/handler.go +++ b/container/mesos/handler.go @@ -94,7 +94,10 @@ func newMesosContainerHandler( } labels := cinfo.labels - pid := int(*cinfo.cntr.ContainerStatus.ExecutorPID) + pid, err := client.ContainerPid(id) + if err != nil { + return nil, err + } libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootFs, pid, includedMetrics) diff --git a/container/mesos/mesos_agent.go b/container/mesos/mesos_agent.go index 48ed5f8c..20f921b9 100644 --- a/container/mesos/mesos_agent.go +++ b/container/mesos/mesos_agent.go @@ -62,14 +62,39 @@ func (s *state) GetExecutor(id string) (*mesos.ExecutorInfo, error) { // GetTask returns a task launched by given executor. func (s *state) GetTask(exID string) (*mesos.Task, error) { + // Check if task is in Launched Tasks list for _, t := range s.st.GetTasks.LaunchedTasks { - if t.ExecutorID.Value == exID { + if s.isMatchingTask(&t, exID) { + return &t, nil + } + } + + // Check if task is in Queued Tasks list + for _, t := range s.st.GetTasks.QueuedTasks { + if s.isMatchingTask(&t, exID) { return &t, nil } } return nil, fmt.Errorf("unable to find task matching executor id %s", exID) } +func (s *state) isMatchingTask(t *mesos.Task, exID string) bool { + // MESOS-9111: For tasks launched through mesos command/default executor, the + // executorID(which is same as the taskID) field is not filled in the TaskInfo object. + // The workaround is compare with taskID field if executorID is empty + if t.ExecutorID != nil { + if t.ExecutorID.Value == exID { + return true + } + } else { + if t.TaskID.Value == exID { + return true + } + } + + return false +} + func (s *state) fetchLabelsFromTask(exID string, labels map[string]string) error { t, err := s.GetTask(exID) if err != nil { diff --git a/container/mesos/mesos_agent_test.go b/container/mesos/mesos_agent_test.go index 8de4b7bf..ce085111 100644 --- a/container/mesos/mesos_agent_test.go +++ b/container/mesos/mesos_agent_test.go @@ -47,12 +47,15 @@ func PopulateExecutors(exID string) *agent.Response_GetExecutors { return execs } -func PopulateTasks() *agent.Response_GetTasks { +func PopulateTasks(taskID string, exID string) *agent.Response_GetTasks { tasks := &agent.Response_GetTasks{} tasks.LaunchedTasks = make([]mesos.Task, 1) task := mesos.Task{ - ExecutorID: &mesos.ExecutorID{Value: "exec-id1"}, + TaskID: mesos.TaskID{Value: taskID}, + } + if len(exID) > 0 { + task.ExecutorID = &mesos.ExecutorID{Value: exID} } task.Resources = make([]mesos.Resource, 1) @@ -92,7 +95,23 @@ func TestFetchLabels(t *testing.T) { agentState: &agent.Response_GetState{ GetFrameworks: PopulateFrameworks("fw-id1"), GetExecutors: PopulateExecutors("exec-id1"), - GetTasks: PopulateTasks(), + GetTasks: PopulateTasks("task-id1", "exec-id1"), + }, + expectedError: nil, + expectedLabels: map[string]string{ + framework: "TestFramework", + source: "source1", + schedulerSLA: nonRevocable, + "key1": "value1", + }, + }, + { + frameworkID: "fw-id1", + executorID: "task-id1", + agentState: &agent.Response_GetState{ + GetFrameworks: PopulateFrameworks("fw-id1"), + GetExecutors: PopulateExecutors("task-id1"), + GetTasks: PopulateTasks("task-id1", ""), }, expectedError: nil, expectedLabels: map[string]string{ @@ -108,7 +127,7 @@ func TestFetchLabels(t *testing.T) { agentState: &agent.Response_GetState{ GetFrameworks: PopulateFrameworks("fw-id1"), GetExecutors: PopulateExecutors("exec-id1"), - GetTasks: PopulateTasks(), + GetTasks: PopulateTasks("task-id1", "exec-id1"), }, expectedError: fmt.Errorf("framework ID \"fw-id2\" not found: unable to find framework id fw-id2"), expectedLabels: map[string]string{},