sanghyeok An created KAFKA-19923:
------------------------------------
Summary: Kafka Streams throws ClassCastException with different
Consumed instances.
Key: KAFKA-19923
URL: https://issues.apache.org/jira/browse/KAFKA-19923
Project: Kafka
Issue Type: Bug
Reporter: sanghyeok An
Assignee: sanghyeok An
Kafka Streams throws a ClassCastException when using different Consumed
instances for the same topic.
For example:
builder.stream("A", Consumed.with(Serdes.String(), Serdes.String()))
.peek((k, v) -> System.out.println(k));
builder.stream("A", Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
.peek((k, v) -> System.out.println(k));
Since both use the same topic name and the same ConsumedInternal configuration
for auto offset reset, these two StreamSourceNodes are merged during topology
building.
As a result, the Topology is built successfully.
However, when the StreamThread starts, the consumer begins to receive records
from the broker, and the records flow through the pipeline, a
ClassCastException is thrown at runtime.
In my opinion, we have two options:
# Document this behavior.
# When merging source nodes, the builder should consider the full
ConsumedInternal configuration (for example, key/value SerDes and timestamp
extractor), instead of only the auto offset reset policy.
I think option 1 is also acceptable, because Kafka Streams will fail fast with
a ClassCastException before the consumer commits any offsets.
Option 2 would require more substantial changes in Kafka Streams, because
TimestampExtractor and key/value SerDes do not expose a straightforward way to
check semantic equivalence.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)