Skip to content

Commit 8b48db6

Browse files
committed
feat: Add Source#elements
1 parent 2dc8960 commit 8b48db6

File tree

5 files changed

+102
-1
lines changed

5 files changed

+102
-1
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Source.elements
2+
3+
Create a `Source` from the given elements.
4+
5+
@ref[Source operators](../index.md#source-operators)
6+
7+
## Signature
8+
9+
@apidoc[Source.elements](Source$) { scala="#elements[T](elements:T*):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]" java="#elements[T](T)" }
10+
11+
12+
## Description
13+
14+
Create a `Source` from the given elements.
15+
16+
## Examples
17+
18+
## Reactive Streams semantics
19+
20+
@@@div { .callout }
21+
22+
**emits** the elements one by one
23+
24+
**completes** when the last element has been emitted
25+
26+
@@@

docs/src/main/paradox/stream/operators/index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad
1313
|Source|<a name="completionstage"></a>@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.|
1414
|Source|<a name="completionstagesource"></a>@ref[completionStageSource](Source/completionStageSource.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.|
1515
|Source|<a name="cycle"></a>@ref[cycle](Source/cycle.md)|Stream iterator in cycled manner.|
16+
|Source|<a name="elements"></a>@ref[elements](Source/elements.md)|Create a `Source` from the given elements.|
1617
|Source|<a name="empty"></a>@ref[empty](Source/empty.md)|Complete right away without ever emitting any elements.|
1718
|Source|<a name="failed"></a>@ref[failed](Source/failed.md)|Fail directly with a user specified exception.|
1819
|Source|<a name="from"></a>@ref[@scala[apply]@java[from]](Source/from.md)|Stream the values of an @scala[`immutable.Seq`]@java[`Iterable`].|
@@ -454,6 +455,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
454455
* [dropRepeated](Source-or-Flow/dropRepeated.md)
455456
* [dropWhile](Source-or-Flow/dropWhile.md)
456457
* [dropWithin](Source-or-Flow/dropWithin.md)
458+
* [elements](Source/elements.md)
457459
* [empty](Source/empty.md)
458460
* [exists](Sink/exists.md)
459461
* [expand](Source-or-Flow/expand.md)

stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,27 @@ public void mustBeAbleToEmitEveryArrayElementSequentially() {
126126
.expectComplete();
127127
}
128128

129+
@Test
130+
public void mustBeAbleToUseElements() {
131+
Source.elements("a", "b", "c")
132+
.runWith(TestSink.probe(system), system)
133+
.ensureSubscription()
134+
.request(3)
135+
.expectNext("a")
136+
.expectNext("b")
137+
.expectNext("c")
138+
.expectComplete();
139+
}
140+
141+
@Test
142+
public void mustBeAbleToUseElementsEmpty() {
143+
Source.<String>elements()
144+
.runWith(TestSink.probe(system), system)
145+
.ensureSubscription()
146+
.request(1)
147+
.expectComplete();
148+
}
149+
129150
@Test
130151
public void mustBeAbleToUseVoidTypeInForeach() {
131152
final TestKit probe = new TestKit(system);

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,24 @@ object Source {
247247
def single[T](element: T): Source[T, NotUsed] =
248248
new Source(scaladsl.Source.single(element))
249249

250+
/**
251+
* Create a `Source` from the given elements.
252+
*
253+
* @since 1.3.0
254+
*/
255+
@varargs
256+
@SafeVarargs
257+
@SuppressWarnings(Array("varargs"))
258+
def elements[T](elements: T*): javadsl.Source[T, NotUsed] = {
259+
if (elements.isEmpty) {
260+
empty()
261+
} else if (elements.length == 1) {
262+
single(elements.head)
263+
} else {
264+
new Source(scaladsl.Source(elements))
265+
}
266+
}
267+
250268
/**
251269
* Create a `Source` that will continually emit the given element.
252270
*/

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
1515

1616
import java.util.concurrent.CompletionStage
1717

18-
import scala.annotation.tailrec
18+
import scala.annotation.{ tailrec, varargs }
1919
import scala.annotation.unchecked.uncheckedVariance
2020
import scala.collection.{ immutable, AbstractIterator }
2121
import scala.concurrent.{ Future, Promise }
@@ -437,6 +437,40 @@ object Source {
437437
def single[T](element: T): Source[T, NotUsed] =
438438
fromGraph(new GraphStages.SingleSource(element))
439439

440+
/**
441+
* Create a `Source` from the given elements.
442+
*
443+
* @since 1.3.0
444+
*/
445+
@varargs
446+
@SafeVarargs
447+
@SuppressWarnings(Array("varargs"))
448+
def elements[T](elements: T*): Source[T, NotUsed] = {
449+
if (elements.isEmpty) {
450+
empty[T]
451+
} else if (elements.length == 1) {
452+
single(elements.head)
453+
} else {
454+
Source(elements)
455+
}
456+
}
457+
458+
/**
459+
* Creates a `Source` from an array, if the array is empty, the stream is completed immediately,
460+
* otherwise, every element of the array will be emitted sequentially.
461+
*
462+
* @since 1.3.0
463+
*/
464+
def fromArray[T](array: Array[T]): Source[T, NotUsed] = {
465+
if (array.length == 0) {
466+
empty
467+
} else if (array.length == 1) {
468+
single(array(0))
469+
} else {
470+
Source.fromGraph(new ArraySource[T](array))
471+
}
472+
}
473+
440474
/**
441475
* Create a `Source` from an `Option` value, emitting the value if it is defined.
442476
*

0 commit comments

Comments
 (0)