Skip to content

Commit 020816e

Browse files
committed
feat: Add Source#items (#2429)
(cherry picked from commit f0db8f0)
1 parent 4c1c337 commit 020816e

File tree

4 files changed

+30
-3
lines changed

4 files changed

+30
-3
lines changed

stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers {
5555
("apply" -> "fromGraph") ::
5656
("apply" -> "fromIterator") ::
5757
("apply" -> "fromFunctions") ::
58+
("apply" -> "fromArray") ::
5859
Nil
5960

6061
// format: OFF

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
6262
c.expectComplete()
6363
}
6464

65+
"product elements with Array" in {
66+
val p = Source(Array(1, 2, 3)).runWith(Sink.asPublisher(false))
67+
val c = TestSubscriber.manualProbe[Int]()
68+
p.subscribe(c)
69+
val sub = c.expectSubscription()
70+
sub.request(3)
71+
c.expectNext(1)
72+
c.expectNext(2)
73+
c.expectNext(3)
74+
c.expectComplete()
75+
}
76+
6577
"reject later subscriber" in {
6678
val p = Source.single(1).runWith(Sink.asPublisher(false))
6779
val c1 = TestSubscriber.manualProbe[Int]()

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import pekko.japi.function.Creator
3636
import pekko.stream._
3737
import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava }
3838
import pekko.stream.impl.Stages.DefaultAttributes
39-
import pekko.stream.impl.fusing.{ ArraySource, StatefulMapConcat, ZipWithIndexJava }
39+
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
4040
import pekko.util.{ unused, _ }
4141
import pekko.util.FutureConverters._
4242
import pekko.util.JavaDurationConverters._
@@ -201,8 +201,7 @@ object Source {
201201
*
202202
* @since 1.1.0
203203
*/
204-
def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source.fromGraph(
205-
new ArraySource[T](array)))
204+
def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source(array))
206205

207206
/**
208207
* Create a `Source` from an `Optional` value, emitting the value if it is present.

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,21 @@ object Source {
423423
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
424424
fromGraph(new IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
425425

426+
/**
427+
* Creates a `Source` from an array, if the array is empty, the stream is completed immediately,
428+
* otherwise, every element of the array will be emitted sequentially.
429+
*
430+
* @since 1.3.0
431+
*/
432+
def apply[T](array: Array[T]): Source[T, NotUsed] = {
433+
if (array.length == 0)
434+
empty
435+
else if (array.length == 1)
436+
single(array(0))
437+
else
438+
Source.fromGraph(new ArraySource[T](array))
439+
}
440+
426441
/**
427442
* Starts a new `Source` from the given `Future`. The stream will consist of
428443
* one element when the `Future` is completed with a successful value, which

0 commit comments

Comments
 (0)