Thank you for the information. We are using the Kafka 3.0 client library.
We are able to reliably reproduce this issue in our test environment now. I
removed my timestamp extractor, and I set the max.task.idle.ms to 2000. I
also turned on trace logging for package
org.apache.kafka.streams.processor.internals.

To create the issue we stopped the application and ran enough data to
create a lag of 400 messages. We saw 5 missed joins.

>From the stream-thread log messages we saw the event message, our stream
missed the join, and then several milliseconds later we saw the
stream-thread print out the status message. The stream-thread printed out
our status message a total of 5 times.

Given that only a few milliseconds passed between missing the join and the
stream-thread printing the status message, would increasing the
max.task.idle.ms help?

Thanks,
Chad

On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org> wrote:

> Timestamp synchronization is not perfect, and as a matter of fact, we
> fixed a few gaps in 3.0.0 release. We actually hope, that we closed the
> last gaps in 3.0.0... *fingers-crossed* :)
>
> > We are using a timestamp extractor that returns 0.
>
> You can do this, and it effectively "disables" timestamp synchronization
> as records on the KTable side don't have a timeline any longer. As a
> side effect it also allows you to "bootstrap" the table, as records with
> timestamp zero will always be processed first (as they are smaller). Of
> course, you also don't have time synchronization for "future" data and
> your program becomes non-deterministic if you reprocess old data.
>
> > his seemed to be the only
> > way to bootstrap enough records at startup to avoid the missed join.
>
> Using 3.0.0 and enabling timestamp synchronization via
> `max.task.idle.ms` config, should allow you to get the correct behavior
> without the zero-extractor (of course, your KTable data must have
> smaller timestamps that your KStream data).
>
> > If I use "timestamp synchronization" do I have to remove the zero
> > timestamp extractor? If I remove the zero timestamp extractor will
> > timestamp synchronization take care of the missed join issue on startup?
>
> To be more precise: timestamp synchronization is _always_ on. The
> question is just how strict it is applied. By default, we do the weakest
> from which is only best effort.
>
> > I'm guessing the issue here is that occasionally the poll request is not
> > returning the matching record for the KTable side of the join before the
> > task goes off and starts processing records.
>
> Yes, because of default best effort approach. That is why you should
> increase `max.task.idle.ms` to detect this case and "skip" processing
> and let KS do another poll() to get KTable data.
>
> 2.8 and earlier:
>
> max.task.idle.ms=0 -> best effort (no poll() retry)
> max.task.idle.ms>0 -> try to do another poll() until data is there or
> idle time passed
>
> Note: >0 might still "fail" even if there is data, because consumer
> fetch behavior is not predictable.
>
>
> 3.0:
>
> max.task.idle.ms=-1 -> best effort (no poll() retry)
> max.task.idle.ms=0 -> if there is data broker side, repeat to poll()
> until you get the data
> max.task.idle.ms>0 -> even if there is not data broker side, wait until
> data becomes available or the idle time passed
>
>
> Hope this helps.
>
>
> -Matthias
>
> On 11/1/21 4:29 PM, Guozhang Wang wrote:
> > Hello Chad,
> >
> >  From your earlier comment, you mentioned "In my scenario the records
> were
> > written to the KTable topic before the record was written to the KStream
> > topic." So I think Matthias and others have excluded this possibility
> while
> > trying to help investigate.
> >
> > If only the matching records from KStream are returned via a single a
> > consumer poll call but not the other records from KTable, then you would
> > miss this matched join result.
> >
> > Guozhang
> >
> >
> > On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler <chad.preis...@gmail.com>
> > wrote:
> >
> >> Thank you for your response and the links to the presentations.
> >>
> >>
> >> *However, this seems tobe orthogonal to your issue?*
> >>
> >> Yes. From what I see in the code it looks like you have a single
> consumer
> >> subscribed to multiple topics. Please correct me if I'm wrong.
> >>
> >>
> >> *By default, timestamp synchronization is disabled. Maybeenabling it
> would
> >> help?*
> >>
> >> We are using a timestamp extractor that returns 0. We did that because
> we
> >> were almost always missing joins on startup, and this seemed to be the
> only
> >> way to bootstrap enough records at startup to avoid the missed join. We
> >> found a post that said doing that would make the KTable act like the
> >> GlobalKTable at startup. So far this works great, we never miss a join
> on a
> >> startup. If I use "timestamp synchronization" do I have to remove the
> zero
> >> timestamp extractor? If I remove the zero timestamp extractor will
> >> timestamp synchronization take care of the missed join issue on startup?
> >>
> >> I'm guessing the issue here is that occasionally the poll request is not
> >> returning the matching record for the KTable side of the join before the
> >> task goes off and starts processing records. Later when we put the same
> >> record on the topic and the KTable has had a chance to load more records
> >> the join works and everything is good to go. Because of the way our
> system
> >> works no new status records have been written and so the new record
> joins
> >> against the correct status.
> >>
> >> Do you agree that the poll request is returning the KStream record but
> not
> >> returning the KTable record and therefore the join is getting missed? If
> >> you don't agree, what do you think is going on? Is there a way to prove
> >> this out?
> >>
> >> Thanks,
> >> Chad
> >>
> >> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>
> >>> Yes, a StreamThread has one consumer. The number of StreamThreads per
> >>> instance is configurable via `num.stream.threads`. Partitions are
> >>> assigned to threads similar to consumer is a plain consumer group.
> >>>
> >>> It seems you run with the default of one thread per instance. As you
> >>> spin up 12 instances, it results in 12 threads for the application. As
> >>> you have 12 partitions, using more threads won't be useful as no
> >>> partitions are left for them to process.
> >>>
> >>> For a stream-table joins, there will be one task per "partition pair"
> >>> that computes the join for those partitions. So you get 12 tasks, and
> >>> each thread processes one task in your setup. Ie, a thread consumer is
> >>> reading data for both input topics.
> >>>
> >>> Pausing happens on a per-partition bases: for joins there is two
> buffers
> >>> per task (one for each input topic partition). It's possible that one
> >>> partition is paused while the other is processed. However, this seems
> to
> >>> be orthogonal to your issue?
> >>>
> >>> For a GlobalKTable, you get an additional GlobalThread that only reads
> >>> the data from the "global topic" to update the GlobalKTable. Semantics
> >>> of KStream-KTable and KStream-GlobalKTable joins are different: Cf
> >>>
> >>>
> >>
> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
> >>>
> >>> For the timestamp synchronization, you may checkout `max.task.idle.ms`
> >>> config. By default, timestamp synchronization is disabled. Maybe
> >>> enabling it would help?
> >>>
> >>> You may also check out slides 34-38:
> >>>
> >>>
> >>
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
> >>>
> >>> There is one corner case: if two records with the same timestamp come
> >>> it, it's not defined which one will be processed first.
> >>>
> >>> Hope this helps.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 10/30/21 6:45 AM, Chad Preisler wrote:
> >>>> Yes, this helped. I have some additional questions.
> >>>>
> >>>> Does StreamThread have one consumer? (Looks like it, but just want to
> >>>> confirm)
> >>>> Is there a separate StreamThread for each topic including the KTable?
> >>>> If a KTable is a StreamThread and there is a  StreamTask for that
> >> KTable,
> >>>> could my buffer be getting filled up, and the mainConsumer for the
> >> KTable
> >>>> be getting paused? I see this code in StreamTask#addRecords.
> >>>>
> >>>> // if after adding these records, its partition queue's buffered size
> >> has
> >>>> been
> >>>>           // increased beyond the threshold, we can then pause the
> >>>> consumption for this partition
> >>>>           if (newQueueSize > maxBufferedSize) {
> >>>>               mainConsumer.pause(singleton(partition));
> >>>>           }
> >>>>
> >>>> Is there any specific logging that I can set to debug or trace that
> >> would
> >>>> help me troubleshoot? I'd prefer not to turn debug and/or trace on for
> >>>> every single class.
> >>>>
> >>>> Thanks,
> >>>> Chad
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com> wrote:
> >>>>
> >>>>> Hi Chad,
> >>>>>> I'm wondering if someone can point me to the Kafka streams internal
> >>> code
> >>>>> that reads records for the join?
> >>>>> --> You can check StreamThread#pollPhase, where stream thread (main
> >>>>> consumer) periodically poll records. And then, it'll process each
> >>> topology
> >>>>> node with these polled records in stream tasks (StreamTask#process).
> >>>>>
> >>>>> Hope that helps.
> >>>>>
> >>>>> Thanks.
> >>>>> Luke
> >>>>>
> >>>>>
> >>>>> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart
> >>>>> <gilles.philipp...@fundingcircle.com.invalid> wrote:
> >>>>>
> >>>>>> Hi Chad, this talk around 24:00 clearly explains what you’re seeing
> >>>>>>
> >>>>>
> >>>
> >>
> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
> >>>>>> <
> >>>>>>
> >>>>>
> >>>
> >>
> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
> >>>>>>>
> >>>>>>
> >>>>>> Gilles
> >>>>>>
> >>>>>>> On 30 Oct 2021, at 04:02, Chad Preisler <chad.preis...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>> Hello,
> >>>>>>>
> >>>>>>> I have a stream application that does a KStream to KTable left
> join.
> >>> We
> >>>>>>> seem to be occasionally missing joins (KTable side is null).
> >>>>>>>
> >>>>>>> I'm wondering if someone can point me to the Kafka streams internal
> >>>>> code
> >>>>>>> that reads records for the join? I've poked around the Kafka code
> >>> base,
> >>>>>> but
> >>>>>>> there is a lot there. I imagine there is some consumer poll for
> each
> >>>>> side
> >>>>>>> of the join, and possibly a background thread for reading the
> KTable
> >>>>>> topic.
> >>>>>>>
> >>>>>>> I figure there are several possible causes of this issue, and since
> >>>>>> nothing
> >>>>>>> is really jumping out in my code, I was going to start looking at
> >> the
> >>>>>> Kafka
> >>>>>>> code to see if there is something I can do to fix this.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Chad
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Funding Circle Limited is authorised and regulated by the Financial
> >>>>>> Conduct Authority under firm registration number 722513. Funding
> >> Circle
> >>>>> is
> >>>>>> not covered by the Financial Services Compensation Scheme.
> Registered
> >>> in
> >>>>>> England (Co. No. 06968588) with registered office at 71 Queen
> >> Victoria
> >>>>>> Street, London EC4V 4AY.
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
> >
>

Reply via email to