Hi Flavio! Is this on Flink 0.9-SNAPSHOT or 0.8.1 ?
Stephan On Fri, Mar 20, 2015 at 6:03 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > To make it work I had to clone the Flink repo, imporrt the Flink-java > project and modify the > HadoopOutputFormatBase in the open() and finalizeGlobal and call > > if(this.mapreduceOutputFormat instanceof Configurable){ > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration); > } > otherwise the "mapred.output.dir" property was always null :( > > On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier <pomperma...@okkam.it > > wrote: > >> Hi guys, >> >> I was trying to insert into an HBase table with Flink 0.8.1 and it seems >> to be not possible without creating a custom version of the HBase >> TableOutputFormat that specialize Mutation with Put. >> This is my code using the standard Flink APIs: >> >> myds.output(new HadoopOutputFormat<Text, Put>(new >> TableOutputFormat<Text>(), job)); >> >> and this is the Exception I get: >> >> Exception in thread "main" >> org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and >> abstract classes are not valid types: class >> org.apache.hadoop.hbase.client.Mutation >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) >> .... >> >> So I had to copy the TableOutputFormat, rename it as >> HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type >> argument. >> However the table filed is not initialized because setConf is not called. >> Is this a bug of the HadoopOutputFormat wrapper that does not check is the >> outputFormat is an instance of Configurable and call setConf (as it happens >> for the inputSlit)? >> >> Best, >> Flavio >> > > > >