This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch issue-29334-source-single-fast-path in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 7983ac1435a343e5638d4dadaa9d239f16071e2d Author: 虎鸣 <[email protected]> AuthorDate: Sun Apr 26 17:29:37 2026 +0800 perf(stream): reduce source construction overhead --- .../stream/GraphStageConstructionBenchmark.scala | 179 +++++++++++++++++++++ .../apache/pekko/stream/RangeSourceBenchmark.scala | 103 ++++++++++++ .../apache/pekko/stream/javadsl/SourceTest.java | 15 ++ .../pekko/stream/impl/TraversalBuilderSpec.scala | 39 ++++- .../apache/pekko/stream/scaladsl/SourceSpec.scala | 62 +++++++ .../apache/pekko/stream/impl/ActorRefSource.scala | 2 + .../org/apache/pekko/stream/impl/QueueSource.scala | 2 + .../pekko/stream/impl/TraversalBuilder.scala | 84 ++++++---- .../pekko/stream/impl/fusing/FlattenConcat.scala | 55 ++++++- .../pekko/stream/impl/fusing/GraphStages.scala | 14 ++ .../pekko/stream/impl/fusing/IteratorSource.scala | 75 +++++++++ .../org/apache/pekko/stream/impl/fusing/Ops.scala | 8 +- .../pekko/stream/impl/fusing/RangeSource.scala | 63 ++++++++ .../org/apache/pekko/stream/javadsl/Source.scala | 6 +- .../org/apache/pekko/stream/scaladsl/Flow.scala | 8 +- .../org/apache/pekko/stream/scaladsl/Sink.scala | 8 +- .../org/apache/pekko/stream/scaladsl/Source.scala | 149 +++++++++-------- 17 files changed, 755 insertions(+), 117 deletions(-) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala new file mode 100644 index 0000000000..5fd4e5b233 --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import java.util.concurrent.TimeUnit + +import scala.collection.immutable +import scala.concurrent.Promise + +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.impl.LinearTraversalBuilder +import org.apache.pekko.stream.impl.Stages.DefaultAttributes +import org.apache.pekko.stream.impl.fusing.GraphStages +import org.apache.pekko.stream.impl.fusing.IterableSource +import org.apache.pekko.stream.scaladsl.Flow +import org.apache.pekko.stream.scaladsl.Keep +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.stream.stage.GraphStageWithMaterializedValue +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.BenchmarkMode +import org.openjdk.jmh.annotations.Mode +import org.openjdk.jmh.annotations.OutputTimeUnit +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.infra.Blackhole + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +class GraphStageConstructionBenchmark { + private val element = "element" + private val elements: immutable.Iterable[String] = Vector(element, element) + private val range = 1 to 1000 + private val pendingFuture = Promise[String]().future + private val flowStage = GraphStages.identity[Any] + private val sinkStage = GraphStages.IgnoreSink + + private def oldSourceFromGraphStage[T, M]( + stage: GraphStageWithMaterializedValue[SourceShape[T], M]): Source[T, M] = { + val attributes = stage.traversalBuilder.attributes + val noAttributeStage = stage.withAttributes(Attributes.none) + new Source( + LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder, noAttributeStage.shape, Keep.right), + noAttributeStage.shape).withAttributes(attributes) + } + + private def oldSourceFromIterable[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = + (iterable.knownSize: @scala.annotation.switch) match { + case 0 => Source.empty + case 1 => oldSourceFromGraphStage(new GraphStages.SingleSource(iterable.head)) + case _ => + oldSourceFromGraphStage(new IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource) + } + + @Benchmark + def sourceSingle(blackhole: Blackhole): Unit = + blackhole.consume(Source.single(element)) + + @Benchmark + def javadslSourceSingle(blackhole: Blackhole): Unit = + blackhole.consume(javadsl.Source.single(element)) + + @Benchmark + def sourceRepeat(blackhole: Blackhole): Unit = + blackhole.consume(Source.repeat(element)) + + @Benchmark + def oldSourceRepeatPath(blackhole: Blackhole): Unit = { + val iterable = new immutable.Iterable[String] { + override def iterator: Iterator[String] = Iterator.continually(element) + override def toString: String = "() => Iterator" + } + blackhole.consume(oldSourceFromGraphStage(new IterableSource[String](iterable)).withAttributes(DefaultAttributes.repeat)) + } + + @Benchmark + def sourceFromIterator(blackhole: Blackhole): Unit = + blackhole.consume(Source.fromIterator(() => Iterator.single(element))) + + @Benchmark + def oldSourceFromIteratorPath(blackhole: Blackhole): Unit = { + val iterable = new immutable.Iterable[String] { + override def iterator: Iterator[String] = Iterator.single(element) + override def toString: String = "() => Iterator" + } + blackhole.consume( + oldSourceFromGraphStage(new IterableSource[String](iterable)).withAttributes(DefaultAttributes.iterableSource)) + } + + @Benchmark + def sourceIterable(blackhole: Blackhole): Unit = + blackhole.consume(Source(elements)) + + @Benchmark + def oldSourceIterablePath(blackhole: Blackhole): Unit = + blackhole.consume(oldSourceFromIterable(elements)) + + @Benchmark + def sourceRange(blackhole: Blackhole): Unit = + blackhole.consume(Source(range)) + + @Benchmark + def oldSourceRangePath(blackhole: Blackhole): Unit = + blackhole.consume( + oldSourceFromGraphStage(new IterableSource[Int](range)).withAttributes(DefaultAttributes.iterableSource)) + + @Benchmark + def javadslSourceRange(blackhole: Blackhole): Unit = + blackhole.consume(javadsl.Source.range(1, 1000)) + + @Benchmark + def oldJavadslSourceRangePath(blackhole: Blackhole): Unit = + blackhole.consume( + new javadsl.Source( + oldSourceFromGraphStage(new IterableSource[Integer](range.asInstanceOf[immutable.Iterable[Integer]])) + .withAttributes(DefaultAttributes.iterableSource))) + + @Benchmark + def sourceFuturePending(blackhole: Blackhole): Unit = + blackhole.consume(Source.future(pendingFuture)) + + @Benchmark + def oldSourceFuturePendingPath(blackhole: Blackhole): Unit = + blackhole.consume(oldSourceFromGraphStage(new GraphStages.FutureSource[String](pendingFuture))) + + @Benchmark + def sourceFromGraphStage(blackhole: Blackhole): Unit = + blackhole.consume(Source.fromGraph(new GraphStages.SingleSource(element))) + + @Benchmark + def oldSourceFromGraphStagePath(blackhole: Blackhole): Unit = { + val stage = new GraphStages.SingleSource(element) + blackhole.consume(oldSourceFromGraphStage(stage)) + } + + @Benchmark + def sinkFromGraphStage(blackhole: Blackhole): Unit = + blackhole.consume(Sink.fromGraph(sinkStage)) + + @Benchmark + def oldSinkFromGraphStagePath(blackhole: Blackhole): Unit = { + val attributes = sinkStage.traversalBuilder.attributes + val noAttributeStage = sinkStage.withAttributes(Attributes.none) + blackhole.consume( + new Sink( + LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder, noAttributeStage.shape, Keep.right), + noAttributeStage.shape).withAttributes(attributes)) + } + + @Benchmark + def flowFromGraphStage(blackhole: Blackhole): Unit = + blackhole.consume(Flow.fromGraph(flowStage)) + + @Benchmark + def oldFlowFromGraphStagePath(blackhole: Blackhole): Unit = { + val attributes = flowStage.traversalBuilder.attributes + val noAttributeStage = flowStage.withAttributes(Attributes.none) + blackhole.consume( + new Flow( + LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder, noAttributeStage.shape, Keep.right), + noAttributeStage.shape).withAttributes(attributes)) + } +} diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala new file mode 100644 index 0000000000..59925f9612 --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import java.util.concurrent.{ CountDownLatch, TimeUnit } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.openjdk.jmh.annotations._ + +import org.apache.pekko +import pekko.actor.ActorSystem +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.impl.fusing.IterableSource +import pekko.stream.scaladsl.{ Keep, RunnableGraph, Sink, Source } +import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler } + +object RangeSourceBenchmark { + @volatile var sinkSum: Int = 0 +} + +final class IntCompletionLatch extends GraphStageWithMaterializedValue[SinkShape[Int], CountDownLatch] { + val in: Inlet[Int] = Inlet[Int]("IntCompletionLatch.in") + override val shape: SinkShape[Int] = SinkShape(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, CountDownLatch) = { + val latch = new CountDownLatch(1) + val logic = new GraphStageLogic(shape) with InHandler { + private[this] var sum = 0 + + override def preStart(): Unit = pull(in) + override def onPush(): Unit = { + sum += grab(in) + pull(in) + } + + override def onUpstreamFinish(): Unit = { + RangeSourceBenchmark.sinkSum = sum + latch.countDown() + completeStage() + } + + setHandler(in, this) + } + (logic, latch) + } +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class RangeSourceBenchmark { + implicit val system: ActorSystem = ActorSystem("RangeSourceBenchmark") + + @Param(Array("1", "1000")) + var elements: Int = 0 + + var rangeToLatch: RunnableGraph[CountDownLatch] = _ + var oldRangeToLatch: RunnableGraph[CountDownLatch] = _ + + @Setup + def setup(): Unit = { + val range = 1 to elements + rangeToLatch = Source(range).toMat(Sink.fromGraph(new IntCompletionLatch))(Keep.right) + oldRangeToLatch = Source + .fromGraph(new IterableSource[Int](range)) + .withAttributes(DefaultAttributes.iterableSource) + .toMat(Sink.fromGraph(new IntCompletionLatch))(Keep.right) + } + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } + + @Benchmark + def sourceRangeToLatch(): Unit = + await(rangeToLatch.run()) + + @Benchmark + def oldSourceRangeToLatch(): Unit = + await(oldRangeToLatch.run()) + + private def await(latch: CountDownLatch): Unit = + if (!latch.await(5, TimeUnit.SECONDS)) + throw new RuntimeException("Latch timed out") +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 3aec7545b0..db3341baea 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -747,6 +747,21 @@ public class SourceTest extends StreamTestJupiter { } } + @Test + public void mustWorkFromRangeWithNegativeStep() throws Exception { + CompletionStage<List<Integer>> f = + Source.range(5, 1, -2).grouped(20).runWith(Sink.head(), system); + final List<Integer> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(Arrays.asList(5, 3, 1), result); + } + + @Test + public void mustWorkFromEmptyRange() throws Exception { + CompletionStage<List<Integer>> f = Source.range(1, 5, -1).runWith(Sink.seq(), system); + final List<Integer> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(0, result.size()); + } + @Test public void mustRepeat() throws Exception { final CompletionStage<List<Integer>> f = diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala index 2af4b9b0a7..153339aeef 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala @@ -19,8 +19,9 @@ import org.apache.pekko import pekko.NotUsed import pekko.stream._ import pekko.stream.impl.TraversalTestUtils._ -import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource } -import pekko.stream.impl.fusing.IterableSource +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, SingleSource } +import pekko.stream.impl.fusing.{ IterableSource, IteratorSource, RangeSource } import pekko.stream.scaladsl.{ Keep, Source } import pekko.testkit.PekkoSpec import pekko.util.OptionVal @@ -547,6 +548,40 @@ class TraversalBuilderSpec extends PekkoSpec { OptionVal.None) } + "find Source.fromIterator via TraversalBuilder with getValuePresentedSource" in { + val createIterator = () => Iterator("a", "b", "c") + TraversalBuilder.getValuePresentedSource(Source.fromIterator(createIterator)).get.asInstanceOf[IteratorSource[ + String]].createIterator should ===( + createIterator) + val iteratorSource = new IteratorSource(createIterator, DefaultAttributes.iterableSource) + TraversalBuilder.getValuePresentedSource(iteratorSource) should be(OptionVal.Some(iteratorSource)) + + TraversalBuilder.getValuePresentedSource(Source.fromIterator(createIterator).async) should be(OptionVal.None) + TraversalBuilder.getValuePresentedSource( + Source.fromIterator(createIterator).mapMaterializedValue(_ => "Mat")) should be(OptionVal.None) + } + + "find Source.range via TraversalBuilder with getValuePresentedSource" in { + val range = 1 to 4 + TraversalBuilder.getValuePresentedSource(Source(range)).get.asInstanceOf[RangeSource[Int]].range should ===(range) + val rangeSource = new RangeSource[Int](range, DefaultAttributes.iterableSource) + TraversalBuilder.getValuePresentedSource(rangeSource) should be(OptionVal.Some(rangeSource)) + + TraversalBuilder.getValuePresentedSource(Source(range).async) should be(OptionVal.None) + TraversalBuilder.getValuePresentedSource(Source(range).mapMaterializedValue(_ => "Mat")) should be(OptionVal.None) + } + + "find Source.repeat via TraversalBuilder with getValuePresentedSource" in { + TraversalBuilder.getValuePresentedSource(Source.repeat("a")).get.asInstanceOf[RepeatSource[String]].elem should ===( + "a") + val repeatSource = new RepeatSource("a") + TraversalBuilder.getValuePresentedSource(repeatSource) should be(OptionVal.Some(repeatSource)) + + TraversalBuilder.getValuePresentedSource(Source.repeat("c").async) should be(OptionVal.None) + TraversalBuilder.getValuePresentedSource(Source.repeat("d").mapMaterializedValue(_ => "Mat")) should be( + OptionVal.None) + } + "find Source.javaStreamSource via TraversalBuilder with getValuePresentedSource" in { val javaStream = java.util.stream.Stream.empty[String]() TraversalBuilder.getValuePresentedSource(Source.fromJavaStream(() => javaStream)).get diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 22d3b55077..119b51338b 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -369,6 +369,52 @@ class SourceSpec extends StreamSpec with DefaultTimeout { // #repeat f.futureValue shouldBe Done } + + "work when flattened through value-presented source fast path" in { + Source + .single("repeat") + .flatMapConcat(_ => Source.repeat(42)) + .take(3) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(42, 42, 42)) + } + + "work when recovered through value-presented source fast path" in { + Source + .failed[Int](TE("boom")) + .recoverWithRetries(1, { case _ => Source.repeat(42) }) + .take(3) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(42, 42, 42)) + } + } + + "Range Source" must { + "emit inclusive and exclusive ranges" in { + Source(1 to 4).runWith(Sink.seq).futureValue should ===(immutable.Seq(1, 2, 3, 4)) + Source(1 until 4).runWith(Sink.seq).futureValue should ===(immutable.Seq(1, 2, 3)) + } + + "emit stepped and empty ranges" in { + Source(5 to 1 by -2).runWith(Sink.seq).futureValue should ===(immutable.Seq(5, 3, 1)) + Source(1 to 5 by -1).runWith(Sink.seq).futureValue should ===(immutable.Seq.empty) + } + + "work when flattened through value-presented source fast path" in { + Source + .single("range") + .flatMapConcat(_ => Source(1 to 3)) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(1, 2, 3)) + } + + "work when recovered through value-presented source fast path" in { + Source + .failed[Int](TE("boom")) + .recoverWithRetries(1, { case _ => Source(1 to 3) }) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(1, 2, 3)) + } } "Unfold Source" must { @@ -437,6 +483,22 @@ class SourceSpec extends StreamSpec with DefaultTimeout { immutable.Seq(false, true, false, true, false, true, false, true, false, true)) } + "work when flattened through value-presented source fast path" in { + Source + .single("iterator") + .flatMapConcat(_ => Source.fromIterator(() => Iterator(1, 2, 3))) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(1, 2, 3)) + } + + "work when recovered through value-presented source fast path" in { + Source + .failed[Int](TE("boom")) + .recoverWithRetries(1, { case _ => Source.fromIterator(() => Iterator(1, 2, 3)) }) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(1, 2, 3)) + } + "fail stream when iterator throws" in { Source .fromIterator(() => (1 to 1000).toIterator.map(k => if (k < 10) k else throw TE("a"))) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala index 18ebc52537..1abea0b5ce 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala @@ -18,6 +18,7 @@ import pekko.actor.ActorRef import pekko.annotation.InternalApi import pekko.stream._ import pekko.stream.OverflowStrategies._ +import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage._ import pekko.util.OptionVal @@ -39,6 +40,7 @@ private object ActorRefSource { val out: Outlet[T] = Outlet[T]("actorRefSource.out") override val shape: SourceShape[T] = SourceShape.of(out) + override def initialAttributes: Attributes = DefaultAttributes.actorRefSource def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ActorRef) = throw new IllegalStateException("Not supported") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala index 61e217202a..36d8014ac3 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala @@ -20,6 +20,7 @@ import pekko.Done import pekko.annotation.InternalApi import pekko.stream._ import pekko.stream.OverflowStrategies._ +import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.scaladsl.SourceQueueWithComplete import pekko.stream.stage._ @@ -49,6 +50,7 @@ import pekko.stream.stage._ val out = Outlet[T]("queueSource.out") override val shape: SourceShape[T] = SourceShape.of(out) + override def initialAttributes: Attributes = DefaultAttributes.queueSource @scala.annotation.nowarn("msg=inferred structural type") override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala index c6206a106a..9002c81938 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala @@ -22,9 +22,10 @@ import pekko.annotation.{ DoNotInherit, InternalApi } import pekko.stream._ import pekko.stream.impl.StreamLayout.AtomicModule import pekko.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 } -import pekko.stream.impl.fusing.{ GraphStageModule, IterableSource } -import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource } +import pekko.stream.impl.fusing.{ GraphStageModule, IterableSource, IteratorSource, RangeSource } +import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, SingleSource } import pekko.stream.scaladsl.Keep +import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.util.OptionVal /** @@ -354,24 +355,29 @@ import pekko.util.OptionVal * performance optimization in FlattenMerge and possibly other places. */ def getSingleSource[A >: Null](graph: Graph[SourceShape[A], _]): OptionVal[SingleSource[A]] = { + @inline def fromModule(module: AtomicModule[_, _]): OptionVal[SingleSource[A]] = + module match { + case m: GraphStageModule[_, _] => + m.stage match { + case single: SingleSource[A] @unchecked => OptionVal.Some(single) + case _ => OptionVal.None + } + case _ => OptionVal.None + } + graph match { case single: SingleSource[A] @unchecked => OptionVal.Some(single) case _ => graph.traversalBuilder match { - case l: LinearTraversalBuilder => + case l: LinearTraversalBuilder if !l.attributes.isAsync => l.pendingBuilder match { case OptionVal.Some(a: AtomicTraversalBuilder) => - a.module match { - case m: GraphStageModule[_, _] => - m.stage match { - case single: SingleSource[A] @unchecked => - // It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize. - if ((l.traversalSoFar eq EmptyTraversal) && !l.attributes.isAsync) - OptionVal.Some(single) - else OptionVal.None - case _ => OptionVal.None - } - case _ => OptionVal.None + // It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize. + if (l.traversalSoFar eq EmptyTraversal) fromModule(a.module) else OptionVal.None + case OptionVal.None => + l.traversalSoFar match { + case MaterializeAtomic(module, _) => fromModule(module) + case _ => OptionVal.None } case _ => OptionVal.None } @@ -388,30 +394,36 @@ import pekko.util.OptionVal @InternalApi def getValuePresentedSource[A >: Null]( graph: Graph[SourceShape[A], _]): OptionVal[Graph[SourceShape[A], _]] = { def isValuePresentedSource(graph: Graph[SourceShape[_ <: A], _]): Boolean = graph match { - case _: SingleSource[_] | _: FutureSource[_] | _: IterableSource[_] | _: JavaStreamSource[_, _] | - _: FailedSource[_] => + case _: SingleSource[_] | _: FutureSource[_] | _: IterableSource[_] | _: IteratorSource[_] | + _: RangeSource[_] | _: RepeatSource[_] | _: JavaStreamSource[_, _] | _: FailedSource[_] => true case maybeEmpty if isEmptySource(maybeEmpty) => true case _ => false } + @inline def fromModule(module: AtomicModule[_, _]): OptionVal[Graph[SourceShape[A], _]] = + module match { + case m: GraphStageModule[_, _] => + m.stage match { + case _ if isValuePresentedSource(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) => + OptionVal.Some(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) + case _ => OptionVal.None + } + case _ => OptionVal.None + } + graph match { case _ if isValuePresentedSource(graph) => OptionVal.Some(graph) case _ => graph.traversalBuilder match { - case l: LinearTraversalBuilder => + case l: LinearTraversalBuilder if !l.attributes.isAsync => l.pendingBuilder match { case OptionVal.Some(a: AtomicTraversalBuilder) => - a.module match { - case m: GraphStageModule[_, _] => - m.stage match { - case _ if isValuePresentedSource(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) => - // It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize. - if ((l.traversalSoFar eq EmptyTraversal) && !l.attributes.isAsync) - OptionVal.Some(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) - else OptionVal.None - case _ => OptionVal.None - } - case _ => OptionVal.None + // It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize. + if (l.traversalSoFar eq EmptyTraversal) fromModule(a.module) else OptionVal.None + case OptionVal.None => + l.traversalSoFar match { + case MaterializeAtomic(module, _) => fromModule(module) + case _ => OptionVal.None } case _ => OptionVal.None } @@ -779,6 +791,22 @@ import pekko.util.OptionVal } } + + @inline @InternalApi private[pekko] def fromGraphStage( + graphStage: GraphStageWithMaterializedValue[_ <: Shape, _]): LinearTraversalBuilder = { + val builder = graphStage.traversalBuilder + val attributes = builder.attributes + val linear = builder match { + case atomic: AtomicTraversalBuilder => + LinearTraversalBuilder.fromModule(atomic.module, Attributes.none) + case _ => + val builderWithoutAttributes = + if (attributes eq Attributes.none) builder else builder.setAttributes(Attributes.none) + fromBuilder(builderWithoutAttributes, graphStage.shape, Keep.right) + } + + if (attributes eq Attributes.none) linear else linear.setAttributes(attributes) + } } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala index 5cf2bdc827..e9d8b1e7a7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala @@ -17,6 +17,7 @@ package org.apache.pekko.stream.impl.fusing +import scala.collection.immutable import scala.concurrent.Future import scala.util.{ Failure, Try } @@ -25,7 +26,7 @@ import pekko.annotation.InternalApi import pekko.stream.{ Attributes, FlowShape, Graph, Inlet, Outlet, SourceShape, SubscriptionWithCancelException } import pekko.stream.impl.{ Buffer => BufferImpl, FailedSource, JavaStreamSource, TraversalBuilder } import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource } +import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, SingleSource } import pekko.stream.scaladsl.Source import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import pekko.util.OptionVal @@ -54,6 +55,35 @@ private[pekko] object FlattenConcat { override def isClosed: Boolean = !hasNext } + private final class InflightRangeSource[T](range: immutable.Range) extends InflightSource[T] { + private val isEmptyRange = range.isEmpty + private val rangeLast = if (isEmptyRange) 0 else range.last + private val rangeStep = range.step + private var nextElement = range.start + private var closed = isEmptyRange + + override def hasNext: Boolean = !closed + override def next(): T = + if (closed) throw new NoSuchElementException("next called after completion") + else { + val current = nextElement + if (current == rangeLast) closed = true + else nextElement = current + rangeStep + current.asInstanceOf[T] + } + override def tryPull(): Unit = () + override def cancel(cause: Throwable): Unit = () + override def isClosed: Boolean = closed + } + + private final class InflightRepeatSource[T](elem: T) extends InflightSource[T] { + override def hasNext: Boolean = true + override def next(): T = elem + override def tryPull(): Unit = () + override def cancel(cause: Throwable): Unit = () + override def isClosed: Boolean = false + } + private final class InflightCompletedFutureSource[T](result: Try[T]) extends InflightSource[T] { private var _hasNext = result.isSuccess override def hasNext: Boolean = _hasNext @@ -219,6 +249,26 @@ private[pekko] final class FlattenConcat[T, M](parallelism: Int) } } + private def addRangeSource(range: immutable.Range): Unit = { + val inflightSource = new InflightRangeSource[T](range) + if (isAvailable(out) && queue.isEmpty) { + if (inflightSource.hasNext) { + push(out, inflightSource.next()) + if (inflightSource.hasNext) + queue.enqueue(inflightSource) + } + } else if (inflightSource.hasNext) { + queue.enqueue(inflightSource) + } + } + + private def addRepeatSource(elem: T): Unit = { + val inflightSource = new InflightRepeatSource[T](elem) + if (isAvailable(out) && queue.isEmpty) + push(out, inflightSource.next()) + queue.enqueue(inflightSource) + } + private def addCompletedFutureElem(elem: Try[T]): Unit = { if (isAvailable(out) && queue.isEmpty) { elem match { @@ -287,6 +337,9 @@ private[pekko] final class FlattenConcat[T, M](parallelism: Int) case None => addPendingFutureElem(future) } case iterable: IterableSource[T] @unchecked => addSourceElements(iterable.elements.iterator) + case iterator: IteratorSource[T] @unchecked => addSourceElements(iterator.createIterator()) + case range: RangeSource[T] @unchecked => addRangeSource(range.range) + case repeat: RepeatSource[T] @unchecked => addRepeatSource(repeat.elem) case javaStream: JavaStreamSource[T, _] @unchecked => import scala.jdk.CollectionConverters._ addSourceElements(javaStream.open().iterator.asScala) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala index b95b29f980..2b475d7bb6 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala @@ -294,6 +294,20 @@ import pekko.stream.stage._ override def toString: String = "SingleSource" } + final class RepeatSource[T](val elem: T) extends GraphStage[SourceShape[T]] { + override def initialAttributes: Attributes = DefaultAttributes.repeat + val out = Outlet[T]("repeat.out") + override val shape = SourceShape(out) + override def createLogic(attr: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + override def onPull(): Unit = push(out, elem) + + setHandler(out, this) + } + + override def toString: String = "RepeatSource" + } + final class FutureFlattenSource[T, M](futureSource: Future[Graph[SourceShape[T], M]]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { ReactiveStreamsCompliance.requireNonNullElement(futureSource) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala new file mode 100644 index 0000000000..dbb35f324c --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import scala.util.control.NonFatal + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.stream.ActorAttributes.SupervisionStrategy +import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision } +import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } + +@InternalApi +private[pekko] final class IteratorSource[T]( + val createIterator: () => Iterator[T], + defaultAttributes: Attributes) + extends GraphStage[SourceShape[T]] { + + override protected def initialAttributes: Attributes = defaultAttributes + + private val out = Outlet[T]("IteratorSource.out") + override val shape: SourceShape[T] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + private var currentIterator: Iterator[T] = _ + + override def onPull(): Unit = + try { + if (currentIterator eq null) + currentIterator = createIterator() + pushNextOrComplete() + } catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => failStage(ex) + case Supervision.Resume => pushNextOrComplete() + case Supervision.Restart => + currentIterator = createIterator() + pushNextOrComplete() + } + } + + private def pushNextOrComplete(): Unit = + if (currentIterator.hasNext) { + if (isAvailable(out)) { + push(out, currentIterator.next()) + if (!currentIterator.hasNext) + completeStage() + } + } else { + completeStage() + } + + setHandler(out, this) + } + + override def toString: String = "IteratorSource" +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 6f7b1db1ae..a0747c2b99 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -45,7 +45,7 @@ import pekko.stream.impl.{ TraversalBuilder } import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SimpleLinearGraphStage, SingleSource } +import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, SimpleLinearGraphStage, SingleSource } import pekko.stream.scaladsl.{ DelayStrategy, Source, @@ -2195,6 +2195,12 @@ private[pekko] object TakeWithin { } case iterableSource: IterableSource[T @unchecked] => emitMultiple(out, iterableSource.elements, () => completeStage()) + case iteratorSource: IteratorSource[T @unchecked] => + emitMultiple(out, iteratorSource.createIterator(), () => completeStage()) + case rangeSource: RangeSource[T @unchecked] => + emitMultiple(out, rangeSource.range.iterator.asInstanceOf[Iterator[T]], () => completeStage()) + case repeatSource: RepeatSource[T @unchecked] => + emitMultiple(out, Iterator.continually(repeatSource.elem), () => completeStage()) case javaStreamSource: JavaStreamSource[T @unchecked, _] => emitMultiple(out, javaStreamSource.open().spliterator(), () => completeStage()) case _ => diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala new file mode 100644 index 0000000000..7f9131a3cc --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import scala.collection.immutable + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.stream.{ Attributes, Outlet, SourceShape } +import pekko.stream.impl.ReactiveStreamsCompliance +import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } + +@InternalApi +private[pekko] final class RangeSource[T](val range: immutable.Range, defaultAttributes: Attributes) + extends GraphStage[SourceShape[T]] { + ReactiveStreamsCompliance.requireNonNullElement(range) + + override protected def initialAttributes: Attributes = defaultAttributes + + private val out = Outlet[T]("RangeSource.out") + override val shape: SourceShape[T] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + private[this] val isEmptyRange = range.isEmpty + private[this] val rangeLast = if (isEmptyRange) 0 else range.last + private[this] val rangeStep = range.step + private[this] var nextElement = range.start + + override def preStart(): Unit = + if (isEmptyRange) completeStage() + + override def onPull(): Unit = { + val current = nextElement + val isLast = current == rangeLast + if (!isLast) + nextElement = current + rangeStep + + push(out, current.asInstanceOf[T]) + if (isLast) + completeStage() + } + + setHandler(out, this) + } + + override def toString: String = "RangeSource" +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 10a634f582..522be44914 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -38,7 +38,7 @@ import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava } +import pekko.stream.impl.fusing.{ RangeSource, StatefulMapConcat, ZipWithIndexJava } import pekko.util._ import org.jspecify.annotations.Nullable @@ -234,7 +234,9 @@ object Source { * @see [[scala.collection.immutable.Range.inclusive(Int, Int, Int)]] */ def range(start: Int, end: Int, step: Int): javadsl.Source[Integer, NotUsed] = - new Source(scaladsl.Source(Range.inclusive(start, end, step).asInstanceOf[immutable.Iterable[Integer]])) + new Source( + scaladsl.Source.fromGraph( + new RangeSource[Integer](Range.inclusive(start, end, step), DefaultAttributes.iterableSource))) /** * Elements are emitted periodically with the specified interval. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4cb3070b21..1123b52a1b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -505,13 +505,7 @@ object Flow { case f: Flow[I, O, M] => f case f: javadsl.Flow[I, O, M] @unchecked => f.asScala case g: GraphStageWithMaterializedValue[FlowShape[I, O], M] => - // move these from the operator itself to make the returned source - // behave as it is the operator with regards to attributes - val attrs = g.traversalBuilder.attributes - val noAttrStage = g.withAttributes(Attributes.none) - new Flow( - LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right), - noAttrStage.shape).withAttributes(attrs) + new Flow(LinearTraversalBuilder.fromGraphStage(g), g.shape) case _ => new Flow(LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right), g.shape) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 1a49530dfe..8407ba5a0e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -150,13 +150,7 @@ object Sink { case s: Sink[T, M] => s case s: javadsl.Sink[T, M] @unchecked => s.asScala case g: GraphStageWithMaterializedValue[SinkShape[T], M] => - // move these from the stage itself to make the returned source - // behave as it is the stage with regards to attributes - val attrs = g.traversalBuilder.attributes - val noAttrStage = g.withAttributes(Attributes.none) - new Sink( - LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right), - noAttrStage.shape).withAttributes(attrs) + new Sink(LinearTraversalBuilder.fromGraphStage(g), g.shape) case other => new Sink(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), other.shape) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 739382bf7e..035c7076e9 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -30,7 +30,15 @@ import pekko.annotation.InternalApi import pekko.stream._ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.{ ArraySource, GraphStages, IterableSource, LazyFutureSource, LazySingleSource } +import pekko.stream.impl.fusing.{ + ArraySource, + GraphStages, + IterableSource, + IteratorSource, + LazyFutureSource, + LazySingleSource, + RangeSource +} import pekko.stream.impl.fusing.GraphStages._ import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.util.ConstantFun @@ -286,11 +294,8 @@ object Source { * Elements are pulled out of the iterator in accordance with the demand coming * from the downstream transformation steps. */ - def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed] = - apply(new immutable.Iterable[T] { - override def iterator: Iterator[T] = f() - override def toString: String = "() => Iterator" - }) + @inline def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed] = + fromGraphStage(new IteratorSource[T](f, DefaultAttributes.iterableSource)) /** * Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its @@ -309,11 +314,11 @@ object Source { * Starts a new 'cycled' `Source` from the given elements. The producer stream of elements * will continue infinitely by repeating the sequence of elements provided by function parameter. */ - def cycle[T](f: () => Iterator[T]): Source[T, NotUsed] = { + @inline def cycle[T](f: () => Iterator[T]): Source[T, NotUsed] = { val iterator = Iterator.continually { val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty iterator") else i }.flatten - fromIterator(() => iterator).withAttributes(DefaultAttributes.cycledSource) + fromGraphStage(new IteratorSource[T](() => iterator, DefaultAttributes.cycledSource)) } /** @@ -372,26 +377,25 @@ object Source { def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match { case s: Source[T, M] => s case s: javadsl.Source[T, M] @unchecked => s.asScala - case g: GraphStageWithMaterializedValue[SourceShape[T], M] => - // move these from the stage itself to make the returned source - // behave as it is the stage with regards to attributes - val attrs = g.traversalBuilder.attributes - val noAttrStage = g.withAttributes(Attributes.none) - new Source( - LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right), - noAttrStage.shape).withAttributes(attrs) - case other => + case g: GraphStageWithMaterializedValue[SourceShape[T], M] => fromGraphStage(g) + case other => // composite source shaped graph new Source(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), other.shape) } + @inline private def fromGraphStage[T, M](g: GraphStageWithMaterializedValue[SourceShape[T], M]): Source[T, M] = + new Source(LinearTraversalBuilder.fromGraphStage(g), g.shape) + + @inline private def fromRange[T](range: immutable.Range): Source[T, NotUsed] = + fromGraphStage(new RangeSource[T](range, DefaultAttributes.iterableSource)) + /** * Defers the creation of a [[Source]] until materialization. The `factory` function * exposes [[Materializer]] which is going to be used during materialization and * [[Attributes]] of the [[Source]] returned by this method. */ def fromMaterializer[T, M](factory: (Materializer, Attributes) => Source[T, M]): Source[T, Future[M]] = - Source.fromGraph(new SetupSourceStage(factory)) + fromGraphStage(new SetupSourceStage(factory)) /** * Helper to create [[Source]] from `Iterable`. @@ -403,13 +407,16 @@ object Source { * beginning) regardless of when they subscribed. * @see [[apply(immutable.Seq)]] */ - def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = { + @inline def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = { // unknown size is -1 (iterable.knownSize: @switch) match { case 0 => empty case 1 => single(iterable.head) case _ => - fromGraph(new IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource) + iterable match { + case range: immutable.Range => fromRange[T](range) + case _ => fromGraphStage(new IterableSource[T](iterable)) + } } } @@ -424,12 +431,16 @@ object Source { * @see [[apply(immutable.Iterable)]] * @since 2.0.0 */ - def apply[T](seq: immutable.Seq[T]): Source[T, NotUsed] = { - seq match { - case immutable.Seq() => empty[T] - case immutable.Seq(elem: T @unchecked) => single(elem) - case _ => - fromGraph(new IterableSource[T](seq)).withAttributes(DefaultAttributes.iterableSource) + @inline def apply[T](seq: immutable.Seq[T]): Source[T, NotUsed] = { + // unknown size is -1 + (seq.knownSize: @switch) match { + case 0 => empty[T] + case 1 => single(seq.head) + case _ => + seq match { + case range: immutable.Range => fromRange[T](range) + case _ => fromGraphStage(new IterableSource[T](seq)) + } } } @@ -439,13 +450,13 @@ object Source { * * @since 1.3.0 */ - def apply[T](array: Array[T]): Source[T, NotUsed] = { + @inline def apply[T](array: Array[T]): Source[T, NotUsed] = { if (array.length == 0) empty else if (array.length == 1) single(array(0)) else - Source.fromGraph(new ArraySource[T](array)) + fromGraphStage(new ArraySource[T](array)) } /** @@ -456,14 +467,14 @@ object Source { * receive new tick elements as soon as it has requested more elements. */ def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] = - fromGraph(new TickSource[T](initialDelay, interval, tick)) + fromGraphStage(new TickSource[T](initialDelay, interval, tick)) /** * Create a `Source` with one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ - def single[T](element: T): Source[T, NotUsed] = - fromGraph(new GraphStages.SingleSource(element)) + @inline def single[T](element: T): Source[T, NotUsed] = + fromGraphStage(new GraphStages.SingleSource(element)) /** * Create a `Source` from the given elements. @@ -496,8 +507,8 @@ object Source { /** * Create a `Source` that will continually emit the given element. */ - def repeat[T](element: T): Source[T, NotUsed] = { - fromIterator(() => Iterator.continually(element)).withAttributes(DefaultAttributes.repeat) + @inline def repeat[T](element: T): Source[T, NotUsed] = { + fromGraphStage(new GraphStages.RepeatSource(element)) } /** @@ -514,7 +525,7 @@ object Source { * }}} */ def unfold[S, E](s: S)(f: S => Option[(S, E)]): Source[E, NotUsed] = - Source.fromGraph(new Unfold(s, f)) + fromGraphStage(new Unfold(s, f)) /** * Same as [[unfold]], but uses an async function to generate the next state-element tuple. @@ -532,7 +543,7 @@ object Source { * }}} */ def unfoldAsync[S, E](s: S)(f: S => Future[Option[(S, E)]]): Source[E, NotUsed] = - Source.fromGraph(new UnfoldAsync(s, f)) + fromGraphStage(new UnfoldAsync(s, f)) /** * Creates a sequential `Source` by iterating with the given predicate and function, @@ -543,27 +554,29 @@ object Source { * @since 1.1.0 */ def iterate[T](seed: T)(p: T => Boolean, f: T => T): Source[T, NotUsed] = - fromIterator(() => - new AbstractIterator[T] { - private var first = true - private var acc = seed - override def hasNext: Boolean = p(acc) - override def next(): T = { - if (first) { - first = false - } else { - acc = f(acc) + fromGraphStage(new IteratorSource[T]( + () => + new AbstractIterator[T] { + private var first = true + private var acc = seed + override def hasNext: Boolean = p(acc) + override def next(): T = { + if (first) { + first = false + } else { + acc = f(acc) + } + acc } - acc - } - }).withAttributes(DefaultAttributes.iterateSource) + }, + DefaultAttributes.iterateSource)) /** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. */ def empty[T]: Source[T, NotUsed] = _empty private[this] val _empty: Source[Nothing, NotUsed] = - Source.fromGraph(EmptySource) + fromGraphStage(EmptySource) /** * Create a `Source` which materializes a [[scala.concurrent.Promise]] which controls what element @@ -577,20 +590,21 @@ object Source { * with None. */ def maybe[T]: Source[T, Promise[Option[T]]] = - Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T], Promise[Option[T]]]]) + fromGraphStage( + MaybeSource.asInstanceOf[GraphStageWithMaterializedValue[SourceShape[T], Promise[Option[T]]]]) /** * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. */ def failed[T](cause: Throwable): Source[T, NotUsed] = - Source.fromGraph(new FailedSource[T](cause)) + fromGraphStage(new FailedSource[T](cause)) /** * Emits a single value when the given `Future` is successfully completed and then completes the stream. * The stream fails if the `Future` is completed with a failure. */ - def future[T](futureElement: Future[T]): Source[T, NotUsed] = futureElement.value match { - case None => fromGraph(new FutureSource[T](futureElement)) + @inline def future[T](futureElement: Future[T]): Source[T, NotUsed] = futureElement.value match { + case None => fromGraphStage(new FutureSource[T](futureElement)) case Some(scala.util.Success(null)) => empty[T] case Some(scala.util.Success(elem)) => single(elem) case Some(scala.util.Failure(ex)) => failed[T](ex) @@ -601,7 +615,7 @@ object Source { * This stream could be useful in tests. */ def never[T]: Source[T, NotUsed] = _never - private[this] val _never: Source[Nothing, NotUsed] = fromGraph(GraphStages.NeverSource) + private[this] val _never: Source[Nothing, NotUsed] = fromGraphStage(GraphStages.NeverSource) /** * Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream. @@ -616,8 +630,8 @@ object Source { * Turn a `Future[Source]` into a source that will emit the values of the source when the future completes successfully. * If the `Future` is completed with a failure the stream is failed. */ - def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] = futureSource.value match { - case None => fromGraph(new FutureFlattenSource(futureSource)) + @inline def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] = futureSource.value match { + case None => fromGraphStage(new FutureFlattenSource(futureSource)) case Some(scala.util.Success(null)) => val exception = new NullPointerException("futureSource completed with null") Source.failed(exception).mapMaterializedValue(_ => Future.failed[M](exception)) @@ -634,7 +648,7 @@ object Source { * the laziness and will trigger the factory immediately. */ def lazySingle[T](create: () => T): Source[T, NotUsed] = - fromGraph(new LazySingleSource(create)) + fromGraphStage(new LazySingleSource(create)) /** * Defers invoking the `create` function to create a future element until there is downstream demand. @@ -646,7 +660,7 @@ object Source { * the laziness and will trigger the factory immediately. */ def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed] = - fromGraph(new LazyFutureSource(create)) + fromGraphStage(new LazyFutureSource(create)) /** * Defers invoking the `create` function to create a future source until there is downstream demand. @@ -665,7 +679,7 @@ object Source { * is failed with a [[pekko.stream.NeverMaterializedException]] */ def lazySource[T, M](create: () => Source[T, M]): Source[T, Future[M]] = - fromGraph(new LazySource(create)) + fromGraphStage(new LazySource(create)) /** * Defers invoking the `create` function to create a future source until there is downstream demand. @@ -738,9 +752,7 @@ object Source { overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") require(!overflowStrategy.isBackpressure, "Backpressure overflowStrategy not supported") - Source - .fromGraph(new ActorRefSource(bufferSize, overflowStrategy, completionMatcher, failureMatcher)) - .withAttributes(DefaultAttributes.actorRefSource) + fromGraphStage(new ActorRefSource(bufferSize, overflowStrategy, completionMatcher, failureMatcher)) } /** @@ -751,7 +763,7 @@ object Source { ackMessage: Any, completionMatcher: PartialFunction[Any, CompletionStrategy], failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = { - Source.fromGraph(new ActorRefBackpressureSource(ackTo, ackMessage, completionMatcher, failureMatcher)) + fromGraphStage(new ActorRefBackpressureSource(ackTo, ackMessage, completionMatcher, failureMatcher)) } /** @@ -772,7 +784,7 @@ object Source { ackMessage: Any, completionMatcher: PartialFunction[Any, CompletionStrategy], failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = { - Source.fromGraph(new ActorRefBackpressureSource(None, ackMessage, completionMatcher, failureMatcher)) + fromGraphStage(new ActorRefBackpressureSource(None, ackMessage, completionMatcher, failureMatcher)) } /** @@ -910,7 +922,7 @@ object Source { * @param bufferSize size of the buffer in number of elements */ def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]] = - Source.fromGraph(new BoundedSourceQueueStage[T](bufferSize)) + fromGraphStage(new BoundedSourceQueueStage[T](bufferSize)) /** * Creates a Source that will immediately execute the provided function `producer` with a [[BoundedSourceQueue]] when materialized. @@ -1029,8 +1041,7 @@ object Source { bufferSize: Int, overflowStrategy: OverflowStrategy, maxConcurrentOffers: Int): Source[T, SourceQueueWithComplete[T]] = - Source.fromGraph( - new QueueSource(bufferSize, overflowStrategy, maxConcurrentOffers).withAttributes(DefaultAttributes.queueSource)) + fromGraphStage(new QueueSource(bufferSize, overflowStrategy, maxConcurrentOffers)) /** * Start a new `Source` from some resource which can be opened, read and closed. @@ -1063,7 +1074,7 @@ object Source { * @tparam R - the resource type. */ def unfoldResource[T, R](create: () => R, read: (R) => Option[T], close: (R) => Unit): Source[T, NotUsed] = - Source.fromGraph(new UnfoldResourceSource(create, read, close)) + fromGraphStage(new UnfoldResourceSource(create, read, close)) /** * Start a new `Source` from some resource which can be opened, read and closed. @@ -1091,7 +1102,7 @@ object Source { create: () => Future[R], read: (R) => Future[Option[T]], close: (R) => Future[Done]): Source[T, NotUsed] = - Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close)) + fromGraphStage(new UnfoldResourceSourceAsync(create, read, close)) /** * Merge multiple [[Source]]s. Prefer the sources depending on the 'priority' parameters. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
