Just a quick update. Setting max.task.idle.ms to 10000 (10 seconds) had no
effect on this issue.

On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler <chad.preis...@gmail.com>
wrote:

> No unfortunately it is not the case. The table record is written about 20
> seconds before the stream record. I’ll crank up the time tomorrow and see
> what happens.
>
> On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> Hard to tell, but as it seems that you can reproduce the issue, it might
>> be worth a try to increase the idle time further.
>>
>> I guess one corner case for stream-table join that is not resolved yet
>> is when stream and table record have the same timestamp... For this
>> case, the table record might not be processed first.
>>
>> Could you hit this case?
>>
>>
>> -Matthias
>>
>> On 11/2/21 3:13 PM, Chad Preisler wrote:
>> > Thank you for the information. We are using the Kafka 3.0 client
>> library.
>> > We are able to reliably reproduce this issue in our test environment
>> now. I
>> > removed my timestamp extractor, and I set the max.task.idle.ms to
>> 2000. I
>> > also turned on trace logging for package
>> > org.apache.kafka.streams.processor.internals.
>> >
>> > To create the issue we stopped the application and ran enough data to
>> > create a lag of 400 messages. We saw 5 missed joins.
>> >
>> >  From the stream-thread log messages we saw the event message, our
>> stream
>> > missed the join, and then several milliseconds later we saw the
>> > stream-thread print out the status message. The stream-thread printed
>> out
>> > our status message a total of 5 times.
>> >
>> > Given that only a few milliseconds passed between missing the join and
>> the
>> > stream-thread printing the status message, would increasing the
>> > max.task.idle.ms help?
>> >
>> > Thanks,
>> > Chad
>> >
>> > On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> >> Timestamp synchronization is not perfect, and as a matter of fact, we
>> >> fixed a few gaps in 3.0.0 release. We actually hope, that we closed the
>> >> last gaps in 3.0.0... *fingers-crossed* :)
>> >>
>> >>> We are using a timestamp extractor that returns 0.
>> >>
>> >> You can do this, and it effectively "disables" timestamp
>> synchronization
>> >> as records on the KTable side don't have a timeline any longer. As a
>> >> side effect it also allows you to "bootstrap" the table, as records
>> with
>> >> timestamp zero will always be processed first (as they are smaller). Of
>> >> course, you also don't have time synchronization for "future" data and
>> >> your program becomes non-deterministic if you reprocess old data.
>> >>
>> >>> his seemed to be the only
>> >>> way to bootstrap enough records at startup to avoid the missed join.
>> >>
>> >> Using 3.0.0 and enabling timestamp synchronization via
>> >> `max.task.idle.ms` config, should allow you to get the correct
>> behavior
>> >> without the zero-extractor (of course, your KTable data must have
>> >> smaller timestamps that your KStream data).
>> >>
>> >>> If I use "timestamp synchronization" do I have to remove the zero
>> >>> timestamp extractor? If I remove the zero timestamp extractor will
>> >>> timestamp synchronization take care of the missed join issue on
>> startup?
>> >>
>> >> To be more precise: timestamp synchronization is _always_ on. The
>> >> question is just how strict it is applied. By default, we do the
>> weakest
>> >> from which is only best effort.
>> >>
>> >>> I'm guessing the issue here is that occasionally the poll request is
>> not
>> >>> returning the matching record for the KTable side of the join before
>> the
>> >>> task goes off and starts processing records.
>> >>
>> >> Yes, because of default best effort approach. That is why you should
>> >> increase `max.task.idle.ms` to detect this case and "skip" processing
>> >> and let KS do another poll() to get KTable data.
>> >>
>> >> 2.8 and earlier:
>> >>
>> >> max.task.idle.ms=0 -> best effort (no poll() retry)
>> >> max.task.idle.ms>0 -> try to do another poll() until data is there or
>> >> idle time passed
>> >>
>> >> Note: >0 might still "fail" even if there is data, because consumer
>> >> fetch behavior is not predictable.
>> >>
>> >>
>> >> 3.0:
>> >>
>> >> max.task.idle.ms=-1 -> best effort (no poll() retry)
>> >> max.task.idle.ms=0 -> if there is data broker side, repeat to poll()
>> >> until you get the data
>> >> max.task.idle.ms>0 -> even if there is not data broker side, wait
>> until
>> >> data becomes available or the idle time passed
>> >>
>> >>
>> >> Hope this helps.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 11/1/21 4:29 PM, Guozhang Wang wrote:
>> >>> Hello Chad,
>> >>>
>> >>>   From your earlier comment, you mentioned "In my scenario the records
>> >> were
>> >>> written to the KTable topic before the record was written to the
>> KStream
>> >>> topic." So I think Matthias and others have excluded this possibility
>> >> while
>> >>> trying to help investigate.
>> >>>
>> >>> If only the matching records from KStream are returned via a single a
>> >>> consumer poll call but not the other records from KTable, then you
>> would
>> >>> miss this matched join result.
>> >>>
>> >>> Guozhang
>> >>>
>> >>>
>> >>> On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler <
>> chad.preis...@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> Thank you for your response and the links to the presentations.
>> >>>>
>> >>>>
>> >>>> *However, this seems tobe orthogonal to your issue?*
>> >>>>
>> >>>> Yes. From what I see in the code it looks like you have a single
>> >> consumer
>> >>>> subscribed to multiple topics. Please correct me if I'm wrong.
>> >>>>
>> >>>>
>> >>>> *By default, timestamp synchronization is disabled. Maybeenabling it
>> >> would
>> >>>> help?*
>> >>>>
>> >>>> We are using a timestamp extractor that returns 0. We did that
>> because
>> >> we
>> >>>> were almost always missing joins on startup, and this seemed to be
>> the
>> >> only
>> >>>> way to bootstrap enough records at startup to avoid the missed join.
>> We
>> >>>> found a post that said doing that would make the KTable act like the
>> >>>> GlobalKTable at startup. So far this works great, we never miss a
>> join
>> >> on a
>> >>>> startup. If I use "timestamp synchronization" do I have to remove the
>> >> zero
>> >>>> timestamp extractor? If I remove the zero timestamp extractor will
>> >>>> timestamp synchronization take care of the missed join issue on
>> startup?
>> >>>>
>> >>>> I'm guessing the issue here is that occasionally the poll request is
>> not
>> >>>> returning the matching record for the KTable side of the join before
>> the
>> >>>> task goes off and starts processing records. Later when we put the
>> same
>> >>>> record on the topic and the KTable has had a chance to load more
>> records
>> >>>> the join works and everything is good to go. Because of the way our
>> >> system
>> >>>> works no new status records have been written and so the new record
>> >> joins
>> >>>> against the correct status.
>> >>>>
>> >>>> Do you agree that the poll request is returning the KStream record
>> but
>> >> not
>> >>>> returning the KTable record and therefore the join is getting
>> missed? If
>> >>>> you don't agree, what do you think is going on? Is there a way to
>> prove
>> >>>> this out?
>> >>>>
>> >>>> Thanks,
>> >>>> Chad
>> >>>>
>> >>>> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <mj...@apache.org>
>> >> wrote:
>> >>>>
>> >>>>> Yes, a StreamThread has one consumer. The number of StreamThreads
>> per
>> >>>>> instance is configurable via `num.stream.threads`. Partitions are
>> >>>>> assigned to threads similar to consumer is a plain consumer group.
>> >>>>>
>> >>>>> It seems you run with the default of one thread per instance. As you
>> >>>>> spin up 12 instances, it results in 12 threads for the application.
>> As
>> >>>>> you have 12 partitions, using more threads won't be useful as no
>> >>>>> partitions are left for them to process.
>> >>>>>
>> >>>>> For a stream-table joins, there will be one task per "partition
>> pair"
>> >>>>> that computes the join for those partitions. So you get 12 tasks,
>> and
>> >>>>> each thread processes one task in your setup. Ie, a thread consumer
>> is
>> >>>>> reading data for both input topics.
>> >>>>>
>> >>>>> Pausing happens on a per-partition bases: for joins there is two
>> >> buffers
>> >>>>> per task (one for each input topic partition). It's possible that
>> one
>> >>>>> partition is paused while the other is processed. However, this
>> seems
>> >> to
>> >>>>> be orthogonal to your issue?
>> >>>>>
>> >>>>> For a GlobalKTable, you get an additional GlobalThread that only
>> reads
>> >>>>> the data from the "global topic" to update the GlobalKTable.
>> Semantics
>> >>>>> of KStream-KTable and KStream-GlobalKTable joins are different: Cf
>> >>>>>
>> >>>>>
>> >>>>
>> >>
>> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
>> >>>>>
>> >>>>> For the timestamp synchronization, you may checkout `
>> max.task.idle.ms`
>> >>>>> config. By default, timestamp synchronization is disabled. Maybe
>> >>>>> enabling it would help?
>> >>>>>
>> >>>>> You may also check out slides 34-38:
>> >>>>>
>> >>>>>
>> >>>>
>> >>
>> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
>> >>>>>
>> >>>>> There is one corner case: if two records with the same timestamp
>> come
>> >>>>> it, it's not defined which one will be processed first.
>> >>>>>
>> >>>>> Hope this helps.
>> >>>>>
>> >>>>>
>> >>>>> -Matthias
>> >>>>>
>> >>>>>
>> >>>>> On 10/30/21 6:45 AM, Chad Preisler wrote:
>> >>>>>> Yes, this helped. I have some additional questions.
>> >>>>>>
>> >>>>>> Does StreamThread have one consumer? (Looks like it, but just want
>> to
>> >>>>>> confirm)
>> >>>>>> Is there a separate StreamThread for each topic including the
>> KTable?
>> >>>>>> If a KTable is a StreamThread and there is a  StreamTask for that
>> >>>> KTable,
>> >>>>>> could my buffer be getting filled up, and the mainConsumer for the
>> >>>> KTable
>> >>>>>> be getting paused? I see this code in StreamTask#addRecords.
>> >>>>>>
>> >>>>>> // if after adding these records, its partition queue's buffered
>> size
>> >>>> has
>> >>>>>> been
>> >>>>>>            // increased beyond the threshold, we can then pause the
>> >>>>>> consumption for this partition
>> >>>>>>            if (newQueueSize > maxBufferedSize) {
>> >>>>>>                mainConsumer.pause(singleton(partition));
>> >>>>>>            }
>> >>>>>>
>> >>>>>> Is there any specific logging that I can set to debug or trace that
>> >>>> would
>> >>>>>> help me troubleshoot? I'd prefer not to turn debug and/or trace on
>> for
>> >>>>>> every single class.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Chad
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com>
>> wrote:
>> >>>>>>
>> >>>>>>> Hi Chad,
>> >>>>>>>> I'm wondering if someone can point me to the Kafka streams
>> internal
>> >>>>> code
>> >>>>>>> that reads records for the join?
>> >>>>>>> --> You can check StreamThread#pollPhase, where stream thread
>> (main
>> >>>>>>> consumer) periodically poll records. And then, it'll process each
>> >>>>> topology
>> >>>>>>> node with these polled records in stream tasks
>> (StreamTask#process).
>> >>>>>>>
>> >>>>>>> Hope that helps.
>> >>>>>>>
>> >>>>>>> Thanks.
>> >>>>>>> Luke
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart
>> >>>>>>> <gilles.philipp...@fundingcircle.com.invalid> wrote:
>> >>>>>>>
>> >>>>>>>> Hi Chad, this talk around 24:00 clearly explains what you’re
>> seeing
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>
>> >>
>> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
>> >>>>>>>> <
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>
>> >>
>> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Gilles
>> >>>>>>>>
>> >>>>>>>>> On 30 Oct 2021, at 04:02, Chad Preisler <
>> chad.preis...@gmail.com>
>> >>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hello,
>> >>>>>>>>>
>> >>>>>>>>> I have a stream application that does a KStream to KTable left
>> >> join.
>> >>>>> We
>> >>>>>>>>> seem to be occasionally missing joins
>> <https://www.google.com/maps/search/%3E%3E%3E%3E%3E+seem+to+be+occasionally+missing+joins?entry=gmail&source=g>
>> (KTable side is null).
>> >>>>>>>>>
>> >>>>>>>>> I'm wondering if someone can point me to the Kafka streams
>> internal
>> >>>>>>> code
>> >>>>>>>>> that reads records for the join? I've poked around the Kafka
>> code
>> >>>>> base,
>> >>>>>>>> but
>> >>>>>>>>> there is a lot there. I imagine there is some consumer poll for
>> >> each
>> >>>>>>> side
>> >>>>>>>>> of the join, and possibly a background thread for reading the
>> >> KTable
>> >>>>>>>> topic.
>> >>>>>>>>>
>> >>>>>>>>> I figure there are several possible causes of this issue, and
>> since
>> >>>>>>>> nothing
>> >>>>>>>>> is really jumping out in my code, I was going to start looking
>> at
>> >>>> the
>> >>>>>>>> Kafka
>> >>>>>>>>> code to see if there is something I can do to fix this.
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>> Chad
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> --
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Funding Circle Limited is authorised and regulated by the
>> Financial
>> >>>>>>>> Conduct Authority under firm registration number 722513. Funding
>> >>>> Circle
>> >>>>>>> is
>> >>>>>>>> not covered by the Financial Services Compensation Scheme.
>> >> Registered
>> >>>>> in
>> >>>>>>>> England (Co. No. 06968588) with registered office at 71 Queen
>> >>>> Victoria
>> >>>>>>>> Street, London EC4V 4AY.
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >>
>> >
>>
>

Reply via email to