See one small edit below... On Tue, May 21, 2024 at 10:25 AM Chad Preisler <chad.preis...@gmail.com> wrote:
> Hello, > > I think the issue is related to certain partitions not getting records to > advance stream time (because of low volume). Can someone confirm that each > partition has its own stream time and that the stream time for a partition > only advances when a record is written to the partition after the window > closes? > > If I use the repartition method on each input topic to reduce the number > of partitions for those streams, how many instances of the application will > process records? For example, if the input topics each have 6 partitions, > and I use the repartition method to set the number of partitions for the > streams to 2, how many instances of the application will process records? > > Thanks, > Chad > > > On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax <mj...@apache.org> wrote: > >> >>> How do you know this? >> >> First thing we do is write a log message in the value joiner. We don't >> see >> >> the log message for the missed records. >> >> Well, for left/right join results, the ValueJoiner would only be called >> when the window is closed... And for invalid input (or late record, ie, >> which arrive out-of-order and their window was already closes), records >> would be dropped right away. So you cannot really infer that a record >> did make it into the join or not, or what happens if it did make it into >> the `Processor`. >> >> -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring >> >> `dropped-records-total` is the name of the metric. >> >> >> >> -Matthias >> >> >> >> On 5/1/24 11:35 AM, Chad Preisler wrote: >> > Hello, >> > >> > We did some testing in our test environment today. We are seeing some >> > records processes where only one side of the join has a record. So >> that's >> > good. However, we are still seeing some records get skipped. They never >> hit >> > the value joiner (we write a log message first thing in the value >> joiner). >> > During the test we were putting some load on the system, so stream time >> was >> > advancing. We did notice that the join windows were taking much longer >> than >> > 30 minutes to close and process records. Thirty minutes is the window >> plus >> > grace. >> > >> >> How do you know this? >> > First thing we do is write a log message in the value joiner. We don't >> see >> > the log message for the missed records. >> > >> > I will try pushing the same records locally. However, we don't see any >> > errors in our logs and the stream does process one sided joins after the >> > skipped record. Do you have any docs on the "dropper records" metric? I >> did >> > a Google search and didn't find many good results for that. >> > >> > Thanks, >> > >> > Chad >> > >> > On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax <mj...@apache.org> >> wrote: >> > >> >>>> Thanks for the information. I ran the code using Kafka locally. After >> >>>> submitting some records inside and outside of the time window and >> grace, >> >>>> the join performed as expected when running locally. >> >> >> >> That gives some hope :) >> >> >> >> >> >> >> >>> However, they never get into the join. >> >> >> >> How do you know this? >> >> >> >> >> >> Did you check the metric for dropper records? Maybe records are >> >> considers malformed and dropped? Are you using the same records in >> >> production and in your local test? >> >> >> >> >> >>>> Are there any settings for the stream client that would affect the >> join? >> >> >> >> Not that I can think of... There is one more internal config, but as >> >> long as data is flowing, it should not impact the result you see. >> >> >> >> >> >>>> Are there any settings on the broker side that would affect the join? >> >> >> >> No. The join is computed client side. Broker configs should not have >> any >> >> impact. >> >> >> >>> f I increase the log level for the streams API would that >> >>>> shed some light on what is happening? >> >> >> >> I don't think it would help much. The code in question is >> >> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it >> >> does not do any logging except WARN for the already mentioned "dropping >> >> malformed" records that is also recorded via JMX. >> >> >> >>> WARN: "Skipping record due to null key or value. " >> >> >> >> >> >> If you can identify a specific record from the input which would >> produce >> >> an output, but does not, maybe you can try to feed it into your local >> >> test env and try to re-produce the issue? >> >> >> >> >> >> -Matthias >> >> >> >> On 4/30/24 11:38 AM, Chad Preisler wrote: >> >>> Matthias, >> >>> >> >>> Thanks for the information. I ran the code using Kafka locally. After >> >>> submitting some records inside and outside of the time window and >> grace, >> >>> the join performed as expected when running locally. >> >>> >> >>> I'm not sure why the join is not working as expected when running >> against >> >>> our actual brokers. We are peeking at the records for the streams and >> we >> >>> are seeing the records get pulled. However, they never get into the >> join. >> >>> It's been over 24 hours since the expected records were created, and >> >> there >> >>> has been plenty of traffic to advance the stream time. Only records >> that >> >>> have both a left and right side match are getting processed by the >> join. >> >>> >> >>> Are there any settings for the stream client that would affect the >> join? >> >>> >> >>> Are there any settings on the broker side that would affect the join? >> >>> >> >>> The outer join is just one part of the topology. Compared to running >> it >> >>> locally there is a lot more data going through the app when running on >> >> our >> >>> actual servers. If I increase the log level for the streams API would >> >> that >> >>> shed some light on what is happening? Does anyone know if there are >> >>> specific packages that I should increase the log level for? Any >> specific >> >>> log message I can hone in on to tell me what is going on? >> >>> >> >>> Basically, I'm looking for some pointers on where I can start looking. >> >>> >> >>> Thanks, >> >>> Chad >> >>> >> >>> >> >>> On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax <mj...@apache.org> >> >> wrote: >> >>> >> >>>>> I expect the join to >> >>>>>> execute after the 25 with one side of the join containing a record >> and >> >>>> the >> >>>>>> other being null >> >>>> >> >>>> Given that you also have a grace period of 5 minutes, the result will >> >>>> only be emitted after the grace-period passed and the window is >> closed >> >>>> (not when window end time is reached). >> >>>> >> >>>>> One has a >> >>>>>> naming convention of "KSTREAM_OUTERSHARED". I see a record there, >> but >> >>>> I'm >> >>>>>> not sure how to decode that message to see what is in it. What is >> the >> >>>>>> purpose of those messages? >> >>>> >> >>>> It's an internal store, that stores all records which are subject to >> be >> >>>> emitted as left/right join result, ie, if there is no inner join >> result. >> >>>> The format used is internal: >> >>>> >> >>>> >> >> >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java >> >>>> >> >>>> Also note: time is based on event-time, ie, if the input stream >> stops to >> >>>> send new records, "stream-time" will stop to advance and the result >> >>>> might not be emitted because the window does not get closed. >> >>>> >> >>>> (Last, there is some internal wall-clock time delay of one second to >> >>>> emit results for performance reasons...) >> >>>> >> >>>> HTH. >> >>>> >> >>>> -Matthias >> >>>> >> >>>> On 4/30/24 6:51 AM, Chad Preisler wrote: >> >>>>> Hello, >> >>>>> >> >>>>> I have a KStream to KStream outer join with a time difference of 25 >> >>>> minutes >> >>>>> and 5 minutes of grace. When I get a record for one side of the >> join, >> >>>> but >> >>>>> don't get a record on the other side of the join, I expect the join >> to >> >>>>> execute after the 25 with one side of the join containing a record >> and >> >>>> the >> >>>>> other being null. Is that correct? If it is correct, it's not >> working >> >>>> for >> >>>>> me. >> >>>>> >> >>>>> I was poking around on the broker and saw some internal topics. I >> see >> >> the >> >>>>> key I expected to execute the join on some of those topics. One has >> a >> >>>>> naming convention of "KSTREAM_OUTERSHARED". I see a record there, >> but >> >> I'm >> >>>>> not sure how to decode that message to see what is in it. What is >> the >> >>>>> purpose of those messages? If I decode the message will it help me >> see >> >>>> when >> >>>>> the join should have been executed? >> >>>>> >> >>>>> I also see the key on a topic with the naming convention >> >>>>> "KSTREAM_OUTERTHIS". >> >>>>> >> >>>>> Are there any other topics that I should be looking at to >> troubleshoot >> >>>> this >> >>>>> issue? >> >>>>> >> >>>>> Thanks, >> >>>>> Chad >> >>>>> >> >>>> >> >>> >> >> >> > >> >