To make it work I had to clone the Flink repo, imporrt the Flink-java project and modify the HadoopOutputFormatBase in the open() and finalizeGlobal and call
if(this.mapreduceOutputFormat instanceof Configurable){ ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration); } otherwise the "mapred.output.dir" property was always null :( On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Hi guys, > > I was trying to insert into an HBase table with Flink 0.8.1 and it seems > to be not possible without creating a custom version of the HBase > TableOutputFormat that specialize Mutation with Put. > This is my code using the standard Flink APIs: > > myds.output(new HadoopOutputFormat<Text, Put>(new > TableOutputFormat<Text>(), job)); > > and this is the Exception I get: > > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and > abstract classes are not valid types: class > org.apache.hadoop.hbase.client.Mutation > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > .... > > So I had to copy the TableOutputFormat, rename it as > HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type > argument. > However the table filed is not initialized because setConf is not called. > Is this a bug of the HadoopOutputFormat wrapper that does not check is the > outputFormat is an instance of Configurable and call setConf (as it happens > for the inputSlit)? > > Best, > Flavio >