AHeise commented on a change in pull request #15109:
URL: https://github.com/apache/flink/pull/15109#discussion_r596023752



##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSink.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.sink.writer.HBaseWriter;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A Sink Connector for HBase. Please use an {@link HBaseSinkBuilder} to 
construct a {@link
+ * HBaseSink}. As HBase does not support transactions, this sink does not 
guarantee exactly-once,
+ * but at-least-once.
+ *
+ * <p>The following example shows how to create an HBaseSink that writes Long 
values to HBase.
+ *
+ * <pre>{@code
+ * HBaseSink<Long> hbaseSink =
+ *      HBaseSink.builder()
+ *          .setTableName(tableName)
+ *          .setSinkSerializer(new HBaseLongSerializer())
+ *          .setHBaseConfiguration(hbaseConfig)
+ *          .build();
+ * }</pre>
+ *
+ * <p>Here is an example for the Serializer:
+ *
+ * <pre>{@code
+ * static class HBaseLongSerializer implements HBaseSinkSerializer<Long> {
+ *     @Override
+ *     public HBaseEvent serialize(Long event) {
+ *         return HBaseEvent.putWith(                  // or deleteWith()
+ *                 event.toString(),                   // rowId
+ *                 "exampleColumnFamily",              // column family
+ *                 "exampleQualifier",                 // qualifier
+ *                 Bytes.toBytes(event.toString()));   // payload
+ *     }
+ * }
+ * }</pre>
+ *
+ * @see HBaseSinkBuilder HBaseSinkBuilder for more details on creation
+ * @see HBaseSinkSerializer HBaseSinkSerializer for more details on 
serialization
+ */
+public class HBaseSink<IN> implements Sink<IN, Void, Mutation, Void> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseSink.class);
+
+    private final HBaseSinkSerializer<IN> sinkSerializer;
+    private final byte[] serializedHBaseConfig;
+    private final Configuration sinkConfiguration;
+
+    HBaseSink(
+            HBaseSinkSerializer<IN> sinkSerializer,
+            org.apache.hadoop.conf.Configuration hbaseConfiguration,
+            Configuration sinkConfiguration) {
+        this.sinkSerializer = sinkSerializer;
+        this.serializedHBaseConfig =
+                
HBaseConfigurationUtil.serializeConfiguration(hbaseConfiguration);
+        this.sinkConfiguration = sinkConfiguration;
+        LOG.debug("constructed sink");
+    }
+
+    public static <IN> HBaseSinkBuilder<IN> builder() {
+        return new HBaseSinkBuilder<>();
+    }
+
+    @Override
+    public SinkWriter<IN, Void, Mutation> createWriter(InitContext context, 
List<Mutation> states)
+            throws IOException {

Review comment:
       Please remove unused exceptions (in the complete PR). We will re-add 
them, when needed.

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseEvent.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase;
+
+import org.apache.flink.connector.hbase.sink.HBaseSinkSerializer;
+import org.apache.flink.connector.hbase.source.HBaseSource;
+import org.apache.flink.connector.hbase.source.reader.HBaseSourceEvent;
+
+import org.apache.hadoop.hbase.Cell;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * The base HBaseEvent which needs to be created in the {@link 
HBaseSinkSerializer} to write data to
+ * HBase. The subclass {@link HBaseSourceEvent} contains additional 
information and is used by the
+ * {@link HBaseSource} to represent an incoming event from HBase.
+ */
+public class HBaseEvent {
+
+    public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+
+    private final String rowId;
+    private final String cf;
+    private final String qualifier;
+    private final byte[] payload;
+    private final Cell.Type type;
+
+    protected HBaseEvent(
+            Cell.Type type, String rowId, String cf, String qualifier, byte[] 
payload) {
+        this.rowId = rowId;
+        this.cf = cf;
+        this.qualifier = qualifier;
+        this.payload = payload;
+        this.type = type;
+    }
+
+    public static HBaseEvent deleteWith(String rowId, String cf, String 
qualifier) {
+        return new HBaseEvent(Cell.Type.Delete, rowId, cf, qualifier, null);
+    }
+
+    public static HBaseEvent putWith(String rowId, String cf, String 
qualifier, byte[] payload) {
+        return new HBaseEvent(Cell.Type.Put, rowId, cf, qualifier, payload);
+    }
+
+    @Override
+    public String toString() {
+        return type.name() + " " + rowId + " " + cf + " " + qualifier + " " + 
new String(payload);
+    }
+
+    public Cell.Type getType() {
+        return type;
+    }
+
+    public byte[] getPayload() {
+        return payload;
+    }
+
+    public String getRowId() {
+        return rowId;
+    }
+
+    public String getCf() {
+        return cf;
+    }
+
+    public String getQualifier() {
+        return qualifier;
+    }
+
+    public byte[] getRowIdBytes() {
+        return rowId.getBytes(DEFAULT_CHARSET);
+    }
+
+    public byte[] getCfBytes() {

Review comment:
       `cf `-> `columnFamily`

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/split/HBaseSourceSplitState.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.source.split;
+
+import org.apache.flink.connector.hbase.HBaseEvent;
+import org.apache.flink.connector.hbase.source.reader.HBaseSourceEvent;
+
+/**
+ * Contains the state of an {@link HBaseSourceSplit}. It tracks the timestamp 
of the last emitted
+ * event to ensure no duplicates appear on recovery.
+ */
+public class HBaseSourceSplitState {

Review comment:
       Please add `@Internal` to all classes that are public (because Java 
doesn't know C#'s `internal`) and are not supposed to be used by the end-users.

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseEvent.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase;
+
+import org.apache.flink.connector.hbase.sink.HBaseSinkSerializer;
+import org.apache.flink.connector.hbase.source.HBaseSource;
+import org.apache.flink.connector.hbase.source.reader.HBaseSourceEvent;
+
+import org.apache.hadoop.hbase.Cell;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * The base HBaseEvent which needs to be created in the {@link 
HBaseSinkSerializer} to write data to
+ * HBase. The subclass {@link HBaseSourceEvent} contains additional 
information and is used by the
+ * {@link HBaseSource} to represent an incoming event from HBase.
+ */
+public class HBaseEvent {

Review comment:
       Please add `equals`/`hashCode` for easier testing.

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseEvent.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase;
+
+import org.apache.flink.connector.hbase.sink.HBaseSinkSerializer;
+import org.apache.flink.connector.hbase.source.HBaseSource;
+import org.apache.flink.connector.hbase.source.reader.HBaseSourceEvent;
+
+import org.apache.hadoop.hbase.Cell;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * The base HBaseEvent which needs to be created in the {@link 
HBaseSinkSerializer} to write data to
+ * HBase. The subclass {@link HBaseSourceEvent} contains additional 
information and is used by the
+ * {@link HBaseSource} to represent an incoming event from HBase.
+ */
+public class HBaseEvent {
+
+    public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+
+    private final String rowId;
+    private final String cf;

Review comment:
       `cf` -> `columnFamily`.

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkMutationSerializer.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.connector.hbase.sink.writer.HBaseWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * This class serializes {@link Mutation} and is used to serialize the state 
of the {@link
+ * HBaseWriter}.
+ */
+public class HBaseSinkMutationSerializer implements 
SimpleVersionedSerializer<Mutation> {

Review comment:
       -`public`

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseSource.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.hbase.source.enumerator.HBaseSourceEnumeratorCheckpointSerializer;
+import org.apache.flink.connector.hbase.source.enumerator.HBaseSplitEnumerator;
+import org.apache.flink.connector.hbase.source.reader.HBaseSourceDeserializer;
+import org.apache.flink.connector.hbase.source.reader.HBaseSourceReader;
+import org.apache.flink.connector.hbase.source.split.HBaseSourceSplit;
+import 
org.apache.flink.connector.hbase.source.split.HBaseSourceSplitSerializer;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * A source connector for HBase. Please use an {@link HBaseSourceBuilder} to 
construct an {@link
+ * HBaseSource}. The following example shows how to create a HBaseSource that 
reads String values
+ * from each cell.
+ *
+ * <pre>{@code
+ * HBaseSource<String> source =
+ *     HBaseSource.builder()
+ *         .setSourceDeserializer(new HBaseStringDeserializer())
+ *         .setTableName("test-table")
+ *         .setHBaseConfiguration(new HBaseTestClusterUtil().getConfig())
+ *         .build();
+ *
+ * static class HBaseStringDeserializer implements 
HBaseSourceDeserializer<String> {
+ *     @Override
+ *     public String deserialize(HBaseSourceEvent event) {
+ *         return new String(event.getPayload(), HBaseEvent.DEFAULT_CHARSET);
+ *     }
+ * }
+ * }</pre>
+ *
+ * @see HBaseSourceBuilder HBaseSourceBuilder for more details.
+ */
+public class HBaseSource<T> implements Source<T, HBaseSourceSplit, 
Collection<HBaseSourceSplit>> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSource.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private final HBaseSourceDeserializer<T> sourceDeserializer;
+    private final byte[] serializedHBaseConfig;
+    private final Configuration sourceConfiguration;
+
+    HBaseSource(
+            HBaseSourceDeserializer<T> sourceDeserializer,
+            org.apache.hadoop.conf.Configuration hbaseConfiguration,
+            Configuration sourceConfiguration) {
+        this.serializedHBaseConfig =
+                
HBaseConfigurationUtil.serializeConfiguration(hbaseConfiguration);
+        this.sourceDeserializer = sourceDeserializer;
+        this.sourceConfiguration = sourceConfiguration;
+        LOG.debug("constructed source");
+    }
+
+    public static <IN> HBaseSourceBuilder<IN> builder() {
+        return new HBaseSourceBuilder<>();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<T, HBaseSourceSplit> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        LOG.debug("creating reader");
+        return new HBaseSourceReader<>(
+                serializedHBaseConfig, sourceDeserializer, 
sourceConfiguration, readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<HBaseSourceSplit, Collection<HBaseSourceSplit>> 
restoreEnumerator(
+            SplitEnumeratorContext<HBaseSourceSplit> enumContext,
+            Collection<HBaseSourceSplit> checkpoint)
+            throws Exception {
+        LOG.debug("restoring enumerator");

Review comment:
       with X elements.

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/reader/HBaseSourceSplitReader.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.source.reader;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.hbase.source.hbaseendpoint.HBaseEndpoint;
+import org.apache.flink.connector.hbase.source.split.HBaseSourceSplit;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+/** A {@link SplitReader} implementation for HBase. */
+public class HBaseSourceSplitReader implements SplitReader<HBaseSourceEvent, 
HBaseSourceSplit> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSourceSplitReader.class);
+
+    private final Queue<HBaseSourceSplit> splits;
+    private final HBaseEndpoint hbaseEndpoint;
+
+    @Nullable private String currentSplitId;
+
+    public HBaseSourceSplitReader(byte[] serializedConfig, Configuration 
sourceConfiguration) {
+        LOG.debug("constructing Split Reader");
+        try {
+            this.hbaseEndpoint =
+                    new HBaseEndpoint(
+                            
HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, null),
+                            sourceConfiguration);
+        } catch (Exception e) {
+            throw new RuntimeException("failed HBase consumer", e);
+        }
+        this.splits = new ArrayDeque<>();
+    }
+
+    @Override
+    public RecordsWithSplitIds<HBaseSourceEvent> fetch() throws IOException {
+        if (currentSplitId == null) {
+            HBaseSourceSplit nextSplit = splits.poll();
+            if (nextSplit != null) {
+                currentSplitId = nextSplit.splitId();
+            } else {
+                throw new IOException("No split remaining");
+            }
+        }
+        List<HBaseSourceEvent> records = hbaseEndpoint.getAll();
+        LOG.debug("{} records in the queue", records.size());
+        return new HBaseSplitRecords<>(currentSplitId, records.iterator());
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<HBaseSourceSplit> 
splitsChanges) {
+        LOG.debug("handle splits change {}", splitsChanges);
+        if (splitsChanges instanceof SplitsAddition) {
+            HBaseSourceSplit split = splitsChanges.splits().get(0);
+            try {
+                this.hbaseEndpoint.startReplication(split.getColumnFamilies());
+            } catch (Exception e) {
+                throw new RuntimeException("failed HBase consumer", e);
+            }
+            splits.addAll(splitsChanges.splits());
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported splits change type "
+                            + splitsChanges.getClass().getSimpleName()
+                            + " in "
+                            + this.getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void wakeUp() {
+        LOG.debug("waking up HBaseEndpoint");
+        hbaseEndpoint.wakeup();
+    }
+
+    @Override
+    public void close() throws Exception {
+        hbaseEndpoint.close();
+    }
+
+    private static class HBaseSplitRecords<T> implements 
RecordsWithSplitIds<T> {
+        private Iterator<T> recordsForSplit;
+        private String splitId;
+
+        private HBaseSplitRecords(String splitId, Iterator<T> recordsForSplit) 
{

Review comment:
       `package-private` or else a synth ctor will be created.

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/reader/HBaseSourceEvent.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.source.reader;
+
+import org.apache.flink.connector.hbase.HBaseEvent;
+
+import org.apache.hadoop.hbase.Cell;
+
+import java.util.Arrays;
+
+/** The HBaseSourceEvent is used to represent incoming events from HBase. */
+public class HBaseSourceEvent extends HBaseEvent {
+
+    private final String table;
+    private final long timestamp;
+    /** Index of operation inside one wal entry. */
+    private final int index;
+
+    public HBaseSourceEvent(
+            Cell.Type type,
+            String rowId,
+            String table,
+            String cf,

Review comment:
       cf -> column family

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/reader/HBaseSourceReader.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.hbase.source.split.HBaseSourceSplit;
+import org.apache.flink.connector.hbase.source.split.HBaseSourceSplitState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** The source reader for Hbase. */
+public class HBaseSourceReader<T>

Review comment:
       `package-private` if possible

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/writer/HBaseWriter.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.sink.writer;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.HBaseEvent;
+import org.apache.flink.connector.hbase.sink.HBaseSinkOptions;
+import org.apache.flink.connector.hbase.sink.HBaseSinkSerializer;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+
+import 
org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.io.Closer;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * The HBaseWriter is responsible for writing incoming events to HBase.
+ *
+ * <p>Stored events will be flushed to HBase eiter if the {@link #queueLimit} 
is reached or if the
+ * {@link #maxLatencyMs} has elapsed.
+ */
+public class HBaseWriter<IN> implements SinkWriter<IN, Void, Mutation> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWriter.class);
+
+    private final int queueLimit;
+    private final int maxLatencyMs;
+    private final HBaseSinkSerializer<IN> sinkSerializer;
+    private final ArrayBlockingQueue<Mutation> pendingMutations;
+    private final Connection connection;
+    private final Table table;
+    private volatile long lastFlushTimeStamp = 0;
+    private TimerTask batchSendTimer;
+
+    public HBaseWriter(
+            Sink.InitContext context,

Review comment:
       unused.

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/reader/HBaseSourceReader.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.hbase.source.split.HBaseSourceSplit;
+import org.apache.flink.connector.hbase.source.split.HBaseSourceSplitState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** The source reader for Hbase. */
+public class HBaseSourceReader<T>
+        extends SingleThreadMultiplexSourceReaderBase<
+                HBaseSourceEvent, T, HBaseSourceSplit, HBaseSourceSplitState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSourceReader.class);
+
+    public HBaseSourceReader(
+            byte[] serializedHBaseConfig,
+            HBaseSourceDeserializer<T> sourceDeserializer,
+            Configuration sourceConfiguration,
+            SourceReaderContext context) {
+        super(
+                () -> new HBaseSourceSplitReader(serializedHBaseConfig, 
sourceConfiguration),
+                new HBaseRecordEmitter<>(sourceDeserializer),
+                sourceConfiguration,
+                context);
+        LOG.debug("constructing Source Reader");

Review comment:
       add config?

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseSource.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.hbase.source.enumerator.HBaseSourceEnumeratorCheckpointSerializer;
+import org.apache.flink.connector.hbase.source.enumerator.HBaseSplitEnumerator;
+import org.apache.flink.connector.hbase.source.reader.HBaseSourceDeserializer;
+import org.apache.flink.connector.hbase.source.reader.HBaseSourceReader;
+import org.apache.flink.connector.hbase.source.split.HBaseSourceSplit;
+import 
org.apache.flink.connector.hbase.source.split.HBaseSourceSplitSerializer;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * A source connector for HBase. Please use an {@link HBaseSourceBuilder} to 
construct an {@link
+ * HBaseSource}. The following example shows how to create a HBaseSource that 
reads String values
+ * from each cell.
+ *
+ * <pre>{@code
+ * HBaseSource<String> source =
+ *     HBaseSource.builder()
+ *         .setSourceDeserializer(new HBaseStringDeserializer())
+ *         .setTableName("test-table")
+ *         .setHBaseConfiguration(new HBaseTestClusterUtil().getConfig())
+ *         .build();
+ *
+ * static class HBaseStringDeserializer implements 
HBaseSourceDeserializer<String> {
+ *     @Override
+ *     public String deserialize(HBaseSourceEvent event) {
+ *         return new String(event.getPayload(), HBaseEvent.DEFAULT_CHARSET);
+ *     }
+ * }
+ * }</pre>
+ *
+ * @see HBaseSourceBuilder HBaseSourceBuilder for more details.
+ */
+public class HBaseSource<T> implements Source<T, HBaseSourceSplit, 
Collection<HBaseSourceSplit>> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSource.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private final HBaseSourceDeserializer<T> sourceDeserializer;
+    private final byte[] serializedHBaseConfig;
+    private final Configuration sourceConfiguration;
+
+    HBaseSource(
+            HBaseSourceDeserializer<T> sourceDeserializer,
+            org.apache.hadoop.conf.Configuration hbaseConfiguration,
+            Configuration sourceConfiguration) {
+        this.serializedHBaseConfig =
+                
HBaseConfigurationUtil.serializeConfiguration(hbaseConfiguration);
+        this.sourceDeserializer = sourceDeserializer;
+        this.sourceConfiguration = sourceConfiguration;
+        LOG.debug("constructed source");

Review comment:
       Add config, you could also override `#toString` and add `this` here (or 
even better in `builder#build`).

##########
File path: 
flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/reader/HBaseSourceEvent.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.source.reader;
+
+import org.apache.flink.connector.hbase.HBaseEvent;
+
+import org.apache.hadoop.hbase.Cell;
+
+import java.util.Arrays;
+
+/** The HBaseSourceEvent is used to represent incoming events from HBase. */
+public class HBaseSourceEvent extends HBaseEvent {

Review comment:
       Please add `equals`/`hashCode` for easier testing. Make sure to use 
`getClass` and not `instanceof`, also in base class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to