This is a part of my Gradle config:

ext {
    scalaVersion = '2.12'
    flinkVersion = '1.9.1'
    scalaBuildVersion = "${scalaVersion}.10"
    scalaMockVersion = '4.4.0'
    circeGenericVersion = '0.12.3'
    circeExtrasVersion = '0.12.2'
    pardiseVersion = '2.1.1'
    slf4jVersion = '1.7.7'
    log4jVersion = '1.2.17'
    sourceDir = 'src/main/scala'
    testDir = 'src/test/scala'
}
repositories {
    mavenCentral()
    //maven { url "
https://repository.apache.org/content/repositories/snapshots/"; }
}
configurations {
    scalaCompilerPlugin
}
dependencies {
    implementation "org.scala-lang:scala-library:${scalaBuildVersion}"
    // --------------------------------------------------------------
    // Compile-time dependencies that should NOT be part of the
    // shadow jar and are provided in the lib folder of Flink
    // --------------------------------------------------------------
    //compile "org.apache.flink:flink-java:${flinkVersion}"
    implementation
"org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
    implementation
"org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
    // --------------------------------------------------------------
    // Dependencies that should be part of the shadow jar, e.g.
    // connectors. These must be in the flinkShadowJar configuration!
    // --------------------------------------------------------------
    //flinkShadowJar
"org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
    // https://mvnrepository.com/artifact/io.circe/
    implementation
"io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
    implementation
"io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
    implementation
"io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
    // https://mvnrepository.com/artifact/org.scalamacros/paradise
    scalaCompilerPlugin
"org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
    implementation "log4j:log4j:${log4jVersion}"
    implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
    // Add test dependencies here.
    //testImplementation "junit:junit:4.12"
    testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0"
    // https://mvnrepository.com/artifact/org.scalamock/scalamock
    testImplementation
"org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
}

So all are with the same scala version. I cannot share the code, but the
main app looks like:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream  = env
        .addSource(KAFKA_STREAM) // this will get us a stream with our
object model which is like this: case class A(a:Map[String,
other_case_class_obj], b: List[other_case_class_obj], c: String)
.flatMap(CustomFlatMap())
.print

Thank you,
Alex

On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <twal...@apache.org> wrote:

> That's sounds like a classloading or most likely dependency issue.
>
> Are all dependencies including Flink use the same Scala version? Could
> you maybe share reproducible some code with us?
>
> Regards,
> Timo
>
>
> On 19.12.19 13:53, Alexandru Vasiu wrote:
> > I'm sorry for my last message, it might be incomplete.
> >
> > So I used case classed for my objects, but it doesn't work.
> >
> > Riching this error: "Exception in thread "main"
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
>
> > java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when I'm
> > trying to apply the map/flatMap function over the stream (which is from
> > a Kafka consumer).
> >
> >
> > Alex
> >
> > On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
> > <alexandru.ava...@gmail.com <mailto:alexandru.ava...@gmail.com>> wrote:
> >
> >     I used `case class` for example case class A(a: Map[String, String])
> >     so it should work
> >
> >     Alex
> >
> >     On Thu, Dec 19, 2019 at 2:18 PM Timo Walther <twal...@apache.org
> >     <mailto:twal...@apache.org>> wrote:
> >
> >         Hi Alex,
> >
> >         the problem is that `case class` classes are analyzed by Scala
> >         specific
> >         code whereas `class` classes are analyzed with Java specific
> >         code. So I
> >         would recommend to use a case class to make sure you stay in the
> >         "Scala
> >         world" otherwise the fallback is the Java-based TypeExtractor.
> >
> >         For your custom Map, you can simply ignore this error message.
> >         It will
> >         fallback to the Java-based TypeExtractor and treat it as a
> >         generic type
> >         because it is not a POJO.
> >
> >         I hope this helps.
> >
> >         Regards,
> >         Timo
> >
> >
> >         On 19.12.19 12:41, Alexandru Vasiu wrote:
> >          > Hi,
> >          >
> >          > I use flink-scala version 1.9.1 and scala 2.12.10, and I
> >         defined a data
> >          > type which is a bit more complex: it has a list in it and
> even a
> >          > dictionary. When I try to use a custom map I got this error:
> >          >
> >          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
> >         class A  does
> >          > not contain a setter for field fields
> >          > INFO  org.apache.flink.api.java.typeutils.TypeExtractor -
> >         class A cannot
> >          > be used as a POJO type because not all fields are valid POJO
> >         fields, and
> >          > must be processed as GenericType. Please read the Flink
> >         documentation on
> >          > "Data Types & Serialization" for details of the effect on
> >         performance.
> >          >
> >          > Is there a fix for this? Or a workaround?
> >          >
> >          > Thank you,
> >          > Alex
> >
>
>

Reply via email to