diff --git a/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java b/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java index 5ca293d51..78a599400 100644 --- a/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java +++ b/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java @@ -30,6 +30,12 @@ public final class SystemParameters { public static final String JOB_AUTOSCALE_V2_ENABLED_PARAM = "mantis.job.autoscale.v2.enabled"; public static final String JOB_AUTOSCALE_V2_LOADER_CONFIG_PARAM = "mantis.job.autoscale.v2.loader.config"; + public static final String SCALAR_WORKER_TO_WORKER_MAX_CHUNK_TIME_MSEC = "mantis.scalarw2w.maxChunkTimeMSec"; + public static final String SCALAR_WORKER_TO_WORKER_CONSUMER_THREADS = "mantis.scalarw2w.numConsumerThreads"; + public static final String SCALAR_WORKER_TO_WORKER_SPSC = "mantis.scalarw2w.spsc"; + public static final String SCALAR_WORKER_TO_WORKER_BUFFER_CAPACITY = "mantis.scalarw2w.bufferCapacity"; + public static final String SCALAR_WORKER_TO_WORKER_MAX_CHUNK_SIZE = "mantis.scalarw2w.maxChunkSize"; + @Deprecated public static final String MANTIS_WORKER_JVM_OPTS_STAGE_PREFIX = "MANTIS_WORKER_JVM_OPTS_STAGE"; diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java index 73092c926..a4db80220 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java @@ -39,6 +39,8 @@ import rx.Observable; import rx.functions.Func1; +import static io.mantisrx.common.SystemParameters.*; + /** * Execution of WorkerPublisher that publishes the stream to the next stage. * @@ -91,6 +93,11 @@ public void start(final StageConfig stage, Observable> toSer .name(name) .port(serverPort) .metricsRegistry(MetricsRegistry.getInstance()) + .numQueueConsumers(scalarNumConsumerThreads()) + .maxChunkSize(scalarMaxChunkSize()) + .maxChunkTimeMSec(scalarMaxChunkTimeMSec()) + .bufferCapacity(scalarBufferCapacity()) + .useSpscQueue(scalarUseSpsc()) .router(router) .build(); final LegacyTcpPushServer modernServer = @@ -185,6 +192,33 @@ private int numConsumerThreads() { return Integer.parseInt(stringValue); } + private boolean scalarUseSpsc() { + String stringValue = propService.getStringValue(SCALAR_WORKER_TO_WORKER_SPSC, "false"); + return Boolean.parseBoolean(stringValue); + + } + + private int scalarBufferCapacity() { + String stringValue = propService.getStringValue(SCALAR_WORKER_TO_WORKER_BUFFER_CAPACITY, "50000"); + return Integer.parseInt(stringValue); + } + + private int scalarMaxChunkTimeMSec() { + String stringValue = propService.getStringValue(SCALAR_WORKER_TO_WORKER_MAX_CHUNK_TIME_MSEC, "250"); + return Integer.parseInt(stringValue); + } + + private int scalarMaxChunkSize() { + String stringValue = propService.getStringValue(SCALAR_WORKER_TO_WORKER_MAX_CHUNK_SIZE, "1000"); + return Integer.parseInt(stringValue); + } + + private int scalarNumConsumerThreads() { + // num threads to read/process from consumer queue + String stringValue = propService.getStringValue(SCALAR_WORKER_TO_WORKER_CONSUMER_THREADS, "1"); + return Integer.parseInt(stringValue); + } + private boolean runNewW2Wserver(String jobName) { String legacyServerString = propService.getStringValue("mantis.w2w.newServerImplScalar", "true"); String legacyServerStringPerJob = propService.getStringValue(jobName + ".mantis.w2w.newServerImplScalar", "false"); diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java index 23d357edf..927c51c4f 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java @@ -253,6 +253,46 @@ public class ParameterUtils { "built in to allow for network delays and/or miss a few worker heartbeats before being killed.") .build(); systemParams.put(workerTimeout.getName(), workerTimeout); + + // Scalar worker to worker params + ParameterDefinition scalarW2WMaxChunkTimeMSec = new IntParameter() + .name(SCALAR_WORKER_TO_WORKER_MAX_CHUNK_TIME_MSEC) + .validator(Validators.range(1, 100000)) + .defaultValue(100) + .description("max time to read from queue, for a single chunk") + .build(); + systemParams.put(scalarW2WMaxChunkTimeMSec.getName(), scalarW2WMaxChunkTimeMSec); + + ParameterDefinition scalarW2WConsumerThreads = new IntParameter() + .name(SCALAR_WORKER_TO_WORKER_CONSUMER_THREADS) + .validator(Validators.range(1, 64)) + .description("number of consumer threads draining the queue to write to worker") + .defaultValue(1) + .build(); + systemParams.put(scalarW2WConsumerThreads.getName(), scalarW2WConsumerThreads); + + ParameterDefinition scalarW2WSpsc = new BooleanParameter() + .name(SCALAR_WORKER_TO_WORKER_SPSC) + .description("Whether to use spsc or blocking queue for scalar worker to worker stages") + .defaultValue(false) + .build(); + systemParams.put(scalarW2WSpsc.getName(), scalarW2WSpsc); + + ParameterDefinition scalarW2WBufferCapacity = new IntParameter() + .name(SCALAR_WORKER_TO_WORKER_BUFFER_CAPACITY) + .validator(Validators.range(1, 100000)) + .description("buffer capacity for scalar worker to worker stages") + .defaultValue(100) + .build(); + systemParams.put(scalarW2WBufferCapacity.getName(), scalarW2WBufferCapacity); + + ParameterDefinition scalarW2WMaxChunkSize = new IntParameter() + .name(SCALAR_WORKER_TO_WORKER_MAX_CHUNK_SIZE) + .validator(Validators.range(1, 100000)) + .description("max chunk size for scalar worker to worker stages") + .defaultValue(25) + .build(); + systemParams.put(scalarW2WMaxChunkSize.getName(), scalarW2WMaxChunkSize); } private ParameterUtils() {