Which field?the Tuple2?I use it with Flink 0.8.1 without errors
On Apr 3, 2015 2:27 AM, <fhue...@gmail.com> wrote:

> If Put is not Serializable it cannot be serialized and shipped.
>
> Is it possible to make that field transient and initialize Put in
> configure()?
>
>
>
>
>
>
> From: Flavio Pompermaier
> Sent: ‎Friday‎, ‎3‎. ‎April‎, ‎2015 ‎01‎:‎42
> To: dev@flink.apache.org
>
>
>
>
>
> Now I made my fork (https://github.com/fpompermaier/flink) but when I run
> the application I get this error:
>
>  java.io.NotSerializableException: org.apache.hadoop.hbase.client.Put
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at java.util.ArrayList.writeObject(ArrayList.java:742)
> at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
>
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:286)
> at
>
> org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:74)
>
> I started from the wordcount example and my code is:
>                         Job job = Job.getInstance();
> job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> outputTableName);
> job.getConfiguration().set("mapred.output.dir","/tmp/test");
> counts.map(new MapFunction<Tuple2<String,Integer>, Tuple2<Text,Mutation>>()
> {
> private final byte[] CF_SOME = Bytes.toBytes("test-column");
> private final byte[] Q_SOME = Bytes.toBytes("value");
> private Tuple2<Text, Mutation> reuse = new Tuple2<Text, Mutation>();
>
> @Override
> public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws
> Exception {
> reuse.f0 = new Text(t.f0);
> Put put = new Put(t.f0.getBytes());
> put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1));
> reuse.f1 = put;
> return reuse;
> }
> }).output(new HadoopOutputFormat<Text, Mutation>(new
> TableOutputFormat<Text>(), job));
>
> Do I have to register how to serialize Put somewhere?
>
> On Wed, Apr 1, 2015 at 2:32 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > What ever works best for you.
> > We can easily backport or forwardport the patch.
> >
> > 2015-04-01 14:12 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
> >
> > > Ok..I'd like to have this fix in the next release. Should I branch
> Flink
> > > 0.8.1 or 0.9 or which version?
> > >
> > > On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <m...@apache.org>
> > wrote:
> > >
> > > > Hi Flavio,
> > > >
> > > > Thanks for looking into this problem. Actually, it's a bit difficult
> to
> > > > discuss your changes here because of the formatting/syntax
> highlighting
> > > and
> > > > missing context of the classes. Usually, we do that in a pull
> request.
> > Do
> > > > you have a GitHub account? If so, push your changes to your forked
> > Flink
> > > > repository. GitHub will then offer you to create a pull request for
> > your
> > > > modified branch.
> > > >
> > > > Let's discuss your changes on GitHub.
> > > >
> > > > Best,
> > > > Max
> > > >
> > > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <
> > pomperma...@okkam.it
> > > >
> > > > wrote:
> > > >
> > > > > Any feedback about this?
> > > > >
> > > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <
> > > > pomperma...@okkam.it>
> > > > > wrote:
> > > > >
> > > > > > Hi Flink devs,
> > > > > > this is my final report about the HBaseOutputFormat problem (with
> > > Flink
> > > > > > 0.8.1) and I hope you could suggest me the best way to make a PR:
> > > > > >
> > > > > > 1) The following code produce the error reported below (this
> should
> > > be
> > > > > > fixed in 0.9 right?)
> > > > > >       Job job = Job.getInstance();
> > > > > >   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> > > > > > *TableOutputFormat*<Text>(), job));
> > > > > >
> > > > > > org.apache.flink.api.common.functions.InvalidTypesException:
> > > Interfaces
> > > > > > and abstract classes are not valid types: class
> > > > > > org.apache.hadoop.hbase.client.Mutation
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> > > > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
> > > > > >
> > > > > > 2)  So I created a custom HBaseTableOutputFormat -*see at the end
> > of
> > > > the
> > > > > > mail-* (that is basically copied from to the HBase
> > TableInputFormat)
> > > > that
> > > > > >  sets correctly the "mapred.output.dir" param required by the
> > > > > > HadoopOutputFormatBase so I can make it work:
> > > > > >                 Job job = Job.getInstance();
> > > > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> > > > > > outputTableName);
> > > > > > HBaseTableOutputFormat<Text> hbaseTOF = new
> > > HBaseTableOutputFormat<>();
> > > > > > HadoopOutputFormat<Text, Put> outOF = new
> > > > > > HadoopOutputFormat<>(hbaseTOF, job);
> > > > > > myDataset.output(outOF);
> > > > > >
> > > > > > 3) However this does still not work unless you call setConf() of
> > > > > > Configurable subclasses in the HadoopOutputFormatBase:
> > > > > >
> > > > > > - in the* public void finalizeGlobal(int parallelism) throws
> > > > IOException*
> > > > > >  method:
> > > > > > ....
> > > > > >                * if(this.mapreduceOutputFormat instanceof
> > > > Configurable){*
> > > > > > *
> > > > >
> > >
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > > > * }*
> > > > > > this.fileOutputCommitter = new FileOutputCommitter(new
> > > > > > Path(this.configuration.get("mapred.output.dir")), taskContext);
> > > > > > ....
> > > > > > - In the* public void open(int taskNumber, int numTasks) throws
> > > > > > IOException*  method:
> > > > > > ....
> > > > > >
> > > > > >               *  if(this.mapreduceOutputFormat instanceof
> > > > Configurable){*
> > > > > > *
> > > > >
> > >
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > > > * }*
> > > > > >  try {
> > > > > > this.context =
> > > > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> > > > > > taskAttemptID);
> > > > > > } catch (Exception e) {
> > > > > > throw new RuntimeException(e);
> > > > > > }
> > > > > > ....
> > > > > >
> > > > > > 4) Probably the modifications apported in point 3 should be
> applied
> > > > both
> > > > > > for mapreduce and mapred packages..
> > > > > >
> > > > > > Thanks in advace,
> > > > > > Flavio
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > -----------------------------------------------------------------------
> > > > > > this is the HadoopOutputFormatBase.java:
> > > > > >
> > > -----------------------------------------------------------------------
> > > > > > import java.io.IOException;
> > > > > >
> > > > > > import org.apache.commons.logging.Log;
> > > > > > import org.apache.commons.logging.LogFactory;
> > > > > > import org.apache.hadoop.classification.InterfaceAudience;
> > > > > > import org.apache.hadoop.classification.InterfaceStability;
> > > > > > import org.apache.hadoop.conf.Configurable;
> > > > > > import org.apache.hadoop.conf.Configuration;
> > > > > > import org.apache.hadoop.hbase.HBaseConfiguration;
> > > > > > import org.apache.hadoop.hbase.HConstants;
> > > > > > import org.apache.hadoop.hbase.client.Delete;
> > > > > > import org.apache.hadoop.hbase.client.HTable;
> > > > > > import org.apache.hadoop.hbase.client.Put;
> > > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> > > > > > import org.apache.hadoop.hbase.util.FSUtils;
> > > > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> > > > > > import org.apache.hadoop.mapreduce.JobContext;
> > > > > > import org.apache.hadoop.mapreduce.OutputCommitter;
> > > > > > import org.apache.hadoop.mapreduce.OutputFormat;
> > > > > > import org.apache.hadoop.mapreduce.RecordWriter;
> > > > > > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> > > > > >
> > > > > > /**
> > > > > >  * Convert Map/Reduce output and write it to an HBase table. The
> > KEY
> > > is
> > > > > > ignored
> > > > > >  * while the output value <u>must</u> be either a {@link Put} or
> a
> > > > > >  * {@link Delete} instance.
> > > > > >  *
> > > > > >  * @param <KEY>  The type of the key. Ignored in this class.
> > > > > >  */
> > > > > > @InterfaceAudience.Public
> > > > > > @InterfaceStability.Stable
> > > > > > public class HBaseTableOutputFormat<KEY>* extends
> OutputFormat<KEY,
> > > > Put>*
> > > > > > implements Configurable {
> > > > > >
> > > > > >   private final Log LOG =
> > > > > LogFactory.getLog(HBaseTableOutputFormat.class);
> > > > > >
> > > > > >   /** Job parameter that specifies the output table. */
> > > > > >   public static final String OUTPUT_TABLE =
> > > "hbase.mapred.outputtable";
> > > > > >
> > > > > >   /**
> > > > > >    * Optional job parameter to specify a peer cluster.
> > > > > >    * Used specifying remote cluster when copying between hbase
> > > clusters
> > > > > > (the
> > > > > >    * source is picked up from <code>hbase-site.xml</code>).
> > > > > >    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> > > > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String)
> > > > > >    */
> > > > > >   public static final String QUORUM_ADDRESS =
> > > > > "hbase.mapred.output.quorum";
> > > > > >
> > > > > >   /** Optional job parameter to specify peer cluster's ZK client
> > port
> > > > */
> > > > > >   public static final String QUORUM_PORT =
> > > > > > "hbase.mapred.output.quorum.port";
> > > > > >
> > > > > >   /** Optional specification of the rs class name of the peer
> > cluster
> > > > */
> > > > > >   public static final String
> > > > > >       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
> > > > > >   /** Optional specification of the rs impl name of the peer
> > cluster
> > > */
> > > > > >   public static final String
> > > > > >       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
> > > > > >
> > > > > >   /** The configuration. */
> > > > > >   private Configuration conf = null;
> > > > > >
> > > > > >   private HTable table;
> > > > > >
> > > > > >   /**
> > > > > >    * Writes the reducer output to an HBase table.
> > > > > >    *
> > > > > >    * @param <KEY>  The type of the key.
> > > > > >    */
> > > > > >   protected static class TableRecordWriter<KEY>
> > > > > >   *extends RecordWriter<KEY, Put> *{
> > > > > >
> > > > > >     /** The table to write to. */
> > > > > >     private HTable table;
> > > > > >
> > > > > >     /**
> > > > > >      * Instantiate a TableRecordWriter with the HBase HClient for
> > > > > writing.
> > > > > >      *
> > > > > >      * @param table  The table to write to.
> > > > > >      */
> > > > > >     public TableRecordWriter(HTable table) {
> > > > > >       this.table = table;
> > > > > >     }
> > > > > >
> > > > > >     /**
> > > > > >      * Closes the writer, in this case flush table commits.
> > > > > >      *
> > > > > >      * @param context  The context.
> > > > > >      * @throws IOException When closing the writer fails.
> > > > > >      * @see
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > > >      */
> > > > > >     @Override
> > > > > >     public void close(TaskAttemptContext context)
> > > > > >     throws IOException {
> > > > > >       table.close();
> > > > > >     }
> > > > > >
> > > > > >     /**
> > > > > >      * Writes a key/value pair into the table.
> > > > > >      *
> > > > > >      * @param key  The key.
> > > > > >      * @param value  The value.
> > > > > >      * @throws IOException When writing fails.
> > > > > >      * @see
> > > > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> > > > > > java.lang.Object)
> > > > > >      */
> > > > > >     @Override
> > > > > >     *public void write(KEY key, Put value)*
> > > > > > *    throws IOException {*
> > > > > > *      if (value instanceof Put) this.table.put(new
> > > Put((Put)value));*
> > > > > > *//      else if (value instanceof Delete) this.table.delete(new
> > > > > > Delete((Delete)value));*
> > > > > > *      else throw new IOException("Pass a Delete or a Put");*
> > > > > > *    }*
> > > > > >   }
> > > > > >
> > > > > >   /**
> > > > > >    * Creates a new record writer.
> > > > > >    *
> > > > > >    * @param context  The current task context.
> > > > > >    * @return The newly created writer instance.
> > > > > >    * @throws IOException When creating the writer fails.
> > > > > >    * @throws InterruptedException When the jobs is cancelled.
> > > > > >    * @see
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > > >    */
> > > > > >   @Override
> > > > > >   public RecordWriter<KEY, *Put*> getRecordWriter(
> > > > > >     TaskAttemptContext context)
> > > > > >   throws IOException, InterruptedException {
> > > > > >     return new TableRecordWriter<KEY>(this.table);
> > > > > >   }
> > > > > >
> > > > > >   /**
> > > > > >    * Checks if the output target exists.
> > > > > >    *
> > > > > >    * @param context  The current context.
> > > > > >    * @throws IOException When the check fails.
> > > > > >    * @throws InterruptedException When the job is aborted.
> > > > > >    * @see
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
> > > > > >    */
> > > > > >   @Override
> > > > > >   public void checkOutputSpecs(JobContext context) throws
> > > IOException,
> > > > > >       InterruptedException {
> > > > > >     // TODO Check if the table exists?
> > > > > >
> > > > > >   }
> > > > > >
> > > > > >   /**
> > > > > >    * Returns the output committer.
> > > > > >    *
> > > > > >    * @param context  The current context.
> > > > > >    * @return The committer.
> > > > > >    * @throws IOException When creating the committer fails.
> > > > > >    * @throws InterruptedException When the job is aborted.
> > > > > >    * @see
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > > >    */
> > > > > >   @Override
> > > > > >   public OutputCommitter getOutputCommitter(TaskAttemptContext
> > > context)
> > > > > >   throws IOException, InterruptedException {
> > > > > >     return new TableOutputCommitter();
> > > > > >   }
> > > > > >
> > > > > >   public Configuration getConf() {
> > > > > >     return conf;
> > > > > >   }
> > > > > >
> > > > > >   @Override
> > > > > >   public void setConf(Configuration otherConf) {
> > > > > >     this.conf = HBaseConfiguration.create(otherConf);
> > > > > >
> > > > > >     String tableName = this.conf.get(OUTPUT_TABLE);
> > > > > >     if(tableName == null || tableName.length() <= 0) {
> > > > > >       throw new IllegalArgumentException("Must specify table
> > name");
> > > > > >     }
> > > > > >
> > > > > >     String address = this.conf.get(QUORUM_ADDRESS);
> > > > > >     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
> > > > > >     String serverClass = this.conf.get(REGION_SERVER_CLASS);
> > > > > >     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> > > > > >
> > > > > >     try {
> > > > > >       if (address != null) {
> > > > > >         ZKUtil.applyClusterKeyToConf(this.conf, address);
> > > > > >       }
> > > > > >       if (serverClass != null) {
> > > > > >         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
> > > > > >       }
> > > > > >       if (zkClientPort != 0) {
> > > > > >         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
> > > > zkClientPort);
> > > > > >       }
> > > > > >       this.table = new HTable(this.conf, tableName);
> > > > > >       this.table.setAutoFlush(false, true);
> > > > > >     *  String outDir =
> > FSUtils.getTableDir(FSUtils.getRootDir(conf),
> > > > > > this.table.getName()).toString();*
> > > > > > *      this.conf.set("mapred.output.dir", outDir);*
> > > > > > *      otherConf.set("mapred.output.dir", outDir);*
> > > > > >       LOG.info("Created table instance for "  + tableName);
> > > > > >     } catch(IOException e) {
> > > > > >       LOG.error(e);
> > > > > >       throw new RuntimeException(e);
> > > > > >     }
> > > > > >   }
> > > > > > }
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >

Reply via email to