Seems this question was cross-posted on SO: https://stackoverflow.com/questions/79299571/inconsistent-results-with-kstream-ktable-join-under-load

Left an answer there.

On 12/28/24 11:45 PM, Hagiu Alexandru wrote:
Hello,

I'm developing a Quarkus microservice that utilizes Kafka Streams to
process messages from multiple topics. Specifically, I'm attempting to join
a KStream and a KTable derived from two of these topics. Under normal
conditions, the join operates as expected. However, under increased load,
the join occasionally fails to produce results, even though both messages
appear to be processed, but the ones from the KTable seems to be processed
slightly after the ones of the KStream(even if the actual timestamp of the
produced message is before).

Code Example:

KStream<String, OperationAvro> operationKStream =
streamsBuilder.stream(operationTopic);
KStream<String, ContextAvro> contextKStream =
streamsBuilder.stream(contextTopic);
KTable<String, ContextAvro> contextKTable =
contextKStream.toTable(Materialized.as(contextStoreName));

// both KTable and KStream use a custom processor which logs each
message and its timestamp

KStream<String, Result> processingResultStream = operationKStream
     .filter((key, value) -> isEligible(value))
     .join(
         contextKTable,
         (operation, context) -> topicsJoiner(operation, context),
         Joined.with(Serdes.String(), new
ValueAndTraceSerde<>(getSpecificAvroSerde()), getSpecificAvroSerde())
     )
     .peek(this::logTheOutputOfJoiner)
     .mapValues(handler::processTheJoinedData);


*Issue:*

    - Under normal load, the join between operationKStream and contextKTable
    functions correctly.
    - Under higher load, there are instances where the join does not produce
    results, because when the KStream's message is processed, the corresponding
    KTable's message is not yet present, even if it is actually already sent
    from the producer. *Note:* when the KTable's message is processed and
    logged(after the join was tried and skipped), the timestamp of it(attached
    by the producer when the message was created) is before the timestamp of
    the corresponding message from the KSTream.

*Troubleshooting Steps Taken:*

    - Checked Partition Assignment: Ensured that messages with the same key
    are assigned to the same partition across topics to facilitate correct
    joining(co-partitioning).
    - Adjusted num.stream.threads: Set to 6 as all the topics I'm subscribed
    to have 6 partitions. This didn't solve the issue as the messages with the
    same key will be distributed to the same stream task which is on a single
    thread for co-partitioning.
    - Adjusted max.task.idle.ms: Set to 2000 ms to allow for out-of-order
    message handling(
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization).
    This did not resolve the issue.

My question is why this issue happen as the timestamp of the messages seems
to be in correct order, but it happen for the messages from the KStream to
be processed before the ones of KTable, which make the join to be skipped.


Thank you,

Alex Hagiu


Reply via email to