Fabian, Awesome! After your initial email I got things to work by deploying my fat jar into the flink/lib folder, and volia! it worked. :) I will grab your pull request and give it a go tomorrow.
On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Here's the pull request that hopefully fixes your issue: > https://github.com/apache/flink/pull/4690 > > Best, Fabian > > 2017-09-20 16:15 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Garrett, >> >> I think I identified the problem. >> You said you put the Hive/HCat dependencies into your user fat Jar, >> correct? In this case, they are loaded with Flink's userClassLoader (as >> described before). >> >> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly >> loads the user classes with the user class loader. >> However, when the HCatOutputFormat.getOutputCommitter() method is >> called, Hive tries to load additional classes with the current thread class >> loader (see at org.apache.hadoop.hive.common. >> JavaUtils.loadClass(JavaUtils.java:78)). >> This behavior is actually OK, because we usually set the context >> classloader to be the user classloader before calling user code. However, >> this has not been done here. >> So, this is in fact a bug. >> >> I created this JIRA issue: https://issues.apache.org/jira >> /browse/FLINK-7656 and will open a PR for that. >> >> Thanks for helping to diagnose the issue, >> Fabian >> >> 2017-09-19 22:05 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>: >> >>> Fabian, >>> >>> It looks like hive instantiates both input and output formats when >>> doing either. I use hive 1.2.1, and you can see in >>> HCatUtil.getStorageHandler where it tries to load both. It looks like its >>> happening after the writes complete and flink is in the finish/finalize >>> stage. When I watch the counters in the Flink ui, i see all output tasks >>> mark finished along with bytes sent and records sent being exactly what I >>> expect them to be. The first error also mentions the master, is this the >>> flink jobmanager process then? >>> >>> The expanded stacktrace is: >>> >>> Caused by: java.lang.Exception: Failed to finalize execution on master >>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte >>> xFinished(ExecutionGraph.java:1325) >>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec >>> utionFinished(ExecutionVertex.java:688) >>> at org.apache.flink.runtime.executiongraph.Execution.markFinish >>> ed(Execution.java:797) >>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat >>> eState(ExecutionGraph.java:1477) >>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand >>> leMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710) >>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand >>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709) >>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand >>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709) >>> ... 8 more >>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to >>> load foster storage handler >>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma >>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202) >>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz >>> eOnMaster(OutputFormatVertex.java:118) >>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte >>> xFinished(ExecutionGraph.java:1320) >>> ... 14 more >>> Caused by: java.io.IOException: Failed to load foster storage handler >>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H >>> CatUtil.java:409) >>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H >>> CatUtil.java:367) >>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO >>> utputFormat(HCatBaseOutputFormat.java:77) >>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu >>> tCommitter(HCatOutputFormat.java:275) >>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma >>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200) >>> ... 16 more >>> Caused by: java.lang.ClassNotFoundException: >>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>> at java.lang.Class.forName0(Native Method) >>> at java.lang.Class.forName(Class.java:348) >>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78) >>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74) >>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini >>> t>(FosterStorageHandler.68) >>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H >>> CatUtil.java:404) >>> >>> >>> Thank you all for any help. :) >>> >>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Garrett, >>>> >>>> Flink distinguishes between two classloaders: 1) the system classloader >>>> which is the main classloader of the process. This classloader loads all >>>> jars in the ./lib folder and 2) the user classloader which loads the job >>>> jar. >>>> AFAIK, the different operators do not have distinct classloaders. So, >>>> in principle all operators should use the same user classloader. >>>> >>>> According to the stacktrace you posted, the OrcInputFormat cannot be >>>> found when you try to emit to an ORC file. >>>> This looks suspicious because I would rather expect the OrcOutputFormat >>>> to be the problem than the input format. >>>> Can you post more of the stacktrace? This would help to identify the >>>> spot in the Flink code where the exception is thrown. >>>> >>>> Thanks, Fabian >>>> >>>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>: >>>> >>>>> Hey all, >>>>> >>>>> I am trying out a POC with flink on yarn. My simple goal is to read >>>>> from a Hive ORC table, process some data and write to a new Hive ORC >>>>> table. >>>>> >>>>> Currently I can get Flink to read the source table fine, both with >>>>> using The HCatalog Input format directly, and by using the flink-hcatalog >>>>> wrapper. Processing the data also works fine. Dumping to console or a >>>>> text >>>>> file also works fine. >>>>> >>>>> I'm now stuck trying to write the data out, I'm getting >>>>> ClassNotFoundExceptions: >>>>> >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> at java.lang.Class.forName0(Native Method) >>>>> at java.lang.Class.forName(Class.java:348) >>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils. >>>>> java:78) >>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74) >>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini >>>>> t>(FosterStorageHandler.68) >>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H >>>>> CatUtil.java:404) >>>>> >>>>> Since I read from an Orc table, I know I have that class in my >>>>> classpath. So I'm wondering if each stage/step in a flink process has >>>>> some >>>>> kind of special classloader that I am not aware of? (also its odd that it >>>>> wants the inputformat and not the outputformat, not sure why yet) >>>>> >>>>> My output code looks like this: >>>>> >>>>> >>>>> Job job = Job.getInstance(conf); >>>>> >>>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema", >>>>> "table",null)); >>>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem >>>>> a(job.getConfiguration()); >>>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema); >>>>> >>>>> HCatOutputFormat outputFormat = new HCatOutputFormat(); >>>>> >>>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new >>>>> HadoopOutputFormat(outputFormat, job); >>>>> >>>>> // from previous processing step >>>>> hcat.output(out); >>>>> env.execute("run"); >>>>> >>>>> >>>>> >>>>> One other thing to note, I had to put >>>>> flink-hadoop-compatability_2.11-1.3.2.jar >>>>> into the lib folder of the flink distro. Building my code in a shaded jar >>>>> with that dependency did not work for me. However when I put the >>>>> hive/hcat >>>>> jars in the lib folder it caused lots of other errors. Since the shading >>>>> didn't work for the hadoop-compatability jar it makes me think there is >>>>> some funky class loader stuff going on. I don't understand why this >>>>> doesnt >>>>> work. The orc code is shaded and verified in my jar, the classes are >>>>> present, plus I successfully read from an ORC table. >>>>> >>>>> Any help or explanation into how the classpath/classloading works >>>>> would be wonderful! >>>>> >>>> >>>> >>> >> >