spena opened a new pull request #10462:
URL: https://github.com/apache/kafka/pull/10462


   Fixes the issue with https://issues.apache.org/jira/browse/KAFKA-10847.
   
   Note:
   - DO NOT review commit #1. This commit is being reviewed in 
https://github.com/apache/kafka/pull/10331
   
   To fix the above problem, the left/outer stream-stream join processor uses a 
buffer to hold non-joined records for some time until the window closes, so 
they are not processed if a join is found during the join window time. If the 
window of a record closes and a join was not found, then this should be emitted 
and processed by the consequent topology processor.
   
   A new time-ordered window store is used to temporary hold records that do 
not have a join and keep the records keys ordered by time. The 
`KStreamStreamJoin` has a reference to this new store . For every non-joined 
record seen, the processor writes it to this new state store without processing 
it. When a joined record is seen, the processor deletes the joined record from 
the new state store to prevent further processing. 
   
   Records that were never joined at the end of the window + grace period are 
emitted to the next topology processor. I use the stream time to check for the 
expiry time for determinism results . The `KStreamStreamJoin` checks for 
expired records and emit them every time a new record is processed in the join 
processor.
   
   The new state store is shared with the left and right join nodes. The new 
store needs to serialize the record keys using a combined key of 
`<joinSide-recordKey>`. This key combination helps to delete the records from 
the other join if a joined record is found. Two new serdes are created for 
this, `KeyAndJoinSideSerde` which serializes a boolean value that specifies the 
side where the key is found, and `ValueOrOtherValueSerde` that serializes 
either V1 or V2 based on where the key was found. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to