You are Right Chesnay but I'm doing this stuff in parallel with other 2
things and I messed up the jar name, sorry for that.
For the code after env.execute I'll try to use the new JobListener
interface next days.. I hope it could be sufficient (I just have to call an
external service to update the status of the job).

Best,
Flavio

On Fri, Oct 30, 2020 at 12:40 PM Chesnay Schepler <ches...@apache.org>
wrote:

> Yes, it is definitely way easier to upload&run jars instead of submitting
> JobGraphs.
>
> But I thought this was not viable for you because you cannot execute
> anything after env.execute()? I believe this limitation still exists. Or
> are you referring here to error-handling in case env.execute() throws an
> exception (which I think should work)?
> Finally, I have to point that I already entertained the possibility of the
> jar not being packaged correctly twice in this thread, 2 and 3 days ago
> respectively. We could've saved some time here had you checked whether the
> jar actually contains the class.
>
> On 10/30/2020 12:24 PM, Flavio Pompermaier wrote:
>
> I just discovered that I was using the "slim" jar instead of the "fat"
> one...sorry. Now I'm able to successfully run the program on the remote
> cluster.
> However, the fact of generating the job graph on the client side it's
> something I really don't like at allbecause it requires access both to
> flink jdist and env variables (such as hadoop ones) and the user jar, which
> is not really neat to me.
> But if I understood correctly in Flink 1.12 I'll be able to use only the
> Job Manager REST API to run the job (since the limitation of job submission
> failure could be handled), is it correct?
>
> Thanks for the support,
> Flavio
>
> On Fri, Oct 30, 2020 at 11:37 AM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Can you give me more information on your packaging setup / project
>> structure? Is "it.test.MyOb" a test class? Does the dependency containing
>> this class have a "test" scope?
>>
>> On 10/30/2020 11:34 AM, Chesnay Schepler wrote:
>>
>> It is irrelevant whether it contains transitive dependencies or not;
>> that's a maven concept, not a classloading one.
>>
>> The WordCount main class, which is only contained in that jar, could be
>> found, so the classloading is working. If any other class that is supposed
>> to be in jar cannot be found, then that means class is either not in the
>> jar, or some other transitive dependency is missing from the jar. (class
>> loading exceptions can a bit misleading at times, particularly when
>> accessing transitive dependencies in static fields IIRC).
>>
>> > Actually I was able to use the REST API without creating the JobGraph
>>
>> I'm not debating that, and pointed that out myself.
>> > [without a job graph you] cannot use the REST API *(outside of
>> uploading jars)*
>>
>> On 10/30/2020 11:22 AM, Flavio Pompermaier wrote:
>>
>> Yes, with the WordCount it works but that jar is not a "fat" jar (it does
>> not include transitive dependencies).
>> Actually I was able to use the REST API without creating the JobGraph,
>> you just have to tell the run API the jar id, the main cluss to run and the
>> optional parameters.
>> For this don't use any Flink official client, I use the Spring REST
>> template and I've implemented the call of the services by myself.
>>
>> On Fri, Oct 30, 2020 at 11:12 AM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> If you aren't setting up the classpath correctly then you cannot create
>>> a JobGraph, and cannot use the REST API (outside of uploading jars).
>>> In other words, you _will_ have to solve this issue, one way or another.
>>>
>>> FYI, I just tried your code to submit a WordCount jar to a cluster (the
>>> one in the distribution), and it worked flawlessly. Please triple check
>>> your packaging and class references.
>>>
>>> On 10/30/2020 10:48 AM, Flavio Pompermaier wrote:
>>>
>>> For "REST only client" I mean using only the REST API to interact with
>>> the Flink cluster, i.e. without creating any PackagedProgram and thus
>>> incurring into classpath problems.
>>> I've implemented a running job server that was using the REST API to
>>> upload the job jar and execute the run command but then I gave up because I
>>> was not able to run any code after env.execute..so I ended up using SSH to
>>> the remote server and using the CLI client. This has the limitation of not
>>> being able to get the job id and monitor the job status or get back
>>> exceptions when deploying the job.
>>> So now I was trying to explore this new way of submitting the job (that
>>> computes the jobGraph on the client side and submit it to the cluster).
>>>
>>>
>>>
>>> On Fri, Oct 30, 2020 at 10:32 AM Chesnay Schepler <ches...@apache.org>
>>> wrote:
>>>
>>>> To clarify, if the job creation fails on the JM side, in 1.11 the job
>>>> submission will fail, in 1.12 it will succeed but the job will be in a
>>>> failed state.
>>>>
>>>> On 10/30/2020 10:23 AM, Chesnay Schepler wrote:
>>>>
>>>> 1) the job still reaches a failed state, which you can poll for, see 2)
>>>> 2) polling is your only way.
>>>>
>>>> What do you mean with "REST only client"? Do you mean a plain http
>>>> client, not something that Flink provides?
>>>>
>>>> On 10/30/2020 10:02 AM, Flavio Pompermaier wrote:
>>>>
>>>> Nothing to do also with IntelliJ..do you have any sample project I
>>>> can reuse to test the job submission to a cluster?
>>>> I can't really understand why the classes within the fat jar are not
>>>> found when generating the PackagedProgram.
>>>> Ideally, I'd prefer to use REST only client (so no need to build
>>>> package programs and introduce classpath problems..) but I have 2 
>>>> questions:
>>>>
>>>>    - I remember that when submitting jobs from REST there's no way to
>>>>    detect failures in the job creation (like missing classes, classpath
>>>>    problems, etc). Am I wrong?
>>>>    - I'd like to monitor the progress of my batch job (for example I
>>>>    can count the number of completed vertices wrt the total count of
>>>>    vertices). Is there any suggested way to do that apart from polling?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> I'm runnin the code from Eclipse, the jar exists and it contains the
>>>>> classes Flink is not finding..maybe I can try to use IntelliJ in the
>>>>> afternoon
>>>>>
>>>>> On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <ches...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> @Kostas: Ah, I missed that.
>>>>>>
>>>>>> @Flavio: the only alternative I can think your jar does not contain
>>>>>> the
>>>>>> classes, or does not exist at all on the machine your application is
>>>>>> run on.
>>>>>>
>>>>>> On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
>>>>>> > Hi all,
>>>>>> >
>>>>>> > I will have a look in the whole stack trace in a bit.
>>>>>> >
>>>>>> > @Chesnay Schepler I think that we are setting the correct
>>>>>> classloader
>>>>>> > during jobgraph creation [1]. Is that what you mean?
>>>>>> >
>>>>>> > Cheers,
>>>>>> > Kostas
>>>>>> >
>>>>>> > [1]
>>>>>> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>>>>>> >
>>>>>> > On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
>>>>>> > <pomperma...@okkam.it> wrote:
>>>>>> >> Always the same problem.
>>>>>> >>
>>>>>> >> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>>>>>> >> at java.base/java.net
>>>>>> .URLClassLoader.findClass(URLClassLoader.java:471)
>>>>>> >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>>> >> at
>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>>> >> at
>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>> >> at
>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>> >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>>> >> ... 10 more
>>>>>> >>
>>>>>> >> I've also tried with
>>>>>> >>
>>>>>> >>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
>>>>>> "parent-first");
>>>>>> >>
>>>>>> >> but nothing changes.
>>>>>> >>
>>>>>> >> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <
>>>>>> ches...@apache.org> wrote:
>>>>>> >>> hmm..it appears as if PackagedProgramUtils#createJobGraph does
>>>>>> some things outside the usercode classlodaer (getPipelineFromProgram()),
>>>>>> specifically the call to the main method.
>>>>>> >>>
>>>>>> >>> @klou This seems like wrong behavior?
>>>>>> >>>
>>>>>> >>> @Flavio What you could try in the meantime is wrap the call to
>>>>>> createJobGraph like this:
>>>>>> >>>
>>>>>> >>> final ClassLoader contextClassLoader =
>>>>>> Thread.currentThread().getContextClassLoader();
>>>>>> >>> try {
>>>>>> >>>
>>>>>>  
>>>>>> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>>>> >>>     // do tstuff
>>>>>> >>> } finally {
>>>>>> >>>
>>>>>>  Thread.currentThread().setContextClassLoader(contextClassLoader);
>>>>>> >>> }
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>>>> >>>
>>>>>> >>> Any help here?  How can I understand why the classes inside the
>>>>>> jar are not found when creating the PackagedProgram?
>>>>>> >>>
>>>>>> >>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>> >>>> In the logs I see that the jar is the classpath (I'm trying to
>>>>>> debug the program from the IDE)..isn'it?
>>>>>> >>>>
>>>>>> >>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>> >>>> ...
>>>>>> >>>>
>>>>>> >>>> Best,
>>>>>> >>>> Flavio
>>>>>> >>>>
>>>>>> >>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <
>>>>>> ches...@apache.org> wrote:
>>>>>> >>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>> >>>>>
>>>>>> >>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>> >>>>>
>>>>>> >>>>> Well it happens on the client before you even hit the
>>>>>> RestClusterClient, so I assume that either your jar is not packaged
>>>>>> correctly or you your JobExecutor is putting it on the classpath.
>>>>>> >>>>>
>>>>>> >>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>> >>>>>
>>>>>> >>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my
>>>>>> main class I'm trying to use as a client towards the Flink cluster -
>>>>>> session mode).
>>>>>> >>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>> >>>>>
>>>>>> >>>>> The code of getBatchEnv is:
>>>>>> >>>>>
>>>>>> >>>>> @Deprecated
>>>>>> >>>>>    public static BatchEnv getBatchEnv() {
>>>>>> >>>>>      // TODO use the following when ready to convert from/to
>>>>>> datastream
>>>>>> >>>>>      // return
>>>>>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>> >>>>>      ExecutionEnvironment env =
>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>> >>>>>      BatchTableEnvironment ret =
>>>>>> BatchTableEnvironment.create(env);
>>>>>> >>>>>      customizeEnv(ret);
>>>>>> >>>>>      return new BatchEnv(env, ret);
>>>>>> >>>>>    }
>>>>>> >>>>>
>>>>>> >>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>> >>>>>      final Configuration conf =
>>>>>> ret.getConfig().getConfiguration();
>>>>>> >>>>>      //
>>>>>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
>>>>>> 2);
>>>>>> >>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>> >>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
>>>>>> FLINK_TEST_TMP_DIR);
>>>>>> >>>>>      //
>>>>>> conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>> >>>>>      //
>>>>>> conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>>>>> 0.4f);//NOSONAR
>>>>>> >>>>>      //
>>>>>> conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 *
>>>>>> 2);//NOSONAR
>>>>>> >>>>>      //
>>>>>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);//
>>>>>> NOSONAR
>>>>>> >>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
>>>>>> 0);// NOSONAR
>>>>>> >>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");//
>>>>>> NOSONAR
>>>>>> >>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");//
>>>>>> NOSONAR
>>>>>> >>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");//
>>>>>> NOSONAR
>>>>>> >>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT,
>>>>>> Duration.ofMinutes(10));// NOSONAR
>>>>>> >>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>> >>>>>
>>>>>> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
>>>>>> JodaDateTimeSerializer.class));
>>>>>> >>>>>
>>>>>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
>>>>>> TBaseSerializer.class));
>>>>>> >>>>>
>>>>>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
>>>>>> TBaseSerializer.class));
>>>>>> >>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
>>>>>> kryoSerializers);
>>>>>> >>>>>
>>>>>> >>>>>    }
>>>>>> >>>>>
>>>>>> >>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>> >>>>>
>>>>>> >>>>> System.out: (none)
>>>>>> >>>>>
>>>>>> >>>>> System.err: (none)
>>>>>> >>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>>> >>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>>> >>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>>> >>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>>> >>>>> at
>>>>>> org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>>> >>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>>> >>>>> at
>>>>>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>>> >>>>> at
>>>>>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>>> >>>>> at it.okkam.datalinks.flink.jobs
>>>>>> .EnsReconciliator.main(EnsReconciliator.java:73)
>>>>>> >>>>> at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>> >>>>> at
>>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> >>>>> at
>>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> >>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>>> >>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>>> >>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>>> >>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>>> >>>>> ... 3 more
>>>>>> >>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>>> >>>>> at java.base/java.net
>>>>>> .URLClassLoader.findClass(URLClassLoader.java:471)
>>>>>> >>>>> at
>>>>>> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>>> >>>>> at
>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>>> >>>>> at
>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>> >>>>> at
>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>> >>>>> at
>>>>>> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>>> >>>>> ... 13 more
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <
>>>>>> rmetz...@apache.org> wrote:
>>>>>> >>>>>> Hi Flavio,
>>>>>> >>>>>> can you share the full stacktrace you are seeing? I'm
>>>>>> wondering if the error happens on the client or server side (among other
>>>>>> questions I have).
>>>>>> >>>>>>
>>>>>> >>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>> >>>>>>> Hi to all,
>>>>>> >>>>>>> I was trying to use the RestClusterClient to submit my job to
>>>>>> the Flink cluster.
>>>>>> >>>>>>> However when I submit the job Flink cannot find the classes
>>>>>> contained in the "fat" jar..what should I do? Am I missing something in 
>>>>>> my
>>>>>> code?
>>>>>> >>>>>>> This is the current client code I'm testing:
>>>>>> >>>>>>>
>>>>>> >>>>>>> public static void main(String[] args) throws
>>>>>> MalformedURLException {
>>>>>> >>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>> >>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>> >>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>> >>>>>>>
>>>>>> >>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>> >>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>> >>>>>>>
>>>>>> >>>>>>>      try {
>>>>>> >>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>> >>>>>>>            new RestClusterClient<>(flinkConf,
>>>>>> StandaloneClusterId.getInstance());
>>>>>> >>>>>>>
>>>>>> >>>>>>>        final PackagedProgram packagedProgram =
>>>>>> PackagedProgram.newBuilder()//
>>>>>> >>>>>>>            .setJarFile(jarFile)//
>>>>>> >>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>> >>>>>>>
>>>>>> .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>> >>>>>>>            .build();
>>>>>> >>>>>>>
>>>>>> >>>>>>>        final JobGraph jobGraph =
>>>>>> >>>>>>>
>>>>>> PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>> >>>>>>>
>>>>>> >>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>> >>>>>>>
>>>>>> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>> >>>>>>>
>>>>>> >>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>> >>>>>>>      } catch (Exception ex) {
>>>>>> >>>>>>>        ex.printStackTrace();
>>>>>> >>>>>>>        System.exit(1);
>>>>>> >>>>>>>      }
>>>>>> >>>>>>> }
>>>>>> >>>>>>>
>>>>>> >>>>>>> Best,
>>>>>> >>>>>>> Flavio
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply via email to