Merge pull request #2010 from sashankreddya/test_mesos_branch
Ensure that labels for mesos tasks launched via MesosCommandExecutor are collected
This commit is contained in:
commit
18afaa921b
@ -30,6 +30,7 @@ import (
|
||||
|
||||
const (
|
||||
maxRetryAttempts = 3
|
||||
invalidPID = -1
|
||||
)
|
||||
|
||||
var (
|
||||
@ -43,6 +44,7 @@ type client struct {
|
||||
|
||||
type mesosAgentClient interface {
|
||||
ContainerInfo(id string) (*containerInfo, error)
|
||||
ContainerPid(id string) (int, error)
|
||||
}
|
||||
|
||||
type containerInfo struct {
|
||||
@ -90,6 +92,32 @@ func (self *client) ContainerInfo(id string) (*containerInfo, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get the Pid of the container
|
||||
func (self *client) ContainerPid(id string) (int, error) {
|
||||
var pid int
|
||||
var err error
|
||||
err = retry.Retry(
|
||||
func(attempt uint) error {
|
||||
c, err := self.ContainerInfo(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.cntr.ContainerStatus != nil {
|
||||
pid = int(*c.cntr.ContainerStatus.ExecutorPID)
|
||||
} else {
|
||||
err = fmt.Errorf("error fetching Pid")
|
||||
}
|
||||
return err
|
||||
},
|
||||
strategy.Limit(maxRetryAttempts),
|
||||
)
|
||||
if err != nil {
|
||||
return invalidPID, fmt.Errorf("failed to fetch pid")
|
||||
}
|
||||
return pid, err
|
||||
}
|
||||
|
||||
func (self *client) getContainer(id string) (*mContainer, error) {
|
||||
// Get all containers
|
||||
cntrs, err := self.getContainers()
|
||||
|
@ -32,6 +32,23 @@ func (c *FakeMesosAgentClient) ContainerInfo(id string) (*containerInfo, error)
|
||||
return cInfo, nil
|
||||
}
|
||||
|
||||
func (c *FakeMesosAgentClient) ContainerPid(id string) (int, error) {
|
||||
if c.err != nil {
|
||||
return invalidPID, c.err
|
||||
}
|
||||
cInfo, ok := c.cntrInfo[id]
|
||||
if !ok {
|
||||
return invalidPID, fmt.Errorf("can't locate container %s", id)
|
||||
}
|
||||
|
||||
if cInfo.cntr.ContainerStatus == nil {
|
||||
return invalidPID, fmt.Errorf("error fetching Pid")
|
||||
}
|
||||
|
||||
pid := int(*cInfo.cntr.ContainerStatus.ExecutorPID)
|
||||
return pid, nil
|
||||
}
|
||||
|
||||
func fakeMesosAgentClient(cntrInfo map[string]*containerInfo, err error) mesosAgentClient {
|
||||
return &FakeMesosAgentClient{
|
||||
err: err,
|
||||
|
@ -107,16 +107,11 @@ func (self *mesosFactory) CanHandleAndAccept(name string) (handle bool, accept b
|
||||
// Check if the container is known to mesos and it is active.
|
||||
id := ContainerNameToMesosId(name)
|
||||
|
||||
c, err := self.client.ContainerInfo(id)
|
||||
_, err = self.client.ContainerInfo(id)
|
||||
if err != nil {
|
||||
return false, true, fmt.Errorf("error getting running container: %v", err)
|
||||
}
|
||||
|
||||
pid := int(*c.cntr.ContainerStatus.ExecutorPID)
|
||||
if pid <= 0 {
|
||||
return false, true, fmt.Errorf("mesos container pid: %d is invalid", pid)
|
||||
}
|
||||
|
||||
return true, true, nil
|
||||
}
|
||||
|
||||
|
@ -94,7 +94,10 @@ func newMesosContainerHandler(
|
||||
}
|
||||
|
||||
labels := cinfo.labels
|
||||
pid := int(*cinfo.cntr.ContainerStatus.ExecutorPID)
|
||||
pid, err := client.ContainerPid(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootFs, pid, includedMetrics)
|
||||
|
||||
|
@ -62,14 +62,39 @@ func (s *state) GetExecutor(id string) (*mesos.ExecutorInfo, error) {
|
||||
|
||||
// GetTask returns a task launched by given executor.
|
||||
func (s *state) GetTask(exID string) (*mesos.Task, error) {
|
||||
// Check if task is in Launched Tasks list
|
||||
for _, t := range s.st.GetTasks.LaunchedTasks {
|
||||
if t.ExecutorID.Value == exID {
|
||||
if s.isMatchingTask(&t, exID) {
|
||||
return &t, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check if task is in Queued Tasks list
|
||||
for _, t := range s.st.GetTasks.QueuedTasks {
|
||||
if s.isMatchingTask(&t, exID) {
|
||||
return &t, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("unable to find task matching executor id %s", exID)
|
||||
}
|
||||
|
||||
func (s *state) isMatchingTask(t *mesos.Task, exID string) bool {
|
||||
// MESOS-9111: For tasks launched through mesos command/default executor, the
|
||||
// executorID(which is same as the taskID) field is not filled in the TaskInfo object.
|
||||
// The workaround is compare with taskID field if executorID is empty
|
||||
if t.ExecutorID != nil {
|
||||
if t.ExecutorID.Value == exID {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
if t.TaskID.Value == exID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *state) fetchLabelsFromTask(exID string, labels map[string]string) error {
|
||||
t, err := s.GetTask(exID)
|
||||
if err != nil {
|
||||
|
@ -47,12 +47,15 @@ func PopulateExecutors(exID string) *agent.Response_GetExecutors {
|
||||
return execs
|
||||
}
|
||||
|
||||
func PopulateTasks() *agent.Response_GetTasks {
|
||||
func PopulateTasks(taskID string, exID string) *agent.Response_GetTasks {
|
||||
tasks := &agent.Response_GetTasks{}
|
||||
tasks.LaunchedTasks = make([]mesos.Task, 1)
|
||||
|
||||
task := mesos.Task{
|
||||
ExecutorID: &mesos.ExecutorID{Value: "exec-id1"},
|
||||
TaskID: mesos.TaskID{Value: taskID},
|
||||
}
|
||||
if len(exID) > 0 {
|
||||
task.ExecutorID = &mesos.ExecutorID{Value: exID}
|
||||
}
|
||||
|
||||
task.Resources = make([]mesos.Resource, 1)
|
||||
@ -92,7 +95,23 @@ func TestFetchLabels(t *testing.T) {
|
||||
agentState: &agent.Response_GetState{
|
||||
GetFrameworks: PopulateFrameworks("fw-id1"),
|
||||
GetExecutors: PopulateExecutors("exec-id1"),
|
||||
GetTasks: PopulateTasks(),
|
||||
GetTasks: PopulateTasks("task-id1", "exec-id1"),
|
||||
},
|
||||
expectedError: nil,
|
||||
expectedLabels: map[string]string{
|
||||
framework: "TestFramework",
|
||||
source: "source1",
|
||||
schedulerSLA: nonRevocable,
|
||||
"key1": "value1",
|
||||
},
|
||||
},
|
||||
{
|
||||
frameworkID: "fw-id1",
|
||||
executorID: "task-id1",
|
||||
agentState: &agent.Response_GetState{
|
||||
GetFrameworks: PopulateFrameworks("fw-id1"),
|
||||
GetExecutors: PopulateExecutors("task-id1"),
|
||||
GetTasks: PopulateTasks("task-id1", ""),
|
||||
},
|
||||
expectedError: nil,
|
||||
expectedLabels: map[string]string{
|
||||
@ -108,7 +127,7 @@ func TestFetchLabels(t *testing.T) {
|
||||
agentState: &agent.Response_GetState{
|
||||
GetFrameworks: PopulateFrameworks("fw-id1"),
|
||||
GetExecutors: PopulateExecutors("exec-id1"),
|
||||
GetTasks: PopulateTasks(),
|
||||
GetTasks: PopulateTasks("task-id1", "exec-id1"),
|
||||
},
|
||||
expectedError: fmt.Errorf("framework ID \"fw-id2\" not found: unable to find framework id fw-id2"),
|
||||
expectedLabels: map[string]string{},
|
||||
|
Loading…
Reference in New Issue
Block a user