Just a quick update. Setting max.task.idle.ms to 10000 (10 seconds) had no effect on this issue.
On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler <chad.preis...@gmail.com> wrote: > No unfortunately it is not the case. The table record is written about 20 > seconds before the stream record. I’ll crank up the time tomorrow and see > what happens. > > On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax <mj...@apache.org> wrote: > >> Hard to tell, but as it seems that you can reproduce the issue, it might >> be worth a try to increase the idle time further. >> >> I guess one corner case for stream-table join that is not resolved yet >> is when stream and table record have the same timestamp... For this >> case, the table record might not be processed first. >> >> Could you hit this case? >> >> >> -Matthias >> >> On 11/2/21 3:13 PM, Chad Preisler wrote: >> > Thank you for the information. We are using the Kafka 3.0 client >> library. >> > We are able to reliably reproduce this issue in our test environment >> now. I >> > removed my timestamp extractor, and I set the max.task.idle.ms to >> 2000. I >> > also turned on trace logging for package >> > org.apache.kafka.streams.processor.internals. >> > >> > To create the issue we stopped the application and ran enough data to >> > create a lag of 400 messages. We saw 5 missed joins. >> > >> > From the stream-thread log messages we saw the event message, our >> stream >> > missed the join, and then several milliseconds later we saw the >> > stream-thread print out the status message. The stream-thread printed >> out >> > our status message a total of 5 times. >> > >> > Given that only a few milliseconds passed between missing the join and >> the >> > stream-thread printing the status message, would increasing the >> > max.task.idle.ms help? >> > >> > Thanks, >> > Chad >> > >> > On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org> >> wrote: >> > >> >> Timestamp synchronization is not perfect, and as a matter of fact, we >> >> fixed a few gaps in 3.0.0 release. We actually hope, that we closed the >> >> last gaps in 3.0.0... *fingers-crossed* :) >> >> >> >>> We are using a timestamp extractor that returns 0. >> >> >> >> You can do this, and it effectively "disables" timestamp >> synchronization >> >> as records on the KTable side don't have a timeline any longer. As a >> >> side effect it also allows you to "bootstrap" the table, as records >> with >> >> timestamp zero will always be processed first (as they are smaller). Of >> >> course, you also don't have time synchronization for "future" data and >> >> your program becomes non-deterministic if you reprocess old data. >> >> >> >>> his seemed to be the only >> >>> way to bootstrap enough records at startup to avoid the missed join. >> >> >> >> Using 3.0.0 and enabling timestamp synchronization via >> >> `max.task.idle.ms` config, should allow you to get the correct >> behavior >> >> without the zero-extractor (of course, your KTable data must have >> >> smaller timestamps that your KStream data). >> >> >> >>> If I use "timestamp synchronization" do I have to remove the zero >> >>> timestamp extractor? If I remove the zero timestamp extractor will >> >>> timestamp synchronization take care of the missed join issue on >> startup? >> >> >> >> To be more precise: timestamp synchronization is _always_ on. The >> >> question is just how strict it is applied. By default, we do the >> weakest >> >> from which is only best effort. >> >> >> >>> I'm guessing the issue here is that occasionally the poll request is >> not >> >>> returning the matching record for the KTable side of the join before >> the >> >>> task goes off and starts processing records. >> >> >> >> Yes, because of default best effort approach. That is why you should >> >> increase `max.task.idle.ms` to detect this case and "skip" processing >> >> and let KS do another poll() to get KTable data. >> >> >> >> 2.8 and earlier: >> >> >> >> max.task.idle.ms=0 -> best effort (no poll() retry) >> >> max.task.idle.ms>0 -> try to do another poll() until data is there or >> >> idle time passed >> >> >> >> Note: >0 might still "fail" even if there is data, because consumer >> >> fetch behavior is not predictable. >> >> >> >> >> >> 3.0: >> >> >> >> max.task.idle.ms=-1 -> best effort (no poll() retry) >> >> max.task.idle.ms=0 -> if there is data broker side, repeat to poll() >> >> until you get the data >> >> max.task.idle.ms>0 -> even if there is not data broker side, wait >> until >> >> data becomes available or the idle time passed >> >> >> >> >> >> Hope this helps. >> >> >> >> >> >> -Matthias >> >> >> >> On 11/1/21 4:29 PM, Guozhang Wang wrote: >> >>> Hello Chad, >> >>> >> >>> From your earlier comment, you mentioned "In my scenario the records >> >> were >> >>> written to the KTable topic before the record was written to the >> KStream >> >>> topic." So I think Matthias and others have excluded this possibility >> >> while >> >>> trying to help investigate. >> >>> >> >>> If only the matching records from KStream are returned via a single a >> >>> consumer poll call but not the other records from KTable, then you >> would >> >>> miss this matched join result. >> >>> >> >>> Guozhang >> >>> >> >>> >> >>> On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler < >> chad.preis...@gmail.com> >> >>> wrote: >> >>> >> >>>> Thank you for your response and the links to the presentations. >> >>>> >> >>>> >> >>>> *However, this seems tobe orthogonal to your issue?* >> >>>> >> >>>> Yes. From what I see in the code it looks like you have a single >> >> consumer >> >>>> subscribed to multiple topics. Please correct me if I'm wrong. >> >>>> >> >>>> >> >>>> *By default, timestamp synchronization is disabled. Maybeenabling it >> >> would >> >>>> help?* >> >>>> >> >>>> We are using a timestamp extractor that returns 0. We did that >> because >> >> we >> >>>> were almost always missing joins on startup, and this seemed to be >> the >> >> only >> >>>> way to bootstrap enough records at startup to avoid the missed join. >> We >> >>>> found a post that said doing that would make the KTable act like the >> >>>> GlobalKTable at startup. So far this works great, we never miss a >> join >> >> on a >> >>>> startup. If I use "timestamp synchronization" do I have to remove the >> >> zero >> >>>> timestamp extractor? If I remove the zero timestamp extractor will >> >>>> timestamp synchronization take care of the missed join issue on >> startup? >> >>>> >> >>>> I'm guessing the issue here is that occasionally the poll request is >> not >> >>>> returning the matching record for the KTable side of the join before >> the >> >>>> task goes off and starts processing records. Later when we put the >> same >> >>>> record on the topic and the KTable has had a chance to load more >> records >> >>>> the join works and everything is good to go. Because of the way our >> >> system >> >>>> works no new status records have been written and so the new record >> >> joins >> >>>> against the correct status. >> >>>> >> >>>> Do you agree that the poll request is returning the KStream record >> but >> >> not >> >>>> returning the KTable record and therefore the join is getting >> missed? If >> >>>> you don't agree, what do you think is going on? Is there a way to >> prove >> >>>> this out? >> >>>> >> >>>> Thanks, >> >>>> Chad >> >>>> >> >>>> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <mj...@apache.org> >> >> wrote: >> >>>> >> >>>>> Yes, a StreamThread has one consumer. The number of StreamThreads >> per >> >>>>> instance is configurable via `num.stream.threads`. Partitions are >> >>>>> assigned to threads similar to consumer is a plain consumer group. >> >>>>> >> >>>>> It seems you run with the default of one thread per instance. As you >> >>>>> spin up 12 instances, it results in 12 threads for the application. >> As >> >>>>> you have 12 partitions, using more threads won't be useful as no >> >>>>> partitions are left for them to process. >> >>>>> >> >>>>> For a stream-table joins, there will be one task per "partition >> pair" >> >>>>> that computes the join for those partitions. So you get 12 tasks, >> and >> >>>>> each thread processes one task in your setup. Ie, a thread consumer >> is >> >>>>> reading data for both input topics. >> >>>>> >> >>>>> Pausing happens on a per-partition bases: for joins there is two >> >> buffers >> >>>>> per task (one for each input topic partition). It's possible that >> one >> >>>>> partition is paused while the other is processed. However, this >> seems >> >> to >> >>>>> be orthogonal to your issue? >> >>>>> >> >>>>> For a GlobalKTable, you get an additional GlobalThread that only >> reads >> >>>>> the data from the "global topic" to update the GlobalKTable. >> Semantics >> >>>>> of KStream-KTable and KStream-GlobalKTable joins are different: Cf >> >>>>> >> >>>>> >> >>>> >> >> >> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/ >> >>>>> >> >>>>> For the timestamp synchronization, you may checkout ` >> max.task.idle.ms` >> >>>>> config. By default, timestamp synchronization is disabled. Maybe >> >>>>> enabling it would help? >> >>>>> >> >>>>> You may also check out slides 34-38: >> >>>>> >> >>>>> >> >>>> >> >> >> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/ >> >>>>> >> >>>>> There is one corner case: if two records with the same timestamp >> come >> >>>>> it, it's not defined which one will be processed first. >> >>>>> >> >>>>> Hope this helps. >> >>>>> >> >>>>> >> >>>>> -Matthias >> >>>>> >> >>>>> >> >>>>> On 10/30/21 6:45 AM, Chad Preisler wrote: >> >>>>>> Yes, this helped. I have some additional questions. >> >>>>>> >> >>>>>> Does StreamThread have one consumer? (Looks like it, but just want >> to >> >>>>>> confirm) >> >>>>>> Is there a separate StreamThread for each topic including the >> KTable? >> >>>>>> If a KTable is a StreamThread and there is a StreamTask for that >> >>>> KTable, >> >>>>>> could my buffer be getting filled up, and the mainConsumer for the >> >>>> KTable >> >>>>>> be getting paused? I see this code in StreamTask#addRecords. >> >>>>>> >> >>>>>> // if after adding these records, its partition queue's buffered >> size >> >>>> has >> >>>>>> been >> >>>>>> // increased beyond the threshold, we can then pause the >> >>>>>> consumption for this partition >> >>>>>> if (newQueueSize > maxBufferedSize) { >> >>>>>> mainConsumer.pause(singleton(partition)); >> >>>>>> } >> >>>>>> >> >>>>>> Is there any specific logging that I can set to debug or trace that >> >>>> would >> >>>>>> help me troubleshoot? I'd prefer not to turn debug and/or trace on >> for >> >>>>>> every single class. >> >>>>>> >> >>>>>> Thanks, >> >>>>>> Chad >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com> >> wrote: >> >>>>>> >> >>>>>>> Hi Chad, >> >>>>>>>> I'm wondering if someone can point me to the Kafka streams >> internal >> >>>>> code >> >>>>>>> that reads records for the join? >> >>>>>>> --> You can check StreamThread#pollPhase, where stream thread >> (main >> >>>>>>> consumer) periodically poll records. And then, it'll process each >> >>>>> topology >> >>>>>>> node with these polled records in stream tasks >> (StreamTask#process). >> >>>>>>> >> >>>>>>> Hope that helps. >> >>>>>>> >> >>>>>>> Thanks. >> >>>>>>> Luke >> >>>>>>> >> >>>>>>> >> >>>>>>> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart >> >>>>>>> <gilles.philipp...@fundingcircle.com.invalid> wrote: >> >>>>>>> >> >>>>>>>> Hi Chad, this talk around 24:00 clearly explains what you’re >> seeing >> >>>>>>>> >> >>>>>>> >> >>>>> >> >>>> >> >> >> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ >> >>>>>>>> < >> >>>>>>>> >> >>>>>>> >> >>>>> >> >>>> >> >> >> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ >> >>>>>>>>> >> >>>>>>>> >> >>>>>>>> Gilles >> >>>>>>>> >> >>>>>>>>> On 30 Oct 2021, at 04:02, Chad Preisler < >> chad.preis...@gmail.com> >> >>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>> Hello, >> >>>>>>>>> >> >>>>>>>>> I have a stream application that does a KStream to KTable left >> >> join. >> >>>>> We >> >>>>>>>>> seem to be occasionally missing joins >> <https://www.google.com/maps/search/%3E%3E%3E%3E%3E+seem+to+be+occasionally+missing+joins?entry=gmail&source=g> >> (KTable side is null). >> >>>>>>>>> >> >>>>>>>>> I'm wondering if someone can point me to the Kafka streams >> internal >> >>>>>>> code >> >>>>>>>>> that reads records for the join? I've poked around the Kafka >> code >> >>>>> base, >> >>>>>>>> but >> >>>>>>>>> there is a lot there. I imagine there is some consumer poll for >> >> each >> >>>>>>> side >> >>>>>>>>> of the join, and possibly a background thread for reading the >> >> KTable >> >>>>>>>> topic. >> >>>>>>>>> >> >>>>>>>>> I figure there are several possible causes of this issue, and >> since >> >>>>>>>> nothing >> >>>>>>>>> is really jumping out in my code, I was going to start looking >> at >> >>>> the >> >>>>>>>> Kafka >> >>>>>>>>> code to see if there is something I can do to fix this. >> >>>>>>>>> >> >>>>>>>>> Thanks, >> >>>>>>>>> Chad >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -- >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Funding Circle Limited is authorised and regulated by the >> Financial >> >>>>>>>> Conduct Authority under firm registration number 722513. Funding >> >>>> Circle >> >>>>>>> is >> >>>>>>>> not covered by the Financial Services Compensation Scheme. >> >> Registered >> >>>>> in >> >>>>>>>> England (Co. No. 06968588) with registered office at 71 Queen >> >>>> Victoria >> >>>>>>>> Street, London EC4V 4AY. >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >>> >> >> >> > >> >