[ 
https://issues.apache.org/jira/browse/KAFKA-19923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18040809#comment-18040809
 ] 

sanghyeok An commented on KAFKA-19923:
--------------------------------------

[~mjsax] thanks for your comments! 

I think that while rare, such a scenario could potentially occur. For instance:
{code:java}
// For business logic 
builder.stream("payments",
        Consumed.with(Serdes.String(), paymentSerde))
       .process(this::handlePayment);
// For Debugging or Log Replay.
builder.stream("payments",
        Consumed.with(Serdes.String(), Serdes.ByteArray()))
       .to("payments-raw-debug");//
{code}
However, I agree that this is not the recommended pattern for kafka streams. 
IMHO, a more robust approache would be to consumer the raw bytes once and then 
transform them such as
{code:java}
Stream stream = builder.stream("payments"); 
stream.map( Key -> ToString, Value -> Topayments)
      .process(this::handlePayMent); 
stream.map( Key -> ToString, Value -> Tobytes)
      .to("payments-raw-debug");  {code}
 

So, yes, IMHO, supporting this feature is not required. (If possible, it would 
be good to throws exception such as TopologyCastException)

However, since this pattern can be misused easily, I think it would be 
beneficial to briefly update the documentation. we could warn about the 
potential ClassCastException and suggest the single stream source approach (not 
to use multi stream source for a single topic) as a best practice.

 

What do you think?

> Kafka Streams throws ClassCastException with different Consumed instances.
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-19923
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19923
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: sanghyeok An
>            Assignee: sanghyeok An
>            Priority: Minor
>
> Kafka Streams throws a ClassCastException when using different Consumed 
> instances for the same topic.
> For example:
> builder.stream("A", Consumed.with(Serdes.String(), Serdes.String()))
>        .peek((k, v) -> System.out.println(k));
> builder.stream("A", Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
>        .peek((k, v) -> System.out.println(k));
>  
> Since both use the same topic name and the same ConsumedInternal 
> configuration for auto offset reset, these two StreamSourceNodes are merged 
> during topology building.
>  
> As a result, the Topology is built successfully.
>  
> However, when the StreamThread starts, the consumer begins to receive records 
> from the broker, and the records flow through the pipeline, a 
> ClassCastException is thrown at runtime.
>  
> In my opinion, we have two options:
>  # Document this behavior.
>  # When merging source nodes, the builder should consider the full 
> ConsumedInternal configuration (for example, key/value SerDes and timestamp 
> extractor), instead of only the auto offset reset policy.
>  
> I think option 1 is also acceptable, because Kafka Streams will fail fast 
> with a ClassCastException before the consumer commits any offsets.
>  
> Option 2 would require more substantial changes in Kafka Streams, because 
> TimestampExtractor and key/value SerDes do not expose a straightforward way 
> to check semantic equivalence.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to