spena commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r658928595
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
@SuppressWarnings("unchecked")
private void emitNonJoinedOuterRecords(final
WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+ if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs -
joinBeforeMs - joinGraceMs) {
Review comment:
Why adding both joinAfterMs and joinBeforeMs? The records expire when
`window().start() + joinAfterMs + joinGraceMs` are lower than maxStreamTime.
For instance, say we have a record in the shared state store with time = 1. Now
a new record arrives with time = 17.
```
inputRecordTime = 17
maxObservedTime = 17
minTime = 1
window = 10 (beforeMs = 10, afterMs = 10)
grace = 5
```
Isn't the record 1 suppose to expire and be emitted because 1 + 10 (afterMs)
+ 5 (grace) = 16? which is lower than maxStreamTime?
With the condition you have, the minTime registered is 1, so `(1 >= 17 - 10
- 10 - 5)` is true, and thus it returns and do not emit the record 1 until
another 10 ms has passed.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
@SuppressWarnings("unchecked")
private void emitNonJoinedOuterRecords(final
WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+ if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs -
joinBeforeMs - joinGraceMs) {
+ return;
+ }
+
try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>,
LeftOrRightValue> it = store.all()) {
while (it.hasNext()) {
final KeyValue<Windowed<KeyAndJoinSide<K>>,
LeftOrRightValue> record = it.next();
final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
final LeftOrRightValue value = record.value;
+ minTime.minTime = windowedKey.window().start();
Review comment:
There's one extra thing to do. We should set `minTime = MAX` before
opening the `store.all()` so it resets the minimum in case there are no records
available in the iterator. This is an example I run:
I have a few records in the shared state store (1,5,7). Then a new record
arrives that expire all the 3 records. Record 50 for instance. For each record,
the minTime will be set to 1, then 5, then 7.
Now for every new record after 50 that is still part of the window, the
condition at the beginning of this method `minTime > maxStreamTime - ...` will
be false, thus opening the iterator again and again.
If we reset the minTime to MAX, then the next time, the iterator will be
opened, but no records will be available, so minTime will stay in MAX. And the
future records that do not expire will not open the iterator because `minTime
(MAX) >= maxObservedTime - ...`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]