LeonBein commented on a change in pull request #15109:
URL: https://github.com/apache/flink/pull/15109#discussion_r592684019



##########
File path: 
flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/source/HBaseSourceITCase.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.hbase.source.reader.HBaseEvent;
+import org.apache.flink.connector.hbase.source.reader.HBaseSourceDeserializer;
+import org.apache.flink.connector.hbase.testutil.FailureSink;
+import org.apache.flink.connector.hbase.testutil.HBaseTestClusterUtil;
+import org.apache.flink.connector.hbase.testutil.Util;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.awaitSignalThrowOnFailure;
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.awaitSuccess;
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.cleanupFolder;
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.cleanupSignal;
+import static org.apache.flink.connector.hbase.testutil.FileSignal.makeFolder;
+import static org.apache.flink.connector.hbase.testutil.FileSignal.signal;
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.signalFailure;
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.signalSuccess;
+import static org.junit.Assert.assertArrayEquals;
+
+/** Tests the most basic use cases of the source with a mocked HBase system. */
+public class HBaseSourceITCase extends TestsWithTestHBaseCluster {
+
+    private DataStream<String> streamFromHBaseSource(
+            StreamExecutionEnvironment environment, String tableName)
+            throws ParserConfigurationException, SAXException, IOException {
+        return streamFromHBaseSource(environment, tableName, 1);
+    }
+
+    private DataStream<String> streamFromHBaseSource(
+            StreamExecutionEnvironment environment, String tableName, int 
parallelism)
+            throws ParserConfigurationException, SAXException, IOException {
+        HBaseStringDeserializationScheme deserializationScheme =
+                new HBaseStringDeserializationScheme();
+        HBaseSource<String> source =
+                new HBaseSource<>(deserializationScheme, tableName, 
cluster.getConfig());
+        environment.setParallelism(parallelism);
+        DataStream<String> stream =
+                environment.fromSource(
+                        source,
+                        WatermarkStrategy.noWatermarks(),
+                        "hbaseSourceITCase",
+                        deserializationScheme.getProducedType());
+        return stream;
+    }
+
+    private static <T> void expectFirstValuesToBe(
+            DataStream<T> stream, T[] expectedValues, String message) {
+
+        List<T> collectedValues = new ArrayList<>();
+        stream.flatMap(
+                new RichFlatMapFunction<T, Object>() {
+
+                    @Override
+                    public void flatMap(T value, Collector<Object> out) {
+                        System.out.println("Test collected: " + value);
+                        collectedValues.add(value);
+                        if (collectedValues.size() == expectedValues.length) {
+                            assertArrayEquals(message, expectedValues, 
collectedValues.toArray());
+                            throw new SuccessException();
+                        }
+                    }
+                });
+    }
+
+    private static void doAndWaitForSuccess(
+            StreamExecutionEnvironment env, Runnable action, int timeout) {
+        try {
+            JobClient jobClient = env.executeAsync();
+            MiniCluster miniCluster = Util.miniCluster((MiniClusterJobClient) 
jobClient);
+            Util.waitForClusterStart(miniCluster, true);
+
+            action.run();
+            jobClient.getJobExecutionResult().get(timeout, TimeUnit.SECONDS);
+            jobClient.cancel();
+            throw new RuntimeException("Waiting for the correct data timed 
out");
+        } catch (Exception exception) {
+            if (!causedBySuccess(exception)) {
+                throw new RuntimeException("Test failed", exception);
+            } else {
+                // Normal termination
+            }
+        }
+    }
+
+    @Before
+    public void makeSignalFolder() {
+        makeFolder();

Review comment:
       f6702d0e6bd16454bceac0d1a90091db8cb20f51 puts signal folder setup at 
least to test rule. 
   Making the `FileSignal` class itself a rule would create the necessity to 
un-static-ify its methods, then the clean `signalSuccess()`-ish Syntax is lost 
to something like `signal.signalSuccess()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to