Hi, In fact an import was missing. It works now: import org.apache.kafka.streams.Topology import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.Serdes._
On Thu, 9 Apr 2020 at 11:49, Nicolae Marasoiu < nicolae.maras...@ovoenergy.com> wrote: > Hi, > We're using kafka streams to map a topic into another (copy data > exchanging formats). Both are having AVRO values. > > I will start with the compile error, and then progress with the code > samples: > > could not find implicit value for parameter consumed: > org.apache.kafka.streams.scala.kstream.Consumed[String, > custom.UserEvent][error] .stream[String, UserEvent]("schma.avsc") > > We are having these dependencies: > > libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" > % kafkaVersion > libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" > % confluentVersion > libraryDependencies += "io.confluent" % > "kafka-schema-registry-client" % confluentVersion > libraryDependencies += "ch.qos.logback" % "logback-classic" > % "1.2.3" > libraryDependencies += "com.typesafe" % "config" > % "1.4.0" > libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" > % "3.0.4" > > We use a code generator to generate Scala case classes out of AVRO schema > files. One such generated case class has, as one of its fields, an Either > value. In AVRO schema this is expressed with type=[t1,t2] so the generation > seems to be decent, that is a sum type: can be type t1 or type t2. > > The question becomes what is missing on the deserialization path from > topic to case class (binary -> Avro Map -> case class). > > The first thought was kafka-streams-avro-serde, but it may be that this > library only ensures the Serde[GenericRecord] for AVRO Map, not for case > classes. So one of the other dependencies is helping with the AVRO > GenericRecord to case classes mapping and back. We also have some hand > written code that generates case classes out of schemas, that seems to work > directly with spray json. > > I'm thinking that in the (binary <-> Avro GenericRecord <-> case class > instance) transformations, there is a gap, and it could be the fact that in > the case class there is an Either field? > > I'm taking a path now to try to create a Serde[UserEvent] instance. So > that in my understanding would involve converting between UserEvent and > AVRO GenericRecord, similar to Map, and then between AVRO Record and binary > - which is likely covered by the kafka-streams-avro-serde dependency, like > there should be a Serde[GenericRecord] or similar. > > Imports wise, we have this to import implicits: > > import org.apache.kafka.common.serialization.Serdeimport > org.apache.kafka.streams.Topologyimport > org.apache.kafka.streams.scala.ImplicitConversions._import > org.apache.kafka.streams.scala.Serdesimport > org.apache.kafka.streams.scala.Serdes._import > org.apache.kafka.streams.scala.kstream.Consumed > > For more (up to date) details: > > https://stackoverflow.com/questions/61103804/why-do-i-get-this-compilation-error-could-not-find-implicit-value-for-kstream > > > https://www.quora.com/unanswered/Kafka-streams-avro-serde-Is-Scala-Either-type-supported > > Thank you very much, > > -- > Thank you, > Nicolae Marasoiu > Scala Engineer > Orion, OVO Group > > -- Thank you, Nicolae Marasoiu Scala Engineer Orion, OVO Group