Hi team,

I discovered that child first class loader is always used to initialize the
main program when submitting the job to a yarn cluster using application
mode regardless of what value classloader.resolve-order is set in
flink-conf.yaml. But this is not the case if I submit the same job with the
same config to the local cluster which honors the config and use the
correct class loader to load the main program. Here is the log from local
cluster

2021-05-30 15:01:16,372 INFO  org.apache.flink.client.cli.CliFrontend
               [] -
--------------------------------------------------------------------------------
2021-05-30 15:01:16,375 INFO  org.apache.flink.client.cli.CliFrontend
               [] -  Starting Command Line Client (Version: 1.12.1, Scala:
2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
*2021-05-30 15:01:16,616 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: classloader.resolve-order, parent-first*
2021-05-30 15:01:16,763 WARN  org.apache.flink.runtime.util.HadoopUtils
               [] - Could not find Hadoop configuration via any of the
supported methods (Flink configuration, environment variables).
[trim down the log]
2021-05-30 15:01:16,830 INFO  org.apache.flink.client.ClientUtils
               [] - Starting program (detached: false)
*2021-05-30 15:01:16,871 INFO  io.demo.flink.WordCount
                 [] - Loaded by
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6*

Here is the log from yarn cluster
2021-05-30 07:20:14,434 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
--------------------------------------------------------------------------------
2021-05-30 07:20:14,438 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
 Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11,
Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
2021-05-30 07:20:15,205 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: taskmanager.memory.process.size, 2048m
*2021-05-30 07:20:15,205 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: classloader.resolve-order, parent-first*
2021-05-30 07:20:15,205 INFO
 org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: metrics.scope.jm, flink.jobmanager
[trim down the log]
*2021-05-30 07:20:21,383 INFO  io.demo.flink.WordCount
                 [] - Loaded by
org.apache.flink.util.ChildFirstClassLoader@3da30852*

Here is the job to reproduce the problem

public static void main(String[] args) throws Exception {

 // set up the execution environment
 final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

 LOG.info("Loaded by {}", WordCount.class.getClassLoader());
 // get input data
 DataStreamSource<String> text = env.fromElements(
   "To be, or not to be,--that is the question:--",
   "Whether 'tis nobler in the mind to suffer",
   "The slings and arrows of outrageous fortune",
   "Or to take arms against a sea of troubles,"
   );

   text.print();
 env.execute("demo job");

}


Flink version 1.12.1

I believe the inconsistency is the result of user defined flink-conf not
passed to PackageProgram which uses default config instead

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java#L109

Not sure if this is expected behavior that we never assume the main program
is loaded with the configured class loader
-- 
Regards,
Tao

Reply via email to