I see a mismatch between scalaBuildVersion and scalaVersion could this
be the issue?
Regards,
Timo
On 19.12.19 14:33, Alexandru Vasiu wrote:
This is a part of my Gradle config:
ext {
scalaVersion = '2.12'
flinkVersion = '1.9.1'
scalaBuildVersion = "${scalaVersion}.10"
scalaMockVersion = '4.4.0'
circeGenericVersion = '0.12.3'
circeExtrasVersion = '0.12.2'
pardiseVersion = '2.1.1'
slf4jVersion = '1.7.7'
log4jVersion = '1.2.17'
sourceDir = 'src/main/scala'
testDir = 'src/test/scala'
}
repositories {
mavenCentral()
//maven { url
"https://repository.apache.org/content/repositories/snapshots/" }
}
configurations {
scalaCompilerPlugin
}
dependencies {
implementation "org.scala-lang:scala-library:${scalaBuildVersion}"
// --------------------------------------------------------------
// Compile-time dependencies that should NOT be part of the
// shadow jar and are provided in the lib folder of Flink
// --------------------------------------------------------------
//compile "org.apache.flink:flink-java:${flinkVersion}"
implementation
"org.apache.flink:flink-streaming-scala_${scalaVersion}:${flinkVersion}"
implementation
"org.apache.flink:flink-connector-kafka_${scalaVersion}:${flinkVersion}"
// --------------------------------------------------------------
// Dependencies that should be part of the shadow jar, e.g.
// connectors. These must be in the flinkShadowJar configuration!
// --------------------------------------------------------------
//flinkShadowJar
"org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
// https://mvnrepository.com/artifact/io.circe/
implementation
"io.circe:circe-generic_${scalaVersion}:${circeGenericVersion}"
implementation
"io.circe:circe-generic-extras_${scalaVersion}:${circeExtrasVersion}"
implementation
"io.circe:circe-parser_${scalaVersion}:${circeGenericVersion}"
// https://mvnrepository.com/artifact/org.scalamacros/paradise
scalaCompilerPlugin
"org.scalamacros:paradise_${scalaBuildVersion}:${pardiseVersion}"
implementation "log4j:log4j:${log4jVersion}"
implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
// Add test dependencies here.
//testImplementation "junit:junit:4.12"
testImplementation "org.scalatest:scalatest_${scalaVersion}:3.1.0"
// https://mvnrepository.com/artifact/org.scalamock/scalamock
testImplementation
"org.scalamock:scalamock_${scalaVersion}:${scalaMockVersion}"
}
So all are with the same scala version. I cannot share the code, but the
main app looks like:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(KAFKA_STREAM) // this will get us a stream with our
object model which is like this: case class A(a:Map[String,
other_case_class_obj], b: List[other_case_class_obj], c: String)
.flatMap(CustomFlatMap())
.print
Thank you,
Alex
On Thu, Dec 19, 2019 at 3:14 PM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
That's sounds like a classloading or most likely dependency issue.
Are all dependencies including Flink use the same Scala version? Could
you maybe share reproducible some code with us?
Regards,
Timo
On 19.12.19 13:53, Alexandru Vasiu wrote:
> I'm sorry for my last message, it might be incomplete.
>
> So I used case classed for my objects, but it doesn't work.
>
> Riching this error: "Exception in thread "main"
>
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
> java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9" when
I'm
> trying to apply the map/flatMap function over the stream (which
is from
> a Kafka consumer).
>
>
> Alex
>
> On Thu, Dec 19, 2019 at 2:24 PM Alexandru Vasiu
> <alexandru.ava...@gmail.com <mailto:alexandru.ava...@gmail.com>
<mailto:alexandru.ava...@gmail.com
<mailto:alexandru.ava...@gmail.com>>> wrote:
>
> I used `case class` for example case class A(a: Map[String,
String])
> so it should work
>
> Alex
>
> On Thu, Dec 19, 2019 at 2:18 PM Timo Walther
<twal...@apache.org <mailto:twal...@apache.org>
> <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
>
> Hi Alex,
>
> the problem is that `case class` classes are analyzed by
Scala
> specific
> code whereas `class` classes are analyzed with Java specific
> code. So I
> would recommend to use a case class to make sure you stay
in the
> "Scala
> world" otherwise the fallback is the Java-based
TypeExtractor.
>
> For your custom Map, you can simply ignore this error
message.
> It will
> fallback to the Java-based TypeExtractor and treat it as a
> generic type
> because it is not a POJO.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 19.12.19 12:41, Alexandru Vasiu wrote:
> > Hi,
> >
> > I use flink-scala version 1.9.1 and scala 2.12.10, and I
> defined a data
> > type which is a bit more complex: it has a list in it
and even a
> > dictionary. When I try to use a custom map I got this
error:
> >
> > INFO org.apache.flink.api.java.typeutils.TypeExtractor -
> class A does
> > not contain a setter for field fields
> > INFO org.apache.flink.api.java.typeutils.TypeExtractor -
> class A cannot
> > be used as a POJO type because not all fields are
valid POJO
> fields, and
> > must be processed as GenericType. Please read the Flink
> documentation on
> > "Data Types & Serialization" for details of the effect on
> performance.
> >
> > Is there a fix for this? Or a workaround?
> >
> > Thank you,
> > Alex
>