Fabian,
 Awesome!  After your initial email I got things to work by deploying my
fat jar into the flink/lib folder, and volia! it worked. :)  I will grab
your pull request and give it a go tomorrow.

On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Here's the pull request that hopefully fixes your issue:
> https://github.com/apache/flink/pull/4690
>
> Best, Fabian
>
> 2017-09-20 16:15 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Garrett,
>>
>> I think I identified the problem.
>> You said you put the Hive/HCat dependencies into your user fat Jar,
>> correct? In this case, they are loaded with Flink's userClassLoader (as
>> described before).
>>
>> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
>> loads the user classes with the user class loader.
>> However, when the HCatOutputFormat.getOutputCommitter() method is
>> called, Hive tries to load additional classes with the current thread class
>> loader (see at org.apache.hadoop.hive.common.
>> JavaUtils.loadClass(JavaUtils.java:78)).
>> This behavior is actually OK, because we usually set the context
>> classloader to be the user classloader before calling user code. However,
>> this has not been done here.
>> So, this is in fact a bug.
>>
>> I created this JIRA issue: https://issues.apache.org/jira
>> /browse/FLINK-7656 and will open a PR for that.
>>
>> Thanks for helping to diagnose the issue,
>> Fabian
>>
>> 2017-09-19 22:05 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>>
>>> Fabian,
>>>
>>>  It looks like hive instantiates both input and output formats when
>>> doing either. I use hive 1.2.1, and you can see in
>>> HCatUtil.getStorageHandler where it tries to load both.  It looks like its
>>> happening after the writes complete and flink is in the finish/finalize
>>> stage.  When I watch the counters in the Flink ui, i see all output tasks
>>> mark finished along with bytes sent and records sent being exactly what I
>>> expect them to be.  The first error also mentions the master, is this the
>>> flink jobmanager process then?
>>>
>>> The expanded stacktrace is:
>>>
>>> Caused by: java.lang.Exception: Failed to finalize execution on master
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>> xFinished(ExecutionGraph.java:1325)
>>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>>> utionFinished(ExecutionVertex.java:688)
>>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>> ed(Execution.java:797)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>> eState(ExecutionGraph.java:1477)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>> ... 8 more
>>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>>> load foster storage handler
>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>>> eOnMaster(OutputFormatVertex.java:118)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>> xFinished(ExecutionGraph.java:1320)
>>> ... 14 more
>>> Caused by: java.io.IOException: Failed to load foster storage handler
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:409)
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:367)
>>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>>> utputFormat(HCatBaseOutputFormat.java:77)
>>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>>> tCommitter(HCatOutputFormat.java:275)
>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
>>> ... 16 more
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>> t>(FosterStorageHandler.68)
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:404)
>>>
>>>
>>> Thank you all for any help. :)
>>>
>>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Garrett,
>>>>
>>>> Flink distinguishes between two classloaders: 1) the system classloader
>>>> which is the main classloader of the process. This classloader loads all
>>>> jars in the ./lib folder and 2) the user classloader which loads the job
>>>> jar.
>>>> AFAIK, the different operators do not have distinct classloaders. So,
>>>> in principle all operators should use the same user classloader.
>>>>
>>>> According to the stacktrace you posted, the OrcInputFormat cannot be
>>>> found when you try to emit to an ORC file.
>>>> This looks suspicious because I would rather expect the OrcOutputFormat
>>>> to be the problem than the input format.
>>>> Can you post more of the stacktrace? This would help to identify the
>>>> spot in the Flink code where the exception is thrown.
>>>>
>>>> Thanks, Fabian
>>>>
>>>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>>
>>>>> Hey all,
>>>>>
>>>>>  I am trying out a POC with flink on yarn.  My simple goal is to read
>>>>> from a Hive ORC table, process some data and write to a new Hive ORC 
>>>>> table.
>>>>>
>>>>> Currently I can get Flink to read the source table fine, both with
>>>>> using The HCatalog Input format directly, and by using the flink-hcatalog
>>>>> wrapper.  Processing the data also works fine. Dumping to console or a 
>>>>> text
>>>>> file also works fine.
>>>>>
>>>>> I'm now stuck trying to write the data out, I'm getting
>>>>> ClassNotFoundExceptions:
>>>>>
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>> at java.lang.Class.forName0(Native Method)
>>>>> at java.lang.Class.forName(Class.java:348)
>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
>>>>> java:78)
>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>>> t>(FosterStorageHandler.68)
>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>> CatUtil.java:404)
>>>>>
>>>>> Since I read from an Orc table, I know I have that class in my
>>>>> classpath.  So I'm wondering if each stage/step in a flink process has 
>>>>> some
>>>>> kind of special classloader that I am not aware of?  (also its odd that it
>>>>> wants the inputformat and not the outputformat, not sure why yet)
>>>>>
>>>>> My output code looks like this:
>>>>>
>>>>>
>>>>> Job job = Job.getInstance(conf);
>>>>>
>>>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
>>>>> "table",null));
>>>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem
>>>>> a(job.getConfiguration());
>>>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>>>>>
>>>>> HCatOutputFormat outputFormat = new HCatOutputFormat();
>>>>>
>>>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
>>>>> HadoopOutputFormat(outputFormat, job);
>>>>>
>>>>> // from previous processing step
>>>>> hcat.output(out);
>>>>> env.execute("run");
>>>>>
>>>>>
>>>>>
>>>>> One other thing to note, I had to put 
>>>>> flink-hadoop-compatability_2.11-1.3.2.jar
>>>>> into the lib folder of the flink distro.  Building my code in a shaded jar
>>>>> with that dependency did not work for me.  However when I put the 
>>>>> hive/hcat
>>>>> jars in the lib folder it caused lots of other errors.  Since the shading
>>>>> didn't work for the hadoop-compatability jar it makes me think there is
>>>>> some funky class loader stuff going on.  I don't understand why this 
>>>>> doesnt
>>>>> work.  The orc code is shaded and verified in my jar, the classes are
>>>>> present, plus I successfully read from an ORC table.
>>>>>
>>>>> Any help or explanation into how the classpath/classloading works
>>>>> would be wonderful!
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to