Skip to content

Commit 5ed96c4

Browse files
committed
feat: Add Source#items
1 parent 06671fd commit 5ed96c4

File tree

5 files changed

+86
-1
lines changed

5 files changed

+86
-1
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Source.items
2+
3+
Create a `Source` from the given items.
4+
5+
@ref[Source operators](../index.md#source-operators)
6+
7+
## Signature
8+
9+
@apidoc[Source.items](Source$) { scala="#items[T](items:T*):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]" java="#items[T](T)" }
10+
11+
12+
## Description
13+
14+
Create a `Source` from the given items.
15+
16+
## Examples
17+
18+
## Reactive Streams semantics
19+
20+
@@@div { .callout }
21+
22+
**emits** the items one by one
23+
24+
**completes** when the last item has been emitted
25+
26+
@@@

docs/src/main/paradox/stream/operators/index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad
2323
|Source|<a name="frompublisher"></a>@ref[fromPublisher](Source/fromPublisher.md)|Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher).|
2424
|Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.|
2525
|Source|<a name="futuresource"></a>@ref[futureSource](Source/futureSource.md)|Streams the elements of the given future source once it successfully completes.|
26+
|Source|<a name="items"></a>@ref[items](Source/items.md)|Create a `Source` from the given items.|
2627
|Source|<a name="iterate"></a>@ref[iterate](Source/iterate.md)|Creates a sequential `Source` by iterating with the given predicate, function and seed.|
2728
|Source|<a name="lazycompletionstage"></a>@ref[lazyCompletionStage](Source/lazyCompletionStage.md)|Defers creation of a future of a single element source until there is demand.|
2829
|Source|<a name="lazycompletionstagesource"></a>@ref[lazyCompletionStageSource](Source/lazyCompletionStageSource.md)|Defers creation of a future source until there is demand.|
@@ -512,6 +513,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
512513
* [interleave](Source-or-Flow/interleave.md)
513514
* [interleaveAll](Source-or-Flow/interleaveAll.md)
514515
* [intersperse](Source-or-Flow/intersperse.md)
516+
* [items](Source/items.md)
515517
* [iterate](Source/iterate.md)
516518
* [javaCollector](StreamConverters/javaCollector.md)
517519
* [javaCollectorParallelUnordered](StreamConverters/javaCollectorParallelUnordered.md)

stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,27 @@ public void mustBeAbleToEmitEveryArrayElementSequentially() {
126126
.expectComplete();
127127
}
128128

129+
@Test
130+
public void mustBeAbleToUseItems() {
131+
Source.items("a", "b", "c")
132+
.runWith(TestSink.probe(system), system)
133+
.ensureSubscription()
134+
.request(3)
135+
.expectNext("a")
136+
.expectNext("b")
137+
.expectNext("c")
138+
.expectComplete();
139+
}
140+
141+
@Test
142+
public void mustBeAbleToUseItemsWhenEmpty() {
143+
Source.<String>items()
144+
.runWith(TestSink.probe(system), system)
145+
.ensureSubscription()
146+
.request(1)
147+
.expectComplete();
148+
}
149+
129150
@Test
130151
public void mustBeAbleToUseVoidTypeInForeach() {
131152
final TestKit probe = new TestKit(system);

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,24 @@ object Source {
247247
def single[T](element: T): Source[T, NotUsed] =
248248
new Source(scaladsl.Source.single(element))
249249

250+
/**
251+
* Create a `Source` from the given elements.
252+
*
253+
* @since 1.3.0
254+
*/
255+
@varargs
256+
@SafeVarargs
257+
@SuppressWarnings(Array("varargs"))
258+
def items[T](items: T*): javadsl.Source[T, NotUsed] = {
259+
if (items.isEmpty) {
260+
empty()
261+
} else if (items.length == 1) {
262+
single(items.head)
263+
} else {
264+
new Source(scaladsl.Source(items))
265+
}
266+
}
267+
250268
/**
251269
* Create a `Source` that will continually emit the given element.
252270
*/

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
1515

1616
import java.util.concurrent.CompletionStage
1717

18-
import scala.annotation.tailrec
18+
import scala.annotation.{ tailrec, varargs }
1919
import scala.annotation.unchecked.uncheckedVariance
2020
import scala.collection.{ immutable, AbstractIterator }
2121
import scala.concurrent.{ Future, Promise }
@@ -437,6 +437,24 @@ object Source {
437437
def single[T](element: T): Source[T, NotUsed] =
438438
fromGraph(new GraphStages.SingleSource(element))
439439

440+
/**
441+
* Create a `Source` from the given elements.
442+
*
443+
* @since 1.3.0
444+
*/
445+
@varargs
446+
@SafeVarargs
447+
@SuppressWarnings(Array("varargs"))
448+
def items[T](items: T*): Source[T, NotUsed] = {
449+
if (items.isEmpty) {
450+
empty[T]
451+
} else if (items.length == 1) {
452+
single(items.head)
453+
} else {
454+
Source(items)
455+
}
456+
}
457+
440458
/**
441459
* Create a `Source` from an `Option` value, emitting the value if it is defined.
442460
*

0 commit comments

Comments
 (0)