Hi all, I will have a look in the whole stack trace in a bit.
@Chesnay Schepler I think that we are setting the correct classloader during jobgraph creation [1]. Is that what you mean? Cheers, Kostas [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122 On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier <pomperma...@okkam.it> wrote: > > Always the same problem. > > Caused by: java.lang.ClassNotFoundException: it.test.XXX > at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) > at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > ... 10 more > > I've also tried with > > flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first"); > > but nothing changes. > > On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <ches...@apache.org> wrote: >> >> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things >> outside the usercode classlodaer (getPipelineFromProgram()), specifically >> the call to the main method. >> >> @klou This seems like wrong behavior? >> >> @Flavio What you could try in the meantime is wrap the call to >> createJobGraph like this: >> >> final ClassLoader contextClassLoader = >> Thread.currentThread().getContextClassLoader(); >> try { >> >> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader()); >> // do tstuff >> } finally { >> Thread.currentThread().setContextClassLoader(contextClassLoader); >> } >> >> >> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote: >> >> Any help here? How can I understand why the classes inside the jar are not >> found when creating the PackagedProgram? >> >> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >>> >>> In the logs I see that the jar is the classpath (I'm trying to debug the >>> program from the IDE)..isn'it? >>> >>> Classpath: [file:/tmp/job-bundle.jar] >>> ... >>> >>> Best, >>> Flavio >>> >>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <ches...@apache.org> >>> wrote: >>>> >>>> * your JobExecutor is _not_ putting it on the classpath. >>>> >>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote: >>>> >>>> Well it happens on the client before you even hit the RestClusterClient, >>>> so I assume that either your jar is not packaged correctly or you your >>>> JobExecutor is putting it on the classpath. >>>> >>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote: >>>> >>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class >>>> I'm trying to use as a client towards the Flink cluster - session mode). >>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar). >>>> >>>> The code of getBatchEnv is: >>>> >>>> @Deprecated >>>> public static BatchEnv getBatchEnv() { >>>> // TODO use the following when ready to convert from/to datastream >>>> // return >>>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build()); >>>> ExecutionEnvironment env = >>>> ExecutionEnvironment.getExecutionEnvironment(); >>>> BatchTableEnvironment ret = BatchTableEnvironment.create(env); >>>> customizeEnv(ret); >>>> return new BatchEnv(env, ret); >>>> } >>>> >>>> private static void customizeEnv(TableEnvironment ret) { >>>> final Configuration conf = ret.getConfig().getConfiguration(); >>>> // >>>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, >>>> 2); >>>> conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR); >>>> conf.setString(BlobServerOptions.STORAGE_DIRECTORY, >>>> FLINK_TEST_TMP_DIR); >>>> // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); >>>> //NOSONAR >>>> // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, >>>> 0.4f);//NOSONAR >>>> // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * >>>> 2);//NOSONAR >>>> // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * >>>> 2);// NOSONAR >>>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// >>>> NOSONAR >>>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR >>>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR >>>> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR >>>> conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// >>>> NOSONAR >>>> final List<String> kryoSerializers = new ArrayList<>(); >>>> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, >>>> JodaDateTimeSerializer.class)); >>>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, >>>> TBaseSerializer.class)); >>>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, >>>> TBaseSerializer.class)); >>>> conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers); >>>> >>>> } >>>> >>>> Classpath: [file:/tmp/job-bundle.jar] >>>> >>>> System.out: (none) >>>> >>>> System.err: (none) >>>> at >>>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245) >>>> at >>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164) >>>> at >>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77) >>>> at >>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109) >>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42) >>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb >>>> at >>>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116) >>>> at >>>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95) >>>> at >>>> it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73) >>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>> Method) >>>> at >>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >>>> at >>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150) >>>> ... 3 more >>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb >>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) >>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) >>>> at >>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) >>>> at >>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>>> at >>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >>>> ... 13 more >>>> >>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org> wrote: >>>>> >>>>> Hi Flavio, >>>>> can you share the full stacktrace you are seeing? I'm wondering if the >>>>> error happens on the client or server side (among other questions I have). >>>>> >>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <pomperma...@okkam.it> >>>>> wrote: >>>>>> >>>>>> Hi to all, >>>>>> I was trying to use the RestClusterClient to submit my job to the Flink >>>>>> cluster. >>>>>> However when I submit the job Flink cannot find the classes contained in >>>>>> the "fat" jar..what should I do? Am I missing something in my code? >>>>>> This is the current client code I'm testing: >>>>>> >>>>>> public static void main(String[] args) throws MalformedURLException { >>>>>> final Configuration flinkConf = new Configuration(); >>>>>> flinkConf.set(RestOptions.ADDRESS, "localhost"); >>>>>> flinkConf.set(RestOptions.PORT, 8081); >>>>>> >>>>>> final File jarFile = new File("/tmp/job-bundle.jar"); >>>>>> final String jobClass = "it.flink.MyJob"; >>>>>> >>>>>> try { >>>>>> final RestClusterClient<StandaloneClusterId> client = >>>>>> new RestClusterClient<>(flinkConf, >>>>>> StandaloneClusterId.getInstance()); >>>>>> >>>>>> final PackagedProgram packagedProgram = >>>>>> PackagedProgram.newBuilder()// >>>>>> .setJarFile(jarFile)// >>>>>> // .setUserClassPaths(userClassPaths) >>>>>> .setEntryPointClassName(jobClass).setConfiguration(flinkConf)// >>>>>> .build(); >>>>>> >>>>>> final JobGraph jobGraph = >>>>>> PackagedProgramUtils.createJobGraph(packagedProgram, >>>>>> flinkConf, 1, true); >>>>>> >>>>>> final DetachedJobExecutionResult jobExecutionResult = >>>>>> >>>>>> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get(); >>>>>> >>>>>> System.out.println(jobExecutionResult.getJobID()); >>>>>> } catch (Exception ex) { >>>>>> ex.printStackTrace(); >>>>>> System.exit(1); >>>>>> } >>>>>> } >>>>>> >>>>>> Best, >>>>>> Flavio >>>> >>>> >>>> >>