Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public final class SystemParameters {
public static final String JOB_AUTOSCALE_V2_ENABLED_PARAM = "mantis.job.autoscale.v2.enabled";
public static final String JOB_AUTOSCALE_V2_LOADER_CONFIG_PARAM = "mantis.job.autoscale.v2.loader.config";

public static final String SCALAR_WORKER_TO_WORKER_MAX_CHUNK_TIME_MSEC = "mantis.scalarw2w.maxChunkTimeMSec";
public static final String SCALAR_WORKER_TO_WORKER_CONSUMER_THREADS = "mantis.scalarw2w.numConsumerThreads";
public static final String SCALAR_WORKER_TO_WORKER_SPSC = "mantis.scalarw2w.spsc";
public static final String SCALAR_WORKER_TO_WORKER_BUFFER_CAPACITY = "mantis.scalarw2w.bufferCapacity";
public static final String SCALAR_WORKER_TO_WORKER_MAX_CHUNK_SIZE = "mantis.scalarw2w.maxChunkSize";


@Deprecated
public static final String MANTIS_WORKER_JVM_OPTS_STAGE_PREFIX = "MANTIS_WORKER_JVM_OPTS_STAGE";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import rx.Observable;
import rx.functions.Func1;

import static io.mantisrx.common.SystemParameters.*;

/**
* Execution of WorkerPublisher that publishes the stream to the next stage.
*
Expand Down Expand Up @@ -91,6 +93,11 @@ public void start(final StageConfig<?, T> stage, Observable<Observable<T>> toSer
.name(name)
.port(serverPort)
.metricsRegistry(MetricsRegistry.getInstance())
.numQueueConsumers(scalarNumConsumerThreads())
.maxChunkSize(scalarMaxChunkSize())
.maxChunkTimeMSec(scalarMaxChunkTimeMSec())
.bufferCapacity(scalarBufferCapacity())
.useSpscQueue(scalarUseSpsc())
.router(router)
.build();
final LegacyTcpPushServer<T> modernServer =
Expand Down Expand Up @@ -185,6 +192,33 @@ private int numConsumerThreads() {
return Integer.parseInt(stringValue);
}

private boolean scalarUseSpsc() {
String stringValue = propService.getStringValue(SCALAR_WORKER_TO_WORKER_SPSC, "false");
return Boolean.parseBoolean(stringValue);

}

private int scalarBufferCapacity() {
String stringValue = propService.getStringValue(SCALAR_WORKER_TO_WORKER_BUFFER_CAPACITY, "50000");
return Integer.parseInt(stringValue);
}

private int scalarMaxChunkTimeMSec() {
String stringValue = propService.getStringValue(SCALAR_WORKER_TO_WORKER_MAX_CHUNK_TIME_MSEC, "250");
return Integer.parseInt(stringValue);
}

private int scalarMaxChunkSize() {
String stringValue = propService.getStringValue(SCALAR_WORKER_TO_WORKER_MAX_CHUNK_SIZE, "1000");
return Integer.parseInt(stringValue);
}

private int scalarNumConsumerThreads() {
// num threads to read/process from consumer queue
String stringValue = propService.getStringValue(SCALAR_WORKER_TO_WORKER_CONSUMER_THREADS, "1");
return Integer.parseInt(stringValue);
}

private boolean runNewW2Wserver(String jobName) {
String legacyServerString = propService.getStringValue("mantis.w2w.newServerImplScalar", "true");
String legacyServerStringPerJob = propService.getStringValue(jobName + ".mantis.w2w.newServerImplScalar", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,46 @@ public class ParameterUtils {
"built in to allow for network delays and/or miss a few worker heartbeats before being killed.")
.build();
systemParams.put(workerTimeout.getName(), workerTimeout);

// Scalar worker to worker params
ParameterDefinition<Integer> scalarW2WMaxChunkTimeMSec = new IntParameter()
.name(SCALAR_WORKER_TO_WORKER_MAX_CHUNK_TIME_MSEC)
.validator(Validators.range(1, 100000))
.defaultValue(100)
.description("max time to read from queue, for a single chunk")
.build();
systemParams.put(scalarW2WMaxChunkTimeMSec.getName(), scalarW2WMaxChunkTimeMSec);

ParameterDefinition<Integer> scalarW2WConsumerThreads = new IntParameter()
.name(SCALAR_WORKER_TO_WORKER_CONSUMER_THREADS)
.validator(Validators.range(1, 64))
.description("number of consumer threads draining the queue to write to worker")
.defaultValue(1)
.build();
systemParams.put(scalarW2WConsumerThreads.getName(), scalarW2WConsumerThreads);

ParameterDefinition<Boolean> scalarW2WSpsc = new BooleanParameter()
.name(SCALAR_WORKER_TO_WORKER_SPSC)
.description("Whether to use spsc or blocking queue for scalar worker to worker stages")
.defaultValue(false)
.build();
systemParams.put(scalarW2WSpsc.getName(), scalarW2WSpsc);

ParameterDefinition<Integer> scalarW2WBufferCapacity = new IntParameter()
.name(SCALAR_WORKER_TO_WORKER_BUFFER_CAPACITY)
.validator(Validators.range(1, 100000))
.description("buffer capacity for scalar worker to worker stages")
.defaultValue(100)
.build();
systemParams.put(scalarW2WBufferCapacity.getName(), scalarW2WBufferCapacity);

ParameterDefinition<Integer> scalarW2WMaxChunkSize = new IntParameter()
.name(SCALAR_WORKER_TO_WORKER_MAX_CHUNK_SIZE)
.validator(Validators.range(1, 100000))
.description("max chunk size for scalar worker to worker stages")
.defaultValue(25)
.build();
systemParams.put(scalarW2WMaxChunkSize.getName(), scalarW2WMaxChunkSize);
}

private ParameterUtils() {
Expand Down
Loading