raboof commented on code in PR #1622:
URL: https://github.com/apache/pekko/pull/1622#discussion_r1901874553


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala:
##########
@@ -43,8 +43,10 @@ import pekko.util.OptionVal
         .mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy]
         .propagateToNestedMaterialization
     val matPromise = Promise[M]()
-    val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
-      val accumulated = collection.mutable.Buffer.empty[In]
+    object logic extends GraphStageLogic(shape) with InHandler with OutHandler 
{
+      private var left = if (n < 0) 0 else n

Review Comment:
   we already have a `require(n >= 0)` above, I don't think we need to add code 
to deal with the `n < 0` case here.



##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala:
##########
@@ -98,12 +101,10 @@ import pekko.util.OptionVal
             // delegate to subSink
             s.pull()
           case _ =>
-            if (accumulated.size < n) pull(in)
-            else if (accumulated.size == n) {
+            if (left > 0) pull(in)
+            else if (left == 0) {
               // corner case for n = 0, can be handled in FlowOps
               materializeFlow()
-            } else {

Review Comment:
   why not keep this? it should never happen, but that's what the error says



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to