Hi Matthias, In our case, we are doing a Left-Join between a KStream and a KTable. The data can arrive with a few milliseconds/seconds apart. If the KTable arrives first, that's fine. However, if the KStream arrives first, we might "incorrectly" push downstream a result with null for the KTable only because the record arrived "late".
If idleStartTime was reset after the partitions were empty, ` max.task.idle.ms` would apply "again" and we would wait x milliseconds, meaning we would be able to use the record from the KTable, assuming its timestamp was in fact earlier than the one in the KStream. However, since as you said once the task goes into "forced processing mode" the only way to get "out" is to populate both partitions, we might end up consuming one partition, then the other, without ever getting to the point where both partitions have records at the same time. Maybe we are using the wrong semantics here, but I'm curious why idleStartTime is not reset when partitions are empty, by design. Thanks. On Tue, 15 Oct 2019 at 18:36, Matthias J. Sax <matth...@confluent.io> wrote: > Javier, > > If a task is put into "forced processing mode" it will stay there until > all partitions have data at the same time. That is be design. > > Why is this behavior problematic for your use case? > > > > -Matthias > > > On 10/14/19 7:44 AM, Javier Holguera wrote: > > Hi, > > > > We have a KStream and a KTable that we are left-joining. The KTable has a > > "backlog" of records that we want to consume before any of the entries in > > the KStream is processed. To guarantee that, we have played with the > > timestamp extraction, setting the time for those records in the "distant" > > past to guarantee they will be consumed before any of the records in the > > KStream are processed. > > > > This is working as expected, forcing the KTable to be ingested before the > > KStream. However, an unexpected side effect that we have noticed is this > > "delayed" is only applied once when the application starts. Going through > > the code in StreamTask method ( > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L333-L351 > ) > > it seems that the problem is the fact that, once all records in all > > partitions are consumed, idleStartTime is not set back to UNKNOWN. That > > means that the first record that arrives through any of the two > partitions, > > will be immediately processed. > > > > Is this by design? > > > > Thanks. > > > >