Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ public Observable<NamedJobInfo> namedJobInfo(final String jobName) {
return response.getContent()
.map(event -> {
try {
logger.info("[fdc-91] namedJobInfo - {}", event);
return objectMapper.readValue(event.contentAsString(), NamedJobInfo.class);
} catch (IOException e) {
throw new RuntimeException("Invalid namedJobInfo json: " + e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public interface IJobClustersManager {
// worker related messages

void onGetLastSubmittedJobIdSubject(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request);
void onGetLastLaunchedJobIdSubject(JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest request);

void onWorkerEvent(WorkerEvent r);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.ResubmitWorkerRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterArtifactRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterLabelsRequest;
Expand All @@ -110,6 +112,7 @@
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.scheduler.MantisSchedulerFactory;
import io.mantisrx.server.master.scheduler.WorkerEvent;
import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -140,23 +143,31 @@ public class JobClustersManagerActor extends AbstractActorWithTimers implements
private final Counter numJobClusterInitFailures;
private final Counter numJobClusterInitSuccesses;
private Receive initializedBehavior;

@VisibleForTesting
public static Props props(final MantisJobStore jobStore, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator) {
return Props.create(JobClustersManagerActor.class, jobStore, eventPublisher, costsCalculator)
return props(jobStore, eventPublisher, costsCalculator, Collections.emptyList());
}

public static Props props(final MantisJobStore jobStore, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator, final List<String> namedJobsReferToLaunched) {
return Props.create(JobClustersManagerActor.class, jobStore, eventPublisher, costsCalculator, namedJobsReferToLaunched)
.withMailbox("akka.actor.metered-mailbox");
}

private final MantisJobStore jobStore;
private final LifecycleEventPublisher eventPublisher;
private final CostsCalculator costsCalculator;
private final List<String> namedJobsReferToLaunched;
private MantisSchedulerFactory mantisSchedulerFactory = null;

JobClusterInfoManager jobClusterInfoManager;

private ActorRef jobListHelperActor;
public JobClustersManagerActor(final MantisJobStore store, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator) {
public JobClustersManagerActor(final MantisJobStore store, final LifecycleEventPublisher eventPublisher, final CostsCalculator costsCalculator, final List<String> namedJobsReferToLaunched) {
this.jobStore = store;
this.eventPublisher = eventPublisher;
this.costsCalculator = costsCalculator;
this.namedJobsReferToLaunched = namedJobsReferToLaunched;

MetricGroupId metricGroupId = getMetricGroupId();
Metrics m = new Metrics.Builder()
Expand Down Expand Up @@ -234,6 +245,7 @@ private Receive getInitializedBehavior() {
.match(GetJobClusterRequest.class, this::onJobClusterGet)
.match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted)
.match(GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject)
.match(GetLastLaunchedJobIdStreamRequest.class, this::onGetLastLaunchedJobIdSubject)
.match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers)
// List Job Cluster related messages
.match(ListJobClustersRequest.class, this::onJobClustersList)
Expand Down Expand Up @@ -291,6 +303,7 @@ private Receive getInitializingBehavior() {
.match(GetJobClusterRequest.class, (x) -> getSender().tell(new GetJobClusterResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf()))
.match(ListCompletedJobsInClusterRequest.class, (x) -> logger.warn(genUnexpectedMsg(x.toString(), state)))
.match(GetLastSubmittedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastSubmittedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf()))
.match(GetLastLaunchedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastLaunchedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf()))
.match(ListArchivedWorkersRequest.class, (x) -> getSender().tell(new ListArchivedWorkersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf()))
.match(ListJobClustersRequest.class, (x) -> getSender().tell(new ListJobClustersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf()))
.match(ListJobsRequest.class, (x) -> getSender().tell(new ListJobsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf()))
Expand Down Expand Up @@ -323,7 +336,7 @@ private void initialize(JobClustersManagerInitialize initMsg) {
mantisSchedulerFactory = initMsg.getScheduler();
Map<String, IJobClusterMetadata> jobClusterMap = new HashMap<>();

this.jobClusterInfoManager = new JobClusterInfoManager(jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator);
this.jobClusterInfoManager = new JobClusterInfoManager(jobStore, mantisSchedulerFactory, eventPublisher, costsCalculator, namedJobsReferToLaunched);

if (!initMsg.isLoadJobsFromStore()) {
getContext().become(initializedBehavior);
Expand Down Expand Up @@ -522,6 +535,18 @@ public void onGetLastSubmittedJobIdSubject(GetLastSubmittedJobIdStreamRequest r)
sender.tell(new GetLastSubmittedJobIdStreamResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getClusterName(), empty()), getSelf());
}
}

@Override
public void onGetLastLaunchedJobIdSubject(GetLastLaunchedJobIdStreamRequest r) {
Optional<JobClusterInfo> jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(r.getClusterName());
ActorRef sender = getSender();
if (jobClusterInfo.isPresent()) {
jobClusterInfo.get().jobClusterActor.forward(r, getContext());
} else {
sender.tell(new GetLastLaunchedJobIdStreamResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getClusterName(), empty()), getSelf());
}
}

@Override
public void onWorkerEvent(WorkerEvent workerEvent) {
if(logger.isDebugEnabled()) { logger.debug("Entering JobClusterManagerActor:onWorkerEvent {}", workerEvent); }
Expand Down Expand Up @@ -826,12 +851,14 @@ class JobClusterInfoManager {
private final MantisJobStore jobStore;
private final Metrics metrics;
private final CostsCalculator costsCalculator;
private final List<String> namedJobsReferToLaunched;

JobClusterInfoManager(MantisJobStore jobStore, MantisSchedulerFactory mantisSchedulerFactory, LifecycleEventPublisher eventPublisher, CostsCalculator costsCalculator) {
JobClusterInfoManager(MantisJobStore jobStore, MantisSchedulerFactory mantisSchedulerFactory, LifecycleEventPublisher eventPublisher, CostsCalculator costsCalculator, List<String> namedJobsReferToLaunched) {
this.eventPublisher = eventPublisher;
this.mantisSchedulerFactory = mantisSchedulerFactory;
this.jobStore = jobStore;
this.costsCalculator = costsCalculator;
this.namedJobsReferToLaunched = namedJobsReferToLaunched;


MetricGroupId metricGroupId = new MetricGroupId("JobClusterInfoManager");
Expand Down Expand Up @@ -859,7 +886,7 @@ Optional<JobClusterInfo> createClusterActorAndRegister(IJobClusterDefinition job
}
ActorRef jobClusterActor =
getContext().actorOf(
JobClusterActor.props(clusterName, this.jobStore, this.mantisSchedulerFactory, this.eventPublisher, this.costsCalculator),
JobClusterActor.props(clusterName, this.jobStore, this.mantisSchedulerFactory, this.eventPublisher, this.costsCalculator, this.namedJobsReferToLaunched),
"JobClusterActor-" + clusterName);
getContext().watch(jobClusterActor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.server.master.ILeadershipManager;
import io.mantisrx.server.master.LeaderRedirectionFilter;
import io.mantisrx.server.master.config.MasterConfiguration;
import io.mantisrx.server.master.persistence.IMantisPersistenceProvider;
import io.mantisrx.server.master.resourcecluster.ResourceClusters;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class MasterApiAkkaService extends BaseService {
private final Materializer materializer;
private final ExecutorService executorService;
private final CountDownLatch serviceLatch = new CountDownLatch(1);
private final MasterConfiguration masterConfig;

public MasterApiAkkaService(final MasterMonitor masterMonitor,
final MasterDescription masterDescription,
Expand All @@ -101,7 +103,8 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor,
final int serverPort,
final IMantisPersistenceProvider mantisStorageProvider,
final LifecycleEventPublisher lifecycleEventPublisher,
final ILeadershipManager leadershipManager) {
final ILeadershipManager leadershipManager,
final MasterConfiguration masterConfig) {
super(true);
Preconditions.checkNotNull(masterMonitor, "MasterMonitor");
Preconditions.checkNotNull(masterDescription, "masterDescription");
Expand All @@ -110,6 +113,7 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor,
Preconditions.checkNotNull(mantisStorageProvider, "mantisStorageProvider");
Preconditions.checkNotNull(lifecycleEventPublisher, "lifecycleEventPublisher");
Preconditions.checkNotNull(leadershipManager, "leadershipManager");
this.masterConfig = masterConfig;
this.masterMonitor = masterMonitor;
this.masterDescription = masterDescription;
this.jobClustersManagerActor = jobClustersManagerActor;
Expand Down Expand Up @@ -151,7 +155,7 @@ private MantisMasterRoute configureApiRoutes(final ActorSystem actorSystem) {
final JobStatusRouteHandler jobStatusRouteHandler = new JobStatusRouteHandlerAkkaImpl(actorSystem, statusEventBrokerActor);
final JobDiscoveryRouteHandler jobDiscoveryRouteHandler = new JobDiscoveryRouteHandlerAkkaImpl(jobClustersManagerActor, idleTimeout);

final JobDiscoveryRoute v0JobDiscoveryRoute = new JobDiscoveryRoute(jobDiscoveryRouteHandler);
final JobDiscoveryRoute v0JobDiscoveryRoute = new JobDiscoveryRoute(masterConfig.getNamedJobsReferToLaunched(), jobDiscoveryRouteHandler);
final JobClusterRoute v0JobClusterRoute = new JobClusterRoute(jobClusterRouteHandler, jobRouteHandler, actorSystem);
final JobStatusRoute v0JobStatusRoute = new JobStatusRoute(jobStatusRouteHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ CompletionStage<SchedInfoResponse> schedulingInfoStream(final JobClusterManagerP
final boolean sendHeartbeats);
CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> lastSubmittedJobIdStream(final JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request,
final boolean sendHeartbeats);
CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> lastLaunchedJobIdStream(final JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest request,
final boolean sendHeartbeats);
}
Loading