Thank you for the information. We are using the Kafka 3.0 client library. We are able to reliably reproduce this issue in our test environment now. I removed my timestamp extractor, and I set the max.task.idle.ms to 2000. I also turned on trace logging for package org.apache.kafka.streams.processor.internals.
To create the issue we stopped the application and ran enough data to create a lag of 400 messages. We saw 5 missed joins. >From the stream-thread log messages we saw the event message, our stream missed the join, and then several milliseconds later we saw the stream-thread print out the status message. The stream-thread printed out our status message a total of 5 times. Given that only a few milliseconds passed between missing the join and the stream-thread printing the status message, would increasing the max.task.idle.ms help? Thanks, Chad On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org> wrote: > Timestamp synchronization is not perfect, and as a matter of fact, we > fixed a few gaps in 3.0.0 release. We actually hope, that we closed the > last gaps in 3.0.0... *fingers-crossed* :) > > > We are using a timestamp extractor that returns 0. > > You can do this, and it effectively "disables" timestamp synchronization > as records on the KTable side don't have a timeline any longer. As a > side effect it also allows you to "bootstrap" the table, as records with > timestamp zero will always be processed first (as they are smaller). Of > course, you also don't have time synchronization for "future" data and > your program becomes non-deterministic if you reprocess old data. > > > his seemed to be the only > > way to bootstrap enough records at startup to avoid the missed join. > > Using 3.0.0 and enabling timestamp synchronization via > `max.task.idle.ms` config, should allow you to get the correct behavior > without the zero-extractor (of course, your KTable data must have > smaller timestamps that your KStream data). > > > If I use "timestamp synchronization" do I have to remove the zero > > timestamp extractor? If I remove the zero timestamp extractor will > > timestamp synchronization take care of the missed join issue on startup? > > To be more precise: timestamp synchronization is _always_ on. The > question is just how strict it is applied. By default, we do the weakest > from which is only best effort. > > > I'm guessing the issue here is that occasionally the poll request is not > > returning the matching record for the KTable side of the join before the > > task goes off and starts processing records. > > Yes, because of default best effort approach. That is why you should > increase `max.task.idle.ms` to detect this case and "skip" processing > and let KS do another poll() to get KTable data. > > 2.8 and earlier: > > max.task.idle.ms=0 -> best effort (no poll() retry) > max.task.idle.ms>0 -> try to do another poll() until data is there or > idle time passed > > Note: >0 might still "fail" even if there is data, because consumer > fetch behavior is not predictable. > > > 3.0: > > max.task.idle.ms=-1 -> best effort (no poll() retry) > max.task.idle.ms=0 -> if there is data broker side, repeat to poll() > until you get the data > max.task.idle.ms>0 -> even if there is not data broker side, wait until > data becomes available or the idle time passed > > > Hope this helps. > > > -Matthias > > On 11/1/21 4:29 PM, Guozhang Wang wrote: > > Hello Chad, > > > > From your earlier comment, you mentioned "In my scenario the records > were > > written to the KTable topic before the record was written to the KStream > > topic." So I think Matthias and others have excluded this possibility > while > > trying to help investigate. > > > > If only the matching records from KStream are returned via a single a > > consumer poll call but not the other records from KTable, then you would > > miss this matched join result. > > > > Guozhang > > > > > > On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler <chad.preis...@gmail.com> > > wrote: > > > >> Thank you for your response and the links to the presentations. > >> > >> > >> *However, this seems tobe orthogonal to your issue?* > >> > >> Yes. From what I see in the code it looks like you have a single > consumer > >> subscribed to multiple topics. Please correct me if I'm wrong. > >> > >> > >> *By default, timestamp synchronization is disabled. Maybeenabling it > would > >> help?* > >> > >> We are using a timestamp extractor that returns 0. We did that because > we > >> were almost always missing joins on startup, and this seemed to be the > only > >> way to bootstrap enough records at startup to avoid the missed join. We > >> found a post that said doing that would make the KTable act like the > >> GlobalKTable at startup. So far this works great, we never miss a join > on a > >> startup. If I use "timestamp synchronization" do I have to remove the > zero > >> timestamp extractor? If I remove the zero timestamp extractor will > >> timestamp synchronization take care of the missed join issue on startup? > >> > >> I'm guessing the issue here is that occasionally the poll request is not > >> returning the matching record for the KTable side of the join before the > >> task goes off and starts processing records. Later when we put the same > >> record on the topic and the KTable has had a chance to load more records > >> the join works and everything is good to go. Because of the way our > system > >> works no new status records have been written and so the new record > joins > >> against the correct status. > >> > >> Do you agree that the poll request is returning the KStream record but > not > >> returning the KTable record and therefore the join is getting missed? If > >> you don't agree, what do you think is going on? Is there a way to prove > >> this out? > >> > >> Thanks, > >> Chad > >> > >> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <mj...@apache.org> > wrote: > >> > >>> Yes, a StreamThread has one consumer. The number of StreamThreads per > >>> instance is configurable via `num.stream.threads`. Partitions are > >>> assigned to threads similar to consumer is a plain consumer group. > >>> > >>> It seems you run with the default of one thread per instance. As you > >>> spin up 12 instances, it results in 12 threads for the application. As > >>> you have 12 partitions, using more threads won't be useful as no > >>> partitions are left for them to process. > >>> > >>> For a stream-table joins, there will be one task per "partition pair" > >>> that computes the join for those partitions. So you get 12 tasks, and > >>> each thread processes one task in your setup. Ie, a thread consumer is > >>> reading data for both input topics. > >>> > >>> Pausing happens on a per-partition bases: for joins there is two > buffers > >>> per task (one for each input topic partition). It's possible that one > >>> partition is paused while the other is processed. However, this seems > to > >>> be orthogonal to your issue? > >>> > >>> For a GlobalKTable, you get an additional GlobalThread that only reads > >>> the data from the "global topic" to update the GlobalKTable. Semantics > >>> of KStream-KTable and KStream-GlobalKTable joins are different: Cf > >>> > >>> > >> > https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/ > >>> > >>> For the timestamp synchronization, you may checkout `max.task.idle.ms` > >>> config. By default, timestamp synchronization is disabled. Maybe > >>> enabling it would help? > >>> > >>> You may also check out slides 34-38: > >>> > >>> > >> > https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/ > >>> > >>> There is one corner case: if two records with the same timestamp come > >>> it, it's not defined which one will be processed first. > >>> > >>> Hope this helps. > >>> > >>> > >>> -Matthias > >>> > >>> > >>> On 10/30/21 6:45 AM, Chad Preisler wrote: > >>>> Yes, this helped. I have some additional questions. > >>>> > >>>> Does StreamThread have one consumer? (Looks like it, but just want to > >>>> confirm) > >>>> Is there a separate StreamThread for each topic including the KTable? > >>>> If a KTable is a StreamThread and there is a StreamTask for that > >> KTable, > >>>> could my buffer be getting filled up, and the mainConsumer for the > >> KTable > >>>> be getting paused? I see this code in StreamTask#addRecords. > >>>> > >>>> // if after adding these records, its partition queue's buffered size > >> has > >>>> been > >>>> // increased beyond the threshold, we can then pause the > >>>> consumption for this partition > >>>> if (newQueueSize > maxBufferedSize) { > >>>> mainConsumer.pause(singleton(partition)); > >>>> } > >>>> > >>>> Is there any specific logging that I can set to debug or trace that > >> would > >>>> help me troubleshoot? I'd prefer not to turn debug and/or trace on for > >>>> every single class. > >>>> > >>>> Thanks, > >>>> Chad > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com> wrote: > >>>> > >>>>> Hi Chad, > >>>>>> I'm wondering if someone can point me to the Kafka streams internal > >>> code > >>>>> that reads records for the join? > >>>>> --> You can check StreamThread#pollPhase, where stream thread (main > >>>>> consumer) periodically poll records. And then, it'll process each > >>> topology > >>>>> node with these polled records in stream tasks (StreamTask#process). > >>>>> > >>>>> Hope that helps. > >>>>> > >>>>> Thanks. > >>>>> Luke > >>>>> > >>>>> > >>>>> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart > >>>>> <gilles.philipp...@fundingcircle.com.invalid> wrote: > >>>>> > >>>>>> Hi Chad, this talk around 24:00 clearly explains what you’re seeing > >>>>>> > >>>>> > >>> > >> > https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ > >>>>>> < > >>>>>> > >>>>> > >>> > >> > https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ > >>>>>>> > >>>>>> > >>>>>> Gilles > >>>>>> > >>>>>>> On 30 Oct 2021, at 04:02, Chad Preisler <chad.preis...@gmail.com> > >>>>> wrote: > >>>>>>> > >>>>>>> Hello, > >>>>>>> > >>>>>>> I have a stream application that does a KStream to KTable left > join. > >>> We > >>>>>>> seem to be occasionally missing joins (KTable side is null). > >>>>>>> > >>>>>>> I'm wondering if someone can point me to the Kafka streams internal > >>>>> code > >>>>>>> that reads records for the join? I've poked around the Kafka code > >>> base, > >>>>>> but > >>>>>>> there is a lot there. I imagine there is some consumer poll for > each > >>>>> side > >>>>>>> of the join, and possibly a background thread for reading the > KTable > >>>>>> topic. > >>>>>>> > >>>>>>> I figure there are several possible causes of this issue, and since > >>>>>> nothing > >>>>>>> is really jumping out in my code, I was going to start looking at > >> the > >>>>>> Kafka > >>>>>>> code to see if there is something I can do to fix this. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Chad > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> Funding Circle Limited is authorised and regulated by the Financial > >>>>>> Conduct Authority under firm registration number 722513. Funding > >> Circle > >>>>> is > >>>>>> not covered by the Financial Services Compensation Scheme. > Registered > >>> in > >>>>>> England (Co. No. 06968588) with registered office at 71 Queen > >> Victoria > >>>>>> Street, London EC4V 4AY. > >>>>>> > >>>>> > >>>> > >>> > >> > > > > >