spena opened a new pull request #10462: URL: https://github.com/apache/kafka/pull/10462
Fixes the issue with https://issues.apache.org/jira/browse/KAFKA-10847. Note: - DO NOT review commit #1. This commit is being reviewed in https://github.com/apache/kafka/pull/10331 To fix the above problem, the left/outer stream-stream join processor uses a buffer to hold non-joined records for some time until the window closes, so they are not processed if a join is found during the join window time. If the window of a record closes and a join was not found, then this should be emitted and processed by the consequent topology processor. A new time-ordered window store is used to temporary hold records that do not have a join and keep the records keys ordered by time. The `KStreamStreamJoin` has a reference to this new store . For every non-joined record seen, the processor writes it to this new state store without processing it. When a joined record is seen, the processor deletes the joined record from the new state store to prevent further processing. Records that were never joined at the end of the window + grace period are emitted to the next topology processor. I use the stream time to check for the expiry time for determinism results . The `KStreamStreamJoin` checks for expired records and emit them every time a new record is processed in the join processor. The new state store is shared with the left and right join nodes. The new store needs to serialize the record keys using a combined key of `<joinSide-recordKey>`. This key combination helps to delete the records from the other join if a joined record is found. Two new serdes are created for this, `KeyAndJoinSideSerde` which serializes a boolean value that specifies the side where the key is found, and `ValueOrOtherValueSerde` that serializes either V1 or V2 based on where the key was found. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org