[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503810#comment-15503810
 ] 

ASF GitHub Bot commented on FLINK-2055:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2332#discussion_r79422057
  
    --- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/test/java/org/apache/flink/streaming/connectors/hbase/example/HBaseSinkExample.java
 ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.hbase.example;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.connectors.hbase.HBaseMapper;
    +import org.apache.flink.streaming.connectors.hbase.HBaseSink;
    +import org.apache.flink.streaming.connectors.hbase.MutationActions;
    +import org.apache.hadoop.hbase.util.Bytes;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +
    +/**
    + * This is an example showing how to use the HBaseSink in the Streaming 
API.
    + *
    + * To run the example you need a local HBase database that has a table 
"flink-example" with a column family "cf".
    + * In the example, the HBase sink takes an input of type {@link Tuple3} 
and perform different operations based on the input.
    + * The first field of a input value is used as the row key, the second 
field is treated as an opcode that
    + * determines which type of HBase operation is performed and the third 
field is the value to written.
    + */
    +public class HBaseSinkExample {
    +
    +   private static final List<Tuple3<String, Integer, Integer>> dataSource 
= new ArrayList<>(100);
    +   private static final String TABLE_NAME = "flink-example";
    +   private static final String FAMILY = "cf";
    +   private static final String COLUMN1 = "c1";
    +   private static final String COLUMN2 = "c2";
    +   private static final String ROWKEY_PREFIX = "row-";
    +
    +   static {
    +           Random random = new Random();
    +           for (int i = 0; i < 99; i++) {
    +                   String rowKey = ROWKEY_PREFIX + (i % 20);
    +                   int opcode = random.nextInt(9);
    +                   int value = i;
    +                   dataSource.add(new Tuple3<>(rowKey, opcode, value));
    +           }
    +   }
    +
    +   public static void main(String[] args) throws Exception {
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +           DataStream<Tuple3<String, Integer, Integer>> source = 
env.fromCollection(dataSource);
    +
    +           source.addSink(new HBaseSink<>(TABLE_NAME, new 
HBaseMapperExample(FAMILY, COLUMN1, COLUMN2)));
    +           env.execute();
    +   }
    +
    +   /**
    +    * This class implements {@link HBaseMapper}.
    +    */
    +   private static class HBaseMapperExample implements 
HBaseMapper<Tuple3<String, Integer, Integer>> {
    +
    +           private static final long serialVersionUID = 1L;
    +
    +           private byte[] family;
    +           private byte[] col1;
    +           private byte[] col2;
    +
    +           public HBaseMapperExample(String family, String col1, String 
col2) {
    +                   this.family = Bytes.toBytes(family);
    +                   this.col1 = Bytes.toBytes(col1);
    +                   this.col2 = Bytes.toBytes(col2);
    +           }
    +
    +           @Override
    +           public byte[] rowKey(Tuple3<String, Integer, Integer> value) {
    +                   return Bytes.toBytes(value.f0);
    +           }
    +
    +           @Override
    +           public MutationActions actions(Tuple3<String, Integer, Integer> 
value) {
    --- End diff --
    
    I think this example is a bit too complicated. Can you change it to two or 
three different actions?


> Implement Streaming HBaseSink
> -----------------------------
>
>                 Key: FLINK-2055
>                 URL: https://issues.apache.org/jira/browse/FLINK-2055
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming, Streaming Connectors
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to