> The two streams were read in separately: > instead of together: If you want to join two streams, reading both topic separately sound correct.
> There are twenty partitions per topic. It seems as if it is not reading > equally fast from all topic partitions. This should not affect the correctness of the join, if you grace-period, retention-time is large enough. > If time is tracked on a per-record basis. Do you then take the max timestamp > across all partitions and topics as the current event time? We track the maximum timestamp per task. In you case, you get 20 tasks, each task joining 2 partition of the two input topics. Each tasks, tracks time progress independently of other tasks, and across both partitions. > If so, if you read faster from one partition than from the other when > processing old data, what can you do to make sure this does not get discarded > besides putting a very high grace period? This should not happen (as least since 2.1 release) because records a processed in timestamp order. A task compare the timestamp of the "head" record of both partitions and picks the one with lower timestamp for processing. Prior to 2.1, it was a best effort approach, however, consumer should fetch data "round robin" across all partitions. This is only best effort synchronization though. For this case, you might need to set a higher grace-period/retention-time. More details in the corresponding KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization The config John mentioned, is related, however, it's more relevant for a "live" run, when you process data from the tail of a topic and producers write data "in-bursts". For the reprocessing of existing data as you described it, it should not have a huge impact (maybe on startup while buffers are warmed up). However, even than, the grace-period is the more important configuration for an inner stream-stream join. > My join window is one second, grace period 50ms, retention time is default. Why not setting grace-period to retention-time? There is no drawback in a higher grace-period for stream-stream joins. I think this low grace period is the root cause if missing data. > I am also wondering about what to do with the commit interval? In normal > cases this should be on 1000 ms but in the case of this initial startup burst > it should output faster? Commit interval should not affect the joins result at all. -Matthias On 6/20/19 9:25 AM, John Roesler wrote: > Hi! > > You might also want to set MAX_TASK_IDLE_MS_CONFIG = > "max.task.idle.ms" to a non-zero value. This will instruct Streams to > wait the configured amount of time to buffer incoming events on all > topics before choosing any records to process. In turn, this should > cause records to be processed in roughly timestamp order across all > your topics. Without it, Streams might run ahead on one of the topics > before processing events from the others. > > You _might_ want to set the idle time higher than your > MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms", to be sure that > you actually get a chance to poll for more records before giving up on > the idle. > > For any operator that is time-dependent, the maximum _observed_ > timestamp is considered the current stream time. > > I didn't follow the question about commit interval. It's a fixed > configuration, so you can't make it commit more frequently during the > initial catch-up, but then again, why would you want to? It seems like > you'd want the initial load to go as fast as possible, but committing > more frequently will only slow it down. > > I hope this helps, > -John > > On Thu, Jun 20, 2019 at 9:00 AM giselle.vandon...@klarrio.com > <giselle.vandon...@klarrio.com> wrote: >> >> It seems like there were multiple issues: >> 1. The two streams were read in separately: >> val stream1: KStream[String, String] = builder.stream[String, >> String](Set("topic1")) >> val stream2: KStream[String, String] = builder.stream[String, >> String](Set("topic2")) >> instead of together: >> val rawStreams: KStream[String, String] = builder.stream[String, >> String](Set("topic1", "topic2")) >> This second option got much more output but still not complete. >> 2. There are twenty partitions per topic. It seems as if it is not reading >> equally fast from all topic partitions. When printing the input per thread >> the timestamps do not accumulate nicely across partitions. If time is >> tracked on a per-record basis. Do you then take the max timestamp across all >> partitions and topics as the current event time? If so, if you read faster >> from one partition than from the other when processing old data, what can >> you do to make sure this does not get discarded besides putting a very high >> grace period? >> >> My join window is one second, grace period 50ms, retention time is default. >> I use the timestamp inside the observations. But I have tried with the >> default TimestampExtractor (log append time) as well, which still did not >> give all the wanted output. >> >> I am also wondering about what to do with the commit interval? In normal >> cases this should be on 1000 ms but in the case of this initial startup >> burst it should output faster? >> >> On 2019/06/17 16:47:11, "Matthias J. Sax" <matth...@confluent.io> wrote: >>>> I verified keys and timestamps and they match. >>> >>> Did you verify the timestamps client side, ie, in your Streams application? >>> >>>> When is the watermark for the grace period advanced? >>> >>> There is nothing like a watermark. Time is tracked on a per-record basis. >>> >>>> the event time is the Kafka log append time. >>> >>> If it's log append time, that the broker sets the timestamp. Do you use >>> the embedded record timestamp for the join (default)? Or do you have an >>> embedded timestamps in the value and use an custom `TimestampExtractor`? >>> >>> How large is your join-window, what is your grace period and what's your >>> store retention time? >>> >>> >>> -Matthias >>> >>> >>> On 6/17/19 5:24 AM, giselle.vandon...@klarrio.com wrote: >>>> I verified keys and timestamps and they match. If I start the publisher >>>> and processor at the same time, the join has entirely correct output with >>>> 6000 messages coming in and 3000 coming out. >>>> Putting the grace period to a higher value has no effect. >>>> When is the watermark for the grace period advanced? Per commit interval? >>>> I read from 5 Kafka brokers and the event time is the Kafka log append >>>> time. Could the ordering across brokers have something to do with it? >>>> >>>> On 2019/06/14 18:33:22, "Matthias J. Sax" <matth...@confluent.io> wrote: >>>>> How do you know that the result should be 900,000 messages? Did you >>>>> verify that the keys match and that the timestamps are correct? >>>>> >>>>> Did you try to remove grace-period or set a higher value? Maybe there is >>>>> an issue with ouf-of-order data? >>>>> >>>>> -Matthias >>>>> >>>>> On 6/14/19 5:05 AM, giselle.vandon...@klarrio.com wrote: >>>>>> I have two streams of data flowing into a Kafka cluster. I want to >>>>>> process this data with Kafka streams. The stream producers are started >>>>>> at some time t. >>>>>> >>>>>> I start up the Kafka Streams job 5 minutes later and start reading from >>>>>> earliest from both topics (about 900 000 messages already on each >>>>>> topic). The job parses the data and joins the two streams. On the >>>>>> intermediate topics, I see that all the earlier 2x900000 events are >>>>>> flowing through until the join. However, only 250 000 are outputted from >>>>>> the join, instead of 900 000. >>>>>> >>>>>> After processing the burst, the code works perfect on the new incoming >>>>>> events. >>>>>> >>>>>> Grace ms of the join is put on 50 millis but putting it on 5 minutes or >>>>>> 10 minutes makes no difference. My other settings: >>>>>> >>>>>> auto.offset.reset = earliest >>>>>> commit.interval.ms = 1000 >>>>>> batch.size = 204800 >>>>>> max.poll.records = 500000 >>>>>> >>>>>> The join is on the following window: >>>>>> >>>>>> JoinWindows.of(Duration.ofMillis(1000)) >>>>>> .grace(Duration.ofMillis(50)) >>>>>> >>>>>> As far as I understand these joins should be done on event time. What >>>>>> that can cause these results to be discarded? >>>>>> >>>>> >>>>> >>> >>>
signature.asc
Description: OpenPGP digital signature