mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529929645


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > **1. `val substream2 = subscriber.expectNext()`, the `subscriber` is not 
expecting a `complete`, otherwise, the call `sendNext(7)` will cause issue 
because the origin source is already completed.
   > 
   > The problem here is when the `stream2` be generated, I expected the 
behavior the same, just after the `6 % 3 == 0`, wdyt @samueleresca @mdedetrich**
   
   I am not saying that this is wrong, just that its a completely separate 
issue from what this PR is changing/solving. Or to put it differently, this 
behaviour is the same as current Pekko since at this point i.e.
   
   ```scala
   upstreamSubscription.sendNext(6)
   substreamPuppet1.expectNext(6)
   substreamPuppet1.expectComplete()
   upstreamSubscription.sendNext(7)
   val substream2 = subscriber.expectNext()
   val substreamPuppet2 = 
StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
   substreamPuppet2.request(10)
   substreamPuppet2.expectNext(7)
   
   upstreamSubscription.sendComplete()
   subscriber.expectComplete()
   substreamPuppet2.expectComplete()
   ```
   
   we are past recovering from an exception (furthermore if we are somehow 
changing some fundamental behaviour with `SplitAfter` then other tests would 
fail, but they are all passing without any changes). The critical part of the 
test specifically dealing with recovering from exception and resuming is
   
   ```scala
   upstreamSubscription.sendNext(3)
   upstreamSubscription.sendNext(4)
   substreamPuppet1.expectNext(4) // note that 3 was dropped
   ```
   
   And this part is completely unchanged from how the test was originally 
written.
   
   Given that, I think it makes sense to file a separate issue if this 
behaviour about completing after a split needs to change and hence to tackle it 
separately. The only exception to this that I can think of is that some state 
is not being reset correctly
   
   ```scala
   decider(ex) match {
     case Supervision.Resume  => pull(in)
     case Supervision.Stop    => onUpstreamFailure(ex)
     case Supervision.Restart => onUpstreamFailure(ex) // TODO implement 
restart?
   }
   ``` 
   
   in the `case Supervision.Resume  => pull(in)` block, but if thats the case 
it would also error out earlier since that block is only executed when 
recovering from exceptions in `onPush` (and again thats only when `elem` is 3, 
not 6)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to