hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.

@klou This seems like wrong behavior?

@Flavio What you could try in the meantime is wrap the call to createJobGraph like this:

final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader(); try {
   
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
 // do tstuff}finally {
   Thread.currentThread().setContextClassLoader(contextClassLoader); }


On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?

On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <pomperma...@okkam.it <mailto:pomperma...@okkam.it>> wrote:

    In the logs I see that the jar is the classpath (I'm trying to
    debug the program from the IDE)..isn'it?

    Classpath: [file:/tmp/job-bundle.jar]
    ...

    Best,
    Flavio

    On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:

        * your JobExecutor is _not_ putting it on the classpath.

        On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
        Well it happens on the client before you even hit the
        RestClusterClient, so I assume that either your jar is not
        packaged correctly or you your JobExecutor is putting it on
        the classpath.

        On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
        Sure. Here it is (org.apache.flink.client.cli.JobExecutor is
        my main class I'm trying to use as a client towards the
        Flink cluster - session mode).
        it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

        The code of getBatchEnv is:

        @Deprecated
          public static BatchEnv getBatchEnv() {
            // TODO use the following when ready to convert from/to
        datastream
            // return
        getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
            ExecutionEnvironment env =
        ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment ret =
        BatchTableEnvironment.create(env);
            customizeEnv(ret);
            return new BatchEnv(env, ret);
          }

          private static void customizeEnv(TableEnvironment ret) {
            final Configuration conf =
        ret.getConfig().getConfiguration();
            //
        
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
        2);
            conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
        conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
        FLINK_TEST_TMP_DIR);
            //
        conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
        4); //NOSONAR
            //
        conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
        0.4f);//NOSONAR
            //
        conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
        32768 * 2);//NOSONAR
            //
        conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
        32768 * 2);// NOSONAR
        conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
        0);// NOSONAR
        conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
        conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
        conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
            conf.set(ClientOptions.CLIENT_TIMEOUT,
        Duration.ofMinutes(10));// NOSONAR
            final List<String> kryoSerializers = new ArrayList<>();
        kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
        JodaDateTimeSerializer.class));
        kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
        TBaseSerializer.class));
        kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
        TBaseSerializer.class));
        conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
        kryoSerializers);

          }

        Classpath: [file:/tmp/job-bundle.jar]

        System.out: (none)

        System.err: (none)
        at
        
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
        at
        
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
        at
        
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
        at
        
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
        at
        org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
        Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
        at
        
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
        at
        
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
        at
        
it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
        at
        java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
        Method)
        at
        
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
        
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at
        
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        at
        
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at
        
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
        ... 3 more
        Caused by: java.lang.ClassNotFoundException: it/test/MyOb
        at
        java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at
        java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
        at
        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
        at
        
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
        at
        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
        at
        java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
        ... 13 more

        On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger
        <rmetz...@apache.org <mailto:rmetz...@apache.org>> wrote:

            Hi Flavio,
            can you share the full stacktrace you are seeing? I'm
            wondering if the error happens on the client or server
            side (among other questions I have).

            On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier
            <pomperma...@okkam.it <mailto:pomperma...@okkam.it>> wrote:

                Hi to all,
                I was trying to use the RestClusterClient to submit
                my job to the Flink cluster.
                However when I submit the job Flink cannot find the
                classes contained in the "fat" jar..what should I
                do? Am I missing something in my code?
                This is the current client code I'm testing:

                public static void main(String[] args) throws
                MalformedURLException {
                    final Configuration flinkConf = new Configuration();
                flinkConf.set(RestOptions.ADDRESS, "localhost");
                flinkConf.set(RestOptions.PORT, 8081);

                    final File jarFile = new
                File("/tmp/job-bundle.jar");
                    final String jobClass = "it.flink.MyJob";

                    try {
                      final RestClusterClient<StandaloneClusterId>
                client =
                          new RestClusterClient<>(flinkConf,
                StandaloneClusterId.getInstance());

                      final PackagedProgram packagedProgram =
                PackagedProgram.newBuilder()//
                          .setJarFile(jarFile)//
                          // .setUserClassPaths(userClassPaths)
                .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
                          .build();

                      final JobGraph jobGraph =
                PackagedProgramUtils.createJobGraph(packagedProgram,
                flinkConf, 1, true);

                      final DetachedJobExecutionResult
                jobExecutionResult =
                
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

                System.out.println(jobExecutionResult.getJobID());
                    } catch (Exception ex) {
                      ex.printStackTrace();
                      System.exit(1);
                    }
                }

                Best,
                Flavio




Reply via email to