This older email was just brought to my attention.

I did look into it, and it is indeed a bug: https://issues.apache.org/jira/browse/KAFKA-18478

The workaround should be to set the StreamsConfig value serde to the stream-side value serde to make it work.


-Matthias


On 4/8/24 10:30 PM, Mickey Donaghy wrote:
Hi,

I'm trying to do a join between a stream and a KTable with a grace period,
which requires a versioned table. However even if I specify a serializer
everywhere, it seems this doesn't quite make it through to
the RocksDBTimeOrderedKeyValueBuffer and I get an error when
building/starting the topology.

I've put a minimal reproduction below (against kafka-streams 3.7.0, please
excuse the Scala). This fails with:
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-LEFTJOIN-0000000003
  at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:110)
[...]
Caused by: org.apache.kafka.common.config.ConfigException: Please specify a
key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
at
org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
at
org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
at
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer.setSerdesIfNull(RocksDBTimeOrderedKeyValueBuffer.java:159)
at
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:86)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:102)

Am I missing some other place where the serde should be specified, or is
this a bug? (In the real application I need to do this multiple times for
different types, so setting the default serde class would be difficult)

Many thanks,
Mickey

Code:

import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.state.Stores
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig,
TopologyTestDriver}

import java.time.Duration
import java.util.Properties

val builder = new StreamsBuilder
val table   = builder.table(
   "table",
   Materialized
     .as[String,
String](Stores.persistentVersionedKeyValueStore("table-store",
Duration.ofMinutes(20)))
     .withKeySerde(Serdes.stringSerde)
     .withValueSerde(Serdes.stringSerde)
)
val stream  = builder.stream("stream", Consumed.`with`(Serdes.stringSerde,
Serdes.stringSerde))
stream
   .leftJoin(
     table,
     (_ + _): ValueJoiner[String, String, String],
     Joined
       .`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.stringSerde)
       .withGracePeriod(Duration.ofMinutes(10))
   )
   .to("output", Produced.`with`(Serdes.stringSerde, Serdes.stringSerde))

val topology   = builder.build()
val props      = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
val testDriver = new TopologyTestDriver(topology, props)


Reply via email to