Thanks for the details! Seem Guozhang did a PR to improve the behavior: https://github.com/apache/kafka/pull/7573
(I saw that you reviewed the PR already, just following up to close the loop for other and make them aware of the change.) -Matthias On 10/15/19 1:17 PM, Javier Holguera wrote: > Hi Matthias, > > In our case, we are doing a Left-Join between a KStream and a KTable. The > data can arrive with a few milliseconds/seconds apart. If the KTable > arrives first, that's fine. However, if the KStream arrives first, we might > "incorrectly" push downstream a result with null for the KTable only > because the record arrived "late". > > If idleStartTime was reset after the partitions were empty, ` > max.task.idle.ms` would apply "again" and we would wait x milliseconds, > meaning we would be able to use the record from the KTable, assuming its > timestamp was in fact earlier than the one in the KStream. > > However, since as you said once the task goes into "forced processing mode" > the only way to get "out" is to populate both partitions, we might end up > consuming one partition, then the other, without ever getting to the point > where both partitions have records at the same time. > > Maybe we are using the wrong semantics here, but I'm curious why > idleStartTime is not reset when partitions are empty, by design. > > Thanks. > > > On Tue, 15 Oct 2019 at 18:36, Matthias J. Sax <matth...@confluent.io> wrote: > >> Javier, >> >> If a task is put into "forced processing mode" it will stay there until >> all partitions have data at the same time. That is be design. >> >> Why is this behavior problematic for your use case? >> >> >> >> -Matthias >> >> >> On 10/14/19 7:44 AM, Javier Holguera wrote: >>> Hi, >>> >>> We have a KStream and a KTable that we are left-joining. The KTable has a >>> "backlog" of records that we want to consume before any of the entries in >>> the KStream is processed. To guarantee that, we have played with the >>> timestamp extraction, setting the time for those records in the "distant" >>> past to guarantee they will be consumed before any of the records in the >>> KStream are processed. >>> >>> This is working as expected, forcing the KTable to be ingested before the >>> KStream. However, an unexpected side effect that we have noticed is this >>> "delayed" is only applied once when the application starts. Going through >>> the code in StreamTask method ( >>> >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L333-L351 >> ) >>> it seems that the problem is the fact that, once all records in all >>> partitions are consumed, idleStartTime is not set back to UNKNOWN. That >>> means that the first record that arrives through any of the two >> partitions, >>> will be immediately processed. >>> >>> Is this by design? >>> >>> Thanks. >>> >> >> >
signature.asc
Description: OpenPGP digital signature