This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch issue-29334-source-single-fast-path
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 7983ac1435a343e5638d4dadaa9d239f16071e2d
Author: 虎鸣 <[email protected]>
AuthorDate: Sun Apr 26 17:29:37 2026 +0800

    perf(stream): reduce source construction overhead
---
 .../stream/GraphStageConstructionBenchmark.scala   | 179 +++++++++++++++++++++
 .../apache/pekko/stream/RangeSourceBenchmark.scala | 103 ++++++++++++
 .../apache/pekko/stream/javadsl/SourceTest.java    |  15 ++
 .../pekko/stream/impl/TraversalBuilderSpec.scala   |  39 ++++-
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  |  62 +++++++
 .../apache/pekko/stream/impl/ActorRefSource.scala  |   2 +
 .../org/apache/pekko/stream/impl/QueueSource.scala |   2 +
 .../pekko/stream/impl/TraversalBuilder.scala       |  84 ++++++----
 .../pekko/stream/impl/fusing/FlattenConcat.scala   |  55 ++++++-
 .../pekko/stream/impl/fusing/GraphStages.scala     |  14 ++
 .../pekko/stream/impl/fusing/IteratorSource.scala  |  75 +++++++++
 .../org/apache/pekko/stream/impl/fusing/Ops.scala  |   8 +-
 .../pekko/stream/impl/fusing/RangeSource.scala     |  63 ++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   |   6 +-
 .../org/apache/pekko/stream/scaladsl/Flow.scala    |   8 +-
 .../org/apache/pekko/stream/scaladsl/Sink.scala    |   8 +-
 .../org/apache/pekko/stream/scaladsl/Source.scala  | 149 +++++++++--------
 17 files changed, 755 insertions(+), 117 deletions(-)

diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala
 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala
new file mode 100644
index 0000000000..5fd4e5b233
--- /dev/null
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/GraphStageConstructionBenchmark.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.immutable
+import scala.concurrent.Promise
+
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.impl.LinearTraversalBuilder
+import org.apache.pekko.stream.impl.Stages.DefaultAttributes
+import org.apache.pekko.stream.impl.fusing.GraphStages
+import org.apache.pekko.stream.impl.fusing.IterableSource
+import org.apache.pekko.stream.scaladsl.Flow
+import org.apache.pekko.stream.scaladsl.Keep
+import org.apache.pekko.stream.scaladsl.Sink
+import org.apache.pekko.stream.scaladsl.Source
+import org.apache.pekko.stream.stage.GraphStageWithMaterializedValue
+import org.openjdk.jmh.annotations.Benchmark
+import org.openjdk.jmh.annotations.BenchmarkMode
+import org.openjdk.jmh.annotations.Mode
+import org.openjdk.jmh.annotations.OutputTimeUnit
+import org.openjdk.jmh.annotations.Scope
+import org.openjdk.jmh.annotations.State
+import org.openjdk.jmh.infra.Blackhole
+
+@State(Scope.Benchmark)
+@BenchmarkMode(Array(Mode.Throughput))
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+class GraphStageConstructionBenchmark {
+  private val element = "element"
+  private val elements: immutable.Iterable[String] = Vector(element, element)
+  private val range = 1 to 1000
+  private val pendingFuture = Promise[String]().future
+  private val flowStage = GraphStages.identity[Any]
+  private val sinkStage = GraphStages.IgnoreSink
+
+  private def oldSourceFromGraphStage[T, M](
+      stage: GraphStageWithMaterializedValue[SourceShape[T], M]): Source[T, M] 
= {
+    val attributes = stage.traversalBuilder.attributes
+    val noAttributeStage = stage.withAttributes(Attributes.none)
+    new Source(
+      LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder, 
noAttributeStage.shape, Keep.right),
+      noAttributeStage.shape).withAttributes(attributes)
+  }
+
+  private def oldSourceFromIterable[T](iterable: immutable.Iterable[T]): 
Source[T, NotUsed] =
+    (iterable.knownSize: @scala.annotation.switch) match {
+      case 0 => Source.empty
+      case 1 => oldSourceFromGraphStage(new 
GraphStages.SingleSource(iterable.head))
+      case _ =>
+        oldSourceFromGraphStage(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+    }
+
+  @Benchmark
+  def sourceSingle(blackhole: Blackhole): Unit =
+    blackhole.consume(Source.single(element))
+
+  @Benchmark
+  def javadslSourceSingle(blackhole: Blackhole): Unit =
+    blackhole.consume(javadsl.Source.single(element))
+
+  @Benchmark
+  def sourceRepeat(blackhole: Blackhole): Unit =
+    blackhole.consume(Source.repeat(element))
+
+  @Benchmark
+  def oldSourceRepeatPath(blackhole: Blackhole): Unit = {
+    val iterable = new immutable.Iterable[String] {
+      override def iterator: Iterator[String] = Iterator.continually(element)
+      override def toString: String = "() => Iterator"
+    }
+    blackhole.consume(oldSourceFromGraphStage(new 
IterableSource[String](iterable)).withAttributes(DefaultAttributes.repeat))
+  }
+
+  @Benchmark
+  def sourceFromIterator(blackhole: Blackhole): Unit =
+    blackhole.consume(Source.fromIterator(() => Iterator.single(element)))
+
+  @Benchmark
+  def oldSourceFromIteratorPath(blackhole: Blackhole): Unit = {
+    val iterable = new immutable.Iterable[String] {
+      override def iterator: Iterator[String] = Iterator.single(element)
+      override def toString: String = "() => Iterator"
+    }
+    blackhole.consume(
+      oldSourceFromGraphStage(new 
IterableSource[String](iterable)).withAttributes(DefaultAttributes.iterableSource))
+  }
+
+  @Benchmark
+  def sourceIterable(blackhole: Blackhole): Unit =
+    blackhole.consume(Source(elements))
+
+  @Benchmark
+  def oldSourceIterablePath(blackhole: Blackhole): Unit =
+    blackhole.consume(oldSourceFromIterable(elements))
+
+  @Benchmark
+  def sourceRange(blackhole: Blackhole): Unit =
+    blackhole.consume(Source(range))
+
+  @Benchmark
+  def oldSourceRangePath(blackhole: Blackhole): Unit =
+    blackhole.consume(
+      oldSourceFromGraphStage(new 
IterableSource[Int](range)).withAttributes(DefaultAttributes.iterableSource))
+
+  @Benchmark
+  def javadslSourceRange(blackhole: Blackhole): Unit =
+    blackhole.consume(javadsl.Source.range(1, 1000))
+
+  @Benchmark
+  def oldJavadslSourceRangePath(blackhole: Blackhole): Unit =
+    blackhole.consume(
+      new javadsl.Source(
+        oldSourceFromGraphStage(new 
IterableSource[Integer](range.asInstanceOf[immutable.Iterable[Integer]]))
+          .withAttributes(DefaultAttributes.iterableSource)))
+
+  @Benchmark
+  def sourceFuturePending(blackhole: Blackhole): Unit =
+    blackhole.consume(Source.future(pendingFuture))
+
+  @Benchmark
+  def oldSourceFuturePendingPath(blackhole: Blackhole): Unit =
+    blackhole.consume(oldSourceFromGraphStage(new 
GraphStages.FutureSource[String](pendingFuture)))
+
+  @Benchmark
+  def sourceFromGraphStage(blackhole: Blackhole): Unit =
+    blackhole.consume(Source.fromGraph(new GraphStages.SingleSource(element)))
+
+  @Benchmark
+  def oldSourceFromGraphStagePath(blackhole: Blackhole): Unit = {
+    val stage = new GraphStages.SingleSource(element)
+    blackhole.consume(oldSourceFromGraphStage(stage))
+  }
+
+  @Benchmark
+  def sinkFromGraphStage(blackhole: Blackhole): Unit =
+    blackhole.consume(Sink.fromGraph(sinkStage))
+
+  @Benchmark
+  def oldSinkFromGraphStagePath(blackhole: Blackhole): Unit = {
+    val attributes = sinkStage.traversalBuilder.attributes
+    val noAttributeStage = sinkStage.withAttributes(Attributes.none)
+    blackhole.consume(
+      new Sink(
+        LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder, 
noAttributeStage.shape, Keep.right),
+        noAttributeStage.shape).withAttributes(attributes))
+  }
+
+  @Benchmark
+  def flowFromGraphStage(blackhole: Blackhole): Unit =
+    blackhole.consume(Flow.fromGraph(flowStage))
+
+  @Benchmark
+  def oldFlowFromGraphStagePath(blackhole: Blackhole): Unit = {
+    val attributes = flowStage.traversalBuilder.attributes
+    val noAttributeStage = flowStage.withAttributes(Attributes.none)
+    blackhole.consume(
+      new Flow(
+        LinearTraversalBuilder.fromBuilder(noAttributeStage.traversalBuilder, 
noAttributeStage.shape, Keep.right),
+        noAttributeStage.shape).withAttributes(attributes))
+  }
+}
diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala
new file mode 100644
index 0000000000..59925f9612
--- /dev/null
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/RangeSourceBenchmark.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.{ CountDownLatch, TimeUnit }
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.openjdk.jmh.annotations._
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.impl.fusing.IterableSource
+import pekko.stream.scaladsl.{ Keep, RunnableGraph, Sink, Source }
+import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, 
InHandler }
+
+object RangeSourceBenchmark {
+  @volatile var sinkSum: Int = 0
+}
+
+final class IntCompletionLatch extends 
GraphStageWithMaterializedValue[SinkShape[Int], CountDownLatch] {
+  val in: Inlet[Int] = Inlet[Int]("IntCompletionLatch.in")
+  override val shape: SinkShape[Int] = SinkShape(in)
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, CountDownLatch) = {
+    val latch = new CountDownLatch(1)
+    val logic = new GraphStageLogic(shape) with InHandler {
+      private[this] var sum = 0
+
+      override def preStart(): Unit = pull(in)
+      override def onPush(): Unit = {
+        sum += grab(in)
+        pull(in)
+      }
+
+      override def onUpstreamFinish(): Unit = {
+        RangeSourceBenchmark.sinkSum = sum
+        latch.countDown()
+        completeStage()
+      }
+
+      setHandler(in, this)
+    }
+    (logic, latch)
+  }
+}
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+class RangeSourceBenchmark {
+  implicit val system: ActorSystem = ActorSystem("RangeSourceBenchmark")
+
+  @Param(Array("1", "1000"))
+  var elements: Int = 0
+
+  var rangeToLatch: RunnableGraph[CountDownLatch] = _
+  var oldRangeToLatch: RunnableGraph[CountDownLatch] = _
+
+  @Setup
+  def setup(): Unit = {
+    val range = 1 to elements
+    rangeToLatch = Source(range).toMat(Sink.fromGraph(new 
IntCompletionLatch))(Keep.right)
+    oldRangeToLatch = Source
+      .fromGraph(new IterableSource[Int](range))
+      .withAttributes(DefaultAttributes.iterableSource)
+      .toMat(Sink.fromGraph(new IntCompletionLatch))(Keep.right)
+  }
+
+  @TearDown
+  def shutdown(): Unit = {
+    Await.result(system.terminate(), 5.seconds)
+  }
+
+  @Benchmark
+  def sourceRangeToLatch(): Unit =
+    await(rangeToLatch.run())
+
+  @Benchmark
+  def oldSourceRangeToLatch(): Unit =
+    await(oldRangeToLatch.run())
+
+  private def await(latch: CountDownLatch): Unit =
+    if (!latch.await(5, TimeUnit.SECONDS))
+      throw new RuntimeException("Latch timed out")
+}
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 3aec7545b0..db3341baea 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -747,6 +747,21 @@ public class SourceTest extends StreamTestJupiter {
     }
   }
 
+  @Test
+  public void mustWorkFromRangeWithNegativeStep() throws Exception {
+    CompletionStage<List<Integer>> f =
+        Source.range(5, 1, -2).grouped(20).runWith(Sink.head(), system);
+    final List<Integer> result = f.toCompletableFuture().get(3, 
TimeUnit.SECONDS);
+    assertEquals(Arrays.asList(5, 3, 1), result);
+  }
+
+  @Test
+  public void mustWorkFromEmptyRange() throws Exception {
+    CompletionStage<List<Integer>> f = Source.range(1, 5, 
-1).runWith(Sink.seq(), system);
+    final List<Integer> result = f.toCompletableFuture().get(3, 
TimeUnit.SECONDS);
+    assertEquals(0, result.size());
+  }
+
   @Test
   public void mustRepeat() throws Exception {
     final CompletionStage<List<Integer>> f =
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
index 2af4b9b0a7..153339aeef 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
@@ -19,8 +19,9 @@ import org.apache.pekko
 import pekko.NotUsed
 import pekko.stream._
 import pekko.stream.impl.TraversalTestUtils._
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
-import pekko.stream.impl.fusing.IterableSource
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SingleSource }
+import pekko.stream.impl.fusing.{ IterableSource, IteratorSource, RangeSource }
 import pekko.stream.scaladsl.{ Keep, Source }
 import pekko.testkit.PekkoSpec
 import pekko.util.OptionVal
@@ -547,6 +548,40 @@ class TraversalBuilderSpec extends PekkoSpec {
       OptionVal.None)
   }
 
+  "find Source.fromIterator via TraversalBuilder with getValuePresentedSource" 
in {
+    val createIterator = () => Iterator("a", "b", "c")
+    
TraversalBuilder.getValuePresentedSource(Source.fromIterator(createIterator)).get.asInstanceOf[IteratorSource[
+      String]].createIterator should ===(
+      createIterator)
+    val iteratorSource = new IteratorSource(createIterator, 
DefaultAttributes.iterableSource)
+    TraversalBuilder.getValuePresentedSource(iteratorSource) should 
be(OptionVal.Some(iteratorSource))
+
+    
TraversalBuilder.getValuePresentedSource(Source.fromIterator(createIterator).async)
 should be(OptionVal.None)
+    TraversalBuilder.getValuePresentedSource(
+      Source.fromIterator(createIterator).mapMaterializedValue(_ => "Mat")) 
should be(OptionVal.None)
+  }
+
+  "find Source.range via TraversalBuilder with getValuePresentedSource" in {
+    val range = 1 to 4
+    
TraversalBuilder.getValuePresentedSource(Source(range)).get.asInstanceOf[RangeSource[Int]].range
 should ===(range)
+    val rangeSource = new RangeSource[Int](range, 
DefaultAttributes.iterableSource)
+    TraversalBuilder.getValuePresentedSource(rangeSource) should 
be(OptionVal.Some(rangeSource))
+
+    TraversalBuilder.getValuePresentedSource(Source(range).async) should 
be(OptionVal.None)
+    
TraversalBuilder.getValuePresentedSource(Source(range).mapMaterializedValue(_ 
=> "Mat")) should be(OptionVal.None)
+  }
+
+  "find Source.repeat via TraversalBuilder with getValuePresentedSource" in {
+    
TraversalBuilder.getValuePresentedSource(Source.repeat("a")).get.asInstanceOf[RepeatSource[String]].elem
 should ===(
+      "a")
+    val repeatSource = new RepeatSource("a")
+    TraversalBuilder.getValuePresentedSource(repeatSource) should 
be(OptionVal.Some(repeatSource))
+
+    TraversalBuilder.getValuePresentedSource(Source.repeat("c").async) should 
be(OptionVal.None)
+    
TraversalBuilder.getValuePresentedSource(Source.repeat("d").mapMaterializedValue(_
 => "Mat")) should be(
+      OptionVal.None)
+  }
+
   "find Source.javaStreamSource via TraversalBuilder with 
getValuePresentedSource" in {
     val javaStream = java.util.stream.Stream.empty[String]()
     TraversalBuilder.getValuePresentedSource(Source.fromJavaStream(() => 
javaStream)).get
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 22d3b55077..119b51338b 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -369,6 +369,52 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
       // #repeat
       f.futureValue shouldBe Done
     }
+
+    "work when flattened through value-presented source fast path" in {
+      Source
+        .single("repeat")
+        .flatMapConcat(_ => Source.repeat(42))
+        .take(3)
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(42, 42, 42))
+    }
+
+    "work when recovered through value-presented source fast path" in {
+      Source
+        .failed[Int](TE("boom"))
+        .recoverWithRetries(1, { case _ => Source.repeat(42) })
+        .take(3)
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(42, 42, 42))
+    }
+  }
+
+  "Range Source" must {
+    "emit inclusive and exclusive ranges" in {
+      Source(1 to 4).runWith(Sink.seq).futureValue should ===(immutable.Seq(1, 
2, 3, 4))
+      Source(1 until 4).runWith(Sink.seq).futureValue should 
===(immutable.Seq(1, 2, 3))
+    }
+
+    "emit stepped and empty ranges" in {
+      Source(5 to 1 by -2).runWith(Sink.seq).futureValue should 
===(immutable.Seq(5, 3, 1))
+      Source(1 to 5 by -1).runWith(Sink.seq).futureValue should 
===(immutable.Seq.empty)
+    }
+
+    "work when flattened through value-presented source fast path" in {
+      Source
+        .single("range")
+        .flatMapConcat(_ => Source(1 to 3))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "work when recovered through value-presented source fast path" in {
+      Source
+        .failed[Int](TE("boom"))
+        .recoverWithRetries(1, { case _ => Source(1 to 3) })
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
   }
 
   "Unfold Source" must {
@@ -437,6 +483,22 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
         immutable.Seq(false, true, false, true, false, true, false, true, 
false, true))
     }
 
+    "work when flattened through value-presented source fast path" in {
+      Source
+        .single("iterator")
+        .flatMapConcat(_ => Source.fromIterator(() => Iterator(1, 2, 3)))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "work when recovered through value-presented source fast path" in {
+      Source
+        .failed[Int](TE("boom"))
+        .recoverWithRetries(1, { case _ => Source.fromIterator(() => 
Iterator(1, 2, 3)) })
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
     "fail stream when iterator throws" in {
       Source
         .fromIterator(() => (1 to 1000).toIterator.map(k => if (k < 10) k else 
throw TE("a")))
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala
index 18ebc52537..1abea0b5ce 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala
@@ -18,6 +18,7 @@ import pekko.actor.ActorRef
 import pekko.annotation.InternalApi
 import pekko.stream._
 import pekko.stream.OverflowStrategies._
+import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.stage._
 import pekko.util.OptionVal
 
@@ -39,6 +40,7 @@ private object ActorRefSource {
   val out: Outlet[T] = Outlet[T]("actorRefSource.out")
 
   override val shape: SourceShape[T] = SourceShape.of(out)
+  override def initialAttributes: Attributes = DefaultAttributes.actorRefSource
 
   def createLogicAndMaterializedValue(inheritedAttributes: Attributes): 
(GraphStageLogic, ActorRef) =
     throw new IllegalStateException("Not supported")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala
index 61e217202a..36d8014ac3 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala
@@ -20,6 +20,7 @@ import pekko.Done
 import pekko.annotation.InternalApi
 import pekko.stream._
 import pekko.stream.OverflowStrategies._
+import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.scaladsl.SourceQueueWithComplete
 import pekko.stream.stage._
 
@@ -49,6 +50,7 @@ import pekko.stream.stage._
 
   val out = Outlet[T]("queueSource.out")
   override val shape: SourceShape[T] = SourceShape.of(out)
+  override def initialAttributes: Attributes = DefaultAttributes.queueSource
 
   @scala.annotation.nowarn("msg=inferred structural type")
   override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes) = {
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
index c6206a106a..9002c81938 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
@@ -22,9 +22,10 @@ import pekko.annotation.{ DoNotInherit, InternalApi }
 import pekko.stream._
 import pekko.stream.impl.StreamLayout.AtomicModule
 import pekko.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
-import pekko.stream.impl.fusing.{ GraphStageModule, IterableSource }
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
+import pekko.stream.impl.fusing.{ GraphStageModule, IterableSource, 
IteratorSource, RangeSource }
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SingleSource }
 import pekko.stream.scaladsl.Keep
+import pekko.stream.stage.GraphStageWithMaterializedValue
 import pekko.util.OptionVal
 
 /**
@@ -354,24 +355,29 @@ import pekko.util.OptionVal
    * performance optimization in FlattenMerge and possibly other places.
    */
   def getSingleSource[A >: Null](graph: Graph[SourceShape[A], _]): 
OptionVal[SingleSource[A]] = {
+    @inline def fromModule(module: AtomicModule[_, _]): 
OptionVal[SingleSource[A]] =
+      module match {
+        case m: GraphStageModule[_, _] =>
+          m.stage match {
+            case single: SingleSource[A] @unchecked => OptionVal.Some(single)
+            case _                                  => OptionVal.None
+          }
+        case _ => OptionVal.None
+      }
+
     graph match {
       case single: SingleSource[A] @unchecked => OptionVal.Some(single)
       case _                                  =>
         graph.traversalBuilder match {
-          case l: LinearTraversalBuilder =>
+          case l: LinearTraversalBuilder if !l.attributes.isAsync =>
             l.pendingBuilder match {
               case OptionVal.Some(a: AtomicTraversalBuilder) =>
-                a.module match {
-                  case m: GraphStageModule[_, _] =>
-                    m.stage match {
-                      case single: SingleSource[A] @unchecked =>
-                        // It would be != EmptyTraversal if 
mapMaterializedValue was used and then we can't optimize.
-                        if ((l.traversalSoFar eq EmptyTraversal) && 
!l.attributes.isAsync)
-                          OptionVal.Some(single)
-                        else OptionVal.None
-                      case _ => OptionVal.None
-                    }
-                  case _ => OptionVal.None
+                // It would be != EmptyTraversal if mapMaterializedValue was 
used and then we can't optimize.
+                if (l.traversalSoFar eq EmptyTraversal) fromModule(a.module) 
else OptionVal.None
+              case OptionVal.None =>
+                l.traversalSoFar match {
+                  case MaterializeAtomic(module, _) => fromModule(module)
+                  case _                            => OptionVal.None
                 }
               case _ => OptionVal.None
             }
@@ -388,30 +394,36 @@ import pekko.util.OptionVal
   @InternalApi def getValuePresentedSource[A >: Null](
       graph: Graph[SourceShape[A], _]): OptionVal[Graph[SourceShape[A], _]] = {
     def isValuePresentedSource(graph: Graph[SourceShape[_ <: A], _]): Boolean 
= graph match {
-      case _: SingleSource[_] | _: FutureSource[_] | _: IterableSource[_] | _: 
JavaStreamSource[_, _] |
-          _: FailedSource[_] =>
+      case _: SingleSource[_] | _: FutureSource[_] | _: IterableSource[_] | _: 
IteratorSource[_] |
+          _: RangeSource[_] | _: RepeatSource[_] | _: JavaStreamSource[_, _] | 
_: FailedSource[_] =>
         true
       case maybeEmpty if isEmptySource(maybeEmpty) => true
       case _                                       => false
     }
+    @inline def fromModule(module: AtomicModule[_, _]): 
OptionVal[Graph[SourceShape[A], _]] =
+      module match {
+        case m: GraphStageModule[_, _] =>
+          m.stage match {
+            case _ if 
isValuePresentedSource(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) =>
+              OptionVal.Some(m.stage.asInstanceOf[Graph[SourceShape[A], _]])
+            case _ => OptionVal.None
+          }
+        case _ => OptionVal.None
+      }
+
     graph match {
       case _ if isValuePresentedSource(graph) => OptionVal.Some(graph)
       case _                                  =>
         graph.traversalBuilder match {
-          case l: LinearTraversalBuilder =>
+          case l: LinearTraversalBuilder if !l.attributes.isAsync =>
             l.pendingBuilder match {
               case OptionVal.Some(a: AtomicTraversalBuilder) =>
-                a.module match {
-                  case m: GraphStageModule[_, _] =>
-                    m.stage match {
-                      case _ if 
isValuePresentedSource(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) =>
-                        // It would be != EmptyTraversal if 
mapMaterializedValue was used and then we can't optimize.
-                        if ((l.traversalSoFar eq EmptyTraversal) && 
!l.attributes.isAsync)
-                          
OptionVal.Some(m.stage.asInstanceOf[Graph[SourceShape[A], _]])
-                        else OptionVal.None
-                      case _ => OptionVal.None
-                    }
-                  case _ => OptionVal.None
+                // It would be != EmptyTraversal if mapMaterializedValue was 
used and then we can't optimize.
+                if (l.traversalSoFar eq EmptyTraversal) fromModule(a.module) 
else OptionVal.None
+              case OptionVal.None =>
+                l.traversalSoFar match {
+                  case MaterializeAtomic(module, _) => fromModule(module)
+                  case _                            => OptionVal.None
                 }
               case _ => OptionVal.None
             }
@@ -779,6 +791,22 @@ import pekko.util.OptionVal
 
     }
   }
+
+  @inline @InternalApi private[pekko] def fromGraphStage(
+      graphStage: GraphStageWithMaterializedValue[_ <: Shape, _]): 
LinearTraversalBuilder = {
+    val builder = graphStage.traversalBuilder
+    val attributes = builder.attributes
+    val linear = builder match {
+      case atomic: AtomicTraversalBuilder =>
+        LinearTraversalBuilder.fromModule(atomic.module, Attributes.none)
+      case _ =>
+        val builderWithoutAttributes =
+          if (attributes eq Attributes.none) builder else 
builder.setAttributes(Attributes.none)
+        fromBuilder(builderWithoutAttributes, graphStage.shape, Keep.right)
+    }
+
+    if (attributes eq Attributes.none) linear else 
linear.setAttributes(attributes)
+  }
 }
 
 /**
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
index 5cf2bdc827..e9d8b1e7a7 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
@@ -17,6 +17,7 @@
 
 package org.apache.pekko.stream.impl.fusing
 
+import scala.collection.immutable
 import scala.concurrent.Future
 import scala.util.{ Failure, Try }
 
@@ -25,7 +26,7 @@ import pekko.annotation.InternalApi
 import pekko.stream.{ Attributes, FlowShape, Graph, Inlet, Outlet, 
SourceShape, SubscriptionWithCancelException }
 import pekko.stream.impl.{ Buffer => BufferImpl, FailedSource, 
JavaStreamSource, TraversalBuilder }
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SingleSource }
 import pekko.stream.scaladsl.Source
 import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
 import pekko.util.OptionVal
@@ -54,6 +55,35 @@ private[pekko] object FlattenConcat {
     override def isClosed: Boolean = !hasNext
   }
 
+  private final class InflightRangeSource[T](range: immutable.Range) extends 
InflightSource[T] {
+    private val isEmptyRange = range.isEmpty
+    private val rangeLast = if (isEmptyRange) 0 else range.last
+    private val rangeStep = range.step
+    private var nextElement = range.start
+    private var closed = isEmptyRange
+
+    override def hasNext: Boolean = !closed
+    override def next(): T =
+      if (closed) throw new NoSuchElementException("next called after 
completion")
+      else {
+        val current = nextElement
+        if (current == rangeLast) closed = true
+        else nextElement = current + rangeStep
+        current.asInstanceOf[T]
+      }
+    override def tryPull(): Unit = ()
+    override def cancel(cause: Throwable): Unit = ()
+    override def isClosed: Boolean = closed
+  }
+
+  private final class InflightRepeatSource[T](elem: T) extends 
InflightSource[T] {
+    override def hasNext: Boolean = true
+    override def next(): T = elem
+    override def tryPull(): Unit = ()
+    override def cancel(cause: Throwable): Unit = ()
+    override def isClosed: Boolean = false
+  }
+
   private final class InflightCompletedFutureSource[T](result: Try[T]) extends 
InflightSource[T] {
     private var _hasNext = result.isSuccess
     override def hasNext: Boolean = _hasNext
@@ -219,6 +249,26 @@ private[pekko] final class FlattenConcat[T, 
M](parallelism: Int)
         }
       }
 
+      private def addRangeSource(range: immutable.Range): Unit = {
+        val inflightSource = new InflightRangeSource[T](range)
+        if (isAvailable(out) && queue.isEmpty) {
+          if (inflightSource.hasNext) {
+            push(out, inflightSource.next())
+            if (inflightSource.hasNext)
+              queue.enqueue(inflightSource)
+          }
+        } else if (inflightSource.hasNext) {
+          queue.enqueue(inflightSource)
+        }
+      }
+
+      private def addRepeatSource(elem: T): Unit = {
+        val inflightSource = new InflightRepeatSource[T](elem)
+        if (isAvailable(out) && queue.isEmpty)
+          push(out, inflightSource.next())
+        queue.enqueue(inflightSource)
+      }
+
       private def addCompletedFutureElem(elem: Try[T]): Unit = {
         if (isAvailable(out) && queue.isEmpty) {
           elem match {
@@ -287,6 +337,9 @@ private[pekko] final class FlattenConcat[T, M](parallelism: 
Int)
                   case None       => addPendingFutureElem(future)
                 }
               case iterable: IterableSource[T] @unchecked        => 
addSourceElements(iterable.elements.iterator)
+              case iterator: IteratorSource[T] @unchecked        => 
addSourceElements(iterator.createIterator())
+              case range: RangeSource[T] @unchecked              => 
addRangeSource(range.range)
+              case repeat: RepeatSource[T] @unchecked            => 
addRepeatSource(repeat.elem)
               case javaStream: JavaStreamSource[T, _] @unchecked =>
                 import scala.jdk.CollectionConverters._
                 addSourceElements(javaStream.open().iterator.asScala)
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
index b95b29f980..2b475d7bb6 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
@@ -294,6 +294,20 @@ import pekko.stream.stage._
     override def toString: String = "SingleSource"
   }
 
+  final class RepeatSource[T](val elem: T) extends GraphStage[SourceShape[T]] {
+    override def initialAttributes: Attributes = DefaultAttributes.repeat
+    val out = Outlet[T]("repeat.out")
+    override val shape = SourceShape(out)
+    override def createLogic(attr: Attributes): GraphStageLogic =
+      new GraphStageLogic(shape) with OutHandler {
+        override def onPull(): Unit = push(out, elem)
+
+        setHandler(out, this)
+      }
+
+    override def toString: String = "RepeatSource"
+  }
+
   final class FutureFlattenSource[T, M](futureSource: 
Future[Graph[SourceShape[T], M]])
       extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] {
     ReactiveStreamsCompliance.requireNonNullElement(futureSource)
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala
new file mode 100644
index 0000000000..dbb35f324c
--- /dev/null
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IteratorSource.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
+
+@InternalApi
+private[pekko] final class IteratorSource[T](
+    val createIterator: () => Iterator[T],
+    defaultAttributes: Attributes)
+    extends GraphStage[SourceShape[T]] {
+
+  override protected def initialAttributes: Attributes = defaultAttributes
+
+  private val out = Outlet[T]("IteratorSource.out")
+  override val shape: SourceShape[T] = SourceShape(out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with OutHandler {
+      private lazy val decider = 
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+      private var currentIterator: Iterator[T] = _
+
+      override def onPull(): Unit =
+        try {
+          if (currentIterator eq null)
+            currentIterator = createIterator()
+          pushNextOrComplete()
+        } catch {
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Stop    => failStage(ex)
+              case Supervision.Resume  => pushNextOrComplete()
+              case Supervision.Restart =>
+                currentIterator = createIterator()
+                pushNextOrComplete()
+            }
+        }
+
+      private def pushNextOrComplete(): Unit =
+        if (currentIterator.hasNext) {
+          if (isAvailable(out)) {
+            push(out, currentIterator.next())
+            if (!currentIterator.hasNext)
+              completeStage()
+          }
+        } else {
+          completeStage()
+        }
+
+      setHandler(out, this)
+    }
+
+  override def toString: String = "IteratorSource"
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 6f7b1db1ae..a0747c2b99 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -45,7 +45,7 @@ import pekko.stream.impl.{
   TraversalBuilder
 }
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, 
SimpleLinearGraphStage, SingleSource }
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SimpleLinearGraphStage, SingleSource }
 import pekko.stream.scaladsl.{
   DelayStrategy,
   Source,
@@ -2195,6 +2195,12 @@ private[pekko] object TakeWithin {
                       }
                     case iterableSource: IterableSource[T @unchecked] =>
                       emitMultiple(out, iterableSource.elements, () => 
completeStage())
+                    case iteratorSource: IteratorSource[T @unchecked] =>
+                      emitMultiple(out, iteratorSource.createIterator(), () => 
completeStage())
+                    case rangeSource: RangeSource[T @unchecked] =>
+                      emitMultiple(out, 
rangeSource.range.iterator.asInstanceOf[Iterator[T]], () => completeStage())
+                    case repeatSource: RepeatSource[T @unchecked] =>
+                      emitMultiple(out, 
Iterator.continually(repeatSource.elem), () => completeStage())
                     case javaStreamSource: JavaStreamSource[T @unchecked, _] =>
                       emitMultiple(out, javaStreamSource.open().spliterator(), 
() => completeStage())
                     case _ =>
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala
new file mode 100644
index 0000000000..7f9131a3cc
--- /dev/null
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/RangeSource.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import scala.collection.immutable
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.stream.impl.ReactiveStreamsCompliance
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
+
+@InternalApi
+private[pekko] final class RangeSource[T](val range: immutable.Range, 
defaultAttributes: Attributes)
+    extends GraphStage[SourceShape[T]] {
+  ReactiveStreamsCompliance.requireNonNullElement(range)
+
+  override protected def initialAttributes: Attributes = defaultAttributes
+
+  private val out = Outlet[T]("RangeSource.out")
+  override val shape: SourceShape[T] = SourceShape(out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with OutHandler {
+      private[this] val isEmptyRange = range.isEmpty
+      private[this] val rangeLast = if (isEmptyRange) 0 else range.last
+      private[this] val rangeStep = range.step
+      private[this] var nextElement = range.start
+
+      override def preStart(): Unit =
+        if (isEmptyRange) completeStage()
+
+      override def onPull(): Unit = {
+        val current = nextElement
+        val isLast = current == rangeLast
+        if (!isLast)
+          nextElement = current + rangeStep
+
+        push(out, current.asInstanceOf[T])
+        if (isLast)
+          completeStage()
+      }
+
+      setHandler(out, this)
+    }
+
+  override def toString: String = "RangeSource"
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 10a634f582..522be44914 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -38,7 +38,7 @@ import pekko.japi.function.Creator
 import pekko.stream._
 import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava 
}
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
+import pekko.stream.impl.fusing.{ RangeSource, StatefulMapConcat, 
ZipWithIndexJava }
 import pekko.util._
 
 import org.jspecify.annotations.Nullable
@@ -234,7 +234,9 @@ object Source {
    * @see [[scala.collection.immutable.Range.inclusive(Int, Int, Int)]]
    */
   def range(start: Int, end: Int, step: Int): javadsl.Source[Integer, NotUsed] 
=
-    new Source(scaladsl.Source(Range.inclusive(start, end, 
step).asInstanceOf[immutable.Iterable[Integer]]))
+    new Source(
+      scaladsl.Source.fromGraph(
+        new RangeSource[Integer](Range.inclusive(start, end, step), 
DefaultAttributes.iterableSource)))
 
   /**
    * Elements are emitted periodically with the specified interval.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 4cb3070b21..1123b52a1b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -505,13 +505,7 @@ object Flow {
       case f: Flow[I, O, M]                                       => f
       case f: javadsl.Flow[I, O, M] @unchecked                    => f.asScala
       case g: GraphStageWithMaterializedValue[FlowShape[I, O], M] =>
-        // move these from the operator itself to make the returned source
-        // behave as it is the operator with regards to attributes
-        val attrs = g.traversalBuilder.attributes
-        val noAttrStage = g.withAttributes(Attributes.none)
-        new Flow(
-          LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, 
noAttrStage.shape, Keep.right),
-          noAttrStage.shape).withAttributes(attrs)
+        new Flow(LinearTraversalBuilder.fromGraphStage(g), g.shape)
 
       case _ => new 
Flow(LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, 
Keep.right), g.shape)
     }
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 1a49530dfe..8407ba5a0e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -150,13 +150,7 @@ object Sink {
       case s: Sink[T, M]                                       => s
       case s: javadsl.Sink[T, M] @unchecked                    => s.asScala
       case g: GraphStageWithMaterializedValue[SinkShape[T], M] =>
-        // move these from the stage itself to make the returned source
-        // behave as it is the stage with regards to attributes
-        val attrs = g.traversalBuilder.attributes
-        val noAttrStage = g.withAttributes(Attributes.none)
-        new Sink(
-          LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, 
noAttrStage.shape, Keep.right),
-          noAttrStage.shape).withAttributes(attrs)
+        new Sink(LinearTraversalBuilder.fromGraphStage(g), g.shape)
 
       case other =>
         new Sink(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, 
other.shape, Keep.right), other.shape)
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 739382bf7e..035c7076e9 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -30,7 +30,15 @@ import pekko.annotation.InternalApi
 import pekko.stream._
 import pekko.stream.impl._
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ ArraySource, GraphStages, IterableSource, 
LazyFutureSource, LazySingleSource }
+import pekko.stream.impl.fusing.{
+  ArraySource,
+  GraphStages,
+  IterableSource,
+  IteratorSource,
+  LazyFutureSource,
+  LazySingleSource,
+  RangeSource
+}
 import pekko.stream.impl.fusing.GraphStages._
 import pekko.stream.stage.GraphStageWithMaterializedValue
 import pekko.util.ConstantFun
@@ -286,11 +294,8 @@ object Source {
    * Elements are pulled out of the iterator in accordance with the demand 
coming
    * from the downstream transformation steps.
    */
-  def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed] =
-    apply(new immutable.Iterable[T] {
-      override def iterator: Iterator[T] = f()
-      override def toString: String = "() => Iterator"
-    })
+  @inline def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed] =
+    fromGraphStage(new IteratorSource[T](f, DefaultAttributes.iterableSource))
 
   /**
    * Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream 
iterator to get all its
@@ -309,11 +314,11 @@ object Source {
    * Starts a new 'cycled' `Source` from the given elements. The producer 
stream of elements
    * will continue infinitely by repeating the sequence of elements provided 
by function parameter.
    */
-  def cycle[T](f: () => Iterator[T]): Source[T, NotUsed] = {
+  @inline def cycle[T](f: () => Iterator[T]): Source[T, NotUsed] = {
     val iterator = Iterator.continually {
       val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty 
iterator") else i
     }.flatten
-    fromIterator(() => iterator).withAttributes(DefaultAttributes.cycledSource)
+    fromGraphStage(new IteratorSource[T](() => iterator, 
DefaultAttributes.cycledSource))
   }
 
   /**
@@ -372,26 +377,25 @@ object Source {
   def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match {
     case s: Source[T, M]                                       => s
     case s: javadsl.Source[T, M] @unchecked                    => s.asScala
-    case g: GraphStageWithMaterializedValue[SourceShape[T], M] =>
-      // move these from the stage itself to make the returned source
-      // behave as it is the stage with regards to attributes
-      val attrs = g.traversalBuilder.attributes
-      val noAttrStage = g.withAttributes(Attributes.none)
-      new Source(
-        LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, 
noAttrStage.shape, Keep.right),
-        noAttrStage.shape).withAttributes(attrs)
-    case other =>
+    case g: GraphStageWithMaterializedValue[SourceShape[T], M] => 
fromGraphStage(g)
+    case other                                                 =>
       // composite source shaped graph
       new Source(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, 
other.shape, Keep.right), other.shape)
   }
 
+  @inline private def fromGraphStage[T, M](g: 
GraphStageWithMaterializedValue[SourceShape[T], M]): Source[T, M] =
+    new Source(LinearTraversalBuilder.fromGraphStage(g), g.shape)
+
+  @inline private def fromRange[T](range: immutable.Range): Source[T, NotUsed] 
=
+    fromGraphStage(new RangeSource[T](range, DefaultAttributes.iterableSource))
+
   /**
    * Defers the creation of a [[Source]] until materialization. The `factory` 
function
    * exposes [[Materializer]] which is going to be used during materialization 
and
    * [[Attributes]] of the [[Source]] returned by this method.
    */
   def fromMaterializer[T, M](factory: (Materializer, Attributes) => Source[T, 
M]): Source[T, Future[M]] =
-    Source.fromGraph(new SetupSourceStage(factory))
+    fromGraphStage(new SetupSourceStage(factory))
 
   /**
    * Helper to create [[Source]] from `Iterable`.
@@ -403,13 +407,16 @@ object Source {
    * beginning) regardless of when they subscribed.
    * @see [[apply(immutable.Seq)]]
    */
-  def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = {
+  @inline def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = {
     // unknown size is -1
     (iterable.knownSize: @switch) match {
       case 0 => empty
       case 1 => single(iterable.head)
       case _ =>
-        fromGraph(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+        iterable match {
+          case range: immutable.Range => fromRange[T](range)
+          case _                      => fromGraphStage(new 
IterableSource[T](iterable))
+        }
     }
   }
 
@@ -424,12 +431,16 @@ object Source {
    * @see [[apply(immutable.Iterable)]]
    * @since 2.0.0
    */
-  def apply[T](seq: immutable.Seq[T]): Source[T, NotUsed] = {
-    seq match {
-      case immutable.Seq()                   => empty[T]
-      case immutable.Seq(elem: T @unchecked) => single(elem)
-      case _                                 =>
-        fromGraph(new 
IterableSource[T](seq)).withAttributes(DefaultAttributes.iterableSource)
+  @inline def apply[T](seq: immutable.Seq[T]): Source[T, NotUsed] = {
+    // unknown size is -1
+    (seq.knownSize: @switch) match {
+      case 0 => empty[T]
+      case 1 => single(seq.head)
+      case _ =>
+        seq match {
+          case range: immutable.Range => fromRange[T](range)
+          case _                      => fromGraphStage(new 
IterableSource[T](seq))
+        }
     }
   }
 
@@ -439,13 +450,13 @@ object Source {
    *
    * @since 1.3.0
    */
-  def apply[T](array: Array[T]): Source[T, NotUsed] = {
+  @inline def apply[T](array: Array[T]): Source[T, NotUsed] = {
     if (array.length == 0)
       empty
     else if (array.length == 1)
       single(array(0))
     else
-      Source.fromGraph(new ArraySource[T](array))
+      fromGraphStage(new ArraySource[T](array))
   }
 
   /**
@@ -456,14 +467,14 @@ object Source {
    * receive new tick elements as soon as it has requested more elements.
    */
   def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: 
T): Source[T, Cancellable] =
-    fromGraph(new TickSource[T](initialDelay, interval, tick))
+    fromGraphStage(new TickSource[T](initialDelay, interval, tick))
 
   /**
    * Create a `Source` with one element.
    * Every connected `Sink` of this stream will see an individual stream 
consisting of one element.
    */
-  def single[T](element: T): Source[T, NotUsed] =
-    fromGraph(new GraphStages.SingleSource(element))
+  @inline def single[T](element: T): Source[T, NotUsed] =
+    fromGraphStage(new GraphStages.SingleSource(element))
 
   /**
    * Create a `Source` from the given elements.
@@ -496,8 +507,8 @@ object Source {
   /**
    * Create a `Source` that will continually emit the given element.
    */
-  def repeat[T](element: T): Source[T, NotUsed] = {
-    fromIterator(() => 
Iterator.continually(element)).withAttributes(DefaultAttributes.repeat)
+  @inline def repeat[T](element: T): Source[T, NotUsed] = {
+    fromGraphStage(new GraphStages.RepeatSource(element))
   }
 
   /**
@@ -514,7 +525,7 @@ object Source {
    * }}}
    */
   def unfold[S, E](s: S)(f: S => Option[(S, E)]): Source[E, NotUsed] =
-    Source.fromGraph(new Unfold(s, f))
+    fromGraphStage(new Unfold(s, f))
 
   /**
    * Same as [[unfold]], but uses an async function to generate the next 
state-element tuple.
@@ -532,7 +543,7 @@ object Source {
    * }}}
    */
   def unfoldAsync[S, E](s: S)(f: S => Future[Option[(S, E)]]): Source[E, 
NotUsed] =
-    Source.fromGraph(new UnfoldAsync(s, f))
+    fromGraphStage(new UnfoldAsync(s, f))
 
   /**
    * Creates a sequential `Source` by iterating with the given predicate and 
function,
@@ -543,27 +554,29 @@ object Source {
    * @since 1.1.0
    */
   def iterate[T](seed: T)(p: T => Boolean, f: T => T): Source[T, NotUsed] =
-    fromIterator(() =>
-      new AbstractIterator[T] {
-        private var first = true
-        private var acc = seed
-        override def hasNext: Boolean = p(acc)
-        override def next(): T = {
-          if (first) {
-            first = false
-          } else {
-            acc = f(acc)
+    fromGraphStage(new IteratorSource[T](
+      () =>
+        new AbstractIterator[T] {
+          private var first = true
+          private var acc = seed
+          override def hasNext: Boolean = p(acc)
+          override def next(): T = {
+            if (first) {
+              first = false
+            } else {
+              acc = f(acc)
+            }
+            acc
           }
-          acc
-        }
-      }).withAttributes(DefaultAttributes.iterateSource)
+        },
+      DefaultAttributes.iterateSource))
 
   /**
    * A `Source` with no elements, i.e. an empty stream that is completed 
immediately for every connected `Sink`.
    */
   def empty[T]: Source[T, NotUsed] = _empty
   private[this] val _empty: Source[Nothing, NotUsed] =
-    Source.fromGraph(EmptySource)
+    fromGraphStage(EmptySource)
 
   /**
    * Create a `Source` which materializes a [[scala.concurrent.Promise]] which 
controls what element
@@ -577,20 +590,21 @@ object Source {
    * with None.
    */
   def maybe[T]: Source[T, Promise[Option[T]]] =
-    Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T], 
Promise[Option[T]]]])
+    fromGraphStage(
+      MaybeSource.asInstanceOf[GraphStageWithMaterializedValue[SourceShape[T], 
Promise[Option[T]]]])
 
   /**
    * Create a `Source` that immediately ends the stream with the `cause` error 
to every connected `Sink`.
    */
   def failed[T](cause: Throwable): Source[T, NotUsed] =
-    Source.fromGraph(new FailedSource[T](cause))
+    fromGraphStage(new FailedSource[T](cause))
 
   /**
    * Emits a single value when the given `Future` is successfully completed 
and then completes the stream.
    * The stream fails if the `Future` is completed with a failure.
    */
-  def future[T](futureElement: Future[T]): Source[T, NotUsed] = 
futureElement.value match {
-    case None                           => fromGraph(new 
FutureSource[T](futureElement))
+  @inline def future[T](futureElement: Future[T]): Source[T, NotUsed] = 
futureElement.value match {
+    case None                           => fromGraphStage(new 
FutureSource[T](futureElement))
     case Some(scala.util.Success(null)) => empty[T]
     case Some(scala.util.Success(elem)) => single(elem)
     case Some(scala.util.Failure(ex))   => failed[T](ex)
@@ -601,7 +615,7 @@ object Source {
    * This stream could be useful in tests.
    */
   def never[T]: Source[T, NotUsed] = _never
-  private[this] val _never: Source[Nothing, NotUsed] = 
fromGraph(GraphStages.NeverSource)
+  private[this] val _never: Source[Nothing, NotUsed] = 
fromGraphStage(GraphStages.NeverSource)
 
   /**
    * Emits a single value when the given `CompletionStage` is successfully 
completed and then completes the stream.
@@ -616,8 +630,8 @@ object Source {
    * Turn a `Future[Source]` into a source that will emit the values of the 
source when the future completes successfully.
    * If the `Future` is completed with a failure the stream is failed.
    */
-  def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, 
Future[M]] = futureSource.value match {
-    case None                           => fromGraph(new 
FutureFlattenSource(futureSource))
+  @inline def futureSource[T, M](futureSource: Future[Source[T, M]]): 
Source[T, Future[M]] = futureSource.value match {
+    case None                           => fromGraphStage(new 
FutureFlattenSource(futureSource))
     case Some(scala.util.Success(null)) =>
       val exception = new NullPointerException("futureSource completed with 
null")
       Source.failed(exception).mapMaterializedValue(_ => 
Future.failed[M](exception))
@@ -634,7 +648,7 @@ object Source {
    * the laziness and will trigger the factory immediately.
    */
   def lazySingle[T](create: () => T): Source[T, NotUsed] =
-    fromGraph(new LazySingleSource(create))
+    fromGraphStage(new LazySingleSource(create))
 
   /**
    * Defers invoking the `create` function to create a future element until 
there is downstream demand.
@@ -646,7 +660,7 @@ object Source {
    * the laziness and will trigger the factory immediately.
    */
   def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed] =
-    fromGraph(new LazyFutureSource(create))
+    fromGraphStage(new LazyFutureSource(create))
 
   /**
    * Defers invoking the `create` function to create a future source until 
there is downstream demand.
@@ -665,7 +679,7 @@ object Source {
    * is failed with a [[pekko.stream.NeverMaterializedException]]
    */
   def lazySource[T, M](create: () => Source[T, M]): Source[T, Future[M]] =
-    fromGraph(new LazySource(create))
+    fromGraphStage(new LazySource(create))
 
   /**
    * Defers invoking the `create` function to create a future source until 
there is downstream demand.
@@ -738,9 +752,7 @@ object Source {
       overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
     require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
     require(!overflowStrategy.isBackpressure, "Backpressure overflowStrategy 
not supported")
-    Source
-      .fromGraph(new ActorRefSource(bufferSize, overflowStrategy, 
completionMatcher, failureMatcher))
-      .withAttributes(DefaultAttributes.actorRefSource)
+    fromGraphStage(new ActorRefSource(bufferSize, overflowStrategy, 
completionMatcher, failureMatcher))
   }
 
   /**
@@ -751,7 +763,7 @@ object Source {
       ackMessage: Any,
       completionMatcher: PartialFunction[Any, CompletionStrategy],
       failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = {
-    Source.fromGraph(new ActorRefBackpressureSource(ackTo, ackMessage, 
completionMatcher, failureMatcher))
+    fromGraphStage(new ActorRefBackpressureSource(ackTo, ackMessage, 
completionMatcher, failureMatcher))
   }
 
   /**
@@ -772,7 +784,7 @@ object Source {
       ackMessage: Any,
       completionMatcher: PartialFunction[Any, CompletionStrategy],
       failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef] = {
-    Source.fromGraph(new ActorRefBackpressureSource(None, ackMessage, 
completionMatcher, failureMatcher))
+    fromGraphStage(new ActorRefBackpressureSource(None, ackMessage, 
completionMatcher, failureMatcher))
   }
 
   /**
@@ -910,7 +922,7 @@ object Source {
    * @param bufferSize size of the buffer in number of elements
    */
   def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]] =
-    Source.fromGraph(new BoundedSourceQueueStage[T](bufferSize))
+    fromGraphStage(new BoundedSourceQueueStage[T](bufferSize))
 
   /**
    * Creates a Source that will immediately execute the provided function 
`producer` with a [[BoundedSourceQueue]] when materialized.
@@ -1029,8 +1041,7 @@ object Source {
       bufferSize: Int,
       overflowStrategy: OverflowStrategy,
       maxConcurrentOffers: Int): Source[T, SourceQueueWithComplete[T]] =
-    Source.fromGraph(
-      new QueueSource(bufferSize, overflowStrategy, 
maxConcurrentOffers).withAttributes(DefaultAttributes.queueSource))
+    fromGraphStage(new QueueSource(bufferSize, overflowStrategy, 
maxConcurrentOffers))
 
   /**
    * Start a new `Source` from some resource which can be opened, read and 
closed.
@@ -1063,7 +1074,7 @@ object Source {
    * @tparam R - the resource type.
    */
   def unfoldResource[T, R](create: () => R, read: (R) => Option[T], close: (R) 
=> Unit): Source[T, NotUsed] =
-    Source.fromGraph(new UnfoldResourceSource(create, read, close))
+    fromGraphStage(new UnfoldResourceSource(create, read, close))
 
   /**
    * Start a new `Source` from some resource which can be opened, read and 
closed.
@@ -1091,7 +1102,7 @@ object Source {
       create: () => Future[R],
       read: (R) => Future[Option[T]],
       close: (R) => Future[Done]): Source[T, NotUsed] =
-    Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close))
+    fromGraphStage(new UnfoldResourceSourceAsync(create, read, close))
 
   /**
    * Merge multiple [[Source]]s. Prefer the sources depending on the 
'priority' parameters.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to