Hi,

thanks for reporting this issue. I did look into it, and it is indeed a bug: https://issues.apache.org/jira/browse/KAFKA-18478

The workaround should be to set the StreamsConfig value serde to the stream-side value serde to make it work.


-Matthias

On 1/10/25 5:36 AM, Hagiu Alexandru wrote:
Hello,

Kind reminder :)

Also, I want to add that I am trying to add a grace period for my
kstream-ktable join, but I encounter an exception.
It seems the Serde that I am providing in the join description is not taken
into consideration when creating the *RocksDBTimeOrderedKeyValueBuffer*.

*<I see some similar issue posted
here: https://lists.apache.org/thread/0vnqkq61tg50rpc826fvq863b2nr96m2
<https://lists.apache.org/thread/0vnqkq61tg50rpc826fvq863b2nr96m2>>*

*The code:*
KStream<String, ContextAvro> contextKStream =
streamsBuilder.stream(contextTopic);

         // Define the versioned state store supplier with a retention period
         VersionedBytesStoreSupplier storeSupplier =
Stores.persistentVersionedKeyValueStore(
             contextStoreName,        // Store name
             Duration.ofMinutes(2)  // History retention period
         );

         // Build the KTable with the versioned state store
         Materialized<String, ContextAvro, KeyValueStore<Bytes, byte[]>>
materialized = Materialized.<String, ContextAvro>as(
                 storeSupplier)
             .withKeySerde(Serdes.String())
             .withValueSerde(getSpecificAvroSerde());

         KTable<String, ContextAvro> contextKTable =
contextKStream.toTable(materialized);


         FixedKeyProcessorSupplier<String, PaymentOperationAvro,
ValueAndTrace<PaymentOperationAvro>> valueAndTraceProcessorSupplier =
             KafkaTracingValueAndTraceProcessor::new;

         // ValuAndTrace is a custom class which encapsulated the message V
value and some tracingData
         // ValuAndTraceSerde is its Serde

          KStream<String, PaymentOperationProcessingResult>
processingResultStream = adyenPaymentOperationKStream
             .processValues(valueAndTraceProcessorSupplier)
             .join(contextKTable,
                 operationJoiner,




*Joined.with(Serdes.String(),                    new
ValueAndTraceSerde<>(getSpecificAvroSerde()),
getSpecificAvroSerde(),                     null,
Duration.ofSeconds(2)))*
             .peek(this::logFollowupOperation)
             .mapValues(operationHandler::handle);

         // operationJoiner is a custom joiner which decapsulate the value
from ValueAndTrace back

*The error:*
org.apache.kafka.streams.errors.StreamsException: ClassCastException
invoking processor: KSTREAM-JOIN-0000000039. Do the Processor's input types
match the deserialized types? Check the Serde setup and change the default
Serdes in StreamConfig or provide correct Serdes via method parameters.
Make sure the Processor can accept the deserialized input of type key:
java.lang.String, and value: com.nestle.nestpay.tracing.ValueAndTrace.
Note that although incorrect Serdes are a common cause of error, the cast
exception might have another cause (in user code, for example). For
example, if a processor wires in a store, but casts the generics
incorrectly, a class cast exception could be raised during processing, but
the cause would not be wrong Serdes.

at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:165)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
at
org.apache.kafka.streams.TopologyTestDriver.completeAllProcessableWork(TopologyTestDriver.java:601)
at
org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:558)
at
org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:847)
at
org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
at
org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:137)
at
com.nestle.nestpay.adyenadapter.KafkaStreamsServiceTest.whenAdyenFollowupOperationAndFollowupOperationContextEventsArePresentForTheSameKey_thenShouldConsumeTheEvents(KafkaStreamsServiceTest.java:279)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at
io.quarkus.test.junit.QuarkusTestExtension.runExtensionMethod(QuarkusTestExtension.java:1013)
at
io.quarkus.test.junit.QuarkusTestExtension.interceptTestMethod(QuarkusTestExtension.java:827)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Caused by: java.lang.ClassCastException: class
com.nestle.nestpay.tracing.ValueAndTrace cannot be cast to class
org.apache.avro.specific.SpecificRecord
(com.nestle.nestpay.tracing.ValueAndTrace and
org.apache.avro.specific.SpecificRecord are in unnamed module of loader
io.quarkus.bootstrap.classloading.QuarkusClassLoader @47c64cfe)
at
io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer.put(RocksDBTimeOrderedKeyValueBuffer.java:281)
at
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:101)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
... 19 more"


On Sun, 29 Dec 2024 at 09:45, Hagiu Alexandru <alexhagi...@gmail.com> wrote:

Hello,

I'm developing a Quarkus microservice that utilizes Kafka Streams to
process messages from multiple topics. Specifically, I'm attempting to join
a KStream and a KTable derived from two of these topics. Under normal
conditions, the join operates as expected. However, under increased load,
the join occasionally fails to produce results, even though both messages
appear to be processed, but the ones from the KTable seems to be processed
slightly after the ones of the KStream(even if the actual timestamp of the
produced message is before).

Code Example:

KStream<String, OperationAvro> operationKStream = 
streamsBuilder.stream(operationTopic);
KStream<String, ContextAvro> contextKStream = 
streamsBuilder.stream(contextTopic);
KTable<String, ContextAvro> contextKTable = 
contextKStream.toTable(Materialized.as(contextStoreName));

// both KTable and KStream use a custom processor which logs each message and 
its timestamp

KStream<String, Result> processingResultStream = operationKStream
     .filter((key, value) -> isEligible(value))
     .join(
         contextKTable,
         (operation, context) -> topicsJoiner(operation, context),
         Joined.with(Serdes.String(), new 
ValueAndTraceSerde<>(getSpecificAvroSerde()), getSpecificAvroSerde())
     )
     .peek(this::logTheOutputOfJoiner)
     .mapValues(handler::processTheJoinedData);


*Issue:*

    - Under normal load, the join between operationKStream and
    contextKTable functions correctly.
    - Under higher load, there are instances where the join does not
    produce results, because when the KStream's message is processed, the
    corresponding KTable's message is not yet present, even if it is actually
    already sent from the producer. *Note:* when the KTable's message is
    processed and logged(after the join was tried and skipped), the timestamp
    of it(attached by the producer when the message was created) is before the
    timestamp of the corresponding message from the KSTream.

*Troubleshooting Steps Taken:*

    - Checked Partition Assignment: Ensured that messages with the same
    key are assigned to the same partition across topics to facilitate correct
    joining(co-partitioning).
    - Adjusted num.stream.threads: Set to 6 as all the topics I'm
    subscribed to have 6 partitions. This didn't solve the issue as the
    messages with the same key will be distributed to the same stream task
    which is on a single thread for co-partitioning.
    - Adjusted max.task.idle.ms: Set to 2000 ms to allow for out-of-order
    message handling(
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization).
    This did not resolve the issue.

My question is why this issue happen as the timestamp of the messages
seems to be in correct order, but it happen for the messages from the
KStream to be processed before the ones of KTable, which make the join to
be skipped.


Thank you,

Alex Hagiu




Reply via email to