Hi team,
I discovered that child first class loader is always used to initialize the
main program when submitting the job to a yarn cluster using application
mode regardless of what value classloader.resolve-order is set in
flink-conf.yaml. But this is not the case if I submit the same job with the
same config to the local cluster which honors the config and use the
correct class loader to load the main program. Here is the log from local
cluster
2021-05-30 15:01:16,372 INFO org.apache.flink.client.cli.CliFrontend
[] -
--------------------------------------------------------------------------------
2021-05-30 15:01:16,375 INFO org.apache.flink.client.cli.CliFrontend
[] - Starting Command Line Client (Version: 1.12.1, Scala:
2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
*2021-05-30 15:01:16,616 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: classloader.resolve-order, parent-first*
2021-05-30 15:01:16,763 WARN org.apache.flink.runtime.util.HadoopUtils
[] - Could not find Hadoop configuration via any of the
supported methods (Flink configuration, environment variables).
[trim down the log]
2021-05-30 15:01:16,830 INFO org.apache.flink.client.ClientUtils
[] - Starting program (detached: false)
*2021-05-30 15:01:16,871 INFO io.demo.flink.WordCount
[] - Loaded by
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6*
Here is the log from yarn cluster
2021-05-30 07:20:14,434 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] -
--------------------------------------------------------------------------------
2021-05-30 07:20:14,438 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] -
Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11,
Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
2021-05-30 07:20:15,205 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: taskmanager.memory.process.size, 2048m
*2021-05-30 07:20:15,205 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: classloader.resolve-order, parent-first*
2021-05-30 07:20:15,205 INFO
org.apache.flink.configuration.GlobalConfiguration [] - Loading
configuration property: metrics.scope.jm, flink.jobmanager
[trim down the log]
*2021-05-30 07:20:21,383 INFO io.demo.flink.WordCount
[] - Loaded by
org.apache.flink.util.ChildFirstClassLoader@3da30852*
Here is the job to reproduce the problem
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
LOG.info("Loaded by {}", WordCount.class.getClassLoader());
// get input data
DataStreamSource<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
text.print();
env.execute("demo job");
}
Flink version 1.12.1
I believe the inconsistency is the result of user defined flink-conf not
passed to PackageProgram which uses default config instead
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java#L109
Not sure if this is expected behavior that we never assume the main program
is loaded with the configured class loader
--
Regards,
Tao