gmethvin commented on code in PR #2684:
URL: https://github.com/apache/pekko/pull/2684#discussion_r2867145212


##########
stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala:
##########
@@ -661,3 +662,173 @@ import org.reactivestreams.Subscriber
     (stageLogic, promise.future)
   }
 }
+
+/**
+ * INTERNAL API
+ *
+ * Dedicated stage for [[pekko.stream.scaladsl.Sink.eagerFutureSink]] that 
materializes the inner sink
+ * when the future completes rather than waiting for the first element. Unlike 
[[LazySink]], this
+ * correctly handles empty streams by materializing the inner sink and 
completing it normally.
+ */
+@InternalApi final private[stream] class EagerFutureSink[T, M](future: 
Future[Sink[T, M]])
+    extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
+  val in = Inlet[T]("eagerFutureSink.in")
+  override def initialAttributes = DefaultAttributes.eagerFutureSink
+  override val shape: SinkShape[T] = SinkShape.of(in)
+
+  override def toString: String = "EagerFutureSink"
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[M]) = {
+    val promise = Promise[M]()
+    val stageLogic = new GraphStageLogic(shape) with InHandler {
+      private var sinkReady: OptionVal[Sink[T, M]] = OptionVal.none
+      private var bufferedElement: OptionVal[T] = OptionVal.none
+      private var upstreamClosed = false

Review Comment:
   Good catch. Added postStop to fail the promise, consistent with other stages 
in this file.



##########
stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala:
##########
@@ -661,3 +662,173 @@ import org.reactivestreams.Subscriber
     (stageLogic, promise.future)
   }
 }
+
+/**
+ * INTERNAL API
+ *
+ * Dedicated stage for [[pekko.stream.scaladsl.Sink.eagerFutureSink]] that 
materializes the inner sink
+ * when the future completes rather than waiting for the first element. Unlike 
[[LazySink]], this
+ * correctly handles empty streams by materializing the inner sink and 
completing it normally.
+ */
+@InternalApi final private[stream] class EagerFutureSink[T, M](future: 
Future[Sink[T, M]])
+    extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
+  val in = Inlet[T]("eagerFutureSink.in")
+  override def initialAttributes = DefaultAttributes.eagerFutureSink
+  override val shape: SinkShape[T] = SinkShape.of(in)
+
+  override def toString: String = "EagerFutureSink"
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[M]) = {
+    val promise = Promise[M]()
+    val stageLogic = new GraphStageLogic(shape) with InHandler {
+      private var sinkReady: OptionVal[Sink[T, M]] = OptionVal.none
+      private var bufferedElement: OptionVal[T] = OptionVal.none
+      private var upstreamClosed = false
+      private var upstreamFailed: OptionVal[Throwable] = OptionVal.none
+
+      override def preStart(): Unit = {
+        pull(in)
+        val cb: AsyncCallback[Try[Sink[T, M]]] =
+          getAsyncCallback {
+            case Success(sink) => onSinkReady(sink)
+            case Failure(e)    =>
+              promise.tryFailure(e)
+              failStage(e)
+          }
+        try {
+          future.onComplete(cb.invoke)(ExecutionContext.parasitic)
+        } catch {
+          case NonFatal(e) =>
+            promise.tryFailure(e)
+            failStage(e)
+        }
+      }
+
+      override def onPush(): Unit = {
+        sinkReady match {
+          case OptionVal.Some(sink) =>
+            switchTo(sink, OptionVal.Some(grab(in)))
+          case OptionVal.None =>
+            bufferedElement = OptionVal.Some(grab(in))
+        }
+      }
+
+      override def onUpstreamFinish(): Unit = {
+        upstreamClosed = true
+        sinkReady match {
+          case OptionVal.Some(sink) =>
+            switchTo(sink, OptionVal.none)
+          case OptionVal.None =>
+            setKeepGoing(true)
+        }
+      }
+
+      override def onUpstreamFailure(ex: Throwable): Unit = {
+        upstreamFailed = OptionVal.Some(ex)
+        upstreamClosed = true
+        sinkReady match {
+          case OptionVal.Some(_) =>
+            promise.tryFailure(ex)
+            failStage(ex)
+          case OptionVal.None =>
+            setKeepGoing(true)

Review Comment:
   Good catch. `postStop` now fails the promise with 
`AbruptStageTerminationException`, consistent with other promise-materializing 
stages in this file. Also added a test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to