Hi,
In fact an import was missing. It works now:

import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._


On Thu, 9 Apr 2020 at 11:49, Nicolae Marasoiu <
nicolae.maras...@ovoenergy.com> wrote:

> Hi,
> We're using kafka streams to map a topic into another (copy data
> exchanging formats). Both are having AVRO values.
>
> I will start with the compile error, and then progress with the code
> samples:
>
> could not find implicit value for parameter consumed: 
> org.apache.kafka.streams.scala.kstream.Consumed[String, 
> custom.UserEvent][error]       .stream[String, UserEvent]("schma.avsc")
>
> We are having these dependencies:
>
> libraryDependencies += "org.apache.kafka"       %% "kafka-streams-scala"      
>    % kafkaVersion
> libraryDependencies += "io.confluent"           % "kafka-streams-avro-serde"  
>    % confluentVersion
> libraryDependencies += "io.confluent"           % 
> "kafka-schema-registry-client" % confluentVersion
> libraryDependencies += "ch.qos.logback"         % "logback-classic"           
>    % "1.2.3"
> libraryDependencies += "com.typesafe"           % "config"                    
>    % "1.4.0"
> libraryDependencies += "com.sksamuel.avro4s"    %% "avro4s-core"              
>    % "3.0.4"
>
> We use a code generator to generate Scala case classes out of AVRO schema
> files. One such generated case class has, as one of its fields, an Either
> value. In AVRO schema this is expressed with type=[t1,t2] so the generation
> seems to be decent, that is a sum type: can be type t1 or type t2.
>
> The question becomes what is missing on the deserialization path from
> topic to case class (binary -> Avro Map -> case class).
>
> The first thought was kafka-streams-avro-serde, but it may be that this
> library only ensures the Serde[GenericRecord] for AVRO Map, not for case
> classes. So one of the other dependencies is helping with the AVRO
> GenericRecord to case classes mapping and back. We also have some hand
> written code that generates case classes out of schemas, that seems to work
> directly with spray json.
>
> I'm thinking that in the (binary <-> Avro GenericRecord <-> case class
> instance) transformations, there is a gap, and it could be the fact that in
> the case class there is an Either field?
>
> I'm taking a path now to try to create a Serde[UserEvent] instance. So
> that in my understanding would involve converting between UserEvent and
> AVRO GenericRecord, similar to Map, and then between AVRO Record and binary
> - which is likely covered by the kafka-streams-avro-serde dependency, like
> there should be a Serde[GenericRecord] or similar.
>
> Imports wise, we have this to import implicits:
>
> import org.apache.kafka.common.serialization.Serdeimport 
> org.apache.kafka.streams.Topologyimport 
> org.apache.kafka.streams.scala.ImplicitConversions._import 
> org.apache.kafka.streams.scala.Serdesimport 
> org.apache.kafka.streams.scala.Serdes._import 
> org.apache.kafka.streams.scala.kstream.Consumed
>
> For more (up to date) details:
>
> https://stackoverflow.com/questions/61103804/why-do-i-get-this-compilation-error-could-not-find-implicit-value-for-kstream
>
>
> https://www.quora.com/unanswered/Kafka-streams-avro-serde-Is-Scala-Either-type-supported
>
> Thank you very much,
>
> --
> Thank you,
> Nicolae Marasoiu
> Scala Engineer
> Orion, OVO Group
>
>

-- 
Thank you,
Nicolae Marasoiu
Scala Engineer
Orion, OVO Group

Reply via email to