Skip to content

Commit 611ce6f

Browse files
authored
feat: Add Source#items (#2429) (#2472)
(cherry picked from commit f0db8f0)
1 parent 4c1c337 commit 611ce6f

File tree

5 files changed

+86
-1
lines changed

5 files changed

+86
-1
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Source.items
2+
3+
Create a `Source` from the given items.
4+
5+
@ref[Source operators](../index.md#source-operators)
6+
7+
## Signature
8+
9+
@apidoc[Source.items](Source$) { scala="#items[T](items:T*):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]" java="#items[T](T)" }
10+
11+
12+
## Description
13+
14+
Create a `Source` from the given items.
15+
16+
## Examples
17+
18+
## Reactive Streams semantics
19+
20+
@@@div { .callout }
21+
22+
**emits** the items one by one
23+
24+
**completes** when the last item has been emitted
25+
26+
@@@

docs/src/main/paradox/stream/operators/index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad
2727
|Source|<a name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref[`Source.completionStageSource`](Source/completionStageSource.md).|
2828
|Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.|
2929
|Source|<a name="futuresource"></a>@ref[futureSource](Source/futureSource.md)|Streams the elements of the given future source once it successfully completes.|
30+
|Source|<a name="items"></a>@ref[items](Source/items.md)|Create a `Source` from the given items.|
3031
|Source|<a name="iterate"></a>@ref[iterate](Source/iterate.md)|Creates a sequential `Source` by iterating with the given predicate, function and seed.|
3132
|Source|<a name="lazily"></a>@ref[lazily](Source/lazily.md)|Deprecated by @ref[`Source.lazySource`](Source/lazySource.md).|
3233
|Source|<a name="lazilyasync"></a>@ref[lazilyAsync](Source/lazilyAsync.md)|Deprecated by @ref[`Source.lazyFutureSource`](Source/lazyFutureSource.md).|
@@ -533,6 +534,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
533534
* [interleave](Source-or-Flow/interleave.md)
534535
* [interleaveAll](Source-or-Flow/interleaveAll.md)
535536
* [intersperse](Source-or-Flow/intersperse.md)
537+
* [items](Source/items.md)
536538
* [iterate](Source/iterate.md)
537539
* [javaCollector](StreamConverters/javaCollector.md)
538540
* [javaCollectorParallelUnordered](StreamConverters/javaCollectorParallelUnordered.md)

stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,27 @@ public void mustBeAbleToEmitEveryArrayElementSequentially() {
125125
.expectComplete();
126126
}
127127

128+
@Test
129+
public void mustBeAbleToUseItems() {
130+
Source.items("a", "b", "c")
131+
.runWith(TestSink.create(system), system)
132+
.ensureSubscription()
133+
.request(3)
134+
.expectNext("a")
135+
.expectNext("b")
136+
.expectNext("c")
137+
.expectComplete();
138+
}
139+
140+
@Test
141+
public void mustBeAbleToUseItemsWhenEmpty() {
142+
Source.<String>items()
143+
.runWith(TestSink.create(system), system)
144+
.ensureSubscription()
145+
.request(1)
146+
.expectComplete();
147+
}
148+
128149
@Test
129150
public void mustBeAbleToUseVoidTypeInForeach() {
130151
final TestKit probe = new TestKit(system);

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,24 @@ object Source {
302302
def single[T](element: T): Source[T, NotUsed] =
303303
new Source(scaladsl.Source.single(element))
304304

305+
/**
306+
* Create a `Source` from the given elements.
307+
*
308+
* @since 1.3.0
309+
*/
310+
@varargs
311+
@SafeVarargs
312+
@SuppressWarnings(Array("varargs"))
313+
def items[T](items: T*): javadsl.Source[T, NotUsed] = {
314+
if (items.isEmpty) {
315+
empty()
316+
} else if (items.length == 1) {
317+
single(items.head)
318+
} else {
319+
new Source(scaladsl.Source(items.toIndexedSeq))
320+
}
321+
}
322+
305323
/**
306324
* Create a `Source` that will continually emit the given element.
307325
*/

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
1515

1616
import java.util.concurrent.CompletionStage
1717

18-
import scala.annotation.{ nowarn, tailrec }
18+
import scala.annotation.{ nowarn, tailrec, varargs }
1919
import scala.annotation.unchecked.uncheckedVariance
2020
import scala.collection.{ immutable, AbstractIterator }
2121
import scala.concurrent.{ Future, Promise }
@@ -480,6 +480,24 @@ object Source {
480480
def single[T](element: T): Source[T, NotUsed] =
481481
fromGraph(new GraphStages.SingleSource(element))
482482

483+
/**
484+
* Create a `Source` from the given elements.
485+
*
486+
* @since 1.3.0
487+
*/
488+
@varargs
489+
@SafeVarargs
490+
@SuppressWarnings(Array("varargs"))
491+
def items[T](items: T*): Source[T, NotUsed] = {
492+
if (items.isEmpty) {
493+
empty[T]
494+
} else if (items.length == 1) {
495+
single(items.head)
496+
} else {
497+
Source(items.toIndexedSeq)
498+
}
499+
}
500+
483501
/**
484502
* Create a `Source` from an `Option` value, emitting the value if it is defined.
485503
*

0 commit comments

Comments
 (0)