Which field?the Tuple2?I use it with Flink 0.8.1 without errors On Apr 3, 2015 2:27 AM, <fhue...@gmail.com> wrote:
> If Put is not Serializable it cannot be serialized and shipped. > > Is it possible to make that field transient and initialize Put in > configure()? > > > > > > > From: Flavio Pompermaier > Sent: Friday, 3. April, 2015 01:42 > To: dev@flink.apache.org > > > > > > Now I made my fork (https://github.com/fpompermaier/flink) but when I run > the application I get this error: > > java.io.NotSerializableException: org.apache.hadoop.hbase.client.Put > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at java.util.ArrayList.writeObject(ArrayList.java:742) > at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:286) > at > > org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:74) > > I started from the wordcount example and my code is: > Job job = Job.getInstance(); > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > outputTableName); > job.getConfiguration().set("mapred.output.dir","/tmp/test"); > counts.map(new MapFunction<Tuple2<String,Integer>, Tuple2<Text,Mutation>>() > { > private final byte[] CF_SOME = Bytes.toBytes("test-column"); > private final byte[] Q_SOME = Bytes.toBytes("value"); > private Tuple2<Text, Mutation> reuse = new Tuple2<Text, Mutation>(); > > @Override > public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws > Exception { > reuse.f0 = new Text(t.f0); > Put put = new Put(t.f0.getBytes()); > put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1)); > reuse.f1 = put; > return reuse; > } > }).output(new HadoopOutputFormat<Text, Mutation>(new > TableOutputFormat<Text>(), job)); > > Do I have to register how to serialize Put somewhere? > > On Wed, Apr 1, 2015 at 2:32 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > > What ever works best for you. > > We can easily backport or forwardport the patch. > > > > 2015-04-01 14:12 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > > > > > Ok..I'd like to have this fix in the next release. Should I branch > Flink > > > 0.8.1 or 0.9 or which version? > > > > > > On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <m...@apache.org> > > wrote: > > > > > > > Hi Flavio, > > > > > > > > Thanks for looking into this problem. Actually, it's a bit difficult > to > > > > discuss your changes here because of the formatting/syntax > highlighting > > > and > > > > missing context of the classes. Usually, we do that in a pull > request. > > Do > > > > you have a GitHub account? If so, push your changes to your forked > > Flink > > > > repository. GitHub will then offer you to create a pull request for > > your > > > > modified branch. > > > > > > > > Let's discuss your changes on GitHub. > > > > > > > > Best, > > > > Max > > > > > > > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier < > > pomperma...@okkam.it > > > > > > > > wrote: > > > > > > > > > Any feedback about this? > > > > > > > > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier < > > > > pomperma...@okkam.it> > > > > > wrote: > > > > > > > > > > > Hi Flink devs, > > > > > > this is my final report about the HBaseOutputFormat problem (with > > > Flink > > > > > > 0.8.1) and I hope you could suggest me the best way to make a PR: > > > > > > > > > > > > 1) The following code produce the error reported below (this > should > > > be > > > > > > fixed in 0.9 right?) > > > > > > Job job = Job.getInstance(); > > > > > > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > > > > > > *TableOutputFormat*<Text>(), job)); > > > > > > > > > > > > org.apache.flink.api.common.functions.InvalidTypesException: > > > Interfaces > > > > > > and abstract classes are not valid types: class > > > > > > org.apache.hadoop.hbase.client.Mutation > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > > > > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > > > > > > > > > > > 2) So I created a custom HBaseTableOutputFormat -*see at the end > > of > > > > the > > > > > > mail-* (that is basically copied from to the HBase > > TableInputFormat) > > > > that > > > > > > sets correctly the "mapred.output.dir" param required by the > > > > > > HadoopOutputFormatBase so I can make it work: > > > > > > Job job = Job.getInstance(); > > > > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > > > > > > outputTableName); > > > > > > HBaseTableOutputFormat<Text> hbaseTOF = new > > > HBaseTableOutputFormat<>(); > > > > > > HadoopOutputFormat<Text, Put> outOF = new > > > > > > HadoopOutputFormat<>(hbaseTOF, job); > > > > > > myDataset.output(outOF); > > > > > > > > > > > > 3) However this does still not work unless you call setConf() of > > > > > > Configurable subclasses in the HadoopOutputFormatBase: > > > > > > > > > > > > - in the* public void finalizeGlobal(int parallelism) throws > > > > IOException* > > > > > > method: > > > > > > .... > > > > > > * if(this.mapreduceOutputFormat instanceof > > > > Configurable){* > > > > > > * > > > > > > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > > > * }* > > > > > > this.fileOutputCommitter = new FileOutputCommitter(new > > > > > > Path(this.configuration.get("mapred.output.dir")), taskContext); > > > > > > .... > > > > > > - In the* public void open(int taskNumber, int numTasks) throws > > > > > > IOException* method: > > > > > > .... > > > > > > > > > > > > * if(this.mapreduceOutputFormat instanceof > > > > Configurable){* > > > > > > * > > > > > > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > > > * }* > > > > > > try { > > > > > > this.context = > > > > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > > > > > > taskAttemptID); > > > > > > } catch (Exception e) { > > > > > > throw new RuntimeException(e); > > > > > > } > > > > > > .... > > > > > > > > > > > > 4) Probably the modifications apported in point 3 should be > applied > > > > both > > > > > > for mapreduce and mapred packages.. > > > > > > > > > > > > Thanks in advace, > > > > > > Flavio > > > > > > > > > > > > > > > > > > > > > > > > > > > ----------------------------------------------------------------------- > > > > > > this is the HadoopOutputFormatBase.java: > > > > > > > > > ----------------------------------------------------------------------- > > > > > > import java.io.IOException; > > > > > > > > > > > > import org.apache.commons.logging.Log; > > > > > > import org.apache.commons.logging.LogFactory; > > > > > > import org.apache.hadoop.classification.InterfaceAudience; > > > > > > import org.apache.hadoop.classification.InterfaceStability; > > > > > > import org.apache.hadoop.conf.Configurable; > > > > > > import org.apache.hadoop.conf.Configuration; > > > > > > import org.apache.hadoop.hbase.HBaseConfiguration; > > > > > > import org.apache.hadoop.hbase.HConstants; > > > > > > import org.apache.hadoop.hbase.client.Delete; > > > > > > import org.apache.hadoop.hbase.client.HTable; > > > > > > import org.apache.hadoop.hbase.client.Put; > > > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > > > > > > import org.apache.hadoop.hbase.util.FSUtils; > > > > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > > > > > > import org.apache.hadoop.mapreduce.JobContext; > > > > > > import org.apache.hadoop.mapreduce.OutputCommitter; > > > > > > import org.apache.hadoop.mapreduce.OutputFormat; > > > > > > import org.apache.hadoop.mapreduce.RecordWriter; > > > > > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > > > > > > > > > > > /** > > > > > > * Convert Map/Reduce output and write it to an HBase table. The > > KEY > > > is > > > > > > ignored > > > > > > * while the output value <u>must</u> be either a {@link Put} or > a > > > > > > * {@link Delete} instance. > > > > > > * > > > > > > * @param <KEY> The type of the key. Ignored in this class. > > > > > > */ > > > > > > @InterfaceAudience.Public > > > > > > @InterfaceStability.Stable > > > > > > public class HBaseTableOutputFormat<KEY>* extends > OutputFormat<KEY, > > > > Put>* > > > > > > implements Configurable { > > > > > > > > > > > > private final Log LOG = > > > > > LogFactory.getLog(HBaseTableOutputFormat.class); > > > > > > > > > > > > /** Job parameter that specifies the output table. */ > > > > > > public static final String OUTPUT_TABLE = > > > "hbase.mapred.outputtable"; > > > > > > > > > > > > /** > > > > > > * Optional job parameter to specify a peer cluster. > > > > > > * Used specifying remote cluster when copying between hbase > > > clusters > > > > > > (the > > > > > > * source is picked up from <code>hbase-site.xml</code>). > > > > > > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > > > > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > > > > > > */ > > > > > > public static final String QUORUM_ADDRESS = > > > > > "hbase.mapred.output.quorum"; > > > > > > > > > > > > /** Optional job parameter to specify peer cluster's ZK client > > port > > > > */ > > > > > > public static final String QUORUM_PORT = > > > > > > "hbase.mapred.output.quorum.port"; > > > > > > > > > > > > /** Optional specification of the rs class name of the peer > > cluster > > > > */ > > > > > > public static final String > > > > > > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > > > > > > /** Optional specification of the rs impl name of the peer > > cluster > > > */ > > > > > > public static final String > > > > > > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > > > > > > > > > > > /** The configuration. */ > > > > > > private Configuration conf = null; > > > > > > > > > > > > private HTable table; > > > > > > > > > > > > /** > > > > > > * Writes the reducer output to an HBase table. > > > > > > * > > > > > > * @param <KEY> The type of the key. > > > > > > */ > > > > > > protected static class TableRecordWriter<KEY> > > > > > > *extends RecordWriter<KEY, Put> *{ > > > > > > > > > > > > /** The table to write to. */ > > > > > > private HTable table; > > > > > > > > > > > > /** > > > > > > * Instantiate a TableRecordWriter with the HBase HClient for > > > > > writing. > > > > > > * > > > > > > * @param table The table to write to. > > > > > > */ > > > > > > public TableRecordWriter(HTable table) { > > > > > > this.table = table; > > > > > > } > > > > > > > > > > > > /** > > > > > > * Closes the writer, in this case flush table commits. > > > > > > * > > > > > > * @param context The context. > > > > > > * @throws IOException When closing the writer fails. > > > > > > * @see > > > > > > > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > > */ > > > > > > @Override > > > > > > public void close(TaskAttemptContext context) > > > > > > throws IOException { > > > > > > table.close(); > > > > > > } > > > > > > > > > > > > /** > > > > > > * Writes a key/value pair into the table. > > > > > > * > > > > > > * @param key The key. > > > > > > * @param value The value. > > > > > > * @throws IOException When writing fails. > > > > > > * @see > > > > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > > > > > > java.lang.Object) > > > > > > */ > > > > > > @Override > > > > > > *public void write(KEY key, Put value)* > > > > > > * throws IOException {* > > > > > > * if (value instanceof Put) this.table.put(new > > > Put((Put)value));* > > > > > > *// else if (value instanceof Delete) this.table.delete(new > > > > > > Delete((Delete)value));* > > > > > > * else throw new IOException("Pass a Delete or a Put");* > > > > > > * }* > > > > > > } > > > > > > > > > > > > /** > > > > > > * Creates a new record writer. > > > > > > * > > > > > > * @param context The current task context. > > > > > > * @return The newly created writer instance. > > > > > > * @throws IOException When creating the writer fails. > > > > > > * @throws InterruptedException When the jobs is cancelled. > > > > > > * @see > > > > > > > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > > */ > > > > > > @Override > > > > > > public RecordWriter<KEY, *Put*> getRecordWriter( > > > > > > TaskAttemptContext context) > > > > > > throws IOException, InterruptedException { > > > > > > return new TableRecordWriter<KEY>(this.table); > > > > > > } > > > > > > > > > > > > /** > > > > > > * Checks if the output target exists. > > > > > > * > > > > > > * @param context The current context. > > > > > > * @throws IOException When the check fails. > > > > > > * @throws InterruptedException When the job is aborted. > > > > > > * @see > > > > > > > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > > > > > > */ > > > > > > @Override > > > > > > public void checkOutputSpecs(JobContext context) throws > > > IOException, > > > > > > InterruptedException { > > > > > > // TODO Check if the table exists? > > > > > > > > > > > > } > > > > > > > > > > > > /** > > > > > > * Returns the output committer. > > > > > > * > > > > > > * @param context The current context. > > > > > > * @return The committer. > > > > > > * @throws IOException When creating the committer fails. > > > > > > * @throws InterruptedException When the job is aborted. > > > > > > * @see > > > > > > > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > > */ > > > > > > @Override > > > > > > public OutputCommitter getOutputCommitter(TaskAttemptContext > > > context) > > > > > > throws IOException, InterruptedException { > > > > > > return new TableOutputCommitter(); > > > > > > } > > > > > > > > > > > > public Configuration getConf() { > > > > > > return conf; > > > > > > } > > > > > > > > > > > > @Override > > > > > > public void setConf(Configuration otherConf) { > > > > > > this.conf = HBaseConfiguration.create(otherConf); > > > > > > > > > > > > String tableName = this.conf.get(OUTPUT_TABLE); > > > > > > if(tableName == null || tableName.length() <= 0) { > > > > > > throw new IllegalArgumentException("Must specify table > > name"); > > > > > > } > > > > > > > > > > > > String address = this.conf.get(QUORUM_ADDRESS); > > > > > > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > > > > > > String serverClass = this.conf.get(REGION_SERVER_CLASS); > > > > > > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > > > > > > > > > > > try { > > > > > > if (address != null) { > > > > > > ZKUtil.applyClusterKeyToConf(this.conf, address); > > > > > > } > > > > > > if (serverClass != null) { > > > > > > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > > > > > > } > > > > > > if (zkClientPort != 0) { > > > > > > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, > > > > zkClientPort); > > > > > > } > > > > > > this.table = new HTable(this.conf, tableName); > > > > > > this.table.setAutoFlush(false, true); > > > > > > * String outDir = > > FSUtils.getTableDir(FSUtils.getRootDir(conf), > > > > > > this.table.getName()).toString();* > > > > > > * this.conf.set("mapred.output.dir", outDir);* > > > > > > * otherConf.set("mapred.output.dir", outDir);* > > > > > > LOG.info("Created table instance for " + tableName); > > > > > > } catch(IOException e) { > > > > > > LOG.error(e); > > > > > > throw new RuntimeException(e); > > > > > > } > > > > > > } > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > >