Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.intuit.tank.vm.agent.messages.AgentWsEnvelope;
import com.intuit.tank.vm.agent.messages.AgentWsEnvelope.AckStatus;
import com.intuit.tank.vm.agent.messages.AgentWsEnvelope.Type;
import com.intuit.tank.vm.api.enumerated.JobStatus;
import com.intuit.tank.vm.vmManager.VMTracker;
import com.intuit.tank.vm.settings.AgentConfig;
import com.intuit.tank.vm.settings.TankConfig;
Expand Down Expand Up @@ -102,6 +103,7 @@ public enum AgentWsState {
private ServletContext servletContext;

private volatile JobManager cachedJobManager;
private volatile AgentStatusLifecycle cachedAgentStatusLifecycle;
private volatile VMTracker cachedVMTracker;
private volatile AgentConfig cachedAgentConfig;

Expand Down Expand Up @@ -299,16 +301,31 @@ private void handleStatusUpdate(WebSocketSession session, AgentWsEnvelope envelo

status.setInstanceId(boundId);

VMTracker tracker = resolveVMTracker();
if (tracker == null) {
LOG.warn(new ObjectMessage(Map.of("Message", "Unable to resolve VMTracker for WS status update from " + boundId)));
AgentStatusLifecycle statusLifecycle = resolveAgentStatusLifecycle();
if (statusLifecycle != null) {
try {
statusLifecycle.setVmStatus(boundId, status);
return;
} catch (Exception e) {
LOG.warn(new ObjectMessage(Map.of("Message", "Failed WS lifecycle status update from " + boundId + ": " + e.getMessage())));
}
}

if (isTerminalStatus(status)) {
LOG.error(new ObjectMessage(Map.of("Message", "Unable to process terminal WS status for " + boundId
+ " - lifecycle handler unavailable, not falling back to non-terminating tracker update")));
return;
}

try {
tracker.setStatus(status);
} catch (Exception e) {
LOG.warn(new ObjectMessage(Map.of("Message", "Failed WS status update from " + boundId + ": " + e.getMessage())));
VMTracker tracker = resolveVMTracker();
if (tracker != null) {
try {
tracker.setStatus(status);
} catch (Exception e) {
LOG.warn(new ObjectMessage(Map.of("Message", "Failed WS tracker status update from " + boundId + ": " + e.getMessage())));
}
} else {
LOG.warn(new ObjectMessage(Map.of("Message", "Unable to resolve status handler for WS status update from " + boundId)));
}
}

Expand Down Expand Up @@ -573,6 +590,24 @@ private JobManager resolveJobManager() {
}
}

private AgentStatusLifecycle resolveAgentStatusLifecycle() {
AgentStatusLifecycle statusLifecycle = cachedAgentStatusLifecycle;
if (statusLifecycle != null) {
return statusLifecycle;
}
if (servletContext == null) {
return null;
}
try {
statusLifecycle = new ServletInjector<AgentStatusLifecycle>().getManagedBean(servletContext, AgentStatusLifecycle.class);
cachedAgentStatusLifecycle = statusLifecycle;
return statusLifecycle;
} catch (Exception e) {
LOG.error(new ObjectMessage(Map.of("Message", "Error resolving AgentStatusLifecycle: " + e.getMessage())), e);
return null;
}
}

private VMTracker resolveVMTracker() {
VMTracker tracker = cachedVMTracker;
if (tracker != null) {
Expand Down Expand Up @@ -649,6 +684,12 @@ private boolean isTerminalVmStatus(VMStatus status) {
|| status == VMStatus.disconnected;
}

private boolean isTerminalStatus(CloudVmStatus status) {
return status.getJobStatus() == JobStatus.Completed
|| status.getVmStatus() == VMStatus.terminated
|| status.getVmStatus() == VMStatus.replaced;
}

private void handleJobTransferComplete(String instanceId) {
String jobId = agentJobs.get(instanceId);
if (jobId == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.intuit.tank.rest.mvc;

import com.intuit.tank.dao.JobInstanceDao;
import com.intuit.tank.project.JobInstance;
import com.intuit.tank.vm.api.enumerated.JobQueueStatus;
import com.intuit.tank.vm.api.enumerated.JobStatus;
import com.intuit.tank.vm.vmManager.VMTerminator;
import com.intuit.tank.vm.vmManager.VMTracker;
import com.intuit.tank.vm.vmManager.models.CloudVmStatus;
import com.intuit.tank.vm.vmManager.models.CloudVmStatusContainer;
import com.intuit.tank.vm.vmManager.models.VMStatus;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.Date;
import java.util.List;

@ApplicationScoped
public class AgentStatusLifecycle {

private static final Logger LOG = LogManager.getLogger(AgentStatusLifecycle.class);

@Inject
private VMTracker vmTracker;

@Inject
private VMTerminator terminator;

public void setVmStatus(final String instanceId, final CloudVmStatus status) {
status.setInstanceId(instanceId);
vmTracker.setStatus(status);
if (isTerminalStatus(status)) {
terminator.terminate(status.getInstanceId());
checkJobStatus(status.getJobId());
}
}

private boolean isTerminalStatus(CloudVmStatus status) {
return status.getJobStatus() == JobStatus.Completed
|| status.getVmStatus() == VMStatus.terminated
|| status.getVmStatus() == VMStatus.replaced;
}

public void checkJobStatus(String jobId) {
CloudVmStatusContainer container = vmTracker.getVmStatusForJob(jobId);
LOG.info("Checking Job Status to see if we can kill reporting instances. Container=" + container);
if (container != null) {
if (container.getEndTime() != null) {
JobInstanceDao dao = new JobInstanceDao();

JobInstance finishedJob = dao.findById(Integer.valueOf(jobId));
if (finishedJob != null && finishedJob.getEndTime() == null
&& finishedJob.getStatus() != JobQueueStatus.Deleted) {
finishedJob.setEndTime(new Date());
finishedJob.setStatus(JobQueueStatus.Completed);
dao.saveOrUpdate(finishedJob);
}
List<JobQueueStatus> statuses = Arrays.asList(JobQueueStatus.Running, JobQueueStatus.Starting);
List<JobInstance> instances = dao.getForStatus(statuses);
LOG.info("Checking Job Status to see if we can kill reporting instances. found running instances: "
+ instances.size());
boolean killModal = true;
boolean killNonRegional = true;

for (JobInstance job : instances) {
CloudVmStatusContainer statusForJob = vmTracker.getVmStatusForJob(Integer.toString(job.getId()));
if (!jobId.equals(Integer.toString(job.getId())) && statusForJob != null
&& statusForJob.getEndTime() == null) {
LOG.info("Found another job that is not finished: " + job);
}
}
if (killNonRegional || killModal) {
for (CloudVmStatusContainer statusForJob : vmTracker.getAllJobs()) {
if (statusForJob.getEndTime() == null && !NumberUtils.isCreatable(statusForJob.getJobId())) {
killNonRegional = false;
killModal = false;
LOG.info("Cannot kill Reporting instances because of automation job id: "
+ statusForJob.getJobId());
}
}
}
} else {
LOG.info("Container does not have end time set so cannot kill reporting instances.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,68 @@ void testStatusUpdateDelegatesToVmTrackerAndEnforcesBoundIdentity() throws Excep
assertEquals("job-1", statusCaptor.getValue().getJobId());
assertEquals(JobStatus.Running, statusCaptor.getValue().getJobStatus());
}

@Test
void testTerminalStatusUpdateDelegatesToAgentStatusLifecycleForTermination() throws Exception {
AgentWsEnvelope hello = AgentWsEnvelope.hello("i-123", "job-1", "sess-1", null);
handler.handleTextMessage(session, new TextMessage(hello.toJson()));

AgentStatusLifecycle statusLifecycle = mock(AgentStatusLifecycle.class);
Field statusLifecycleField = AgentCommandWebSocketHandler.class.getDeclaredField("cachedAgentStatusLifecycle");
statusLifecycleField.setAccessible(true);
statusLifecycleField.set(handler, statusLifecycle);

CloudVmStatus status = new CloudVmStatus(
"i-spoofed",
"job-1",
"sg-1",
JobStatus.Completed,
VMImageType.AGENT,
VMRegion.US_EAST,
VMStatus.terminated,
new ValidationStatus(),
5,
0,
new Date(),
new Date());

AgentWsEnvelope statusUpdate = AgentWsEnvelope.statusUpdate("i-123", "job-1", status);
handler.handleTextMessage(session, new TextMessage(statusUpdate.toJson()));

ArgumentCaptor<CloudVmStatus> statusCaptor = ArgumentCaptor.forClass(CloudVmStatus.class);
verify(statusLifecycle).setVmStatus(eq("i-123"), statusCaptor.capture());
assertEquals("i-123", statusCaptor.getValue().getInstanceId());
assertEquals(JobStatus.Completed, statusCaptor.getValue().getJobStatus());
assertEquals(VMStatus.terminated, statusCaptor.getValue().getVmStatus());
}

@Test
void testTerminalStatusDoesNotFallBackToTrackerWhenLifecycleUnavailable() throws Exception {
AgentWsEnvelope hello = AgentWsEnvelope.hello("i-123", "job-1", "sess-1", null);
handler.handleTextMessage(session, new TextMessage(hello.toJson()));

VMTracker vmTracker = mock(VMTracker.class);
Field vmTrackerField = AgentCommandWebSocketHandler.class.getDeclaredField("cachedVMTracker");
vmTrackerField.setAccessible(true);
vmTrackerField.set(handler, vmTracker);

CloudVmStatus status = new CloudVmStatus(
"i-spoofed",
"job-1",
"sg-1",
JobStatus.Completed,
VMImageType.AGENT,
VMRegion.US_EAST,
VMStatus.terminated,
new ValidationStatus(),
5,
0,
new Date(),
new Date());

AgentWsEnvelope statusUpdate = AgentWsEnvelope.statusUpdate("i-123", "job-1", status);
handler.handleTextMessage(session, new TextMessage(statusUpdate.toJson()));

verify(vmTracker, never()).setStatus(any(CloudVmStatus.class));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.intuit.tank.rest.mvc;

import com.intuit.tank.vm.api.enumerated.JobStatus;
import com.intuit.tank.vm.api.enumerated.VMImageType;
import com.intuit.tank.vm.api.enumerated.VMRegion;
import com.intuit.tank.vm.vmManager.VMTerminator;
import com.intuit.tank.vm.vmManager.VMTracker;
import com.intuit.tank.vm.vmManager.models.CloudVmStatus;
import com.intuit.tank.vm.vmManager.models.VMStatus;
import com.intuit.tank.vm.vmManager.models.ValidationStatus;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Date;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class AgentStatusLifecycleTest {

@Mock
private VMTracker vmTracker;

@Mock
private VMTerminator terminator;

@InjectMocks
private AgentStatusLifecycle lifecycle;

@Test
void testRunningStatusUpdatesTrackerWithoutTermination() {
CloudVmStatus status = createStatus("i-running", JobStatus.Running, VMStatus.running);

lifecycle.setVmStatus("i-running", status);

verify(vmTracker).setStatus(status);
verify(terminator, never()).terminate("i-running");
}

@Test
void testCompletedStatusTriggersTerminationAndEnforcesInstanceId() {
CloudVmStatus status = createStatus("i-spoofed", JobStatus.Completed, VMStatus.running);

lifecycle.setVmStatus("i-done", status);

verify(vmTracker).setStatus(argThat(updated -> "i-done".equals(updated.getInstanceId())
&& updated.getJobStatus() == JobStatus.Completed));
verify(terminator).terminate("i-done");
verify(vmTracker).getVmStatusForJob("123");
assertEquals("i-done", status.getInstanceId());
}

@Test
void testTerminatedVmStatusTriggersTermination() {
CloudVmStatus status = createStatus("i-term", JobStatus.Running, VMStatus.terminated);

lifecycle.setVmStatus("i-term", status);

verify(vmTracker).setStatus(status);
verify(terminator).terminate("i-term");
}

private CloudVmStatus createStatus(String instanceId, JobStatus jobStatus, VMStatus vmStatus) {
return new CloudVmStatus(
instanceId,
"123",
"sg-1",
jobStatus,
VMImageType.AGENT,
VMRegion.US_EAST,
vmStatus,
new ValidationStatus(),
5,
jobStatus == JobStatus.Completed ? 0 : 1,
new Date(),
jobStatus == JobStatus.Completed ? new Date() : null);
}
}
Loading