If you want to join two streams, reading both topic separately sound

This should not affect the correctness of the join, if you grace-period,
retention-time is large enough.

We track the maximum timestamp per task. In you case, you get 20 tasks,
each task joining 2 partition of the two input topics. Each tasks,
tracks time progress independently of other tasks, and across both

This should not happen (as least since 2.1 release) because records a
processed in timestamp order. A task compare the timestamp of the "head"
record of both partitions and picks the one with lower timestamp for

Prior to 2.1, it was a best effort approach, however, consumer should
fetch data "round robin" across all partitions. This is only best effort
synchronization though. For this case, you might need to set a higher

More details in the corresponding KIP:

The config John mentioned, is related, however, it's more relevant for a
"live" run, when you process data from the tail of a topic and producers
write data "in-bursts". For the reprocessing of existing data as you
described it, it should not have a huge impact (maybe on startup while
buffers are warmed up). However, even than, the grace-period is the more
important configuration for an inner stream-stream join.

Why not setting grace-period to retention-time? There is no drawback in
a higher grace-period for stream-stream joins. I think this low grace
period is the root cause if missing data.

Commit interval should not affect the joins result at all.


> You might also want to set MAX_TASK_IDLE_MS_CONFIG =
> "max.task.idle.ms" to a non-zero value. This will instruct Streams to
> wait the configured amount of time to buffer incoming events on all
> topics before choosing any records to process. In turn, this should
> cause records to be processed in roughly timestamp order across all
> your topics. Without it, Streams might run ahead on one of the topics
> before processing events from the others.
> You _might_ want to set the idle time higher than your
> MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms", to be sure that
> you actually get a chance to poll for more records before giving up on
> the idle.
> For any operator that is time-dependent, the maximum _observed_
> timestamp is considered the current stream time.
> I didn't follow the question about commit interval. It's a fixed
> configuration, so you can't make it commit more frequently during the
> initial catch-up, but then again, why would you want to? It seems like
> you'd want the initial load to go as fast as possible, but committing
> more frequently will only slow it down.
>>>> I verified keys and timestamps and they match.
>>> Did you verify the timestamps client side, ie, in your Streams application?
>>>> When is the watermark for the grace period advanced?
>>> There is nothing like a watermark. Time is tracked on a per-record basis.
>>>> the event time is the Kafka log append time.
>>> If it's log append time, that the broker sets the timestamp. Do you use
>>> the embedded record timestamp for the join (default)? Or do you have an
>>> embedded timestamps in the value and use an custom `TimestampExtractor`?
>>> How large is your join-window, what is your grace period and what's your
>>> store retention time?
>>>> I verified keys and timestamps and they match.  If I start the publisher 
>>>> and processor at the same time, the join has entirely correct output with 
>>>> 6000 messages coming in and 3000 coming out.
>>>> Putting the grace period to a higher value has no effect.
>>>> When is the watermark for the grace period advanced? Per commit interval?
>>>> I read from 5 Kafka brokers and the event time is the Kafka log append 
>>>> time. Could the ordering across brokers have something to do with it?
>>>>> How do you know that the result should be 900,000 messages? Did you
>>>>> verify that the keys match and that the timestamps are correct?
>>>>> Did you try to remove grace-period or set a higher value? Maybe there is
>>>>> an issue with ouf-of-order data?
>>>>>> I have two streams of data flowing into a Kafka cluster. I want to 
>>>>>> process this data with Kafka streams. The stream producers are started 
>>>>>> at some time t.
>>>>>> I start up the Kafka Streams job 5 minutes later and start reading from 
>>>>>> earliest from both topics (about 900 000 messages already on each 
>>>>>> topic). The job parses the data and joins the two streams. On the 
>>>>>> intermediate topics, I see that all the earlier 2x900000 events are 
>>>>>> flowing through until the join. However, only 250 000 are outputted from 
>>>>>> the join, instead of 900 000.
>>>>>> After processing the burst, the code works perfect on the new incoming 
>>>>>> events.
>>>>>> Grace ms of the join is put on 50 millis but putting it on 5 minutes or 
>>>>>> 10 minutes makes no difference. My other settings:
>>>>>>     auto.offset.reset = earliest
>>>>>>     commit.interval.ms = 1000
>>>>>>     batch.size = 204800
>>>>>>     max.poll.records = 500000
>>>>>> The join is on the following window:
>>>>>> JoinWindows.of(Duration.ofMillis(1000))
>>>>>>       .grace(Duration.ofMillis(50))
>>>>>> As far as I understand these joins should be done on event time. What 
>>>>>> that can cause these results to be discarded?

