This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 1da9bd6ce4 [E2E][HBase]Refactor hbase e2e (#6859) 1da9bd6ce4 is described below commit 1da9bd6ce475a012a702a4a864d5307303ed0660 Author: TaoZex <45089228+tao...@users.noreply.github.com> AuthorDate: Tue Jun 4 09:57:09 2024 +0800 [E2E][HBase]Refactor hbase e2e (#6859) --- .../hbase/format/HBaseDeserializationFormat.java | 2 + .../connector-hbase-e2e/pom.xml | 14 ++- .../e2e/connector/hbase/HbaseCluster.java | 138 +++++++++++++++++++++ .../seatunnel/e2e/connector/hbase/HbaseIT.java | 109 ++++++---------- .../src/test/resources/fake-to-hbase-array.conf | 2 +- .../src/test/resources/fake-to-hbase.conf | 2 +- .../src/test/resources/hbase-to-assert.conf | 48 ++++--- 7 files changed, 220 insertions(+), 95 deletions(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java index 8d7a1bcbe1..578df7101c 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java @@ -58,7 +58,9 @@ public class HBaseDeserializationFormat { switch (typeInfo.getSqlType()) { case TINYINT: + return cell[0]; case SMALLINT: + return (short) ((cell[0] & 0xFF) << 8 | (cell[1] & 0xFF)); case INT: return Bytes.toInt(cell); case BOOLEAN: diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml index 93ba8de981..b9fdd76f6f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml @@ -26,14 +26,24 @@ <name>SeaTunnel : E2E : Connector V2 : Hbase</name> <dependencies> - + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-fake</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.seatunnel</groupId> <artifactId>connector-hbase</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> - + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-assert</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseCluster.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseCluster.java new file mode 100644 index 0000000000..3b54c35592 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseCluster.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.Bytes; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +import static org.apache.seatunnel.e2e.common.container.TestContainer.NETWORK; + +public class HbaseCluster { + + private static final Logger LOG = LoggerFactory.getLogger(HbaseCluster.class); + + private static final int ZOOKEEPER_PORT = 2181; + private static final int MASTER_PORT = 16000; + private static final int REGION_PORT = 16020; + private static final String HOST = "hbase_e2e"; + + private static final String DOCKER_NAME = "jcjabouille/hbase-standalone:2.4.9"; + private static final DockerImageName HBASE_DOCKER_IMAGE = DockerImageName.parse(DOCKER_NAME); + + private Connection connection; + private GenericContainer<?> hbaseContainer; + + public Connection startService() throws IOException { + String hostname = InetAddress.getLocalHost().getHostName(); + hbaseContainer = + new GenericContainer<>(HBASE_DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withExposedPorts(MASTER_PORT) + .withExposedPorts(REGION_PORT) + .withExposedPorts(ZOOKEEPER_PORT) + .withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostname)) + .withEnv("HBASE_MASTER_PORT", String.valueOf(MASTER_PORT)) + .withEnv("HBASE_REGION_PORT", String.valueOf(REGION_PORT)) + .withEnv( + "HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT", + String.valueOf(ZOOKEEPER_PORT)) + .withEnv("HBASE_ZOOKEEPER_QUORUM", HOST) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_NAME))); + hbaseContainer.setPortBindings( + Arrays.asList( + String.format("%s:%s", MASTER_PORT, MASTER_PORT), + String.format("%s:%s", REGION_PORT, REGION_PORT), + String.format("%s:%s", ZOOKEEPER_PORT, ZOOKEEPER_PORT))); + Startables.deepStart(Stream.of(hbaseContainer)).join(); + LOG.info("HBase container started"); + + String zookeeperQuorum = getZookeeperQuorum(); + LOG.info("Successfully start hbase service, zookeeper quorum: {}", zookeeperQuorum); + Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.quorum", zookeeperQuorum); + configuration.set("hbase.security.authentication", "simple"); + configuration.set("hbase.rpc.timeout", "10000"); + configuration.set("hbase.master.port", String.valueOf(MASTER_PORT)); + configuration.set("hbase.regionserver.port", String.valueOf(REGION_PORT)); + connection = ConnectionFactory.createConnection(configuration); + return connection; + } + + public void createTable(String tableName, List<String> list) throws IOException { + TableDescriptorBuilder tableDesc = + TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); + + List<ColumnFamilyDescriptor> colFamilyList = new ArrayList<>(); + for (String columnFamilys : list) { + ColumnFamilyDescriptorBuilder c = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamilys)); + colFamilyList.add(c.build()); + } + tableDesc.setColumnFamilies(colFamilyList); + Admin hbaseAdmin = connection.getAdmin(); + hbaseAdmin.createTable(tableDesc.build()); + } + + public void stopService() throws IOException { + if (Objects.nonNull(connection)) { + connection.close(); + } + if (Objects.nonNull(hbaseContainer)) { + hbaseContainer.close(); + } + hbaseContainer = null; + } + + public static String getZookeeperQuorum() { + String host = null; + try { + host = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + return String.format("%s:%s", host, ZOOKEEPER_PORT); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index e27e5c715e..d3cd57b326 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -19,88 +19,63 @@ package org.apache.seatunnel.e2e.connector.hbase; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.util.Bytes; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.DockerLoggerFactory; import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Objects; -import java.util.stream.Stream; @Slf4j -@Disabled( - "Hbase docker e2e case need user add mapping information of between container id and ip address in hosts file") +@DisabledOnContainer( + value = {}, + type = {EngineType.SEATUNNEL}, + disabledReason = "The hbase container authentication configuration is incorrect.") public class HbaseIT extends TestSuiteBase implements TestResource { - private static final String IMAGE = "harisekhon/hbase:latest"; - - private static final int PORT = 2181; - - private static final String HOST = "hbase-e2e"; - private static final String TABLE_NAME = "seatunnel_test"; private static final String FAMILY_NAME = "info"; - private final Configuration hbaseConfiguration = HBaseConfiguration.create(); - private Connection hbaseConnection; private Admin admin; private TableName table; - private GenericContainer<?> hbaseContainer; + private HbaseCluster hbaseCluster; @BeforeAll @Override public void startUp() throws Exception { - hbaseContainer = - new GenericContainer<>(DockerImageName.parse(IMAGE)) - .withNetwork(NETWORK) - .withNetworkAliases(HOST) - .withExposedPorts(PORT) - .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE))) - .waitingFor( - new HostPortWaitStrategy() - .withStartupTimeout(Duration.ofMinutes(2))); - Startables.deepStart(Stream.of(hbaseContainer)).join(); - log.info("Hbase container started"); - this.initialize(); + hbaseCluster = new HbaseCluster(); + hbaseConnection = hbaseCluster.startService(); + // Create table for hbase sink test + log.info("initial"); + hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME)); + table = TableName.valueOf(TABLE_NAME); } @AfterAll @@ -109,59 +84,37 @@ public class HbaseIT extends TestSuiteBase implements TestResource { if (Objects.nonNull(admin)) { admin.close(); } - if (Objects.nonNull(hbaseConnection)) { - hbaseConnection.close(); - } - if (Objects.nonNull(hbaseContainer)) { - hbaseContainer.close(); - } - } - - private void initialize() throws IOException { - hbaseConfiguration.set("hbase.zookeeper.quorum", HOST + ":" + PORT); - hbaseConnection = ConnectionFactory.createConnection(hbaseConfiguration); - admin = hbaseConnection.getAdmin(); - table = TableName.valueOf(TABLE_NAME); - ColumnFamilyDescriptor familyDescriptor = - ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME.getBytes()) - .setCompressionType(Compression.Algorithm.SNAPPY) - .setCompactionCompressionType(Compression.Algorithm.SNAPPY) - .build(); - TableDescriptor tableDescriptor = - TableDescriptorBuilder.newBuilder(table).setColumnFamily(familyDescriptor).build(); - admin.createTable(tableDescriptor); - log.info("Hbase table has been initialized"); + hbaseCluster.stopService(); } @TestTemplate public void testHbaseSink(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/fake-to-hbase.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); + deleteData(table); + Container.ExecResult sinkExecResult = container.executeJob("/fake-to-hbase.conf"); + Assertions.assertEquals(0, sinkExecResult.getExitCode()); Table hbaseTable = hbaseConnection.getTable(table); Scan scan = new Scan(); - ArrayList<Result> results = new ArrayList<>(); ResultScanner scanner = hbaseTable.getScanner(scan); + ArrayList<Result> results = new ArrayList<>(); for (Result result : scanner) { results.add(result); } Assertions.assertEquals(results.size(), 5); - } - - @TestTemplate - public void testHbaseSource(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/hbase-to-assert.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); + scanner.close(); + Container.ExecResult sourceExecResult = container.executeJob("/hbase-to-assert.conf"); + Assertions.assertEquals(0, sourceExecResult.getExitCode()); } @TestTemplate public void testHbaseSinkWithArray(TestContainer container) throws IOException, InterruptedException { + deleteData(table); Container.ExecResult execResult = container.executeJob("/fake-to-hbase-array.conf"); Assertions.assertEquals(0, execResult.getExitCode()); Table hbaseTable = hbaseConnection.getTable(table); Scan scan = new Scan(); - ArrayList<Result> results = new ArrayList<>(); ResultScanner scanner = hbaseTable.getScanner(scan); + ArrayList<Result> results = new ArrayList<>(); for (Result result : scanner) { String rowKey = Bytes.toString(result.getRow()); for (Cell cell : result.listCells()) { @@ -177,5 +130,17 @@ public class HbaseIT extends TestSuiteBase implements TestResource { results.add(result); } Assertions.assertEquals(results.size(), 3); + scanner.close(); + } + + private void deleteData(TableName table) throws IOException { + Table hbaseTable = hbaseConnection.getTable(table); + Scan scan = new Scan(); + ResultScanner scanner = hbaseTable.getScanner(scan); + // Delete the data generated by the test + for (Result result = scanner.next(); result != null; result = scanner.next()) { + Delete deleteRow = new Delete(result.getRow()); + hbaseTable.delete(deleteRow); + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf index 5cf1896ea2..9da70ea80a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf @@ -49,7 +49,7 @@ source { sink { Hbase { - zookeeper_quorum = "hbase-e2e:2181" + zookeeper_quorum = "hbase_e2e:2181" table = "seatunnel_test" rowkey_column = ["name"] family_name { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf index f3e58ec008..be99bf43fe 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf @@ -40,7 +40,7 @@ source { sink { Hbase { - zookeeper_quorum = "hbase-e2e:2181" + zookeeper_quorum = "hbase_e2e:2181" table = "seatunnel_test" rowkey_column = ["name"] family_name { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf index f209875745..c8b750738d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf @@ -22,9 +22,9 @@ env { source { Hbase { - zookeeper_quorum = "hbase-e2e:2181" + zookeeper_quorum = "hbase_e2e:2181" table = "seatunnel_test" - query_columns=["rowkey", "cf1:col1", "cf1:col2", "cf2:col1", "cf2:col2"] + query_columns=["rowkey", "info:age", "info:c_double", "info:c_boolean","info:c_bigint","info:c_smallint","info:c_tinyint","info:c_float"] schema = { columns = [ { @@ -32,39 +32,49 @@ source { type = string }, { - name = "cf1:col1" - type = boolean + name = "info:age" + type = int }, { - name = "cf1:col2" + name = "info:c_double" type = double }, { - name = "cf2:col1" + name = "info:c_boolean" + type = boolean + }, + { + name = "info:c_bigint" type = bigint }, { - name = "cf2:col2" - type = int + name = "info:c_smallint" + type = smallint + }, + { + name = "info:c_tinyint" + type = tinyint + }, + { + name = "info:c_float" + type = float } - ] - } - result_table_name = hbase_source - } + ] + } + } } sink { Assert { - source_table_name = hbase_source rules { row_rules = [ { rule_type = MAX_ROW - rule_value = 10000 + rule_value = 5 }, { rule_type = MIN_ROW - rule_value = 10000 + rule_value = 5 } ], field_rules = [ @@ -78,7 +88,7 @@ sink { ] }, { - field_name = "cf1:col1" + field_name = "info:c_boolean" field_type = boolean field_value = [ { @@ -87,7 +97,7 @@ sink { ] }, { - field_name = "cf1:col2" + field_name = "info:c_double" field_type = double field_value = [ { @@ -96,7 +106,7 @@ sink { ] }, { - field_name = "cf2:col1" + field_name = "info:c_bigint" field_type = bigint field_value = [ { @@ -105,7 +115,7 @@ sink { ] }, { - field_name = "cf2:col2" + field_name = "info:age" field_type = int field_value = [ {