I see a mismatch between scalaBuildVersion and scalaVersion could this be the issue?

Regards,
Timo


On 19.12.19 14:33, Alexandru Vasiu wrote:
This is a part of my Gradle config:

ext {
     scalaVersion = '2.12'
     flinkVersion = '1.9.1'
     scalaBuildVersion = "${scalaVersion}.10"
     scalaMockVersion = '4.4.0'
     circeGenericVersion = '0.12.3'
     circeExtrasVersion = '0.12.2'
     pardiseVersion = '2.1.1'
     slf4jVersion = '1.7.7'
     log4jVersion = '1.2.17'
     sourceDir = 'src/main/scala'
     testDir = 'src/test/scala'
}
repositories {
     mavenCentral()
    //maven { url "https://repository.apache.org/content/repositories/snapshots/"; }
}
configurations {
     scalaCompilerPlugin
}
dependencies {
     implementation "org.scala-lang:scala-library:${scalaBuildVersion}"
     // --------------------------------------------------------------
     // Compile-time dependencies that should NOT be part of the
     // shadow jar and are provided in the lib folder of Flink
     // --------------------------------------------------------------
     //compile "org.apache.flink:flink-java:${flinkVersion}"
    implementation "org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"     implementation "org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
     // --------------------------------------------------------------
     // Dependencies that should be part of the shadow jar, e.g.
     // connectors. These must be in the flinkShadowJar configuration!
     // --------------------------------------------------------------
    //flinkShadowJar "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
     // https://mvnrepository.com/artifact/io.circe/
    implementation "io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"     implementation "io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"     implementation "io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
     // https://mvnrepository.com/artifact/org.scalamacros/paradise
    scalaCompilerPlugin "org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
     implementation "log4j:log4j:${log4jVersion}"
     implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
     // Add test dependencies here.
     //testImplementation "junit:junit:4.12"
     testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0"
     // https://mvnrepository.com/artifact/org.scalamock/scalamock
    testImplementation "org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
}

So all are with the same scala version. I cannot share the code, but the main app looks like:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream  = env
        .addSource(KAFKA_STREAM) // this will get us a stream with our object model which is like this: case class A(a:Map[String, other_case_class_obj], b: List[other_case_class_obj], c: String)
.flatMap(CustomFlatMap())
.print

Thank you,
Alex

On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    That's sounds like a classloading or most likely dependency issue.

    Are all dependencies including Flink use the same Scala version? Could
    you maybe share reproducible some code with us?

    Regards,
    Timo


    On 19.12.19 13:53, Alexandru Vasiu wrote:
     > I'm sorry for my last message, it might be incomplete.
     >
     > So I used case classed for my objects, but it doesn't work.
     >
     > Riching this error: "Exception in thread "main"
     >
    
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:

     > java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when
    I'm
     > trying to apply the map/flatMap function over the stream (which
    is from
     > a Kafka consumer).
     >
     >
     > Alex
     >
     > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
     > <alexandru.ava...@gmail.com <mailto:alexandru.ava...@gmail.com>
    <mailto:alexandru.ava...@gmail.com
    <mailto:alexandru.ava...@gmail.com>>> wrote:
     >
     >     I used `case class` for example case class A(a: Map[String,
    String])
     >     so it should work
     >
     >     Alex
     >
     >     On Thu, Dec 19, 2019 at 2:18 PM Timo Walther
    <twal...@apache.org <mailto:twal...@apache.org>
     >     <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
     >
     >         Hi Alex,
     >
     >         the problem is that `case class` classes are analyzed by
    Scala
     >         specific
     >         code whereas `class` classes are analyzed with Java specific
     >         code. So I
     >         would recommend to use a case class to make sure you stay
    in the
     >         "Scala
     >         world" otherwise the fallback is the Java-based
    TypeExtractor.
     >
     >         For your custom Map, you can simply ignore this error
    message.
     >         It will
     >         fallback to the Java-based TypeExtractor and treat it as a
     >         generic type
     >         because it is not a POJO.
     >
     >         I hope this helps.
     >
     >         Regards,
     >         Timo
     >
     >
     >         On 19.12.19 12:41, Alexandru Vasiu wrote:
     >          > Hi,
     >          >
     >          > I use flink-scala version 1.9.1 and scala 2.12.10, and I
     >         defined a data
     >          > type which is a bit more complex: it has a list in it
    and even a
     >          > dictionary. When I try to use a custom map I got this
    error:
     >          >
     >          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
     >         class A  does
     >          > not contain a setter for field fields
     >          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
     >         class A cannot
     >          > be used as a POJO type because not all fields are
    valid POJO
     >         fields, and
     >          > must be processed as GenericType. Please read the Flink
     >         documentation on
     >          > "Data Types & Serialization" for details of the effect on
     >         performance.
     >          >
     >          > Is there a fix for this? Or a workaround?
     >          >
     >          > Thank you,
     >          > Alex
     >


Reply via email to