It seems like I have 2 options to work around this issue.

   - Keep the KTable and have another process running that puts the missed
   join message back on the event topic.
   - Switch to GlobalKTable.

Any other solutions/workarounds are welcome.

Thanks,
Chad

On Thu, Nov 4, 2021 at 11:43 AM Chad Preisler <chad.preis...@gmail.com>
wrote:

> enforced-processing-total is zero for all missed join occurrences. I
> logged all the metrics out at the time my stream processed the missed join,
> so let me know if there are any other metics that would help.
>
> On Wed, Nov 3, 2021 at 9:21 PM Chad Preisler <chad.preis...@gmail.com>
> wrote:
>
>> I'm not sure. When I ran with trace logging turned on I saw a bunch of
>> messages like the ones below. Do those messages indicate
>> "enforced-processing"? It gets logged right after the call
>> to enforcedProcessingSensor.record.
>>
>> Continuing to process although some partitions are empty on the broker.
>> There may be out-of-order processing for this task as a result. Partitions
>> with local data: [status-5]. Partitions we gave up waiting for, with their
>> corresponding deadlines: {event-5=1635881287722}. Configured
>> max.task.idle.ms: 2000. Current wall-clock time: 1635881287750.
>>
>> Continuing to process although some partitions are empty on the broker.
>> There may be out-of-order processing for this task as a result. Partitions
>> with local data: [event-5]. Partitions we gave up waiting for, with their
>> corresponding deadlines: {status-5=1635881272754}. Configured
>> max.task.idle.ms: 2000. Current wall-clock time: 1635881277998.
>>
>> On Wed, Nov 3, 2021 at 6:11 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> Can you check if the program ever does "enforced processing", ie,
>>> `max.task.idle.ms` passed, and we process despite an empty input buffer.
>>>
>>> Cf https://kafka.apache.org/documentation/#kafka_streams_task_monitoring
>>>
>>> As long as there is input data, we should never do "enforced processing"
>>> and the metric should stay at zero.
>>>
>>>
>>> -Matthias
>>>
>>> On 11/3/21 2:41 PM, Chad Preisler wrote:
>>> > Just a quick update. Setting max.task.idle.ms to 10000 (10 seconds)
>>> had no
>>> > effect on this issue.
>>> >
>>> > On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler <chad.preis...@gmail.com>
>>> > wrote:
>>> >
>>> >> No unfortunately it is not the case. The table record is written
>>> about 20
>>> >> seconds before the stream record. I’ll crank up the time tomorrow and
>>> see
>>> >> what happens.
>>> >>
>>> >> On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>> >>
>>> >>> Hard to tell, but as it seems that you can reproduce the issue, it
>>> might
>>> >>> be worth a try to increase the idle time further.
>>> >>>
>>> >>> I guess one corner case for stream-table join that is not resolved
>>> yet
>>> >>> is when stream and table record have the same timestamp... For this
>>> >>> case, the table record might not be processed first.
>>> >>>
>>> >>> Could you hit this case?
>>> >>>
>>> >>>
>>> >>> -Matthias
>>> >>>
>>> >>> On 11/2/21 3:13 PM, Chad Preisler wrote:
>>> >>>> Thank you for the information. We are using the Kafka 3.0 client
>>> >>> library.
>>> >>>> We are able to reliably reproduce this issue in our test environment
>>> >>> now. I
>>> >>>> removed my timestamp extractor, and I set the max.task.idle.ms to
>>> >>> 2000. I
>>> >>>> also turned on trace logging for package
>>> >>>> org.apache.kafka.streams.processor.internals.
>>> >>>>
>>> >>>> To create the issue we stopped the application and ran enough data
>>> to
>>> >>>> create a lag of 400 messages. We saw 5 missed joins.
>>> >>>>
>>> >>>>   From the stream-thread log messages we saw the event message, our
>>> >>> stream
>>> >>>> missed the join, and then several milliseconds later we saw the
>>> >>>> stream-thread print out the status message. The stream-thread
>>> printed
>>> >>> out
>>> >>>> our status message a total of 5 times.
>>> >>>>
>>> >>>> Given that only a few milliseconds passed between missing the join
>>> and
>>> >>> the
>>> >>>> stream-thread printing the status message, would increasing the
>>> >>>> max.task.idle.ms help?
>>> >>>>
>>> >>>> Thanks,
>>> >>>> Chad
>>> >>>>
>>> >>>> On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org>
>>> >>> wrote:
>>> >>>>
>>> >>>>> Timestamp synchronization is not perfect, and as a matter of fact,
>>> we
>>> >>>>> fixed a few gaps in 3.0.0 release. We actually hope, that we
>>> closed the
>>> >>>>> last gaps in 3.0.0... *fingers-crossed* :)
>>> >>>>>
>>> >>>>>> We are using a timestamp extractor that returns 0.
>>> >>>>>
>>> >>>>> You can do this, and it effectively "disables" timestamp
>>> >>> synchronization
>>> >>>>> as records on the KTable side don't have a timeline any longer. As
>>> a
>>> >>>>> side effect it also allows you to "bootstrap" the table, as records
>>> >>> with
>>> >>>>> timestamp zero will always be processed first (as they are
>>> smaller). Of
>>> >>>>> course, you also don't have time synchronization for "future" data
>>> and
>>> >>>>> your program becomes non-deterministic if you reprocess old data.
>>> >>>>>
>>> >>>>>> his seemed to be the only
>>> >>>>>> way to bootstrap enough records at startup to avoid the missed
>>> join.
>>> >>>>>
>>> >>>>> Using 3.0.0 and enabling timestamp synchronization via
>>> >>>>> `max.task.idle.ms` config, should allow you to get the correct
>>> >>> behavior
>>> >>>>> without the zero-extractor (of course, your KTable data must have
>>> >>>>> smaller timestamps that your KStream data).
>>> >>>>>
>>> >>>>>> If I use "timestamp synchronization" do I have to remove the zero
>>> >>>>>> timestamp extractor? If I remove the zero timestamp extractor will
>>> >>>>>> timestamp synchronization take care of the missed join issue on
>>> >>> startup?
>>> >>>>>
>>> >>>>> To be more precise: timestamp synchronization is _always_ on. The
>>> >>>>> question is just how strict it is applied. By default, we do the
>>> >>> weakest
>>> >>>>> from which is only best effort.
>>> >>>>>
>>> >>>>>> I'm guessing the issue here is that occasionally the poll request
>>> is
>>> >>> not
>>> >>>>>> returning the matching record for the KTable side of the join
>>> before
>>> >>> the
>>> >>>>>> task goes off and starts processing records.
>>> >>>>>
>>> >>>>> Yes, because of default best effort approach. That is why you
>>> should
>>> >>>>> increase `max.task.idle.ms` to detect this case and "skip"
>>> processing
>>> >>>>> and let KS do another poll() to get KTable data.
>>> >>>>>
>>> >>>>> 2.8 and earlier:
>>> >>>>>
>>> >>>>> max.task.idle.ms=0 -> best effort (no poll() retry)
>>> >>>>> max.task.idle.ms>0 -> try to do another poll() until data is
>>> there or
>>> >>>>> idle time passed
>>> >>>>>
>>> >>>>> Note: >0 might still "fail" even if there is data, because consumer
>>> >>>>> fetch behavior is not predictable.
>>> >>>>>
>>> >>>>>
>>> >>>>> 3.0:
>>> >>>>>
>>> >>>>> max.task.idle.ms=-1 -> best effort (no poll() retry)
>>> >>>>> max.task.idle.ms=0 -> if there is data broker side, repeat to
>>> poll()
>>> >>>>> until you get the data
>>> >>>>> max.task.idle.ms>0 -> even if there is not data broker side, wait
>>> >>> until
>>> >>>>> data becomes available or the idle time passed
>>> >>>>>
>>> >>>>>
>>> >>>>> Hope this helps.
>>> >>>>>
>>> >>>>>
>>> >>>>> -Matthias
>>> >>>>>
>>> >>>>> On 11/1/21 4:29 PM, Guozhang Wang wrote:
>>> >>>>>> Hello Chad,
>>> >>>>>>
>>> >>>>>>    From your earlier comment, you mentioned "In my scenario the
>>> records
>>> >>>>> were
>>> >>>>>> written to the KTable topic before the record was written to the
>>> >>> KStream
>>> >>>>>> topic." So I think Matthias and others have excluded this
>>> possibility
>>> >>>>> while
>>> >>>>>> trying to help investigate.
>>> >>>>>>
>>> >>>>>> If only the matching records from KStream are returned via a
>>> single a
>>> >>>>>> consumer poll call but not the other records from KTable, then you
>>> >>> would
>>> >>>>>> miss this matched join result.
>>> >>>>>>
>>> >>>>>> Guozhang
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler <
>>> >>> chad.preis...@gmail.com>
>>> >>>>>> wrote:
>>> >>>>>>
>>> >>>>>>> Thank you for your response and the links to the presentations.
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> *However, this seems tobe orthogonal to your issue?*
>>> >>>>>>>
>>> >>>>>>> Yes. From what I see in the code it looks like you have a single
>>> >>>>> consumer
>>> >>>>>>> subscribed to multiple topics. Please correct me if I'm wrong.
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> *By default, timestamp synchronization is disabled.
>>> Maybeenabling it
>>> >>>>> would
>>> >>>>>>> help?*
>>> >>>>>>>
>>> >>>>>>> We are using a timestamp extractor that returns 0. We did that
>>> >>> because
>>> >>>>> we
>>> >>>>>>> were almost always missing joins on startup, and this seemed to
>>> be
>>> >>> the
>>> >>>>> only
>>> >>>>>>> way to bootstrap enough records at startup to avoid the missed
>>> join.
>>> >>> We
>>> >>>>>>> found a post that said doing that would make the KTable act like
>>> the
>>> >>>>>>> GlobalKTable at startup. So far this works great, we never miss a
>>> >>> join
>>> >>>>> on a
>>> >>>>>>> startup. If I use "timestamp synchronization" do I have to
>>> remove the
>>> >>>>> zero
>>> >>>>>>> timestamp extractor? If I remove the zero timestamp extractor
>>> will
>>> >>>>>>> timestamp synchronization take care of the missed join issue on
>>> >>> startup?
>>> >>>>>>>
>>> >>>>>>> I'm guessing the issue here is that occasionally the poll
>>> request is
>>> >>> not
>>> >>>>>>> returning the matching record for the KTable side of the join
>>> before
>>> >>> the
>>> >>>>>>> task goes off and starts processing records. Later when we put
>>> the
>>> >>> same
>>> >>>>>>> record on the topic and the KTable has had a chance to load more
>>> >>> records
>>> >>>>>>> the join works and everything is good to go. Because of the way
>>> our
>>> >>>>> system
>>> >>>>>>> works no new status records have been written and so the new
>>> record
>>> >>>>> joins
>>> >>>>>>> against the correct status.
>>> >>>>>>>
>>> >>>>>>> Do you agree that the poll request is returning the KStream
>>> record
>>> >>> but
>>> >>>>> not
>>> >>>>>>> returning the KTable record and therefore the join is getting
>>> >>> missed? If
>>> >>>>>>> you don't agree, what do you think is going on? Is there a way to
>>> >>> prove
>>> >>>>>>> this out?
>>> >>>>>>>
>>> >>>>>>> Thanks,
>>> >>>>>>> Chad
>>> >>>>>>>
>>> >>>>>>> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <
>>> mj...@apache.org>
>>> >>>>> wrote:
>>> >>>>>>>
>>> >>>>>>>> Yes, a StreamThread has one consumer. The number of
>>> StreamThreads
>>> >>> per
>>> >>>>>>>> instance is configurable via `num.stream.threads`. Partitions
>>> are
>>> >>>>>>>> assigned to threads similar to consumer is a plain consumer
>>> group.
>>> >>>>>>>>
>>> >>>>>>>> It seems you run with the default of one thread per instance.
>>> As you
>>> >>>>>>>> spin up 12 instances, it results in 12 threads for the
>>> application.
>>> >>> As
>>> >>>>>>>> you have 12 partitions, using more threads won't be useful as no
>>> >>>>>>>> partitions are left for them to process.
>>> >>>>>>>>
>>> >>>>>>>> For a stream-table joins, there will be one task per "partition
>>> >>> pair"
>>> >>>>>>>> that computes the join for those partitions. So you get 12
>>> tasks,
>>> >>> and
>>> >>>>>>>> each thread processes one task in your setup. Ie, a thread
>>> consumer
>>> >>> is
>>> >>>>>>>> reading data for both input topics.
>>> >>>>>>>>
>>> >>>>>>>> Pausing happens on a per-partition bases: for joins there is two
>>> >>>>> buffers
>>> >>>>>>>> per task (one for each input topic partition). It's possible
>>> that
>>> >>> one
>>> >>>>>>>> partition is paused while the other is processed. However, this
>>> >>> seems
>>> >>>>> to
>>> >>>>>>>> be orthogonal to your issue?
>>> >>>>>>>>
>>> >>>>>>>> For a GlobalKTable, you get an additional GlobalThread that only
>>> >>> reads
>>> >>>>>>>> the data from the "global topic" to update the GlobalKTable.
>>> >>> Semantics
>>> >>>>>>>> of KStream-KTable and KStream-GlobalKTable joins are different:
>>> Cf
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>
>>> >>>
>>> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
>>> >>>>>>>>
>>> >>>>>>>> For the timestamp synchronization, you may checkout `
>>> >>> max.task.idle.ms`
>>> >>>>>>>> config. By default, timestamp synchronization is disabled. Maybe
>>> >>>>>>>> enabling it would help?
>>> >>>>>>>>
>>> >>>>>>>> You may also check out slides 34-38:
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>
>>> >>>
>>> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
>>> >>>>>>>>
>>> >>>>>>>> There is one corner case: if two records with the same timestamp
>>> >>> come
>>> >>>>>>>> it, it's not defined which one will be processed first.
>>> >>>>>>>>
>>> >>>>>>>> Hope this helps.
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> -Matthias
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> On 10/30/21 6:45 AM, Chad Preisler wrote:
>>> >>>>>>>>> Yes, this helped. I have some additional questions.
>>> >>>>>>>>>
>>> >>>>>>>>> Does StreamThread have one consumer? (Looks like it, but just
>>> want
>>> >>> to
>>> >>>>>>>>> confirm)
>>> >>>>>>>>> Is there a separate StreamThread for each topic including the
>>> >>> KTable?
>>> >>>>>>>>> If a KTable is a StreamThread and there is a  StreamTask for
>>> that
>>> >>>>>>> KTable,
>>> >>>>>>>>> could my buffer be getting filled up, and the mainConsumer for
>>> the
>>> >>>>>>> KTable
>>> >>>>>>>>> be getting paused? I see this code in StreamTask#addRecords.
>>> >>>>>>>>>
>>> >>>>>>>>> // if after adding these records, its partition queue's
>>> buffered
>>> >>> size
>>> >>>>>>> has
>>> >>>>>>>>> been
>>> >>>>>>>>>             // increased beyond the threshold, we can then
>>> pause the
>>> >>>>>>>>> consumption for this partition
>>> >>>>>>>>>             if (newQueueSize > maxBufferedSize) {
>>> >>>>>>>>>                 mainConsumer.pause(singleton(partition));
>>> >>>>>>>>>             }
>>> >>>>>>>>>
>>> >>>>>>>>> Is there any specific logging that I can set to debug or trace
>>> that
>>> >>>>>>> would
>>> >>>>>>>>> help me troubleshoot? I'd prefer not to turn debug and/or
>>> trace on
>>> >>> for
>>> >>>>>>>>> every single class.
>>> >>>>>>>>>
>>> >>>>>>>>> Thanks,
>>> >>>>>>>>> Chad
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com>
>>> >>> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>>> Hi Chad,
>>> >>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams
>>> >>> internal
>>> >>>>>>>> code
>>> >>>>>>>>>> that reads records for the join?
>>> >>>>>>>>>> --> You can check StreamThread#pollPhase, where stream thread
>>> >>> (main
>>> >>>>>>>>>> consumer) periodically poll records. And then, it'll process
>>> each
>>> >>>>>>>> topology
>>> >>>>>>>>>> node with these polled records in stream tasks
>>> >>> (StreamTask#process).
>>> >>>>>>>>>>
>>> >>>>>>>>>> Hope that helps.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Thanks.
>>> >>>>>>>>>> Luke
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart
>>> >>>>>>>>>> <gilles.philipp...@fundingcircle.com.invalid> wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>>> Hi Chad, this talk around 24:00 clearly explains what you’re
>>> >>> seeing
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>
>>> >>>
>>> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
>>> >>>>>>>>>>> <
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>
>>> >>>
>>> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Gilles
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>> On 30 Oct 2021, at 04:02, Chad Preisler <
>>> >>> chad.preis...@gmail.com>
>>> >>>>>>>>>> wrote:
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Hello,
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> I have a stream application that does a KStream to KTable
>>> left
>>> >>>>> join.
>>> >>>>>>>> We
>>> >>>>>>>>>>>> seem to be occasionally missing joins
>>> >>> <
>>> https://www.google.com/maps/search/%3E%3E%3E%3E%3E+seem+to+be+occasionally+missing+joins?entry=gmail&source=g
>>> >
>>> >>> (KTable side is null).
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams
>>> >>> internal
>>> >>>>>>>>>> code
>>> >>>>>>>>>>>> that reads records for the join? I've poked around the Kafka
>>> >>> code
>>> >>>>>>>> base,
>>> >>>>>>>>>>> but
>>> >>>>>>>>>>>> there is a lot there. I imagine there is some consumer poll
>>> for
>>> >>>>> each
>>> >>>>>>>>>> side
>>> >>>>>>>>>>>> of the join, and possibly a background thread for reading
>>> the
>>> >>>>> KTable
>>> >>>>>>>>>>> topic.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> I figure there are several possible causes of this issue,
>>> and
>>> >>> since
>>> >>>>>>>>>>> nothing
>>> >>>>>>>>>>>> is really jumping out in my code, I was going to start
>>> looking
>>> >>> at
>>> >>>>>>> the
>>> >>>>>>>>>>> Kafka
>>> >>>>>>>>>>>> code to see if there is something I can do to fix this.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Thanks,
>>> >>>>>>>>>>>> Chad
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> --
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Funding Circle Limited is authorised and regulated by the
>>> >>> Financial
>>> >>>>>>>>>>> Conduct Authority under firm registration number 722513.
>>> Funding
>>> >>>>>>> Circle
>>> >>>>>>>>>> is
>>> >>>>>>>>>>> not covered by the Financial Services Compensation Scheme.
>>> >>>>> Registered
>>> >>>>>>>> in
>>> >>>>>>>>>>> England (Co. No. 06968588) with registered office at 71 Queen
>>> >>>>>>> Victoria
>>> >>>>>>>>>>> Street, London EC4V 4AY.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> >>
>>> >
>>>
>>

Reply via email to