Hi, We're using kafka streams to map a topic into another (copy data exchanging formats). Both are having AVRO values.
I will start with the compile error, and then progress with the code samples: could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent][error] .stream[String, UserEvent]("schma.avsc") We are having these dependencies: libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3" libraryDependencies += "com.typesafe" % "config" % "1.4.0" libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4" We use a code generator to generate Scala case classes out of AVRO schema files. One such generated case class has, as one of its fields, an Either value. In AVRO schema this is expressed with type=[t1,t2] so the generation seems to be decent, that is a sum type: can be type t1 or type t2. The question becomes what is missing on the deserialization path from topic to case class (binary -> Avro Map -> case class). The first thought was kafka-streams-avro-serde, but it may be that this library only ensures the Serde[GenericRecord] for AVRO Map, not for case classes. So one of the other dependencies is helping with the AVRO GenericRecord to case classes mapping and back. We also have some hand written code that generates case classes out of schemas, that seems to work directly with spray json. I'm thinking that in the (binary <-> Avro GenericRecord <-> case class instance) transformations, there is a gap, and it could be the fact that in the case class there is an Either field? I'm taking a path now to try to create a Serde[UserEvent] instance. So that in my understanding would involve converting between UserEvent and AVRO GenericRecord, similar to Map, and then between AVRO Record and binary - which is likely covered by the kafka-streams-avro-serde dependency, like there should be a Serde[GenericRecord] or similar. Imports wise, we have this to import implicits: import org.apache.kafka.common.serialization.Serdeimport org.apache.kafka.streams.Topologyimport org.apache.kafka.streams.scala.ImplicitConversions._import org.apache.kafka.streams.scala.Serdesimport org.apache.kafka.streams.scala.Serdes._import org.apache.kafka.streams.scala.kstream.Consumed For more (up to date) details: https://stackoverflow.com/questions/61103804/why-do-i-get-this-compilation-error-could-not-find-implicit-value-for-kstream https://www.quora.com/unanswered/Kafka-streams-avro-serde-Is-Scala-Either-type-supported Thank you very much, -- Thank you, Nicolae Marasoiu Scala Engineer Orion, OVO Group