[ 
https://issues.apache.org/jira/browse/KAFKA-19923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-19923:
---------------------------------
    Description: 
Kafka Streams throws a ClassCastException when using different Consumed 
instances for the same topic.

For example:
{code:java}
builder.stream("A", Consumed.with(Serdes.String(), Serdes.String()))
       .peek((k, v) -> System.out.println(k));
builder.stream("A", Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
       .peek((k, v) -> System.out.println(k));
{code}
 

Since both use the same topic name and the same ConsumedInternal configuration 
for auto offset reset, these two StreamSourceNodes are merged during topology 
building.

 

As a result, the Topology is built successfully.

 

However, when the StreamThread starts, the consumer begins to receive records 
from the broker, and the records flow through the pipeline, a 
ClassCastException is thrown at runtime.

 

In my opinion, we have two options:
 # Document this behavior.
 # When merging source nodes, the builder should consider the full 
ConsumedInternal configuration (for example, key/value SerDes and timestamp 
extractor), instead of only the auto offset reset policy.

 

I think option 1 is also acceptable, because Kafka Streams will fail fast with 
a ClassCastException before the consumer commits any offsets.

 

Option 2 would require more substantial changes in Kafka Streams, because 
TimestampExtractor and key/value SerDes do not expose a straightforward way to 
check semantic equivalence.

  was:
Kafka Streams throws a ClassCastException when using different Consumed 
instances for the same topic.

For example:
{code:java}
builder.stream("A", Consumed.with(Serdes.String(), Serdes.String()))
       .peek((k, v) -> System.out.println(k));
builder.stream("A", Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
       .peek((k, v) -> System.out.println(k));
// 코드 자리 표시자
{code}
 

 

 

Since both use the same topic name and the same ConsumedInternal configuration 
for auto offset reset, these two StreamSourceNodes are merged during topology 
building.

 

As a result, the Topology is built successfully.

 

However, when the StreamThread starts, the consumer begins to receive records 
from the broker, and the records flow through the pipeline, a 
ClassCastException is thrown at runtime.

 

In my opinion, we have two options:
 # Document this behavior.
 # When merging source nodes, the builder should consider the full 
ConsumedInternal configuration (for example, key/value SerDes and timestamp 
extractor), instead of only the auto offset reset policy.

 

I think option 1 is also acceptable, because Kafka Streams will fail fast with 
a ClassCastException before the consumer commits any offsets.

 

Option 2 would require more substantial changes in Kafka Streams, because 
TimestampExtractor and key/value SerDes do not expose a straightforward way to 
check semantic equivalence.


> Kafka Streams throws ClassCastException with different Consumed instances.
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-19923
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19923
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: sanghyeok An
>            Assignee: sanghyeok An
>            Priority: Minor
>
> Kafka Streams throws a ClassCastException when using different Consumed 
> instances for the same topic.
> For example:
> {code:java}
> builder.stream("A", Consumed.with(Serdes.String(), Serdes.String()))
>        .peek((k, v) -> System.out.println(k));
> builder.stream("A", Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
>        .peek((k, v) -> System.out.println(k));
> {code}
>  
> Since both use the same topic name and the same ConsumedInternal 
> configuration for auto offset reset, these two StreamSourceNodes are merged 
> during topology building.
>  
> As a result, the Topology is built successfully.
>  
> However, when the StreamThread starts, the consumer begins to receive records 
> from the broker, and the records flow through the pipeline, a 
> ClassCastException is thrown at runtime.
>  
> In my opinion, we have two options:
>  # Document this behavior.
>  # When merging source nodes, the builder should consider the full 
> ConsumedInternal configuration (for example, key/value SerDes and timestamp 
> extractor), instead of only the auto offset reset policy.
>  
> I think option 1 is also acceptable, because Kafka Streams will fail fast 
> with a ClassCastException before the consumer commits any offsets.
>  
> Option 2 would require more substantial changes in Kafka Streams, because 
> TimestampExtractor and key/value SerDes do not expose a straightforward way 
> to check semantic equivalence.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to