mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529929645
##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
upstreamSubscription.sendNext(6)
substreamPuppet1.expectNext(6)
substreamPuppet1.expectComplete()
+ upstreamSubscription.sendNext(7)
Review Comment:
> **1. `val substream2 = subscriber.expectNext()`, the `subscriber` is not
expecting a `complete`, otherwise, the call `sendNext(7)` will cause issue
because the origin source is already completed.
>
> The problem here is when the `stream2` be generated, I expected the
behavior the same, just after the `6 % 3 == 0`, wdyt @samueleresca @mdedetrich**
I am not saying that this is wrong, just that its a completely separate
issue from what this PR is changing/solving. Or to put it differently, this
behaviour is the same as current Pekko since at this point i.e.
```scala
upstreamSubscription.sendNext(6)
substreamPuppet1.expectNext(6)
substreamPuppet1.expectComplete()
upstreamSubscription.sendNext(7)
val substream2 = subscriber.expectNext()
val substreamPuppet2 =
StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
substreamPuppet2.request(10)
substreamPuppet2.expectNext(7)
upstreamSubscription.sendComplete()
subscriber.expectComplete()
substreamPuppet2.expectComplete()
```
we are past recovering from an exception (furthermore if we are somehow
changing some fundamental behaviour with `SplitAfter` then other tests would
fail, but they are all passing without any changes). The critical part of the
test specifically dealing with recovering from exception and resuming is
```scala
upstreamSubscription.sendNext(3)
upstreamSubscription.sendNext(4)
substreamPuppet1.expectNext(4) // note that 3 was dropped
```
And this part is completely unchanged from how the test was originally
written.
Given that, I think it makes sense to file a separate issue if this
behaviour about completing after a split needs to change and hence to tackle it
separately. The only exception to this that I can think of is that some state
is not being reset correctly
```scala
decider(ex) match {
case Supervision.Resume => pull(in)
case Supervision.Stop => onUpstreamFailure(ex)
case Supervision.Restart => onUpstreamFailure(ex) // TODO implement
restart?
}
```
in the `case Supervision.Resume => pull(in)` block, but if thats the case
it would also error out earlier since that block is only executed when
recovering from exceptions in `onPush` (and again thats only when `elem` is 3,
not 6)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]