Amit Gurdasani created FLINK-31243:
--------------------------------------
Summary: KryoSerializer when loaded from user code classloader
cannot load Scala extensions from app classloader
Key: FLINK-31243
URL: https://issues.apache.org/jira/browse/FLINK-31243
Project: Flink
Issue Type: Bug
Components: API / Core
Affects Versions: 1.16.1, 1.15.3
Environment: OS: Amazon Linux 2
JVM: Amazon Corretto 11
Reporter: Amit Gurdasani
The
[KryoSerializer|https://github.com/apache/flink/blob/9bf0d9f2c2bcb2bc0c8ab6228bb0a9e76e10ad70/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java]
uses Class.forName() to dynamically load Scala extensions by name. This seems
to imply that it references only its own classloader to find these extensions.
By default, as the application classloader is favored for KryoSerializer, this
implies that unless the flink-scala artifact is available to the application
classloader, the Scala extensions cannot be loaded. Scala applications that
include flink-scala are therefore unable to benefit from the Scala extensions
to the Kryo Serializer.
Exception looks like this:
{noformat}
java.lang.ClassNotFoundException:
org.apache.flink.runtime.types.FlinkScalaKryoInstantiator
at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:315)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:486)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:521)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryo(KryoSerializer.java:720)
at software.amazon.kinesisanalytics.kryotest.Main.main(Main.java:16)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at
org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:239)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829){noformat}
Example code resulting in this issue:
Main class for Flink application:
{noformat}
package software.amazon.kinesisanalytics.kryotest;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.Serializable;
public class Main {
private static class Something implements Serializable {
public static long serialVersionUID = 289034745902347830L;
}
public static void main(String... args) {
StreamExecutionEnvironment executionEnvironment = new
StreamExecutionEnvironment();
KryoSerializer<Something> serializer = new
KryoSerializer<>(Something.class, executionEnvironment.getConfig());
serializer.getKryo();
}
}
{noformat}
build.gradle for Flink application:
{code:java}
plugins {
id 'application'
id 'java'
id 'com.github.johnrengelman.shadow' version '7.1.2'
}
group 'software.amazon.kinesisanalytics'
version '0.1'
repositories {
mavenCentral()
}
dependencies {
compileOnly 'org.apache.flink:flink-core:1.15.2'
compileOnly 'org.apache.flink:flink-streaming-java:1.15.2'
implementation 'org.apache.flink:flink-scala_2.12:1.15.2'
}
shadowJar {
dependencies {
exclude(dependency('com.esotericsoftware.kryo:.*:.*'))
exclude(dependency('com.esotericsoftware.minlog:.*:.*'))
exclude(dependency('com.twitter:.*:.*'))
exclude(dependency('org.apache.flink:flink-core:.*'))
exclude(dependency('org.apache.flink:flink-streaming-java:.*'))
exclude(dependency('org.scala-lang:.*:.*'))
}
}
mainClassName = 'software.amazon.kinesisanalytics.kryotest.Main'
{code}
Note that the application jar does not include Kryo itself, nor flink-core, but
does include flink-scala.
Placing flink-scala in the application classpath eliminates the error, but as I
understand it, the [point of eliminating
Scala|https://flink.apache.org/2022/02/22/scala-free-in-one-fifteen/] from the
Flink application classloader was to allow the only Scala dependencies to be
loaded by the user code classloader. This issue prevents that from being
achieved for the Scala extensions to the Kryo Serializer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)