Here's the pull request that hopefully fixes your issue: https://github.com/apache/flink/pull/4690
Best, Fabian 2017-09-20 16:15 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Garrett, > > I think I identified the problem. > You said you put the Hive/HCat dependencies into your user fat Jar, > correct? In this case, they are loaded with Flink's userClassLoader (as > described before). > > In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly > loads the user classes with the user class loader. > However, when the HCatOutputFormat.getOutputCommitter() method is called, > Hive tries to load additional classes with the current thread class loader > (see at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils. > java:78)). > This behavior is actually OK, because we usually set the context > classloader to be the user classloader before calling user code. However, > this has not been done here. > So, this is in fact a bug. > > I created this JIRA issue: https://issues.apache.org/ > jira/browse/FLINK-7656 and will open a PR for that. > > Thanks for helping to diagnose the issue, > Fabian > > 2017-09-19 22:05 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>: > >> Fabian, >> >> It looks like hive instantiates both input and output formats when doing >> either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler >> where it tries to load both. It looks like its happening after the writes >> complete and flink is in the finish/finalize stage. When I watch the >> counters in the Flink ui, i see all output tasks mark finished along with >> bytes sent and records sent being exactly what I expect them to be. The >> first error also mentions the master, is this the flink jobmanager process >> then? >> >> The expanded stacktrace is: >> >> Caused by: java.lang.Exception: Failed to finalize execution on master >> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte >> xFinished(ExecutionGraph.java:1325) >> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec >> utionFinished(ExecutionVertex.java:688) >> at org.apache.flink.runtime.executiongraph.Execution.markFinish >> ed(Execution.java:797) >> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat >> eState(ExecutionGraph.java:1477) >> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710) >> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709) >> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709) >> ... 8 more >> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to >> load foster storage handler >> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma >> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202) >> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz >> eOnMaster(OutputFormatVertex.java:118) >> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte >> xFinished(ExecutionGraph.java:1320) >> ... 14 more >> Caused by: java.io.IOException: Failed to load foster storage handler >> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler( >> HCatUtil.java:409) >> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler( >> HCatUtil.java:367) >> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO >> utputFormat(HCatBaseOutputFormat.java:77) >> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu >> tCommitter(HCatOutputFormat.java:275) >> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma >> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200) >> ... 16 more >> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io >> .orc.OrcInputFormat >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:348) >> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78) >> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74) >> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini >> t>(FosterStorageHandler.68) >> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H >> CatUtil.java:404) >> >> >> Thank you all for any help. :) >> >> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fhue...@gmail.com> >> wrote: >> >>> Hi Garrett, >>> >>> Flink distinguishes between two classloaders: 1) the system classloader >>> which is the main classloader of the process. This classloader loads all >>> jars in the ./lib folder and 2) the user classloader which loads the job >>> jar. >>> AFAIK, the different operators do not have distinct classloaders. So, in >>> principle all operators should use the same user classloader. >>> >>> According to the stacktrace you posted, the OrcInputFormat cannot be >>> found when you try to emit to an ORC file. >>> This looks suspicious because I would rather expect the OrcOutputFormat >>> to be the problem than the input format. >>> Can you post more of the stacktrace? This would help to identify the >>> spot in the Flink code where the exception is thrown. >>> >>> Thanks, Fabian >>> >>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>: >>> >>>> Hey all, >>>> >>>> I am trying out a POC with flink on yarn. My simple goal is to read >>>> from a Hive ORC table, process some data and write to a new Hive ORC table. >>>> >>>> Currently I can get Flink to read the source table fine, both with >>>> using The HCatalog Input format directly, and by using the flink-hcatalog >>>> wrapper. Processing the data also works fine. Dumping to console or a text >>>> file also works fine. >>>> >>>> I'm now stuck trying to write the data out, I'm getting >>>> ClassNotFoundExceptions: >>>> >>>> Caused by: java.lang.ClassNotFoundException: >>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>> at java.lang.Class.forName0(Native Method) >>>> at java.lang.Class.forName(Class.java:348) >>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78) >>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74) >>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini >>>> t>(FosterStorageHandler.68) >>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H >>>> CatUtil.java:404) >>>> >>>> Since I read from an Orc table, I know I have that class in my >>>> classpath. So I'm wondering if each stage/step in a flink process has some >>>> kind of special classloader that I am not aware of? (also its odd that it >>>> wants the inputformat and not the outputformat, not sure why yet) >>>> >>>> My output code looks like this: >>>> >>>> >>>> Job job = Job.getInstance(conf); >>>> >>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema", >>>> "table",null)); >>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem >>>> a(job.getConfiguration()); >>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema); >>>> >>>> HCatOutputFormat outputFormat = new HCatOutputFormat(); >>>> >>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new >>>> HadoopOutputFormat(outputFormat, job); >>>> >>>> // from previous processing step >>>> hcat.output(out); >>>> env.execute("run"); >>>> >>>> >>>> >>>> One other thing to note, I had to put >>>> flink-hadoop-compatability_2.11-1.3.2.jar >>>> into the lib folder of the flink distro. Building my code in a shaded jar >>>> with that dependency did not work for me. However when I put the hive/hcat >>>> jars in the lib folder it caused lots of other errors. Since the shading >>>> didn't work for the hadoop-compatability jar it makes me think there is >>>> some funky class loader stuff going on. I don't understand why this doesnt >>>> work. The orc code is shaded and verified in my jar, the classes are >>>> present, plus I successfully read from an ORC table. >>>> >>>> Any help or explanation into how the classpath/classloading works would >>>> be wonderful! >>>> >>> >>> >> >