Hi team, I discovered that child first class loader is always used to initialize the main program when submitting the job to a yarn cluster using application mode regardless of what value classloader.resolve-order is set in flink-conf.yaml. But this is not the case if I submit the same job with the same config to the local cluster which honors the config and use the correct class loader to load the main program. Here is the log from local cluster
2021-05-30 15:01:16,372 INFO org.apache.flink.client.cli.CliFrontend [] - -------------------------------------------------------------------------------- 2021-05-30 15:01:16,375 INFO org.apache.flink.client.cli.CliFrontend [] - Starting Command Line Client (Version: 1.12.1, Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00) [trim down the log] *2021-05-30 15:01:16,616 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: classloader.resolve-order, parent-first* 2021-05-30 15:01:16,763 WARN org.apache.flink.runtime.util.HadoopUtils [] - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables). [trim down the log] 2021-05-30 15:01:16,830 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: false) *2021-05-30 15:01:16,871 INFO io.demo.flink.WordCount [] - Loaded by org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6* Here is the log from yarn cluster 2021-05-30 07:20:14,434 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - -------------------------------------------------------------------------------- 2021-05-30 07:20:14,438 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00) [trim down the log] 2021-05-30 07:20:15,205 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 2048m *2021-05-30 07:20:15,205 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: classloader.resolve-order, parent-first* 2021-05-30 07:20:15,205 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: metrics.scope.jm, flink.jobmanager [trim down the log] *2021-05-30 07:20:21,383 INFO io.demo.flink.WordCount [] - Loaded by org.apache.flink.util.ChildFirstClassLoader@3da30852* Here is the job to reproduce the problem public static void main(String[] args) throws Exception { // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); LOG.info("Loaded by {}", WordCount.class.getClassLoader()); // get input data DataStreamSource<String> text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles," ); text.print(); env.execute("demo job"); } Flink version 1.12.1 I believe the inconsistency is the result of user defined flink-conf not passed to PackageProgram which uses default config instead https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java#L109 Not sure if this is expected behavior that we never assume the main program is loaded with the configured class loader -- Regards, Tao