hmm..it appears as if PackagedProgramUtils#createJobGraph does some
things outside the usercode classlodaer (getPipelineFromProgram()),
specifically the call to the main method.
@klou This seems like wrong behavior?
@Flavio What you could try in the meantime is wrap the call to
createJobGraph like this:
final ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader(); try {
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
// do tstuff}finally {
Thread.currentThread().setContextClassLoader(contextClassLoader); }
On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
Any help here? How can I understand why the classes inside the jar
are not found when creating the PackagedProgram?
On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier
<pomperma...@okkam.it <mailto:pomperma...@okkam.it>> wrote:
In the logs I see that the jar is the classpath (I'm trying to
debug the program from the IDE)..isn'it?
Classpath: [file:/tmp/job-bundle.jar]
...
Best,
Flavio
On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
* your JobExecutor is _not_ putting it on the classpath.
On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
Well it happens on the client before you even hit the
RestClusterClient, so I assume that either your jar is not
packaged correctly or you your JobExecutor is putting it on
the classpath.
On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is
my main class I'm trying to use as a client towards the
Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
The code of getBatchEnv is:
@Deprecated
public static BatchEnv getBatchEnv() {
// TODO use the following when ready to convert from/to
datastream
// return
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment ret =
BatchTableEnvironment.create(env);
customizeEnv(ret);
return new BatchEnv(env, ret);
}
private static void customizeEnv(TableEnvironment ret) {
final Configuration conf =
ret.getConfig().getConfiguration();
//
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
FLINK_TEST_TMP_DIR);
//
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
4); //NOSONAR
//
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
0.4f);//NOSONAR
//
conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
32768 * 2);//NOSONAR
//
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
32768 * 2);// NOSONAR
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
0);// NOSONAR
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
conf.set(ClientOptions.CLIENT_TIMEOUT,
Duration.ofMinutes(10));// NOSONAR
final List<String> kryoSerializers = new ArrayList<>();
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
JodaDateTimeSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
TBaseSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
TBaseSerializer.class));
conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
kryoSerializers);
}
Classpath: [file:/tmp/job-bundle.jar]
System.out: (none)
System.err: (none)
at
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at
org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at
it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more
On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger
<rmetz...@apache.org <mailto:rmetz...@apache.org>> wrote:
Hi Flavio,
can you share the full stacktrace you are seeing? I'm
wondering if the error happens on the client or server
side (among other questions I have).
On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier
<pomperma...@okkam.it <mailto:pomperma...@okkam.it>> wrote:
Hi to all,
I was trying to use the RestClusterClient to submit
my job to the Flink cluster.
However when I submit the job Flink cannot find the
classes contained in the "fat" jar..what should I
do? Am I missing something in my code?
This is the current client code I'm testing:
public static void main(String[] args) throws
MalformedURLException {
final Configuration flinkConf = new Configuration();
flinkConf.set(RestOptions.ADDRESS, "localhost");
flinkConf.set(RestOptions.PORT, 8081);
final File jarFile = new
File("/tmp/job-bundle.jar");
final String jobClass = "it.flink.MyJob";
try {
final RestClusterClient<StandaloneClusterId>
client =
new RestClusterClient<>(flinkConf,
StandaloneClusterId.getInstance());
final PackagedProgram packagedProgram =
PackagedProgram.newBuilder()//
.setJarFile(jarFile)//
// .setUserClassPaths(userClassPaths)
.setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
.build();
final JobGraph jobGraph =
PackagedProgramUtils.createJobGraph(packagedProgram,
flinkConf, 1, true);
final DetachedJobExecutionResult
jobExecutionResult =
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
System.out.println(jobExecutionResult.getJobID());
} catch (Exception ex) {
ex.printStackTrace();
System.exit(1);
}
}
Best,
Flavio