
ASF GitHub Bot commented on FLINK-2055:

Github user delding commented on a diff in the pull request:

    --- Diff: 
    @@ -0,0 +1,371 @@
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.hbase;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.hadoop.hbase.client.Append;
    +import org.apache.hadoop.hbase.client.Delete;
    +import org.apache.hadoop.hbase.client.Durability;
    +import org.apache.hadoop.hbase.client.Increment;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Put;
    +import java.util.ArrayList;
    +import java.util.List;
    + *  This class represents a list of {@link MutationAction}s you will take 
when writing
    + *  an input value of {@link HBaseSink} to a row in a HBase table.
    + *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
    + *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
    + */
    +public class MutationActionList {
    +   private final List<MutationAction> actions;
    +   public MutationActionList() {
    +           this.actions = new ArrayList<>();
    +   }
    +   public List<MutationAction> getActions() {
    +           return this.actions;
    +   }
    +   /**
    +    * Create a new list of HBase {@link Mutation}s.
    +    *
    +    * @param rowKey row that the created {@link Mutation} list is applied 
    +    * @param writeToWAL enable WAL
    +    * @return a list of HBase {@link Mutation}s
    +    */
    +   public List<Mutation> newMutationList(byte[] rowKey, boolean 
writeToWAL) {
    +           List<Mutation> mutations = new ArrayList<>();
    +           Put put = null;
    +           Increment increment = null;
    +           Append append = null;
    +           Delete delete = null;
    +           boolean rowIsDeleted = false;
    +           for (MutationAction action : actions) {
    +                   switch (action.getType()) {
    +                           case PUT:
    +                                   if (put == null) {
    +                                           put = new Put(rowKey);
    +                                           mutations.add(put);
    +                                   }
    +                                   if (action.getTs() == -1) {
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
    +                                   } else {
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
    +                                   }
    +                                   break;
    +                           case INCREMENT:
    +                                   if (increment == null) {
    +                                           increment = new 
    +                                           mutations.add(increment);
    +                                   }
    +                                   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
    +                                   break;
    +                           case APPEND:
    +                                   if (append == null) {
    +                                           append = new Append(rowKey);
    +                                           mutations.add(append);
    +                                   }
    +                                   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
    +                                   break;
    +                           // If there are multiple DELETE_ROW actions, 
only the first one is served
    +                           case DELETE_ROW:
    +                                   if (!rowIsDeleted) {
    +                                           for (int i = 0; i < 
mutations.size(); ) {
    +                                                   if (mutations.get(i) 
instanceof Delete) {
    +                                                   } else {
    +                                                           i++;
    +                                                   }
    +                                           }
    +                                           delete = new Delete(rowKey, 
    +                                           mutations.add(delete);
    +                                           rowIsDeleted = true;
    +                                   }
    +                                   break;
    +                           case DELETE_FAMILY:
    +                                   if (!rowIsDeleted) {
    +                                           if (delete == null) {
    +                                                   delete = new 
    +                                                   mutations.add(delete);
    +                                           }
delete.addFamily(action.getFamily(), action.getTs());
    +                                   }
    +                                   break;
    +                           case DELETE_COLUMNS:
    +                                   if (!rowIsDeleted) {
    +                                           if (delete == null) {
    +                                                   delete = new 
    +                                                   mutations.add(delete);
    +                                           }
delete.addColumns(action.getFamily(), action.getQualifier(), action.getTs());
    +                                   }
    +                                   break;
    +                           case DELETE_COLUMN:
    +                                   if (!rowIsDeleted) {
    +                                           if (delete == null) {
    +                                                   delete = new 
    +                                                   mutations.add(delete);
    +                                           }
delete.addColumn(action.getFamily(), action.getQualifier(), action.getTs());
    +                                   }
    +                                   break;
    +                           default:
    +                                   throw new 
IllegalArgumentException("Cannot process such action type: " + 
    +                   }
    +           }
    +           Durability durability = writeToWAL ? Durability.SYNC_WAL : 
    +           for (Mutation mutation : mutations) {
    +                   mutation.setDurability(durability);
    +           }
    +           return mutations;
    +   }
    +   /**
    +    * Create a new list of HBase {@link Mutation}s enabling WAL as default.
    +    *
    +    * @param rowKey row that the created {@link Mutation} list is applied 
    +    * @return a list of HBase {@link Mutation}s
    +    */
    +   public List<Mutation> newMutationList(byte[] rowKey) {
    +           return newMutationList(rowKey, true);
    +   }
    +   /**
    +    * Add to {@link MutationActionList} a {@link MutationAction} of type 
{@link MutationAction.Type#PUT}. which will
    +    * create an HBase {@link Put} operation for a specified row with 
specified timestamp.
    +    *
    +     * @param family family name
    +    * @param qualifier column qualifier
    +    * @param value column value
    +    * @param timestamp version timestamp
    +    * @return this
    +    */
    +   public MutationActionList addPut(byte[] family, byte[] qualifier, 
byte[] value, long timestamp) {
    +           actions.add(new MutationAction(family, qualifier, value, 
timestamp, 0, MutationAction.Type.PUT));
    --- End diff --
    The design is to let users define actions they want to perform on a single 
row given a input record and return a MutationActionList for it. HBaseSink will 
then call newMutatiionList only once given an input and send associated 
Mutations to an HBase table. What do you think?

> Implement Streaming HBaseSink
> -----------------------------
>                 Key: FLINK-2055
>                 URL: https://issues.apache.org/jira/browse/FLINK-2055
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming, Streaming Connectors
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Hilmi Yildirim
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html

This message was sent by Atlassian JIRA

Reply via email to