Roiocam commented on code in PR #199:
URL: 
https://github.com/apache/pekko-persistence-jdbc/pull/199#discussion_r1618115027


##########
core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala:
##########
@@ -94,7 +94,7 @@ class JdbcReadJournal(config: Config, configPath: 
String)(implicit val system: E
     JournalSequenceActor.props(readJournalDao, 
readJournalConfig.journalSequenceRetrievalConfiguration),
     s"$configPath.pekko-persistence-jdbc-journal-sequence-actor")
   private val delaySource =
-    Source.tick(0.seconds, readJournalConfig.refreshInterval, 0).take(1)
+    Source.tick(readJournalConfig.refreshInterval, 
readJournalConfig.refreshInterval, 0).take(1)

Review Comment:
   I did some testing in my local and solved the issue of delaySource not being 
able to truly limit the flow based on the following replacement. 
   
   ```diff
       Source
         .repeat(0)
   +      .buffer(1, OverflowStrategy.backpressure)
   +      .throttle(1, readJournalConfig.refreshInterval)
   +      .flatMapConcat(_ => currentPersistenceIds())
   -     .flatMapConcat(_ => delaySource.flatMapConcat(_ => 
currentPersistenceIds()))
   
   ```
   
   However, it may bring another problem, which is that `repeat + buffer + 
throttle` may cause the stream to never end (I guess it's because there is 
always a buffer element waiting to be processed).
   
   
   In conclusion, I spent some time investigating and found that 
persistenceIds() in each execute`flatMapConcat(_ => delaySource.flatMapConcat(_ 
=> currentPersistenceIds()))` is actually a new delaySource due to the presence 
of `take(1)`. 
   
   Removing take(1) or changing the`initialDelay` parameter of tick() to 
refreshInterval can achieve the desired flow control effect.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to