Hi,
We're using kafka streams to map a topic into another (copy data exchanging
formats). Both are having AVRO values.

I will start with the compile error, and then progress with the code
samples:

could not find implicit value for parameter consumed:
org.apache.kafka.streams.scala.kstream.Consumed[String,
custom.UserEvent][error]       .stream[String,
UserEvent]("schma.avsc")

We are having these dependencies:

libraryDependencies += "org.apache.kafka"       %%
"kafka-streams-scala"         % kafkaVersion
libraryDependencies += "io.confluent"           %
"kafka-streams-avro-serde"     % confluentVersion
libraryDependencies += "io.confluent"           %
"kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback"         % "logback-classic"
          % "1.2.3"
libraryDependencies += "com.typesafe"           % "config"
          % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s"    %% "avro4s-core"
          % "3.0.4"

We use a code generator to generate Scala case classes out of AVRO schema
files. One such generated case class has, as one of its fields, an Either
value. In AVRO schema this is expressed with type=[t1,t2] so the generation
seems to be decent, that is a sum type: can be type t1 or type t2.

The question becomes what is missing on the deserialization path from topic
to case class (binary -> Avro Map -> case class).

The first thought was kafka-streams-avro-serde, but it may be that this
library only ensures the Serde[GenericRecord] for AVRO Map, not for case
classes. So one of the other dependencies is helping with the AVRO
GenericRecord to case classes mapping and back. We also have some hand
written code that generates case classes out of schemas, that seems to work
directly with spray json.

I'm thinking that in the (binary <-> Avro GenericRecord <-> case class
instance) transformations, there is a gap, and it could be the fact that in
the case class there is an Either field?

I'm taking a path now to try to create a Serde[UserEvent] instance. So that
in my understanding would involve converting between UserEvent and AVRO
GenericRecord, similar to Map, and then between AVRO Record and binary -
which is likely covered by the kafka-streams-avro-serde dependency, like
there should be a Serde[GenericRecord] or similar.

Imports wise, we have this to import implicits:

import org.apache.kafka.common.serialization.Serdeimport
org.apache.kafka.streams.Topologyimport
org.apache.kafka.streams.scala.ImplicitConversions._import
org.apache.kafka.streams.scala.Serdesimport
org.apache.kafka.streams.scala.Serdes._import
org.apache.kafka.streams.scala.kstream.Consumed

For more (up to date) details:
https://stackoverflow.com/questions/61103804/why-do-i-get-this-compilation-error-could-not-find-implicit-value-for-kstream

https://www.quora.com/unanswered/Kafka-streams-avro-serde-Is-Scala-Either-type-supported

Thank you very much,

-- 
Thank you,
Nicolae Marasoiu
Scala Engineer
Orion, OVO Group

Reply via email to