Hey all, I am trying out a POC with flink on yarn. My simple goal is to read from a Hive ORC table, process some data and write to a new Hive ORC table.
Currently I can get Flink to read the source table fine, both with using The HCatalog Input format directly, and by using the flink-hcatalog wrapper. Processing the data also works fine. Dumping to console or a text file also works fine. I'm now stuck trying to write the data out, I'm getting ClassNotFoundExceptions: Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io. orc.OrcInputFormat at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78) at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74) at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<init>( FosterStorageHandler.68) at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil. java:404) Since I read from an Orc table, I know I have that class in my classpath. So I'm wondering if each stage/step in a flink process has some kind of special classloader that I am not aware of? (also its odd that it wants the inputformat and not the outputformat, not sure why yet) My output code looks like this: Job job = Job.getInstance(conf); HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema", "table",null)); HCatSchema outSchema = HCatOutputFormat.getTableSchema(job. getConfiguration()); HCatOutputFormat.setSchema(job.getConfiguration(), outSchema); HCatOutputFormat outputFormat = new HCatOutputFormat(); HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new HadoopOutputFormat(outputFormat, job); // from previous processing step hcat.output(out); env.execute("run"); One other thing to note, I had to put flink-hadoop-compatability_2.11-1.3.2.jar into the lib folder of the flink distro. Building my code in a shaded jar with that dependency did not work for me. However when I put the hive/hcat jars in the lib folder it caused lots of other errors. Since the shading didn't work for the hadoop-compatability jar it makes me think there is some funky class loader stuff going on. I don't understand why this doesnt work. The orc code is shaded and verified in my jar, the classes are present, plus I successfully read from an ORC table. Any help or explanation into how the classpath/classloading works would be wonderful!