raboof commented on code in PR #1622: URL: https://github.com/apache/pekko/pull/1622#discussion_r1901874553
########## stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala: ########## @@ -43,8 +43,10 @@ import pekko.util.OptionVal .mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy] .propagateToNestedMaterialization val matPromise = Promise[M]() - val logic = new GraphStageLogic(shape) with InHandler with OutHandler { - val accumulated = collection.mutable.Buffer.empty[In] + object logic extends GraphStageLogic(shape) with InHandler with OutHandler { + private var left = if (n < 0) 0 else n Review Comment: we already have a `require(n >= 0)` above, I don't think we need to add code to deal with the `n < 0` case here. ########## stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala: ########## @@ -98,12 +101,10 @@ import pekko.util.OptionVal // delegate to subSink s.pull() case _ => - if (accumulated.size < n) pull(in) - else if (accumulated.size == n) { + if (left > 0) pull(in) + else if (left == 0) { // corner case for n = 0, can be handled in FlowOps materializeFlow() - } else { Review Comment: why not keep this? it should never happen, but that's what the error says -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org