Ensure that labels for mesos tasks launched via MesosCommandExecutor

are collected as well
This commit is contained in:
Sashank Appireddy 2018-07-27 03:30:46 +00:00
parent a390d2ef0c
commit 2c96ceb478
6 changed files with 99 additions and 12 deletions

View File

@ -30,6 +30,7 @@ import (
const ( const (
maxRetryAttempts = 3 maxRetryAttempts = 3
invalidPID = -1
) )
var ( var (
@ -43,6 +44,7 @@ type client struct {
type mesosAgentClient interface { type mesosAgentClient interface {
ContainerInfo(id string) (*containerInfo, error) ContainerInfo(id string) (*containerInfo, error)
ContainerPid(id string) (int, error)
} }
type containerInfo struct { type containerInfo struct {
@ -90,6 +92,32 @@ func (self *client) ContainerInfo(id string) (*containerInfo, error) {
}, nil }, nil
} }
// Get the Pid of the container
func (self *client) ContainerPid(id string) (int, error) {
var pid int
var err error
err = retry.Retry(
func(attempt uint) error {
c, err := self.ContainerInfo(id)
if err != nil {
return err
}
if c.cntr.ContainerStatus != nil {
pid = int(*c.cntr.ContainerStatus.ExecutorPID)
} else {
err = fmt.Errorf("error fetching Pid")
}
return err
},
strategy.Limit(maxRetryAttempts),
)
if err != nil {
return invalidPID, fmt.Errorf("failed to fetch pid")
}
return pid, err
}
func (self *client) getContainer(id string) (*mContainer, error) { func (self *client) getContainer(id string) (*mContainer, error) {
// Get all containers // Get all containers
cntrs, err := self.getContainers() cntrs, err := self.getContainers()

View File

@ -32,6 +32,23 @@ func (c *FakeMesosAgentClient) ContainerInfo(id string) (*containerInfo, error)
return cInfo, nil return cInfo, nil
} }
func (c *FakeMesosAgentClient) ContainerPid(id string) (int, error) {
if c.err != nil {
return invalidPID, c.err
}
cInfo, ok := c.cntrInfo[id]
if !ok {
return invalidPID, fmt.Errorf("can't locate container %s", id)
}
if cInfo.cntr.ContainerStatus == nil {
return invalidPID, fmt.Errorf("error fetching Pid")
}
pid := int(*cInfo.cntr.ContainerStatus.ExecutorPID)
return pid, nil
}
func fakeMesosAgentClient(cntrInfo map[string]*containerInfo, err error) mesosAgentClient { func fakeMesosAgentClient(cntrInfo map[string]*containerInfo, err error) mesosAgentClient {
return &FakeMesosAgentClient{ return &FakeMesosAgentClient{
err: err, err: err,

View File

@ -107,16 +107,11 @@ func (self *mesosFactory) CanHandleAndAccept(name string) (handle bool, accept b
// Check if the container is known to mesos and it is active. // Check if the container is known to mesos and it is active.
id := ContainerNameToMesosId(name) id := ContainerNameToMesosId(name)
c, err := self.client.ContainerInfo(id) _, err = self.client.ContainerInfo(id)
if err != nil { if err != nil {
return false, true, fmt.Errorf("error getting running container: %v", err) return false, true, fmt.Errorf("error getting running container: %v", err)
} }
pid := int(*c.cntr.ContainerStatus.ExecutorPID)
if pid <= 0 {
return false, true, fmt.Errorf("mesos container pid: %d is invalid", pid)
}
return true, true, nil return true, true, nil
} }

View File

@ -94,7 +94,10 @@ func newMesosContainerHandler(
} }
labels := cinfo.labels labels := cinfo.labels
pid := int(*cinfo.cntr.ContainerStatus.ExecutorPID) pid, err := client.ContainerPid(id)
if err != nil {
return nil, err
}
libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootFs, pid, includedMetrics) libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootFs, pid, includedMetrics)

View File

@ -62,14 +62,39 @@ func (s *state) GetExecutor(id string) (*mesos.ExecutorInfo, error) {
// GetTask returns a task launched by given executor. // GetTask returns a task launched by given executor.
func (s *state) GetTask(exID string) (*mesos.Task, error) { func (s *state) GetTask(exID string) (*mesos.Task, error) {
// Check if task is in Launched Tasks list
for _, t := range s.st.GetTasks.LaunchedTasks { for _, t := range s.st.GetTasks.LaunchedTasks {
if t.ExecutorID.Value == exID { if s.isMatchingTask(&t, exID) {
return &t, nil
}
}
// Check if task is in Queued Tasks list
for _, t := range s.st.GetTasks.QueuedTasks {
if s.isMatchingTask(&t, exID) {
return &t, nil return &t, nil
} }
} }
return nil, fmt.Errorf("unable to find task matching executor id %s", exID) return nil, fmt.Errorf("unable to find task matching executor id %s", exID)
} }
func (s *state) isMatchingTask(t *mesos.Task, exID string) bool {
// MESOS-9111: For tasks launched through mesos command/default executor, the
// executorID(which is same as the taskID) field is not filled in the TaskInfo object.
// The workaround is compare with taskID field if executorID is empty
if t.ExecutorID != nil {
if t.ExecutorID.Value == exID {
return true
}
} else {
if t.TaskID.Value == exID {
return true
}
}
return false
}
func (s *state) fetchLabelsFromTask(exID string, labels map[string]string) error { func (s *state) fetchLabelsFromTask(exID string, labels map[string]string) error {
t, err := s.GetTask(exID) t, err := s.GetTask(exID)
if err != nil { if err != nil {

View File

@ -47,12 +47,15 @@ func PopulateExecutors(exID string) *agent.Response_GetExecutors {
return execs return execs
} }
func PopulateTasks() *agent.Response_GetTasks { func PopulateTasks(taskID string, exID string) *agent.Response_GetTasks {
tasks := &agent.Response_GetTasks{} tasks := &agent.Response_GetTasks{}
tasks.LaunchedTasks = make([]mesos.Task, 1) tasks.LaunchedTasks = make([]mesos.Task, 1)
task := mesos.Task{ task := mesos.Task{
ExecutorID: &mesos.ExecutorID{Value: "exec-id1"}, TaskID: mesos.TaskID{Value: taskID},
}
if len(exID) > 0 {
task.ExecutorID = &mesos.ExecutorID{Value: exID}
} }
task.Resources = make([]mesos.Resource, 1) task.Resources = make([]mesos.Resource, 1)
@ -92,7 +95,23 @@ func TestFetchLabels(t *testing.T) {
agentState: &agent.Response_GetState{ agentState: &agent.Response_GetState{
GetFrameworks: PopulateFrameworks("fw-id1"), GetFrameworks: PopulateFrameworks("fw-id1"),
GetExecutors: PopulateExecutors("exec-id1"), GetExecutors: PopulateExecutors("exec-id1"),
GetTasks: PopulateTasks(), GetTasks: PopulateTasks("task-id1", "exec-id1"),
},
expectedError: nil,
expectedLabels: map[string]string{
framework: "TestFramework",
source: "source1",
schedulerSLA: nonRevocable,
"key1": "value1",
},
},
{
frameworkID: "fw-id1",
executorID: "task-id1",
agentState: &agent.Response_GetState{
GetFrameworks: PopulateFrameworks("fw-id1"),
GetExecutors: PopulateExecutors("task-id1"),
GetTasks: PopulateTasks("task-id1", ""),
}, },
expectedError: nil, expectedError: nil,
expectedLabels: map[string]string{ expectedLabels: map[string]string{
@ -108,7 +127,7 @@ func TestFetchLabels(t *testing.T) {
agentState: &agent.Response_GetState{ agentState: &agent.Response_GetState{
GetFrameworks: PopulateFrameworks("fw-id1"), GetFrameworks: PopulateFrameworks("fw-id1"),
GetExecutors: PopulateExecutors("exec-id1"), GetExecutors: PopulateExecutors("exec-id1"),
GetTasks: PopulateTasks(), GetTasks: PopulateTasks("task-id1", "exec-id1"),
}, },
expectedError: fmt.Errorf("framework ID \"fw-id2\" not found: unable to find framework id fw-id2"), expectedError: fmt.Errorf("framework ID \"fw-id2\" not found: unable to find framework id fw-id2"),
expectedLabels: map[string]string{}, expectedLabels: map[string]string{},