Roiocam commented on code in PR #199: URL: https://github.com/apache/pekko-persistence-jdbc/pull/199#discussion_r1618115027
########## core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala: ########## @@ -94,7 +94,7 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E JournalSequenceActor.props(readJournalDao, readJournalConfig.journalSequenceRetrievalConfiguration), s"$configPath.pekko-persistence-jdbc-journal-sequence-actor") private val delaySource = - Source.tick(0.seconds, readJournalConfig.refreshInterval, 0).take(1) + Source.tick(readJournalConfig.refreshInterval, readJournalConfig.refreshInterval, 0).take(1) Review Comment: I did some testing in my local and solved the issue of delaySource not being able to truly limit the flow based on the following replacement. ```diff Source .repeat(0) + .buffer(1, OverflowStrategy.backpressure) + .throttle(1, readJournalConfig.refreshInterval) + .flatMapConcat(_ => currentPersistenceIds()) - .flatMapConcat(_ => delaySource.flatMapConcat(_ => currentPersistenceIds())) ``` However, it may bring another problem, which is that `repeat + buffer + throttle` may cause the stream to never end (I guess it's because there is always a buffer element waiting to be processed). In conclusion, I spent some time investigating and found that persistenceIds() in each execute`flatMapConcat(_ => delaySource.flatMapConcat(_ => currentPersistenceIds()))` is actually a new delaySource due to the presence of `take(1)`. Removing take(1) or changing the`initialDelay` parameter of tick() to refreshInterval can achieve the desired flow control effect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org