See one small edit below...

On Tue, May 21, 2024 at 10:25 AM Chad Preisler <chad.preis...@gmail.com>
wrote:

> Hello,
>
> I think the issue is related to certain partitions not getting records to
> advance stream time (because of low volume). Can someone confirm that each
> partition has its own stream time and that the stream time for a partition
> only advances when a record is written to the partition after the window
> closes?
>
> If I use the repartition method on each input topic to reduce the number
> of partitions for those streams, how many instances of the application will
> process records? For example, if the input topics each have 6 partitions,
> and I use the repartition method to set the number of partitions for the
> streams to 2, how many instances of the application will process records?
>
> Thanks,
> Chad
>
>
> On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> >>> How do you know this?
>> >> First thing we do is write a log message in the value joiner. We don't
>> see
>> >> the log message for the missed records.
>>
>> Well, for left/right join results, the ValueJoiner would only be called
>> when the window is closed... And for invalid input (or late record, ie,
>> which arrive out-of-order and their window was already closes), records
>> would be dropped right away. So you cannot really infer that a record
>> did make it into the join or not, or what happens if it did make it into
>> the `Processor`.
>>
>> -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring
>>
>> `dropped-records-total` is the name of the metric.
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 5/1/24 11:35 AM, Chad Preisler wrote:
>> > Hello,
>> >
>> > We did some testing in our test environment today. We are seeing some
>> > records processes where only one side of the join has a record. So
>> that's
>> > good. However, we are still seeing some records get skipped. They never
>> hit
>> > the value joiner (we write a log message first thing in the value
>> joiner).
>> > During the test we were putting some load on the system, so stream time
>> was
>> > advancing. We did notice that the join windows were taking much longer
>> than
>> > 30 minutes to close and process records. Thirty minutes is the window
>> plus
>> > grace.
>> >
>> >> How do you know this?
>> > First thing we do is write a log message in the value joiner. We don't
>> see
>> > the log message for the missed records.
>> >
>> > I will try pushing the same records locally. However, we don't see any
>> > errors in our logs and the stream does process one sided joins after the
>> > skipped record. Do you have any docs on the "dropper records" metric? I
>> did
>> > a Google search and didn't find many good results for that.
>> >
>> > Thanks,
>> >
>> > Chad
>> >
>> > On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> >>>> Thanks for the information. I ran the code using Kafka locally. After
>> >>>> submitting some records inside and outside of the time window and
>> grace,
>> >>>> the join performed as expected when running locally.
>> >>
>> >> That gives some hope :)
>> >>
>> >>
>> >>
>> >>> However, they never get into the join.
>> >>
>> >> How do you know this?
>> >>
>> >>
>> >> Did you check the metric for dropper records? Maybe records are
>> >> considers malformed and dropped? Are you using the same records in
>> >> production and in your local test?
>> >>
>> >>
>> >>>> Are there any settings for the stream client that would affect the
>> join?
>> >>
>> >> Not that I can think of... There is one more internal config, but as
>> >> long as data is flowing, it should not impact the result you see.
>> >>
>> >>
>> >>>> Are there any settings on the broker side that would affect the join?
>> >>
>> >> No. The join is computed client side. Broker configs should not have
>> any
>> >> impact.
>> >>
>> >>> f I increase the log level for the streams API would that
>> >>>> shed some light on what is happening?
>> >>
>> >> I don't think it would help much. The code in question is
>> >> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it
>> >> does not do any logging except WARN for the already mentioned "dropping
>> >> malformed" records that is also recorded via JMX.
>> >>
>> >>> WARN: "Skipping record due to null key or value. "
>> >>
>> >>
>> >> If you can identify a specific record from the input which would
>> produce
>> >> an output, but does not, maybe you can try to feed it into your local
>> >> test env and try to re-produce the issue?
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 4/30/24 11:38 AM, Chad Preisler wrote:
>> >>> Matthias,
>> >>>
>> >>> Thanks for the information. I ran the code using Kafka locally. After
>> >>> submitting some records inside and outside of the time window and
>> grace,
>> >>> the join performed as expected when running locally.
>> >>>
>> >>> I'm not sure why the join is not working as expected when running
>> against
>> >>> our actual brokers. We are peeking at the records for the streams and
>> we
>> >>> are seeing the records get pulled. However, they never get into the
>> join.
>> >>> It's been over 24 hours since the expected records were created, and
>> >> there
>> >>> has been plenty of traffic to advance the stream time. Only records
>> that
>> >>> have both a left and right side match are getting processed by the
>> join.
>> >>>
>> >>> Are there any settings for the stream client that would affect the
>> join?
>> >>>
>> >>> Are there any settings on the broker side that would affect the join?
>> >>>
>> >>> The outer join is just one part of the topology. Compared to running
>> it
>> >>> locally there is a lot more data going through the app when running on
>> >> our
>> >>> actual servers. If I increase the log level for the streams API would
>> >> that
>> >>> shed some light on what is happening? Does anyone know if there are
>> >>> specific packages that I should increase the log level for? Any
>> specific
>> >>> log message I can hone in on to tell me what is going on?
>> >>>
>> >>> Basically, I'm looking for some pointers on where I can start looking.
>> >>>
>> >>> Thanks,
>> >>> Chad
>> >>>
>> >>>
>> >>> On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax <mj...@apache.org>
>> >> wrote:
>> >>>
>> >>>>> I expect the join to
>> >>>>>> execute after the 25 with one side of the join containing a record
>> and
>> >>>> the
>> >>>>>> other being null
>> >>>>
>> >>>> Given that you also have a grace period of 5 minutes, the result will
>> >>>> only be emitted after the grace-period passed and the window is
>> closed
>> >>>> (not when window end time is reached).
>> >>>>
>> >>>>> One has a
>> >>>>>> naming convention of "KSTREAM_OUTERSHARED". I see a record there,
>> but
>> >>>> I'm
>> >>>>>> not sure how to decode that message to see what is in it. What is
>> the
>> >>>>>> purpose of those messages?
>> >>>>
>> >>>> It's an internal store, that stores all records which are subject to
>> be
>> >>>> emitted as left/right join result, ie, if there is no inner join
>> result.
>> >>>> The format used is internal:
>> >>>>
>> >>>>
>> >>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java
>> >>>>
>> >>>> Also note: time is based on event-time, ie, if the input stream
>> stops to
>> >>>> send new records, "stream-time" will stop to advance and the result
>> >>>> might not be emitted because the window does not get closed.
>> >>>>
>> >>>> (Last, there is some internal wall-clock time delay of one second to
>> >>>> emit results for performance reasons...)
>> >>>>
>> >>>> HTH.
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 4/30/24 6:51 AM, Chad Preisler wrote:
>> >>>>> Hello,
>> >>>>>
>> >>>>> I have a KStream to KStream outer join with a time difference of 25
>> >>>> minutes
>> >>>>> and 5 minutes of grace.  When I get a record for one side of the
>> join,
>> >>>> but
>> >>>>> don't get a record on the other side of the join, I expect the join
>> to
>> >>>>> execute after the 25 with one side of the join containing a record
>> and
>> >>>> the
>> >>>>> other being null. Is that correct?  If it is correct, it's not
>> working
>> >>>> for
>> >>>>> me.
>> >>>>>
>> >>>>> I was poking around on the broker and saw some internal topics. I
>> see
>> >> the
>> >>>>> key I expected to execute the join on some of those topics. One has
>> a
>> >>>>> naming convention of "KSTREAM_OUTERSHARED". I see a record there,
>> but
>> >> I'm
>> >>>>> not sure how to decode that message to see what is in it. What is
>> the
>> >>>>> purpose of those messages? If I decode the message will it help me
>> see
>> >>>> when
>> >>>>> the join should have been executed?
>> >>>>>
>> >>>>> I also see the key on a topic with the naming convention
>> >>>>> "KSTREAM_OUTERTHIS".
>> >>>>>
>> >>>>> Are there any other topics that I should be looking at to
>> troubleshoot
>> >>>> this
>> >>>>> issue?
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Chad
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Reply via email to