This older email was just brought to my attention.
I did look into it, and it is indeed a bug:
https://issues.apache.org/jira/browse/KAFKA-18478
The workaround should be to set the StreamsConfig value serde to the
stream-side value serde to make it work.
-Matthias
On 4/8/24 10:30 PM, Mickey Donaghy wrote:
Hi,
I'm trying to do a join between a stream and a KTable with a grace period,
which requires a versioned table. However even if I specify a serializer
everywhere, it seems this doesn't quite make it through to
the RocksDBTimeOrderedKeyValueBuffer and I get an error when
building/starting the topology.
I've put a minimal reproduction below (against kafka-streams 3.7.0, please
excuse the Scala). This fails with:
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-LEFTJOIN-0000000003
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:110)
[...]
Caused by: org.apache.kafka.common.config.ConfigException: Please specify a
key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
at
org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
at
org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
at
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer.setSerdesIfNull(RocksDBTimeOrderedKeyValueBuffer.java:159)
at
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:86)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:102)
Am I missing some other place where the serde should be specified, or is
this a bug? (In the real application I need to do this multiple times for
different types, so setting the default serde class would be difficult)
Many thanks,
Mickey
Code:
import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.state.Stores
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig,
TopologyTestDriver}
import java.time.Duration
import java.util.Properties
val builder = new StreamsBuilder
val table = builder.table(
"table",
Materialized
.as[String,
String](Stores.persistentVersionedKeyValueStore("table-store",
Duration.ofMinutes(20)))
.withKeySerde(Serdes.stringSerde)
.withValueSerde(Serdes.stringSerde)
)
val stream = builder.stream("stream", Consumed.`with`(Serdes.stringSerde,
Serdes.stringSerde))
stream
.leftJoin(
table,
(_ + _): ValueJoiner[String, String, String],
Joined
.`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.stringSerde)
.withGracePeriod(Duration.ofMinutes(10))
)
.to("output", Produced.`with`(Serdes.stringSerde, Serdes.stringSerde))
val topology = builder.build()
val props = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
val testDriver = new TopologyTestDriver(topology, props)