Here's the pull request that hopefully fixes your issue:

Best, Fabian

2017-09-20 16:15 GMT+02:00 Fabian Hueske <>:

> Hi Garrett,
> I think I identified the problem.
> You said you put the Hive/HCat dependencies into your user fat Jar,
> correct? In this case, they are loaded with Flink's userClassLoader (as
> described before).
> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
> loads the user classes with the user class loader.
> However, when the HCatOutputFormat.getOutputCommitter() method is called,
> Hive tries to load additional classes with the current thread class loader
> (see at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
> java:78)).
> This behavior is actually OK, because we usually set the context
> classloader to be the user classloader before calling user code. However,
> this has not been done here.
> So, this is in fact a bug.
> I created this JIRA issue:
> jira/browse/FLINK-7656 and will open a PR for that.
> Thanks for helping to diagnose the issue,
> Fabian
> 2017-09-19 22:05 GMT+02:00 Garrett Barton <>:
>> Fabian,
>>  It looks like hive instantiates both input and output formats when doing
>> either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler
>> where it tries to load both.  It looks like its happening after the writes
>> complete and flink is in the finish/finalize stage.  When I watch the
>> counters in the Flink ui, i see all output tasks mark finished along with
>> bytes sent and records sent being exactly what I expect them to be.  The
>> first error also mentions the master, is this the flink jobmanager process
>> then?
>> The expanded stacktrace is:
>> Caused by: java.lang.Exception: Failed to finalize execution on master
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>> xFinished(
>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>> utionFinished(
>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>> ed(
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>> eState(
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>> ... 8 more
>> Caused by: java.lang.RuntimeException: Failed to
>> load foster storage handler
>> at
>> tBase.finalizeGlobal(
>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>> eOnMaster(
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>> xFinished(
>> ... 14 more
>> Caused by: Failed to load foster storage handler
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>> utputFormat(
>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>> tCommitter(
>> at
>> tBase.finalizeGlobal(
>> ... 16 more
>> Caused by: java.lang.ClassNotFoundException:
>> .orc.OrcInputFormat
>> at
>> at java.lang.ClassLoader.loadClass(
>> at sun.misc.Launcher$AppClassLoader.loadClass(
>> at java.lang.ClassLoader.loadClass(
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(
>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(
>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>> t>(FosterStorageHandler.68)
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>> Thank you all for any help. :)
>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <>
>> wrote:
>>> Hi Garrett,
>>> Flink distinguishes between two classloaders: 1) the system classloader
>>> which is the main classloader of the process. This classloader loads all
>>> jars in the ./lib folder and 2) the user classloader which loads the job
>>> jar.
>>> AFAIK, the different operators do not have distinct classloaders. So, in
>>> principle all operators should use the same user classloader.
>>> According to the stacktrace you posted, the OrcInputFormat cannot be
>>> found when you try to emit to an ORC file.
>>> This looks suspicious because I would rather expect the OrcOutputFormat
>>> to be the problem than the input format.
>>> Can you post more of the stacktrace? This would help to identify the
>>> spot in the Flink code where the exception is thrown.
>>> Thanks, Fabian
>>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <>:
>>>> Hey all,
>>>>  I am trying out a POC with flink on yarn.  My simple goal is to read
>>>> from a Hive ORC table, process some data and write to a new Hive ORC table.
>>>> Currently I can get Flink to read the source table fine, both with
>>>> using The HCatalog Input format directly, and by using the flink-hcatalog
>>>> wrapper.  Processing the data also works fine. Dumping to console or a text
>>>> file also works fine.
>>>> I'm now stuck trying to write the data out, I'm getting
>>>> ClassNotFoundExceptions:
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> at
>>>> at java.lang.ClassLoader.loadClass(
>>>> at sun.misc.Launcher$AppClassLoader.loadClass(
>>>> at java.lang.ClassLoader.loadClass(
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(
>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(
>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>> t>(FosterStorageHandler.68)
>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>> Since I read from an Orc table, I know I have that class in my
>>>> classpath.  So I'm wondering if each stage/step in a flink process has some
>>>> kind of special classloader that I am not aware of?  (also its odd that it
>>>> wants the inputformat and not the outputformat, not sure why yet)
>>>> My output code looks like this:
>>>> Job job = Job.getInstance(conf);
>>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
>>>> "table",null));
>>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem
>>>> a(job.getConfiguration());
>>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>>>> HCatOutputFormat outputFormat = new HCatOutputFormat();
>>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
>>>> HadoopOutputFormat(outputFormat, job);
>>>> // from previous processing step
>>>> hcat.output(out);
>>>> env.execute("run");
>>>> One other thing to note, I had to put 
>>>> flink-hadoop-compatability_2.11-1.3.2.jar
>>>> into the lib folder of the flink distro.  Building my code in a shaded jar
>>>> with that dependency did not work for me.  However when I put the hive/hcat
>>>> jars in the lib folder it caused lots of other errors.  Since the shading
>>>> didn't work for the hadoop-compatability jar it makes me think there is
>>>> some funky class loader stuff going on.  I don't understand why this doesnt
>>>> work.  The orc code is shaded and verified in my jar, the classes are
>>>> present, plus I successfully read from an ORC table.
>>>> Any help or explanation into how the classpath/classloading works would
>>>> be wonderful!

Reply via email to