You are Right Chesnay but I'm doing this stuff in parallel with other 2 things and I messed up the jar name, sorry for that. For the code after env.execute I'll try to use the new JobListener interface next days.. I hope it could be sufficient (I just have to call an external service to update the status of the job).
Best, Flavio On Fri, Oct 30, 2020 at 12:40 PM Chesnay Schepler <ches...@apache.org> wrote: > Yes, it is definitely way easier to upload&run jars instead of submitting > JobGraphs. > > But I thought this was not viable for you because you cannot execute > anything after env.execute()? I believe this limitation still exists. Or > are you referring here to error-handling in case env.execute() throws an > exception (which I think should work)? > Finally, I have to point that I already entertained the possibility of the > jar not being packaged correctly twice in this thread, 2 and 3 days ago > respectively. We could've saved some time here had you checked whether the > jar actually contains the class. > > On 10/30/2020 12:24 PM, Flavio Pompermaier wrote: > > I just discovered that I was using the "slim" jar instead of the "fat" > one...sorry. Now I'm able to successfully run the program on the remote > cluster. > However, the fact of generating the job graph on the client side it's > something I really don't like at allbecause it requires access both to > flink jdist and env variables (such as hadoop ones) and the user jar, which > is not really neat to me. > But if I understood correctly in Flink 1.12 I'll be able to use only the > Job Manager REST API to run the job (since the limitation of job submission > failure could be handled), is it correct? > > Thanks for the support, > Flavio > > On Fri, Oct 30, 2020 at 11:37 AM Chesnay Schepler <ches...@apache.org> > wrote: > >> Can you give me more information on your packaging setup / project >> structure? Is "it.test.MyOb" a test class? Does the dependency containing >> this class have a "test" scope? >> >> On 10/30/2020 11:34 AM, Chesnay Schepler wrote: >> >> It is irrelevant whether it contains transitive dependencies or not; >> that's a maven concept, not a classloading one. >> >> The WordCount main class, which is only contained in that jar, could be >> found, so the classloading is working. If any other class that is supposed >> to be in jar cannot be found, then that means class is either not in the >> jar, or some other transitive dependency is missing from the jar. (class >> loading exceptions can a bit misleading at times, particularly when >> accessing transitive dependencies in static fields IIRC). >> >> > Actually I was able to use the REST API without creating the JobGraph >> >> I'm not debating that, and pointed that out myself. >> > [without a job graph you] cannot use the REST API *(outside of >> uploading jars)* >> >> On 10/30/2020 11:22 AM, Flavio Pompermaier wrote: >> >> Yes, with the WordCount it works but that jar is not a "fat" jar (it does >> not include transitive dependencies). >> Actually I was able to use the REST API without creating the JobGraph, >> you just have to tell the run API the jar id, the main cluss to run and the >> optional parameters. >> For this don't use any Flink official client, I use the Spring REST >> template and I've implemented the call of the services by myself. >> >> On Fri, Oct 30, 2020 at 11:12 AM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> If you aren't setting up the classpath correctly then you cannot create >>> a JobGraph, and cannot use the REST API (outside of uploading jars). >>> In other words, you _will_ have to solve this issue, one way or another. >>> >>> FYI, I just tried your code to submit a WordCount jar to a cluster (the >>> one in the distribution), and it worked flawlessly. Please triple check >>> your packaging and class references. >>> >>> On 10/30/2020 10:48 AM, Flavio Pompermaier wrote: >>> >>> For "REST only client" I mean using only the REST API to interact with >>> the Flink cluster, i.e. without creating any PackagedProgram and thus >>> incurring into classpath problems. >>> I've implemented a running job server that was using the REST API to >>> upload the job jar and execute the run command but then I gave up because I >>> was not able to run any code after env.execute..so I ended up using SSH to >>> the remote server and using the CLI client. This has the limitation of not >>> being able to get the job id and monitor the job status or get back >>> exceptions when deploying the job. >>> So now I was trying to explore this new way of submitting the job (that >>> computes the jobGraph on the client side and submit it to the cluster). >>> >>> >>> >>> On Fri, Oct 30, 2020 at 10:32 AM Chesnay Schepler <ches...@apache.org> >>> wrote: >>> >>>> To clarify, if the job creation fails on the JM side, in 1.11 the job >>>> submission will fail, in 1.12 it will succeed but the job will be in a >>>> failed state. >>>> >>>> On 10/30/2020 10:23 AM, Chesnay Schepler wrote: >>>> >>>> 1) the job still reaches a failed state, which you can poll for, see 2) >>>> 2) polling is your only way. >>>> >>>> What do you mean with "REST only client"? Do you mean a plain http >>>> client, not something that Flink provides? >>>> >>>> On 10/30/2020 10:02 AM, Flavio Pompermaier wrote: >>>> >>>> Nothing to do also with IntelliJ..do you have any sample project I >>>> can reuse to test the job submission to a cluster? >>>> I can't really understand why the classes within the fat jar are not >>>> found when generating the PackagedProgram. >>>> Ideally, I'd prefer to use REST only client (so no need to build >>>> package programs and introduce classpath problems..) but I have 2 >>>> questions: >>>> >>>> - I remember that when submitting jobs from REST there's no way to >>>> detect failures in the job creation (like missing classes, classpath >>>> problems, etc). Am I wrong? >>>> - I'd like to monitor the progress of my batch job (for example I >>>> can count the number of completed vertices wrt the total count of >>>> vertices). Is there any suggested way to do that apart from polling? >>>> >>>> Best, >>>> Flavio >>>> >>>> On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> I'm runnin the code from Eclipse, the jar exists and it contains the >>>>> classes Flink is not finding..maybe I can try to use IntelliJ in the >>>>> afternoon >>>>> >>>>> On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <ches...@apache.org> >>>>> wrote: >>>>> >>>>>> @Kostas: Ah, I missed that. >>>>>> >>>>>> @Flavio: the only alternative I can think your jar does not contain >>>>>> the >>>>>> classes, or does not exist at all on the machine your application is >>>>>> run on. >>>>>> >>>>>> On 10/28/2020 12:08 PM, Kostas Kloudas wrote: >>>>>> > Hi all, >>>>>> > >>>>>> > I will have a look in the whole stack trace in a bit. >>>>>> > >>>>>> > @Chesnay Schepler I think that we are setting the correct >>>>>> classloader >>>>>> > during jobgraph creation [1]. Is that what you mean? >>>>>> > >>>>>> > Cheers, >>>>>> > Kostas >>>>>> > >>>>>> > [1] >>>>>> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122 >>>>>> > >>>>>> > On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier >>>>>> > <pomperma...@okkam.it> wrote: >>>>>> >> Always the same problem. >>>>>> >> >>>>>> >> Caused by: java.lang.ClassNotFoundException: it.test.XXX >>>>>> >> at java.base/java.net >>>>>> .URLClassLoader.findClass(URLClassLoader.java:471) >>>>>> >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) >>>>>> >> at >>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) >>>>>> >> at >>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>>>>> >> at >>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>>>> >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >>>>>> >> ... 10 more >>>>>> >> >>>>>> >> I've also tried with >>>>>> >> >>>>>> >> flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, >>>>>> "parent-first"); >>>>>> >> >>>>>> >> but nothing changes. >>>>>> >> >>>>>> >> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler < >>>>>> ches...@apache.org> wrote: >>>>>> >>> hmm..it appears as if PackagedProgramUtils#createJobGraph does >>>>>> some things outside the usercode classlodaer (getPipelineFromProgram()), >>>>>> specifically the call to the main method. >>>>>> >>> >>>>>> >>> @klou This seems like wrong behavior? >>>>>> >>> >>>>>> >>> @Flavio What you could try in the meantime is wrap the call to >>>>>> createJobGraph like this: >>>>>> >>> >>>>>> >>> final ClassLoader contextClassLoader = >>>>>> Thread.currentThread().getContextClassLoader(); >>>>>> >>> try { >>>>>> >>> >>>>>> >>>>>> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader()); >>>>>> >>> // do tstuff >>>>>> >>> } finally { >>>>>> >>> >>>>>> Thread.currentThread().setContextClassLoader(contextClassLoader); >>>>>> >>> } >>>>>> >>> >>>>>> >>> >>>>>> >>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote: >>>>>> >>> >>>>>> >>> Any help here? How can I understand why the classes inside the >>>>>> jar are not found when creating the PackagedProgram? >>>>>> >>> >>>>>> >>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>> In the logs I see that the jar is the classpath (I'm trying to >>>>>> debug the program from the IDE)..isn'it? >>>>>> >>>> >>>>>> >>>> Classpath: [file:/tmp/job-bundle.jar] >>>>>> >>>> ... >>>>>> >>>> >>>>>> >>>> Best, >>>>>> >>>> Flavio >>>>>> >>>> >>>>>> >>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler < >>>>>> ches...@apache.org> wrote: >>>>>> >>>>> * your JobExecutor is _not_ putting it on the classpath. >>>>>> >>>>> >>>>>> >>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote: >>>>>> >>>>> >>>>>> >>>>> Well it happens on the client before you even hit the >>>>>> RestClusterClient, so I assume that either your jar is not packaged >>>>>> correctly or you your JobExecutor is putting it on the classpath. >>>>>> >>>>> >>>>>> >>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote: >>>>>> >>>>> >>>>>> >>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my >>>>>> main class I'm trying to use as a client towards the Flink cluster - >>>>>> session mode). >>>>>> >>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar). >>>>>> >>>>> >>>>>> >>>>> The code of getBatchEnv is: >>>>>> >>>>> >>>>>> >>>>> @Deprecated >>>>>> >>>>> public static BatchEnv getBatchEnv() { >>>>>> >>>>> // TODO use the following when ready to convert from/to >>>>>> datastream >>>>>> >>>>> // return >>>>>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build()); >>>>>> >>>>> ExecutionEnvironment env = >>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>> >>>>> BatchTableEnvironment ret = >>>>>> BatchTableEnvironment.create(env); >>>>>> >>>>> customizeEnv(ret); >>>>>> >>>>> return new BatchEnv(env, ret); >>>>>> >>>>> } >>>>>> >>>>> >>>>>> >>>>> private static void customizeEnv(TableEnvironment ret) { >>>>>> >>>>> final Configuration conf = >>>>>> ret.getConfig().getConfiguration(); >>>>>> >>>>> // >>>>>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, >>>>>> 2); >>>>>> >>>>> conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR); >>>>>> >>>>> conf.setString(BlobServerOptions.STORAGE_DIRECTORY, >>>>>> FLINK_TEST_TMP_DIR); >>>>>> >>>>> // >>>>>> conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR >>>>>> >>>>> // >>>>>> conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, >>>>>> 0.4f);//NOSONAR >>>>>> >>>>> // >>>>>> conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * >>>>>> 2);//NOSONAR >>>>>> >>>>> // >>>>>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// >>>>>> NOSONAR >>>>>> >>>>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, >>>>>> 0);// NOSONAR >>>>>> >>>>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// >>>>>> NOSONAR >>>>>> >>>>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// >>>>>> NOSONAR >>>>>> >>>>> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// >>>>>> NOSONAR >>>>>> >>>>> conf.set(ClientOptions.CLIENT_TIMEOUT, >>>>>> Duration.ofMinutes(10));// NOSONAR >>>>>> >>>>> final List<String> kryoSerializers = new ArrayList<>(); >>>>>> >>>>> >>>>>> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, >>>>>> JodaDateTimeSerializer.class)); >>>>>> >>>>> >>>>>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, >>>>>> TBaseSerializer.class)); >>>>>> >>>>> >>>>>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, >>>>>> TBaseSerializer.class)); >>>>>> >>>>> conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, >>>>>> kryoSerializers); >>>>>> >>>>> >>>>>> >>>>> } >>>>>> >>>>> >>>>>> >>>>> Classpath: [file:/tmp/job-bundle.jar] >>>>>> >>>>> >>>>>> >>>>> System.out: (none) >>>>>> >>>>> >>>>>> >>>>> System.err: (none) >>>>>> >>>>> at >>>>>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245) >>>>>> >>>>> at >>>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164) >>>>>> >>>>> at >>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77) >>>>>> >>>>> at >>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109) >>>>>> >>>>> at >>>>>> org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42) >>>>>> >>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb >>>>>> >>>>> at >>>>>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116) >>>>>> >>>>> at >>>>>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95) >>>>>> >>>>> at it.okkam.datalinks.flink.jobs >>>>>> .EnsReconciliator.main(EnsReconciliator.java:73) >>>>>> >>>>> at >>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>> Method) >>>>>> >>>>> at >>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> >>>>> at >>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> >>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>>>> >>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >>>>>> >>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >>>>>> >>>>> at >>>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150) >>>>>> >>>>> ... 3 more >>>>>> >>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb >>>>>> >>>>> at java.base/java.net >>>>>> .URLClassLoader.findClass(URLClassLoader.java:471) >>>>>> >>>>> at >>>>>> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) >>>>>> >>>>> at >>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) >>>>>> >>>>> at >>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>>>>> >>>>> at >>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>>>> >>>>> at >>>>>> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >>>>>> >>>>> ... 13 more >>>>>> >>>>> >>>>>> >>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger < >>>>>> rmetz...@apache.org> wrote: >>>>>> >>>>>> Hi Flavio, >>>>>> >>>>>> can you share the full stacktrace you are seeing? I'm >>>>>> wondering if the error happens on the client or server side (among other >>>>>> questions I have). >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Hi to all, >>>>>> >>>>>>> I was trying to use the RestClusterClient to submit my job to >>>>>> the Flink cluster. >>>>>> >>>>>>> However when I submit the job Flink cannot find the classes >>>>>> contained in the "fat" jar..what should I do? Am I missing something in >>>>>> my >>>>>> code? >>>>>> >>>>>>> This is the current client code I'm testing: >>>>>> >>>>>>> >>>>>> >>>>>>> public static void main(String[] args) throws >>>>>> MalformedURLException { >>>>>> >>>>>>> final Configuration flinkConf = new Configuration(); >>>>>> >>>>>>> flinkConf.set(RestOptions.ADDRESS, "localhost"); >>>>>> >>>>>>> flinkConf.set(RestOptions.PORT, 8081); >>>>>> >>>>>>> >>>>>> >>>>>>> final File jarFile = new File("/tmp/job-bundle.jar"); >>>>>> >>>>>>> final String jobClass = "it.flink.MyJob"; >>>>>> >>>>>>> >>>>>> >>>>>>> try { >>>>>> >>>>>>> final RestClusterClient<StandaloneClusterId> client = >>>>>> >>>>>>> new RestClusterClient<>(flinkConf, >>>>>> StandaloneClusterId.getInstance()); >>>>>> >>>>>>> >>>>>> >>>>>>> final PackagedProgram packagedProgram = >>>>>> PackagedProgram.newBuilder()// >>>>>> >>>>>>> .setJarFile(jarFile)// >>>>>> >>>>>>> // .setUserClassPaths(userClassPaths) >>>>>> >>>>>>> >>>>>> .setEntryPointClassName(jobClass).setConfiguration(flinkConf)// >>>>>> >>>>>>> .build(); >>>>>> >>>>>>> >>>>>> >>>>>>> final JobGraph jobGraph = >>>>>> >>>>>>> >>>>>> PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true); >>>>>> >>>>>>> >>>>>> >>>>>>> final DetachedJobExecutionResult jobExecutionResult = >>>>>> >>>>>>> >>>>>> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get(); >>>>>> >>>>>>> >>>>>> >>>>>>> System.out.println(jobExecutionResult.getJobID()); >>>>>> >>>>>>> } catch (Exception ex) { >>>>>> >>>>>>> ex.printStackTrace(); >>>>>> >>>>>>> System.exit(1); >>>>>> >>>>>>> } >>>>>> >>>>>>> } >>>>>> >>>>>>> >>>>>> >>>>>>> Best, >>>>>> >>>>>>> Flavio >>>>>> >>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>>> >>> >> >> >