LeonBein commented on a change in pull request #15109: URL: https://github.com/apache/flink/pull/15109#discussion_r595073653
########## File path: flink-connectors/flink-connector-hbase/README.md ########## @@ -0,0 +1,118 @@ +# Flink HBase Connector + +This module provides connectors that allow Flink to access [HBase](https://hbase.apache.org/) using [CDC](https://en.wikipedia.org/wiki/Change_data_capture). +It supports the new Source and Sink API specified in [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) and [FLIP-143](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API). + +## Installing HBase + +Follow the instructions from the [HBase Quick Start Guide](http://hbase.apache.org/book.html#quickstart) to install HBase. + +*Version Compatibility*: This module is compatible with Apache HBase *2.3.4*. + +## HBase Configuration + +Connecting to HBase always requires a `Configuration` instance. +If there is an HBase gateway on the same host as the Flink gateway where the application is started, this can be obtained by invoking `HBaseConfiguration.create()` as in the examples below. +If that's not the case a configuration should be provided where the proper core-site, hdfs-site, and hbase-site are added as resources. + +## DataStream API + +### Reading data from HBase + +To receive data from HBase, the connector makes use of the internal replication mechanism of HBase. +The connector registers at the HBase cluster as a *Replication Peer* and will receive all change events from HBase. + +For the replication to work, the HBase config needs to have replication enabled in the `hbase-site.xml` file. +This needs be done only once per cluster: +```xml +<configuration> + <property> + <name>hbase.replication</name> + <value>true</value> + </property> + ... +</configuration> +``` +All incoming events to Flink will be processed as an `HBaseSourceEvent`. +You will need to specify a Deserializer which will transform each event from an `HBaseSourceEvent` to the desired DataStream type. + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +Configuration hbaseConfig = HBaseConfiguration.create(); +String tableName = "TestTable"; + +HBaseSource<String> hbaseSource = + HBaseSource.<String>builder() + .setTableName(tableName) + .setSourceDeserializer(new HBaseStringDeserializer()) + .setHBaseConfiguration(hbaseConfig) + .build(); + +DataStream<String> stream = env.fromSource( + hbaseSource, + WatermarkStrategy.noWatermarks(), + "HBaseSource"); +// ... +``` + +The Deserializer is created as follows: + +```java +static class HBaseStringDeserializer implements HBaseSourceDeserializer<String> { + @Override + public String deserialize(HBaseSourceEvent event) { + return new String(event.getPayload()); Review comment: ✅ in a840d3467d5eeff0c9a6f28a74aeddd72cfc6ab9 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org