Hi Garrett,

Flink distinguishes between two classloaders: 1) the system classloader
which is the main classloader of the process. This classloader loads all
jars in the ./lib folder and 2) the user classloader which loads the job
jar.
AFAIK, the different operators do not have distinct classloaders. So, in
principle all operators should use the same user classloader.

According to the stacktrace you posted, the OrcInputFormat cannot be found
when you try to emit to an ORC file.
This looks suspicious because I would rather expect the OrcOutputFormat to
be the problem than the input format.
Can you post more of the stacktrace? This would help to identify the spot
in the Flink code where the exception is thrown.

Thanks, Fabian

2017-09-18 18:42 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:

> Hey all,
>
>  I am trying out a POC with flink on yarn.  My simple goal is to read from
> a Hive ORC table, process some data and write to a new Hive ORC table.
>
> Currently I can get Flink to read the source table fine, both with using
> The HCatalog Input format directly, and by using the flink-hcatalog
> wrapper.  Processing the data also works fine. Dumping to console or a text
> file also works fine.
>
> I'm now stuck trying to write the data out, I'm getting
> ClassNotFoundExceptions:
>
> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
> .orc.OrcInputFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<
> init>(FosterStorageHandler.68)
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
> HCatUtil.java:404)
>
> Since I read from an Orc table, I know I have that class in my classpath.
> So I'm wondering if each stage/step in a flink process has some kind of
> special classloader that I am not aware of?  (also its odd that it wants
> the inputformat and not the outputformat, not sure why yet)
>
> My output code looks like this:
>
>
> Job job = Job.getInstance(conf);
>
> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
> "table",null));
> HCatSchema outSchema = HCatOutputFormat.getTableSchem
> a(job.getConfiguration());
> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>
> HCatOutputFormat outputFormat = new HCatOutputFormat();
>
> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
> HadoopOutputFormat(outputFormat, job);
>
> // from previous processing step
> hcat.output(out);
> env.execute("run");
>
>
>
> One other thing to note, I had to put 
> flink-hadoop-compatability_2.11-1.3.2.jar
> into the lib folder of the flink distro.  Building my code in a shaded jar
> with that dependency did not work for me.  However when I put the hive/hcat
> jars in the lib folder it caused lots of other errors.  Since the shading
> didn't work for the hadoop-compatability jar it makes me think there is
> some funky class loader stuff going on.  I don't understand why this doesnt
> work.  The orc code is shaded and verified in my jar, the classes are
> present, plus I successfully read from an ORC table.
>
> Any help or explanation into how the classpath/classloading works would be
> wonderful!
>

Reply via email to