diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToGroup.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToGroup.java index dc414ca29..5ae88bd3f 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToGroup.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToGroup.java @@ -53,7 +53,7 @@ public class GroupToGroup extends KeyValueStageConfig { GroupToGroup(GroupComputation computation, Config config, Codec inputKeyCodec, Codec inputCodec) { - super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters); + super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -76,6 +76,7 @@ public static class Config { // always assume a stateful calculation is being made // do not allow config to override private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); /** @@ -135,6 +136,15 @@ public Config withParameters(List> params) return this; } + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } + } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java index cfb4cf35a..a99927f5a 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java @@ -56,7 +56,7 @@ public class GroupToScalar extends StageConfig { GroupToScalar(GroupToScalarComputation computation, Config config, Codec inputKeyCodec, Codec inputCodec) { - super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); + super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -79,6 +79,7 @@ public static class Config { // do not allow config override private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; private int concurrency = DEFAULT_STAGE_CONCURRENCY; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); /** @@ -149,6 +150,15 @@ public Config withParameters(List> params) { return this; } + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } + } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java index 5760934cd..0c05e06f3 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java @@ -54,7 +54,7 @@ public class KeyToKey extends KeyValueStageConfig { KeyToKey(KeyComputation computation, Config config, Codec inputKeyCodec, Codec inputCodec) { - super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters); + super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -78,6 +78,7 @@ public static class Config { // always assume a stateful calculation is being made // do not allow config to override private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); /** @@ -136,6 +137,15 @@ public Config withParameters(List> params) this.parameters = params; return this; } + + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToScalar.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToScalar.java index d7cb9ed25..927914a76 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToScalar.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToScalar.java @@ -45,7 +45,7 @@ public class KeyToScalar extends StageConfig { KeyToScalar(ToScalarComputation computation, Config config, Codec inputKeyCodec, Codec inputCodec) { - super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters); + super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -67,6 +67,7 @@ public static class Config { // 'stateful group calculation' use case // do not allow config override private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); /** @@ -117,6 +118,15 @@ public Config withParameters(List> params) { return this; } + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } + } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java index 4e0cd6907..e48106d31 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java @@ -37,7 +37,11 @@ public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec } public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency) { - super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency); + this(description, inputKeyCodec, inputCodec, outputKeyCodec, outputCodec, inputStrategy, params, concurrency, rx.internal.util.RxRingBuffer.SIZE); + } + + public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency, int bufferSize) { + super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency, bufferSize); this.keyCodec = outputKeyCodec; } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java index 7fb2a3206..c85f90887 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java @@ -54,7 +54,7 @@ public class ScalarToGroup extends KeyValueStageConfig { public ScalarToGroup(ToGroupComputation computation, Config config, Codec inputCodec) { - super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); + super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; @@ -77,6 +77,7 @@ public static class Config { // default input type is concurrent for 'grouping' use case private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.CONCURRENT; private int concurrency = DEFAULT_STAGE_CONCURRENCY; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private long keyExpireTimeSeconds = Long.MAX_VALUE; // never expire by default private List> parameters = Collections.emptyList(); @@ -155,5 +156,14 @@ public Config withParameters(List> params) { this.parameters = params; return this; } + + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToKey.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToKey.java index 0a2e10425..c3af90a0e 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToKey.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToKey.java @@ -51,7 +51,7 @@ public class ScalarToKey extends KeyValueStageConfig { ScalarToKey(ToKeyComputation computation, Config config, Codec inputCodec) { - super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters); + super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -72,6 +72,7 @@ public static class Config { // default input type is concurrent for 'grouping' use case private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.CONCURRENT; private long keyExpireTimeSeconds = Long.MAX_VALUE; // never expire by default + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); /** @@ -140,5 +141,14 @@ public Config withParameters(List> params) { this.parameters = params; return this; } + + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java index 7ba4309e2..42d5b2db4 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java @@ -42,7 +42,7 @@ public class ScalarToScalar extends StageConfig { public ScalarToScalar(ScalarComputation computation, Config config, Codec inputCodec) { - super(config.description, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); + super(config.description, null, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.bufferSize); this.computation = computation; this.inputStrategy = config.inputStrategy; this.parameters = config.parameters; @@ -68,6 +68,7 @@ public static class Config { // default input type is serial for 'collecting' use case private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; private volatile int concurrency = StageConfig.DEFAULT_STAGE_CONCURRENCY; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); @@ -129,5 +130,14 @@ public INPUT_STRATEGY getInputStrategy() { public int getConcurrency() { return concurrency; } + + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java index a920c1987..f7edc1051 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java @@ -21,6 +21,7 @@ import io.mantisrx.runtime.parameter.ParameterDefinition; import java.util.Collections; import java.util.List; +import rx.internal.util.RxRingBuffer; public abstract class StageConfig { @@ -40,6 +41,9 @@ public abstract class StageConfig { // number of inner observables processed private int concurrency = DEFAULT_STAGE_CONCURRENCY; + // buffer size for observeOn scheduler, defaults to RxRingBuffer.SIZE + private final int bufferSize; + public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy) { this(description, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), DEFAULT_STAGE_CONCURRENCY); @@ -69,6 +73,12 @@ public StageConfig(String description, Codec inputCodec, public StageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency) { + this(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency, RxRingBuffer.SIZE); + } + + public StageConfig(String description, Codec inputKeyCodec, Codec inputCodec, + Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, + int concurrency, int bufferSize) { this.description = description; this.inputKeyCodec = inputKeyCodec; this.inputCodec = inputCodec; @@ -76,6 +86,7 @@ public StageConfig(String description, Codec inputKeyCodec, Codec inpu this.inputStrategy = inputStrategy; this.parameters = params; this.concurrency = concurrency; + this.bufferSize = bufferSize; } public String getDescription() { @@ -109,5 +120,9 @@ public int getConcurrency() { return concurrency; } + public int getBufferSize() { + return bufferSize; + } + public enum INPUT_STRATEGY {NONE_SPECIFIED, SERIAL, CONCURRENT} } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java index 96458dde9..bdc25ab25 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java @@ -204,7 +204,7 @@ private static Observable> executeMantisGroups(Observabl */ @SuppressWarnings("unchecked") private static Observable> executeMantisGroupsInParallel(Observable>> go, Computation computation, - final Context context, final boolean applyTimeoutToInners, final long timeout, final int concurrency) { + final Context context, final boolean applyTimeoutToInners, final long timeout, final int concurrency, final int bufferSize) { logger.info("initializing {}", computation.getClass().getCanonicalName()); computation.init(context); @@ -218,7 +218,7 @@ private static Observable> executeMantisGroupsInParallel .lift(new MonitorOperator<>("worker_stage_outer")) .map(observable -> c .call(context, observable - .observeOn(Schedulers.computation()) + .observeOn(Schedulers.computation(), bufferSize) .lift(new MonitorOperator<>("worker_stage_inner_input"))) .lift(new MonitorOperator<>("worker_stage_inner_output"))); @@ -238,7 +238,7 @@ private static Observable> executeMantisGroupsInParallel .groupBy(e -> Math.abs(e.getKeyValue().hashCode()) % concurrency) .flatMap(gbo -> c .call(context, gbo - .observeOn(mantisRxSingleThreadSchedulers[gbo.getKey()]) + .observeOn(mantisRxSingleThreadSchedulers[gbo.getKey()], bufferSize) .lift(new MonitorOperator>("worker_stage_inner_input"))) .lift(new MonitorOperator("worker_stage_inner_output")))); } @@ -277,7 +277,7 @@ private static Observable> executeInners(Observable Observable> executeInnersInParallel(Observable> oo, Computation computation, - final Context context, final boolean applyTimeoutToInners, final long timeout, final int concurrency) { + final Context context, final boolean applyTimeoutToInners, final long timeout, final int concurrency, final int bufferSize) { logger.info("initializing {}", computation.getClass().getCanonicalName()); computation.init(context); @@ -290,7 +290,7 @@ private static Observable> executeInnersInParallel(Observab .lift(new MonitorOperator<>("worker_stage_outer")) .map(observable -> c .call(context, observable - .observeOn(Schedulers.computation()) + .observeOn(Schedulers.computation(), bufferSize) .lift(new MonitorOperator("worker_stage_inner_input"))) .lift(new MonitorOperator("worker_stage_inner_output"))); } else { @@ -307,7 +307,7 @@ private static Observable> executeInnersInParallel(Observab .flatMap(go -> c .call(context, go - .observeOn(mantisRxSingleThreadSchedulers[go.getKey().intValue()]) + .observeOn(mantisRxSingleThreadSchedulers[go.getKey().intValue()], bufferSize) .lift(new MonitorOperator<>("worker_stage_inner_input"))) .lift(new MonitorOperator<>("worker_stage_inner_output")))); } @@ -352,7 +352,8 @@ private static Observable> setupScalarToScalarStage(ScalarT context, false, Integer.MAX_VALUE, - resolveStageConcurrency(context, stage.getConcurrency())); + resolveStageConcurrency(context, stage.getConcurrency()), + stage.getBufferSize()); } else if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) { Observable> merged = Observable.just(Observable.merge(source)); return executeInners(merged, stage.getComputation(), context, false, Integer.MAX_VALUE); @@ -368,7 +369,7 @@ private static Observable>> se // check if job overrides the default input strategy if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) { return executeInnersInParallel(source, stage.getComputation(), context, true, - stage.getKeyExpireTimeSeconds(), resolveStageConcurrency(context, stage.getConcurrency())); + stage.getKeyExpireTimeSeconds(), resolveStageConcurrency(context, stage.getConcurrency()), stage.getBufferSize()); } else if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) { Observable> merged = Observable.just(Observable.merge(source)); return executeInners(merged, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds()); @@ -385,7 +386,7 @@ private static Observable>> setupSca // check if job overrides the default input strategy if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) { return executeInnersInParallel(source, stage.getComputation(), context, true, - stage.getKeyExpireTimeSeconds(),resolveStageConcurrency(context, stage.getConcurrency())); + stage.getKeyExpireTimeSeconds(),resolveStageConcurrency(context, stage.getConcurrency()), stage.getBufferSize()); } else if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) { Observable> merged = Observable.just(Observable.merge(source)); return executeInners(merged, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds()); @@ -446,7 +447,7 @@ private static Observable> setupGroupToScalarStage(Group if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) { logger.info("Execute Groups in PARALLEL!!!!"); return executeMantisGroupsInParallel(source, stage.getComputation(), context, true, - stage.getKeyExpireTimeSeconds(),resolveStageConcurrency(context, stage.getConcurrency())); + stage.getKeyExpireTimeSeconds(),resolveStageConcurrency(context, stage.getConcurrency()), stage.getBufferSize()); } else if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) { Observable>> merged = Observable.just(Observable.merge(source)); diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventRequestHandler.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventRequestHandler.java index 39cf2abb3..8f3e49698 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventRequestHandler.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventRequestHandler.java @@ -127,17 +127,6 @@ public Observable handle(HttpServerRequest request, // copy reference, then apply request specific filters, sampling Observable requestObservable = observableToServe; - // decouple the observable on a separate thread and add backpressure handling - String decoupleSSE = "false";//ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("sse.decouple", "false"); - //TODO Below condition would be always false during if condition. - // Since decoupleSSE would be false and matching with true as string - // would always ignore code inside if block - if ("true".equals(decoupleSSE)) { - final BasicTag sockAddrTag = new BasicTag("sockAddr", Optional.ofNullable(socketAddrStr).orElse("none")); - requestObservable = requestObservable - .lift(new DropOperator<>("outgoing_ServerSentEventRequestHandler", sockAddrTag)) - .observeOn(Schedulers.io()); - } response.getHeaders().set("Access-Control-Allow-Origin", "*"); response.getHeaders().set("content-type", "text/event-stream"); response.getHeaders().set("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");