Here's the pull request that hopefully fixes your issue:
https://github.com/apache/flink/pull/4690

Best, Fabian

2017-09-20 16:15 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Garrett,
>
> I think I identified the problem.
> You said you put the Hive/HCat dependencies into your user fat Jar,
> correct? In this case, they are loaded with Flink's userClassLoader (as
> described before).
>
> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
> loads the user classes with the user class loader.
> However, when the HCatOutputFormat.getOutputCommitter() method is called,
> Hive tries to load additional classes with the current thread class loader
> (see at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
> java:78)).
> This behavior is actually OK, because we usually set the context
> classloader to be the user classloader before calling user code. However,
> this has not been done here.
> So, this is in fact a bug.
>
> I created this JIRA issue: https://issues.apache.org/
> jira/browse/FLINK-7656 and will open a PR for that.
>
> Thanks for helping to diagnose the issue,
> Fabian
>
> 2017-09-19 22:05 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> Fabian,
>>
>>  It looks like hive instantiates both input and output formats when doing
>> either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler
>> where it tries to load both.  It looks like its happening after the writes
>> complete and flink is in the finish/finalize stage.  When I watch the
>> counters in the Flink ui, i see all output tasks mark finished along with
>> bytes sent and records sent being exactly what I expect them to be.  The
>> first error also mentions the master, is this the flink jobmanager process
>> then?
>>
>> The expanded stacktrace is:
>>
>> Caused by: java.lang.Exception: Failed to finalize execution on master
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>> xFinished(ExecutionGraph.java:1325)
>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>> utionFinished(ExecutionVertex.java:688)
>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>> ed(Execution.java:797)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>> eState(ExecutionGraph.java:1477)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>> ... 8 more
>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>> load foster storage handler
>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>> eOnMaster(OutputFormatVertex.java:118)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>> xFinished(ExecutionGraph.java:1320)
>> ... 14 more
>> Caused by: java.io.IOException: Failed to load foster storage handler
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
>> HCatUtil.java:409)
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
>> HCatUtil.java:367)
>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>> utputFormat(HCatBaseOutputFormat.java:77)
>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>> tCommitter(HCatOutputFormat.java:275)
>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
>> ... 16 more
>> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
>> .orc.OrcInputFormat
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>> t>(FosterStorageHandler.68)
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>> CatUtil.java:404)
>>
>>
>> Thank you all for any help. :)
>>
>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>
>>> Hi Garrett,
>>>
>>> Flink distinguishes between two classloaders: 1) the system classloader
>>> which is the main classloader of the process. This classloader loads all
>>> jars in the ./lib folder and 2) the user classloader which loads the job
>>> jar.
>>> AFAIK, the different operators do not have distinct classloaders. So, in
>>> principle all operators should use the same user classloader.
>>>
>>> According to the stacktrace you posted, the OrcInputFormat cannot be
>>> found when you try to emit to an ORC file.
>>> This looks suspicious because I would rather expect the OrcOutputFormat
>>> to be the problem than the input format.
>>> Can you post more of the stacktrace? This would help to identify the
>>> spot in the Flink code where the exception is thrown.
>>>
>>> Thanks, Fabian
>>>
>>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>
>>>> Hey all,
>>>>
>>>>  I am trying out a POC with flink on yarn.  My simple goal is to read
>>>> from a Hive ORC table, process some data and write to a new Hive ORC table.
>>>>
>>>> Currently I can get Flink to read the source table fine, both with
>>>> using The HCatalog Input format directly, and by using the flink-hcatalog
>>>> wrapper.  Processing the data also works fine. Dumping to console or a text
>>>> file also works fine.
>>>>
>>>> I'm now stuck trying to write the data out, I'm getting
>>>> ClassNotFoundExceptions:
>>>>
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Class.java:348)
>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>> t>(FosterStorageHandler.68)
>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>> CatUtil.java:404)
>>>>
>>>> Since I read from an Orc table, I know I have that class in my
>>>> classpath.  So I'm wondering if each stage/step in a flink process has some
>>>> kind of special classloader that I am not aware of?  (also its odd that it
>>>> wants the inputformat and not the outputformat, not sure why yet)
>>>>
>>>> My output code looks like this:
>>>>
>>>>
>>>> Job job = Job.getInstance(conf);
>>>>
>>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
>>>> "table",null));
>>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem
>>>> a(job.getConfiguration());
>>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>>>>
>>>> HCatOutputFormat outputFormat = new HCatOutputFormat();
>>>>
>>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
>>>> HadoopOutputFormat(outputFormat, job);
>>>>
>>>> // from previous processing step
>>>> hcat.output(out);
>>>> env.execute("run");
>>>>
>>>>
>>>>
>>>> One other thing to note, I had to put 
>>>> flink-hadoop-compatability_2.11-1.3.2.jar
>>>> into the lib folder of the flink distro.  Building my code in a shaded jar
>>>> with that dependency did not work for me.  However when I put the hive/hcat
>>>> jars in the lib folder it caused lots of other errors.  Since the shading
>>>> didn't work for the hadoop-compatability jar it makes me think there is
>>>> some funky class loader stuff going on.  I don't understand why this doesnt
>>>> work.  The orc code is shaded and verified in my jar, the classes are
>>>> present, plus I successfully read from an ORC table.
>>>>
>>>> Any help or explanation into how the classpath/classloading works would
>>>> be wonderful!
>>>>
>>>
>>>
>>
>

Reply via email to