This is a part of my Gradle config: ext { scalaVersion = '2.12' flinkVersion = '1.9.1' scalaBuildVersion = "${scalaVersion}.10" scalaMockVersion = '4.4.0' circeGenericVersion = '0.12.3' circeExtrasVersion = '0.12.2' pardiseVersion = '2.1.1' slf4jVersion = '1.7.7' log4jVersion = '1.2.17' sourceDir = 'src/main/scala' testDir = 'src/test/scala' } repositories { mavenCentral() //maven { url " https://repository.apache.org/content/repositories/snapshots/" } } configurations { scalaCompilerPlugin } dependencies { implementation "org.scala-lang:scala-library:${scalaBuildVersion}" // -------------------------------------------------------------- // Compile-time dependencies that should NOT be part of the // shadow jar and are provided in the lib folder of Flink // -------------------------------------------------------------- //compile "org.apache.flink:flink-java:${flinkVersion}" implementation "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}" implementation "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}" // -------------------------------------------------------------- // Dependencies that should be part of the shadow jar, e.g. // connectors. These must be in the flinkShadowJar configuration! // -------------------------------------------------------------- //flinkShadowJar "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}" // https://mvnrepository.com/artifact/io.circe/ implementation "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}" implementation "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}" implementation "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}" // https://mvnrepository.com/artifact/org.scalamacros/paradise scalaCompilerPlugin "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}" implementation "log4j:log4j:${log4jVersion}" implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}" // Add test dependencies here. //testImplementation "junit:junit:4.12" testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0" // https://mvnrepository.com/artifact/org.scalamock/scalamock testImplementation "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}" }
So all are with the same scala version. I cannot share the code, but the main app looks like: val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env .addSource(KAFKA_STREAM) // this will get us a stream with our object model which is like this: case class A(a:Map[String, other_case_class_obj], b: List[other_case_class_obj], c: String) .flatMap(CustomFlatMap()) .print Thank you, Alex On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <twal...@apache.org> wrote: > That's sounds like a classloading or most likely dependency issue. > > Are all dependencies including Flink use the same Scala version? Could > you maybe share reproducible some code with us? > > Regards, > Timo > > > On 19.12.19 13:53, Alexandru Vasiu wrote: > > I'm sorry for my last message, it might be incomplete. > > > > So I used case classed for my objects, but it doesn't work. > > > > Riching this error: "Exception in thread "main" > > > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError: > > > java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when I'm > > trying to apply the map/flatMap function over the stream (which is from > > a Kafka consumer). > > > > > > Alex > > > > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu > > <alexandru.ava...@gmail.com <mailto:alexandru.ava...@gmail.com>> wrote: > > > > I used `case class` for example case class A(a: Map[String, String]) > > so it should work > > > > Alex > > > > On Thu, Dec 19, 2019 at 2:18 PM Timo Walther <twal...@apache.org > > <mailto:twal...@apache.org>> wrote: > > > > Hi Alex, > > > > the problem is that `case class` classes are analyzed by Scala > > specific > > code whereas `class` classes are analyzed with Java specific > > code. So I > > would recommend to use a case class to make sure you stay in the > > "Scala > > world" otherwise the fallback is the Java-based TypeExtractor. > > > > For your custom Map, you can simply ignore this error message. > > It will > > fallback to the Java-based TypeExtractor and treat it as a > > generic type > > because it is not a POJO. > > > > I hope this helps. > > > > Regards, > > Timo > > > > > > On 19.12.19 12:41, Alexandru Vasiu wrote: > > > Hi, > > > > > > I use flink-scala version 1.9.1 and scala 2.12.10, and I > > defined a data > > > type which is a bit more complex: it has a list in it and > even a > > > dictionary. When I try to use a custom map I got this error: > > > > > > INFO org.apache.flink.api.java.typeutils.TypeExtractor - > > class A does > > > not contain a setter for field fields > > > INFO org.apache.flink.api.java.typeutils.TypeExtractor - > > class A cannot > > > be used as a POJO type because not all fields are valid POJO > > fields, and > > > must be processed as GenericType. Please read the Flink > > documentation on > > > "Data Types & Serialization" for details of the effect on > > performance. > > > > > > Is there a fix for this? Or a workaround? > > > > > > Thank you, > > > Alex > > > >