It seems like I have 2 options to work around this issue.
- Keep the KTable and have another process running that puts the missed join message back on the event topic. - Switch to GlobalKTable. Any other solutions/workarounds are welcome. Thanks, Chad On Thu, Nov 4, 2021 at 11:43 AM Chad Preisler <chad.preis...@gmail.com> wrote: > enforced-processing-total is zero for all missed join occurrences. I > logged all the metrics out at the time my stream processed the missed join, > so let me know if there are any other metics that would help. > > On Wed, Nov 3, 2021 at 9:21 PM Chad Preisler <chad.preis...@gmail.com> > wrote: > >> I'm not sure. When I ran with trace logging turned on I saw a bunch of >> messages like the ones below. Do those messages indicate >> "enforced-processing"? It gets logged right after the call >> to enforcedProcessingSensor.record. >> >> Continuing to process although some partitions are empty on the broker. >> There may be out-of-order processing for this task as a result. Partitions >> with local data: [status-5]. Partitions we gave up waiting for, with their >> corresponding deadlines: {event-5=1635881287722}. Configured >> max.task.idle.ms: 2000. Current wall-clock time: 1635881287750. >> >> Continuing to process although some partitions are empty on the broker. >> There may be out-of-order processing for this task as a result. Partitions >> with local data: [event-5]. Partitions we gave up waiting for, with their >> corresponding deadlines: {status-5=1635881272754}. Configured >> max.task.idle.ms: 2000. Current wall-clock time: 1635881277998. >> >> On Wed, Nov 3, 2021 at 6:11 PM Matthias J. Sax <mj...@apache.org> wrote: >> >>> Can you check if the program ever does "enforced processing", ie, >>> `max.task.idle.ms` passed, and we process despite an empty input buffer. >>> >>> Cf https://kafka.apache.org/documentation/#kafka_streams_task_monitoring >>> >>> As long as there is input data, we should never do "enforced processing" >>> and the metric should stay at zero. >>> >>> >>> -Matthias >>> >>> On 11/3/21 2:41 PM, Chad Preisler wrote: >>> > Just a quick update. Setting max.task.idle.ms to 10000 (10 seconds) >>> had no >>> > effect on this issue. >>> > >>> > On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler <chad.preis...@gmail.com> >>> > wrote: >>> > >>> >> No unfortunately it is not the case. The table record is written >>> about 20 >>> >> seconds before the stream record. I’ll crank up the time tomorrow and >>> see >>> >> what happens. >>> >> >>> >> On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax <mj...@apache.org> >>> wrote: >>> >> >>> >>> Hard to tell, but as it seems that you can reproduce the issue, it >>> might >>> >>> be worth a try to increase the idle time further. >>> >>> >>> >>> I guess one corner case for stream-table join that is not resolved >>> yet >>> >>> is when stream and table record have the same timestamp... For this >>> >>> case, the table record might not be processed first. >>> >>> >>> >>> Could you hit this case? >>> >>> >>> >>> >>> >>> -Matthias >>> >>> >>> >>> On 11/2/21 3:13 PM, Chad Preisler wrote: >>> >>>> Thank you for the information. We are using the Kafka 3.0 client >>> >>> library. >>> >>>> We are able to reliably reproduce this issue in our test environment >>> >>> now. I >>> >>>> removed my timestamp extractor, and I set the max.task.idle.ms to >>> >>> 2000. I >>> >>>> also turned on trace logging for package >>> >>>> org.apache.kafka.streams.processor.internals. >>> >>>> >>> >>>> To create the issue we stopped the application and ran enough data >>> to >>> >>>> create a lag of 400 messages. We saw 5 missed joins. >>> >>>> >>> >>>> From the stream-thread log messages we saw the event message, our >>> >>> stream >>> >>>> missed the join, and then several milliseconds later we saw the >>> >>>> stream-thread print out the status message. The stream-thread >>> printed >>> >>> out >>> >>>> our status message a total of 5 times. >>> >>>> >>> >>>> Given that only a few milliseconds passed between missing the join >>> and >>> >>> the >>> >>>> stream-thread printing the status message, would increasing the >>> >>>> max.task.idle.ms help? >>> >>>> >>> >>>> Thanks, >>> >>>> Chad >>> >>>> >>> >>>> On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org> >>> >>> wrote: >>> >>>> >>> >>>>> Timestamp synchronization is not perfect, and as a matter of fact, >>> we >>> >>>>> fixed a few gaps in 3.0.0 release. We actually hope, that we >>> closed the >>> >>>>> last gaps in 3.0.0... *fingers-crossed* :) >>> >>>>> >>> >>>>>> We are using a timestamp extractor that returns 0. >>> >>>>> >>> >>>>> You can do this, and it effectively "disables" timestamp >>> >>> synchronization >>> >>>>> as records on the KTable side don't have a timeline any longer. As >>> a >>> >>>>> side effect it also allows you to "bootstrap" the table, as records >>> >>> with >>> >>>>> timestamp zero will always be processed first (as they are >>> smaller). Of >>> >>>>> course, you also don't have time synchronization for "future" data >>> and >>> >>>>> your program becomes non-deterministic if you reprocess old data. >>> >>>>> >>> >>>>>> his seemed to be the only >>> >>>>>> way to bootstrap enough records at startup to avoid the missed >>> join. >>> >>>>> >>> >>>>> Using 3.0.0 and enabling timestamp synchronization via >>> >>>>> `max.task.idle.ms` config, should allow you to get the correct >>> >>> behavior >>> >>>>> without the zero-extractor (of course, your KTable data must have >>> >>>>> smaller timestamps that your KStream data). >>> >>>>> >>> >>>>>> If I use "timestamp synchronization" do I have to remove the zero >>> >>>>>> timestamp extractor? If I remove the zero timestamp extractor will >>> >>>>>> timestamp synchronization take care of the missed join issue on >>> >>> startup? >>> >>>>> >>> >>>>> To be more precise: timestamp synchronization is _always_ on. The >>> >>>>> question is just how strict it is applied. By default, we do the >>> >>> weakest >>> >>>>> from which is only best effort. >>> >>>>> >>> >>>>>> I'm guessing the issue here is that occasionally the poll request >>> is >>> >>> not >>> >>>>>> returning the matching record for the KTable side of the join >>> before >>> >>> the >>> >>>>>> task goes off and starts processing records. >>> >>>>> >>> >>>>> Yes, because of default best effort approach. That is why you >>> should >>> >>>>> increase `max.task.idle.ms` to detect this case and "skip" >>> processing >>> >>>>> and let KS do another poll() to get KTable data. >>> >>>>> >>> >>>>> 2.8 and earlier: >>> >>>>> >>> >>>>> max.task.idle.ms=0 -> best effort (no poll() retry) >>> >>>>> max.task.idle.ms>0 -> try to do another poll() until data is >>> there or >>> >>>>> idle time passed >>> >>>>> >>> >>>>> Note: >0 might still "fail" even if there is data, because consumer >>> >>>>> fetch behavior is not predictable. >>> >>>>> >>> >>>>> >>> >>>>> 3.0: >>> >>>>> >>> >>>>> max.task.idle.ms=-1 -> best effort (no poll() retry) >>> >>>>> max.task.idle.ms=0 -> if there is data broker side, repeat to >>> poll() >>> >>>>> until you get the data >>> >>>>> max.task.idle.ms>0 -> even if there is not data broker side, wait >>> >>> until >>> >>>>> data becomes available or the idle time passed >>> >>>>> >>> >>>>> >>> >>>>> Hope this helps. >>> >>>>> >>> >>>>> >>> >>>>> -Matthias >>> >>>>> >>> >>>>> On 11/1/21 4:29 PM, Guozhang Wang wrote: >>> >>>>>> Hello Chad, >>> >>>>>> >>> >>>>>> From your earlier comment, you mentioned "In my scenario the >>> records >>> >>>>> were >>> >>>>>> written to the KTable topic before the record was written to the >>> >>> KStream >>> >>>>>> topic." So I think Matthias and others have excluded this >>> possibility >>> >>>>> while >>> >>>>>> trying to help investigate. >>> >>>>>> >>> >>>>>> If only the matching records from KStream are returned via a >>> single a >>> >>>>>> consumer poll call but not the other records from KTable, then you >>> >>> would >>> >>>>>> miss this matched join result. >>> >>>>>> >>> >>>>>> Guozhang >>> >>>>>> >>> >>>>>> >>> >>>>>> On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler < >>> >>> chad.preis...@gmail.com> >>> >>>>>> wrote: >>> >>>>>> >>> >>>>>>> Thank you for your response and the links to the presentations. >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> *However, this seems tobe orthogonal to your issue?* >>> >>>>>>> >>> >>>>>>> Yes. From what I see in the code it looks like you have a single >>> >>>>> consumer >>> >>>>>>> subscribed to multiple topics. Please correct me if I'm wrong. >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> *By default, timestamp synchronization is disabled. >>> Maybeenabling it >>> >>>>> would >>> >>>>>>> help?* >>> >>>>>>> >>> >>>>>>> We are using a timestamp extractor that returns 0. We did that >>> >>> because >>> >>>>> we >>> >>>>>>> were almost always missing joins on startup, and this seemed to >>> be >>> >>> the >>> >>>>> only >>> >>>>>>> way to bootstrap enough records at startup to avoid the missed >>> join. >>> >>> We >>> >>>>>>> found a post that said doing that would make the KTable act like >>> the >>> >>>>>>> GlobalKTable at startup. So far this works great, we never miss a >>> >>> join >>> >>>>> on a >>> >>>>>>> startup. If I use "timestamp synchronization" do I have to >>> remove the >>> >>>>> zero >>> >>>>>>> timestamp extractor? If I remove the zero timestamp extractor >>> will >>> >>>>>>> timestamp synchronization take care of the missed join issue on >>> >>> startup? >>> >>>>>>> >>> >>>>>>> I'm guessing the issue here is that occasionally the poll >>> request is >>> >>> not >>> >>>>>>> returning the matching record for the KTable side of the join >>> before >>> >>> the >>> >>>>>>> task goes off and starts processing records. Later when we put >>> the >>> >>> same >>> >>>>>>> record on the topic and the KTable has had a chance to load more >>> >>> records >>> >>>>>>> the join works and everything is good to go. Because of the way >>> our >>> >>>>> system >>> >>>>>>> works no new status records have been written and so the new >>> record >>> >>>>> joins >>> >>>>>>> against the correct status. >>> >>>>>>> >>> >>>>>>> Do you agree that the poll request is returning the KStream >>> record >>> >>> but >>> >>>>> not >>> >>>>>>> returning the KTable record and therefore the join is getting >>> >>> missed? If >>> >>>>>>> you don't agree, what do you think is going on? Is there a way to >>> >>> prove >>> >>>>>>> this out? >>> >>>>>>> >>> >>>>>>> Thanks, >>> >>>>>>> Chad >>> >>>>>>> >>> >>>>>>> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax < >>> mj...@apache.org> >>> >>>>> wrote: >>> >>>>>>> >>> >>>>>>>> Yes, a StreamThread has one consumer. The number of >>> StreamThreads >>> >>> per >>> >>>>>>>> instance is configurable via `num.stream.threads`. Partitions >>> are >>> >>>>>>>> assigned to threads similar to consumer is a plain consumer >>> group. >>> >>>>>>>> >>> >>>>>>>> It seems you run with the default of one thread per instance. >>> As you >>> >>>>>>>> spin up 12 instances, it results in 12 threads for the >>> application. >>> >>> As >>> >>>>>>>> you have 12 partitions, using more threads won't be useful as no >>> >>>>>>>> partitions are left for them to process. >>> >>>>>>>> >>> >>>>>>>> For a stream-table joins, there will be one task per "partition >>> >>> pair" >>> >>>>>>>> that computes the join for those partitions. So you get 12 >>> tasks, >>> >>> and >>> >>>>>>>> each thread processes one task in your setup. Ie, a thread >>> consumer >>> >>> is >>> >>>>>>>> reading data for both input topics. >>> >>>>>>>> >>> >>>>>>>> Pausing happens on a per-partition bases: for joins there is two >>> >>>>> buffers >>> >>>>>>>> per task (one for each input topic partition). It's possible >>> that >>> >>> one >>> >>>>>>>> partition is paused while the other is processed. However, this >>> >>> seems >>> >>>>> to >>> >>>>>>>> be orthogonal to your issue? >>> >>>>>>>> >>> >>>>>>>> For a GlobalKTable, you get an additional GlobalThread that only >>> >>> reads >>> >>>>>>>> the data from the "global topic" to update the GlobalKTable. >>> >>> Semantics >>> >>>>>>>> of KStream-KTable and KStream-GlobalKTable joins are different: >>> Cf >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>> >>> >>>>> >>> >>> >>> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/ >>> >>>>>>>> >>> >>>>>>>> For the timestamp synchronization, you may checkout ` >>> >>> max.task.idle.ms` >>> >>>>>>>> config. By default, timestamp synchronization is disabled. Maybe >>> >>>>>>>> enabling it would help? >>> >>>>>>>> >>> >>>>>>>> You may also check out slides 34-38: >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>> >>> >>>>> >>> >>> >>> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/ >>> >>>>>>>> >>> >>>>>>>> There is one corner case: if two records with the same timestamp >>> >>> come >>> >>>>>>>> it, it's not defined which one will be processed first. >>> >>>>>>>> >>> >>>>>>>> Hope this helps. >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> -Matthias >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> On 10/30/21 6:45 AM, Chad Preisler wrote: >>> >>>>>>>>> Yes, this helped. I have some additional questions. >>> >>>>>>>>> >>> >>>>>>>>> Does StreamThread have one consumer? (Looks like it, but just >>> want >>> >>> to >>> >>>>>>>>> confirm) >>> >>>>>>>>> Is there a separate StreamThread for each topic including the >>> >>> KTable? >>> >>>>>>>>> If a KTable is a StreamThread and there is a StreamTask for >>> that >>> >>>>>>> KTable, >>> >>>>>>>>> could my buffer be getting filled up, and the mainConsumer for >>> the >>> >>>>>>> KTable >>> >>>>>>>>> be getting paused? I see this code in StreamTask#addRecords. >>> >>>>>>>>> >>> >>>>>>>>> // if after adding these records, its partition queue's >>> buffered >>> >>> size >>> >>>>>>> has >>> >>>>>>>>> been >>> >>>>>>>>> // increased beyond the threshold, we can then >>> pause the >>> >>>>>>>>> consumption for this partition >>> >>>>>>>>> if (newQueueSize > maxBufferedSize) { >>> >>>>>>>>> mainConsumer.pause(singleton(partition)); >>> >>>>>>>>> } >>> >>>>>>>>> >>> >>>>>>>>> Is there any specific logging that I can set to debug or trace >>> that >>> >>>>>>> would >>> >>>>>>>>> help me troubleshoot? I'd prefer not to turn debug and/or >>> trace on >>> >>> for >>> >>>>>>>>> every single class. >>> >>>>>>>>> >>> >>>>>>>>> Thanks, >>> >>>>>>>>> Chad >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com> >>> >>> wrote: >>> >>>>>>>>> >>> >>>>>>>>>> Hi Chad, >>> >>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams >>> >>> internal >>> >>>>>>>> code >>> >>>>>>>>>> that reads records for the join? >>> >>>>>>>>>> --> You can check StreamThread#pollPhase, where stream thread >>> >>> (main >>> >>>>>>>>>> consumer) periodically poll records. And then, it'll process >>> each >>> >>>>>>>> topology >>> >>>>>>>>>> node with these polled records in stream tasks >>> >>> (StreamTask#process). >>> >>>>>>>>>> >>> >>>>>>>>>> Hope that helps. >>> >>>>>>>>>> >>> >>>>>>>>>> Thanks. >>> >>>>>>>>>> Luke >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart >>> >>>>>>>>>> <gilles.philipp...@fundingcircle.com.invalid> wrote: >>> >>>>>>>>>> >>> >>>>>>>>>>> Hi Chad, this talk around 24:00 clearly explains what you’re >>> >>> seeing >>> >>>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>> >>> >>>>>>> >>> >>>>> >>> >>> >>> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ >>> >>>>>>>>>>> < >>> >>>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>> >>> >>>>>>> >>> >>>>> >>> >>> >>> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ >>> >>>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> Gilles >>> >>>>>>>>>>> >>> >>>>>>>>>>>> On 30 Oct 2021, at 04:02, Chad Preisler < >>> >>> chad.preis...@gmail.com> >>> >>>>>>>>>> wrote: >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Hello, >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> I have a stream application that does a KStream to KTable >>> left >>> >>>>> join. >>> >>>>>>>> We >>> >>>>>>>>>>>> seem to be occasionally missing joins >>> >>> < >>> https://www.google.com/maps/search/%3E%3E%3E%3E%3E+seem+to+be+occasionally+missing+joins?entry=gmail&source=g >>> > >>> >>> (KTable side is null). >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams >>> >>> internal >>> >>>>>>>>>> code >>> >>>>>>>>>>>> that reads records for the join? I've poked around the Kafka >>> >>> code >>> >>>>>>>> base, >>> >>>>>>>>>>> but >>> >>>>>>>>>>>> there is a lot there. I imagine there is some consumer poll >>> for >>> >>>>> each >>> >>>>>>>>>> side >>> >>>>>>>>>>>> of the join, and possibly a background thread for reading >>> the >>> >>>>> KTable >>> >>>>>>>>>>> topic. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> I figure there are several possible causes of this issue, >>> and >>> >>> since >>> >>>>>>>>>>> nothing >>> >>>>>>>>>>>> is really jumping out in my code, I was going to start >>> looking >>> >>> at >>> >>>>>>> the >>> >>>>>>>>>>> Kafka >>> >>>>>>>>>>>> code to see if there is something I can do to fix this. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Thanks, >>> >>>>>>>>>>>> Chad >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> -- >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> Funding Circle Limited is authorised and regulated by the >>> >>> Financial >>> >>>>>>>>>>> Conduct Authority under firm registration number 722513. >>> Funding >>> >>>>>>> Circle >>> >>>>>>>>>> is >>> >>>>>>>>>>> not covered by the Financial Services Compensation Scheme. >>> >>>>> Registered >>> >>>>>>>> in >>> >>>>>>>>>>> England (Co. No. 06968588) with registered office at 71 Queen >>> >>>>>>> Victoria >>> >>>>>>>>>>> Street, London EC4V 4AY. >>> >>>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>> >>> >>>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>> >>> >>>> >>> >>> >>> >> >>> > >>> >>