> The two streams were read in separately:
>  instead of together:

If you want to join two streams, reading both topic separately sound
correct.



> There are twenty partitions per topic. It seems as if it is not reading 
> equally fast from all topic partitions.

This should not affect the correctness of the join, if you grace-period,
retention-time is large enough.



> If time is tracked on a per-record basis. Do you then take the max timestamp 
> across all partitions and topics as the current event time?

We track the maximum timestamp per task. In you case, you get 20 tasks,
each task joining 2 partition of the two input topics. Each tasks,
tracks time progress independently of other tasks, and across both
partitions.



> If so, if you read faster from one partition than from the other when 
> processing old data, what can you do to make sure this does not get discarded 
> besides putting a very high grace period?

This should not happen (as least since 2.1 release) because records a
processed in timestamp order. A task compare the timestamp of the "head"
record of both partitions and picks the one with lower timestamp for
processing.

Prior to 2.1, it was a best effort approach, however, consumer should
fetch data "round robin" across all partitions. This is only best effort
synchronization though. For this case, you might need to set a higher
grace-period/retention-time.

More details in the corresponding KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

The config John mentioned, is related, however, it's more relevant for a
"live" run, when you process data from the tail of a topic and producers
write data "in-bursts". For the reprocessing of existing data as you
described it, it should not have a huge impact (maybe on startup while
buffers are warmed up). However, even than, the grace-period is the more
important configuration for an inner stream-stream join.



> My join window is one second, grace period 50ms, retention time is default.

Why not setting grace-period to retention-time? There is no drawback in
a higher grace-period for stream-stream joins. I think this low grace
period is the root cause if missing data.



> I am also wondering about what to do with the commit interval? In normal 
> cases this should be on 1000 ms but in the case of this initial startup burst 
> it should output faster?

Commit interval should not affect the joins result at all.




-Matthias



On 6/20/19 9:25 AM, John Roesler wrote:
> Hi!
> 
> You might also want to set MAX_TASK_IDLE_MS_CONFIG =
> "max.task.idle.ms" to a non-zero value. This will instruct Streams to
> wait the configured amount of time to buffer incoming events on all
> topics before choosing any records to process. In turn, this should
> cause records to be processed in roughly timestamp order across all
> your topics. Without it, Streams might run ahead on one of the topics
> before processing events from the others.
> 
> You _might_ want to set the idle time higher than your
> MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms", to be sure that
> you actually get a chance to poll for more records before giving up on
> the idle.
> 
> For any operator that is time-dependent, the maximum _observed_
> timestamp is considered the current stream time.
> 
> I didn't follow the question about commit interval. It's a fixed
> configuration, so you can't make it commit more frequently during the
> initial catch-up, but then again, why would you want to? It seems like
> you'd want the initial load to go as fast as possible, but committing
> more frequently will only slow it down.
> 
> I hope this helps,
> -John
> 
> On Thu, Jun 20, 2019 at 9:00 AM giselle.vandon...@klarrio.com
> <giselle.vandon...@klarrio.com> wrote:
>>
>> It seems like there were multiple issues:
>> 1. The two streams were read in separately:
>>     val stream1: KStream[String, String] = builder.stream[String, 
>> String](Set("topic1"))
>>     val stream2: KStream[String, String] = builder.stream[String, 
>> String](Set("topic2"))
>>  instead of together:
>>     val rawStreams: KStream[String, String] = builder.stream[String, 
>> String](Set("topic1", "topic2"))
>> This second option got much more output but still not complete.
>> 2.  There are twenty partitions per topic. It seems as if it is not reading 
>> equally fast from all topic partitions. When printing the input per thread 
>> the timestamps do not accumulate nicely across partitions. If time is 
>> tracked on a per-record basis. Do you then take the max timestamp across all 
>> partitions and topics as the current event time? If so, if you read faster 
>> from one partition than from the other when processing old data, what can 
>> you do to make sure this does not get discarded besides putting a very high 
>> grace period?
>>
>> My join window is one second, grace period 50ms, retention time is default.
>> I use the timestamp inside the observations. But I have tried with the 
>> default TimestampExtractor (log append time) as well, which still did not 
>> give all the wanted output.
>>
>> I am also wondering about what to do with the commit interval? In normal 
>> cases this should be on 1000 ms but in the case of this initial startup 
>> burst it should output faster?
>>
>> On 2019/06/17 16:47:11, "Matthias J. Sax" <matth...@confluent.io> wrote:
>>>> I verified keys and timestamps and they match.
>>>
>>> Did you verify the timestamps client side, ie, in your Streams application?
>>>
>>>> When is the watermark for the grace period advanced?
>>>
>>> There is nothing like a watermark. Time is tracked on a per-record basis.
>>>
>>>> the event time is the Kafka log append time.
>>>
>>> If it's log append time, that the broker sets the timestamp. Do you use
>>> the embedded record timestamp for the join (default)? Or do you have an
>>> embedded timestamps in the value and use an custom `TimestampExtractor`?
>>>
>>> How large is your join-window, what is your grace period and what's your
>>> store retention time?
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 6/17/19 5:24 AM, giselle.vandon...@klarrio.com wrote:
>>>> I verified keys and timestamps and they match.  If I start the publisher 
>>>> and processor at the same time, the join has entirely correct output with 
>>>> 6000 messages coming in and 3000 coming out.
>>>> Putting the grace period to a higher value has no effect.
>>>> When is the watermark for the grace period advanced? Per commit interval?
>>>> I read from 5 Kafka brokers and the event time is the Kafka log append 
>>>> time. Could the ordering across brokers have something to do with it?
>>>>
>>>> On 2019/06/14 18:33:22, "Matthias J. Sax" <matth...@confluent.io> wrote:
>>>>> How do you know that the result should be 900,000 messages? Did you
>>>>> verify that the keys match and that the timestamps are correct?
>>>>>
>>>>> Did you try to remove grace-period or set a higher value? Maybe there is
>>>>> an issue with ouf-of-order data?
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 6/14/19 5:05 AM, giselle.vandon...@klarrio.com wrote:
>>>>>> I have two streams of data flowing into a Kafka cluster. I want to 
>>>>>> process this data with Kafka streams. The stream producers are started 
>>>>>> at some time t.
>>>>>>
>>>>>> I start up the Kafka Streams job 5 minutes later and start reading from 
>>>>>> earliest from both topics (about 900 000 messages already on each 
>>>>>> topic). The job parses the data and joins the two streams. On the 
>>>>>> intermediate topics, I see that all the earlier 2x900000 events are 
>>>>>> flowing through until the join. However, only 250 000 are outputted from 
>>>>>> the join, instead of 900 000.
>>>>>>
>>>>>> After processing the burst, the code works perfect on the new incoming 
>>>>>> events.
>>>>>>
>>>>>> Grace ms of the join is put on 50 millis but putting it on 5 minutes or 
>>>>>> 10 minutes makes no difference. My other settings:
>>>>>>
>>>>>>     auto.offset.reset = earliest
>>>>>>     commit.interval.ms = 1000
>>>>>>     batch.size = 204800
>>>>>>     max.poll.records = 500000
>>>>>>
>>>>>> The join is on the following window:
>>>>>>
>>>>>> JoinWindows.of(Duration.ofMillis(1000))
>>>>>>       .grace(Duration.ofMillis(50))
>>>>>>
>>>>>> As far as I understand these joins should be done on event time. What 
>>>>>> that can cause these results to be discarded?
>>>>>>
>>>>>
>>>>>
>>>
>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to