> There's a subtle race condition in AggregatePublisher.AggregateSubscription. > The AggregatePublisher will subscribe to downstream publishers in turn, > subscribing to the next publisher when the previous publisher onComplete() > has been called. > > The request() method passed to the upstream subscriber detects that all > subscribers have completed if the current downstream publisher is null and > the queue of publisher yet to subscribe to is empty. > > The event loop is responsible for subscribing to the next publisher, and does > so by polling the queue and assigning the returned value to this.publisher, > and then subscribe to that publisher. However, when subscribing to the last > publisher in the queue, there is a small time window where the queue can be > observed to be empty, before this.publisher is assigned. > > The fix adds synchronization to make sure a consistent state is observed when > it matters. Typically - setting `this.publisher` or taking a snapshot of a > publisher and its subscription, should be done within synchronized.
Daniel Fuchs has updated the pull request incrementally with one additional commit since the last revision: Review feedback ------------- Changes: - all: https://git.openjdk.org/jdk/pull/23204/files - new: https://git.openjdk.org/jdk/pull/23204/files/fb5c155b..ff503a4d Webrevs: - full: https://webrevs.openjdk.org/?repo=jdk&pr=23204&range=02 - incr: https://webrevs.openjdk.org/?repo=jdk&pr=23204&range=01-02 Stats: 1 line in 1 file changed: 0 ins; 0 del; 1 mod Patch: https://git.openjdk.org/jdk/pull/23204.diff Fetch: git fetch https://git.openjdk.org/jdk.git pull/23204/head:pull/23204 PR: https://git.openjdk.org/jdk/pull/23204