This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b978792cb1 [Feature][Connector-V2][Hbase] implement hbase catalog 
(#7516)
b978792cb1 is described below

commit b978792cb1a3ec5e49a953da92a62bee3b446a3d
Author: Jast <ad...@hadoop.wiki>
AuthorDate: Mon Sep 9 13:31:42 2024 +0800

    [Feature][Connector-V2][Hbase] implement hbase catalog (#7516)
---
 seatunnel-connectors-v2/connector-hbase/pom.xml    |   6 +
 .../seatunnel/hbase/catalog/HbaseCatalog.java      | 194 ++++++++++
 .../hbase/catalog/HbaseCatalogFactory.java         |  50 +++
 .../seatunnel/hbase/client/HbaseClient.java        | 393 +++++++++++++++++++++
 .../seatunnel/hbase/config/HbaseConfig.java        |  21 ++
 .../seatunnel/hbase/config/HbaseParameters.java    |  23 +-
 .../seatunnel/hbase/constant/HbaseIdentifier.java  |  22 ++
 .../hbase/exception/HbaseConnectorErrorCode.java   |  13 +-
 .../connectors/seatunnel/hbase/sink/HbaseSink.java |  75 +++-
 .../seatunnel/hbase/sink/HbaseSinkFactory.java     |  25 +-
 .../seatunnel/hbase/sink/HbaseSinkWriter.java      |  65 ++--
 .../seatunnel/hbase/source/HbaseSource.java        |  10 +-
 .../seatunnel/hbase/source/HbaseSourceFactory.java |   5 +-
 .../seatunnel/hbase/source/HbaseSourceReader.java  |  36 +-
 .../hbase/source/HbaseSourceSplitEnumerator.java   |  29 +-
 .../hbase/state/HbaseAggregatedCommitInfo.java     |  22 ++
 .../seatunnel/hbase/state/HbaseCommitInfo.java     |  22 ++
 .../seatunnel/hbase/state/HbaseSinkState.java      |  22 ++
 .../seatunnel/hbase/utils/HbaseConnectionUtil.java |  48 ---
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     | 118 +++++++
 .../resources/fake_to_hbase_with_append_data.conf  |  52 +++
 .../fake_to_hbase_with_create_when_not_exists.conf |  51 +++
 .../resources/fake_to_hbase_with_drop_data.conf    |  52 +++
 .../fake_to_hbase_with_error_when_data_exists.conf |  52 +++
 .../fake_to_hbase_with_error_when_not_exists.conf  |  51 +++
 .../fake_to_hbase_with_recreate_schema.conf        |  51 +++
 26 files changed, 1339 insertions(+), 169 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-hbase/pom.xml 
b/seatunnel-connectors-v2/connector-hbase/pom.xml
index 663bdcfdd3..bda49ade0e 100644
--- a/seatunnel-connectors-v2/connector-hbase/pom.xml
+++ b/seatunnel-connectors-v2/connector-hbase/pom.xml
@@ -47,6 +47,12 @@
             <version>${hbase.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java
new file mode 100644
index 0000000000..f6a4815073
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.catalog;
+
+import org.apache.seatunnel.api.configuration.util.ConfigUtil;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/** Hbase catalog implementation. */
+@Slf4j
+public class HbaseCatalog implements Catalog {
+
+    private final String catalogName;
+    private final String defaultDatabase;
+    private final HbaseParameters hbaseParameters;
+
+    private HbaseClient hbaseClient;
+
+    public HbaseCatalog(
+            String catalogName, String defaultDatabase, HbaseParameters 
hbaseParameters) {
+        this.catalogName = checkNotNull(catalogName, "catalogName cannot be 
null");
+        this.defaultDatabase = defaultDatabase;
+        this.hbaseParameters = checkNotNull(hbaseParameters, "Hbase Config 
cannot be null");
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        try {
+            hbaseClient = HbaseClient.createInstance(hbaseParameters);
+        } catch (Exception e) {
+            throw new CatalogException(String.format("Failed to open catalog 
%s", catalogName), e);
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        hbaseClient.close();
+    }
+
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return defaultDatabase;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return hbaseClient.databaseExists(databaseName);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return hbaseClient.listDatabases();
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        }
+        return hbaseClient.listTables(databaseName);
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        checkNotNull(tablePath);
+        return hbaseClient.tableExists(tablePath.getTableName());
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        throw new UnsupportedOperationException("Not implement");
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        hbaseClient.createTable(
+                tablePath.getDatabaseName(),
+                tablePath.getTableName(),
+                hbaseParameters.getFamilyNames().values().stream()
+                        .filter(value -> !"all_columns".equals(value))
+                        .collect(Collectors.toList()),
+                ignoreIfExists);
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        checkNotNull(tablePath);
+        if (!tableExists(tablePath) && !ignoreIfNotExists) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+        hbaseClient.dropTable(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        if (databaseExists(tablePath.getDatabaseName()) && !ignoreIfExists) {
+            throw new DatabaseAlreadyExistException(catalogName, 
tablePath.getDatabaseName());
+        }
+        hbaseClient.createNamespace(tablePath.getDatabaseName());
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        if (!databaseExists(tablePath.getDatabaseName()) && 
!ignoreIfNotExists) {
+            throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+        }
+        hbaseClient.deleteNamespace(tablePath.getDatabaseName());
+    }
+
+    @Override
+    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
+        if (!tableExists(tablePath) && !ignoreIfNotExists) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+        hbaseClient.truncateTable(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    public boolean isExistsData(TablePath tablePath) {
+        return hbaseClient.isExistsData(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    private Map<String, String> buildTableOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "hbase");
+        options.put("config", ConfigUtil.convertToJsonString(tablePath));
+        return options;
+    }
+
+    @Override
+    public PreviewResult previewAction(
+            ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
+        if (actionType == ActionType.CREATE_TABLE) {
+            return new InfoPreviewResult("create index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.DROP_TABLE) {
+            return new InfoPreviewResult("delete index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.TRUNCATE_TABLE) {
+            return new InfoPreviewResult("delete and create index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.CREATE_DATABASE) {
+            return new InfoPreviewResult("create index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.DROP_DATABASE) {
+            return new InfoPreviewResult("delete index " + 
tablePath.getTableName());
+        } else {
+            throw new UnsupportedOperationException("Unsupported action type: 
" + actionType);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalogFactory.java
new file mode 100644
index 0000000000..b9a3fc25fd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalogFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HbaseCatalogFactory implements CatalogFactory {
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        // Create an instance of HbaseCatalog, passing in the catalog name, 
namespace, and Hbase
+        // parameters
+        HbaseParameters hbaseParameters = 
HbaseParameters.buildWithConfig(options);
+        return new HbaseCatalog(catalogName, hbaseParameters.getNamespace(), 
hbaseParameters);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return HbaseIdentifier.IDENTIFIER_NAME;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
new file mode 100644
index 0000000000..aec64bf7cf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.client;
+
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hbase.source.HbaseSourceSplit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode.CONNECTION_FAILED_FOR_ADMIN;
+
+@Slf4j
+public class HbaseClient {
+
+    private final Connection connection;
+    private final Admin admin;
+    private final BufferedMutator hbaseMutator;
+    public static Configuration hbaseConfiguration;
+
+    /**
+     * Constructor for HbaseClient.
+     *
+     * @param connection Hbase connection
+     * @param hbaseParameters Hbase parameters
+     */
+    private HbaseClient(Connection connection, HbaseParameters 
hbaseParameters) {
+        this.connection = connection;
+        try {
+            this.admin = connection.getAdmin();
+
+            BufferedMutatorParams bufferedMutatorParams =
+                    new BufferedMutatorParams(
+                                    TableName.valueOf(
+                                            hbaseParameters.getNamespace(),
+                                            hbaseParameters.getTable()))
+                            
.pool(HTable.getDefaultExecutor(hbaseConfiguration))
+                            
.writeBufferSize(hbaseParameters.getWriteBufferSize());
+            hbaseMutator = 
connection.getBufferedMutator(bufferedMutatorParams);
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    CONNECTION_FAILED_FOR_ADMIN, 
CONNECTION_FAILED_FOR_ADMIN.getDescription(), e);
+        }
+    }
+
+    /**
+     * Create a new instance of HbaseClient.
+     *
+     * @param hbaseParameters Hbase parameters
+     * @return HbaseClient
+     */
+    public static HbaseClient createInstance(HbaseParameters hbaseParameters) {
+        return new HbaseClient(getHbaseConnection(hbaseParameters), 
hbaseParameters);
+    }
+
+    /**
+     * Get Hbase connection.
+     *
+     * @param hbaseParameters Hbase parameters
+     * @return Hbase connection
+     */
+    private static Connection getHbaseConnection(HbaseParameters 
hbaseParameters) {
+        hbaseConfiguration = HBaseConfiguration.create();
+        hbaseConfiguration.set("hbase.zookeeper.quorum", 
hbaseParameters.getZookeeperQuorum());
+        if (hbaseParameters.getHbaseExtraConfig() != null) {
+            
hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
+        }
+        try {
+            Connection connection = 
ConnectionFactory.createConnection(hbaseConfiguration);
+            return connection;
+        } catch (IOException e) {
+            String errorMsg = "Build Hbase connection failed.";
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.CONNECTION_FAILED, errorMsg, e);
+        }
+    }
+
+    /**
+     * Check if a database exists.
+     *
+     * @param databaseName database name
+     * @return true if the database exists, false otherwise
+     */
+    public boolean databaseExists(String databaseName) {
+        try {
+            return Arrays.stream(admin.listNamespaceDescriptors())
+                    .anyMatch(descriptor -> 
descriptor.getName().equals(databaseName));
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION,
+                    
HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION.getErrorMessage(),
+                    e);
+        }
+    }
+
+    /**
+     * List all databases.
+     *
+     * @return List of database names
+     */
+    public List<String> listDatabases() {
+        try {
+            return Arrays.stream(admin.listNamespaceDescriptors())
+                    .map(NamespaceDescriptor::getName)
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION,
+                    
HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION.getErrorMessage(),
+                    e);
+        }
+    }
+
+    /**
+     * List all tables in a database.
+     *
+     * @param databaseName database name
+     * @return List of table names
+     */
+    public List<String> listTables(String databaseName) {
+        try {
+            return Arrays.stream(admin.listTableNamesByNamespace(databaseName))
+                    .map(tableName -> tableName.getNameAsString())
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION,
+                    
HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION.getErrorMessage(),
+                    e);
+        }
+    }
+
+    /**
+     * Check if a table exists.
+     *
+     * @param tableName table name
+     * @return true if the table exists, false otherwise
+     */
+    public boolean tableExists(String tableName) {
+        try {
+            return admin.tableExists(TableName.valueOf(tableName));
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
+                    
HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION.getErrorMessage(),
+                    e);
+        }
+    }
+
+    /**
+     * Create a table.
+     *
+     * @param databaseName database name
+     * @param tableName table name
+     * @param columnFamilies column families
+     * @param ignoreIfExists ignore if the table already exists
+     */
+    public void createTable(
+            String databaseName,
+            String tableName,
+            List<String> columnFamilies,
+            boolean ignoreIfExists) {
+        try {
+            if (!databaseExists(databaseName)) {
+                
admin.createNamespace(NamespaceDescriptor.create(databaseName).build());
+            }
+            TableName table = TableName.valueOf(databaseName, tableName);
+            if (tableExists(table.getNameAsString())) {
+                log.info("Table {} already exists.", table.getNameAsString());
+                if (!ignoreIfExists) {
+                    throw new HbaseConnectorException(
+                            HbaseConnectorErrorCode.TABLE_EXISTS_EXCEPTION,
+                            
HbaseConnectorErrorCode.TABLE_EXISTS_EXCEPTION.getErrorMessage());
+                }
+                return;
+            }
+            TableDescriptorBuilder hbaseTableDescriptor = 
TableDescriptorBuilder.newBuilder(table);
+            columnFamilies.forEach(
+                    family ->
+                            hbaseTableDescriptor.setColumnFamily(
+                                    
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family))
+                                            .build()));
+            admin.createTable(hbaseTableDescriptor.build());
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.TABLE_CREATE_EXCEPTION,
+                    
HbaseConnectorErrorCode.TABLE_CREATE_EXCEPTION.getErrorMessage(),
+                    e);
+        }
+    }
+
+    /**
+     * Drop a table.
+     *
+     * @param databaseName database name
+     * @param tableName table name
+     */
+    public void dropTable(String databaseName, String tableName) {
+        try {
+            TableName table = TableName.valueOf(databaseName, tableName);
+            admin.disableTable(table);
+            admin.deleteTable(table);
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.TABLE_DELETE_EXCEPTION,
+                    
HbaseConnectorErrorCode.TABLE_DELETE_EXCEPTION.getErrorMessage(),
+                    e);
+        }
+    }
+
+    /**
+     * Create a namespace.
+     *
+     * @param namespace namespace name
+     */
+    public void createNamespace(String namespace) {
+        try {
+            
admin.createNamespace(NamespaceDescriptor.create(namespace).build());
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.NAMESPACE_CREATE_EXCEPTION,
+                    
HbaseConnectorErrorCode.NAMESPACE_CREATE_EXCEPTION.getErrorMessage(),
+                    e);
+        }
+    }
+
+    /**
+     * Drop a namespace.
+     *
+     * @param namespace namespace name
+     */
+    public void deleteNamespace(String namespace) {
+        try {
+            admin.deleteNamespace(namespace);
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.NAMESPACE_DELETE_EXCEPTION,
+                    
HbaseConnectorErrorCode.NAMESPACE_DELETE_EXCEPTION.getErrorMessage(),
+                    e);
+        }
+    }
+
+    /**
+     * Truncate a table.
+     *
+     * @param databaseName database name
+     * @param tableName table name
+     */
+    public void truncateTable(String databaseName, String tableName) {
+        try {
+            TableName table = TableName.valueOf(databaseName, tableName);
+            admin.disableTable(table);
+            admin.truncateTable(table, true);
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.TABLE_TRUNCATE_EXCEPTION,
+                    
HbaseConnectorErrorCode.TABLE_TRUNCATE_EXCEPTION.getErrorMessage(),
+                    e);
+        }
+    }
+
+    /**
+     * Check if a table has data.
+     *
+     * @param databaseName database name
+     * @param tableName table name
+     * @return true if the table has data, false otherwise
+     */
+    public boolean isExistsData(String databaseName, String tableName) {
+        try {
+            Table table = connection.getTable(TableName.valueOf(databaseName, 
tableName));
+            Scan scan = new Scan();
+            scan.setCaching(1);
+            scan.setLimit(1);
+            try (ResultScanner scanner = table.getScanner(scan)) {
+                Result result = scanner.next();
+                return !result.isEmpty();
+            }
+        } catch (IOException e) {
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
+                    
HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION.getErrorMessage(),
+                    e);
+        }
+    }
+
+    /** Close Hbase connection. */
+    public void close() {
+        try {
+            if (hbaseMutator != null) {
+                hbaseMutator.flush();
+                hbaseMutator.close();
+            }
+            if (admin != null) {
+                admin.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (IOException e) {
+            log.error("Close Hbase connection failed.", e);
+        }
+    }
+
+    /**
+     * Mutate a Put.
+     *
+     * @param put Hbase put
+     * @throws IOException exception
+     */
+    public void mutate(Put put) throws IOException {
+        hbaseMutator.mutate(put);
+    }
+
+    /**
+     * Scan a table.
+     *
+     * @param split Hbase source split
+     * @param hbaseParameters Hbase parameters
+     * @param columnNames column names
+     * @return ResultScanner
+     * @throws IOException exception
+     */
+    public ResultScanner scan(
+            HbaseSourceSplit split, HbaseParameters hbaseParameters, 
List<String> columnNames)
+            throws IOException {
+        Scan scan = new Scan();
+        scan.withStartRow(split.getStartRow(), true);
+        scan.withStopRow(split.getEndRow(), true);
+        scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
+        scan.setCaching(hbaseParameters.getCaching());
+        scan.setBatch(hbaseParameters.getBatch());
+        for (String columnName : columnNames) {
+            String[] columnNameSplit = columnName.split(":");
+            scan.addColumn(Bytes.toBytes(columnNameSplit[0]), 
Bytes.toBytes(columnNameSplit[1]));
+        }
+        return this.connection
+                .getTable(TableName.valueOf(hbaseParameters.getTable()))
+                .getScanner(scan);
+    }
+
+    /**
+     * Get a RegionLocator.
+     *
+     * @param tableName table name
+     * @return RegionLocator
+     * @throws IOException exception
+     */
+    public RegionLocator getRegionLocator(String tableName) throws IOException 
{
+        return this.connection.getRegionLocator(TableName.valueOf(tableName));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
index 44a5640ffe..2921e1f91c 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
@@ -19,10 +19,17 @@ package 
org.apache.seatunnel.connectors.seatunnel.hbase.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static 
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
 public class HbaseConfig {
 
     private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
@@ -119,6 +126,20 @@ public class HbaseConfig {
                     .withDescription(
                             "Set the batch size to control the maximum number 
of cells returned each time, thereby controlling the amount of data returned by 
a single RPC call. The default value is -1.");
 
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")
+                    .enumType(SchemaSaveMode.class)
+                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+                    .withDescription("schema_save_mode");
+
+    public static final Option<DataSaveMode> DATA_SAVE_MODE =
+            Options.key("data_save_mode")
+                    .singleChoice(
+                            DataSaveMode.class,
+                            Arrays.asList(DROP_DATA, APPEND_DATA, 
ERROR_WHEN_DATA_EXISTS))
+                    .defaultValue(APPEND_DATA)
+                    .withDescription("data_save_mode");
+
     public enum NullMode {
         SKIP,
         EMPTY;
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index 4d020700ad..66b4eb967b 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -51,6 +51,8 @@ public class HbaseParameters implements Serializable {
 
     private String zookeeperQuorum;
 
+    private String namespace;
+
     private String table;
 
     private List<String> rowkeyColumns;
@@ -83,13 +85,22 @@ public class HbaseParameters implements Serializable {
 
     public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
         HbaseParametersBuilder builder = HbaseParameters.builder();
+        String table = config.get(TABLE);
+        int colonIndex = table.indexOf(':');
+        if (colonIndex != -1) {
+            String namespace = table.substring(0, colonIndex);
+            builder.namespace(namespace);
+            builder.table(table.substring(colonIndex + 1));
+        } else {
+            builder.table(table);
+            builder.namespace("default");
+        }
 
         // required parameters
         builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM));
         builder.rowkeyColumns(config.get(ROWKEY_COLUMNS));
         builder.familyNames(config.get(FAMILY_NAME));
 
-        builder.table(config.get(TABLE));
         builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER));
         builder.versionColumn(config.get(VERSION_COLUMN));
         String nullMode = String.valueOf(config.get(NULL_MODE));
@@ -108,7 +119,15 @@ public class HbaseParameters implements Serializable {
 
         // required parameters
         
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
-        builder.table(pluginConfig.getString(TABLE.key()));
+        String table = pluginConfig.getString(TABLE.key());
+        int colonIndex = table.indexOf(':');
+        if (colonIndex != -1) {
+            String namespace = table.substring(0, colonIndex);
+            builder.namespace(namespace);
+            builder.table(table.substring(colonIndex + 1));
+        } else {
+            builder.table(table);
+        }
 
         if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
             Config extraConfig = 
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/constant/HbaseIdentifier.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/constant/HbaseIdentifier.java
new file mode 100644
index 0000000000..3d84216d66
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/constant/HbaseIdentifier.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.constant;
+
+public class HbaseIdentifier {
+    public static final String IDENTIFIER_NAME = "Hbase";
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
index 5717c933b0..7f6a60f955 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
@@ -21,8 +21,17 @@ package 
org.apache.seatunnel.connectors.seatunnel.hbase.exception;
 import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 
 public enum HbaseConnectorErrorCode implements SeaTunnelErrorCode {
-    CONNECTION_FAILED("Hbase-01", "Build Hbase connection failed");
-
+    CONNECTION_FAILED("Hbase-01", "Build Hbase connection failed"),
+    CONNECTION_FAILED_FOR_ADMIN("Hbase-02", "Build Hbase Admin failed"),
+    DATABASE_QUERY_EXCEPTION("Hbase-03", "Hbase namespace query failed"),
+    TABLE_QUERY_EXCEPTION("Hbase-04", "Hbase table query failed"),
+    TABLE_CREATE_EXCEPTION("Hbase-05", "Hbase table create failed"),
+    TABLE_DELETE_EXCEPTION("Hbase-06", "Hbase table delete failed"),
+    TABLE_EXISTS_EXCEPTION("Hbase-07", "Hbase table exists failed"),
+    NAMESPACE_CREATE_EXCEPTION("Hbase-08", "Hbase namespace create failed"),
+    NAMESPACE_DELETE_EXCEPTION("Hbase-09", "Hbase namespace delete failed"),
+    TABLE_TRUNCATE_EXCEPTION("Hbase-10", "Hbase table truncate failed"),
+    ;
     private final String code;
     private final String description;
 
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 0c592dd65a..0a46b1baef 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -17,52 +17,97 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseSinkState;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
-public class HbaseSink extends AbstractSimpleSink<SeaTunnelRow, Void>
-        implements SupportMultiTableSink {
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
 
-    private Config pluginConfig;
+public class HbaseSink
+        implements SeaTunnelSink<
+                        SeaTunnelRow, HbaseSinkState, HbaseCommitInfo, 
HbaseAggregatedCommitInfo>,
+                SupportMultiTableSink,
+                SupportSaveMode {
 
-    private SeaTunnelRowType seaTunnelRowType;
+    private ReadonlyConfig config;
+
+    private CatalogTable catalogTable;
 
-    private HbaseParameters hbaseParameters;
+    private final HbaseParameters hbaseParameters;
+
+    private SeaTunnelRowType seaTunnelRowType;
 
     private List<Integer> rowkeyColumnIndexes = new ArrayList<>();
 
     private int versionColumnIndex = -1;
 
+    public HbaseSink(ReadonlyConfig config, CatalogTable catalogTable) {
+        this.hbaseParameters = HbaseParameters.buildWithConfig(config);
+        this.config = config;
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+        if (hbaseParameters.getVersionColumn() != null) {
+            this.versionColumnIndex = 
seaTunnelRowType.indexOf(hbaseParameters.getVersionColumn());
+        }
+    }
+
     @Override
     public String getPluginName() {
-        return HbaseSinkFactory.IDENTIFIER;
+        return HbaseIdentifier.IDENTIFIER_NAME;
     }
 
-    public HbaseSink(HbaseParameters hbaseParameters, CatalogTable 
catalogTable) {
-        this.hbaseParameters = hbaseParameters;
-        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
+    @Override
+    public HbaseSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
         for (String rowkeyColumn : hbaseParameters.getRowkeyColumns()) {
             
this.rowkeyColumnIndexes.add(seaTunnelRowType.indexOf(rowkeyColumn));
         }
         if (hbaseParameters.getVersionColumn() != null) {
             this.versionColumnIndex = 
seaTunnelRowType.indexOf(hbaseParameters.getVersionColumn());
         }
+        return new HbaseSinkWriter(
+                seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes, 
versionColumnIndex);
     }
 
     @Override
-    public HbaseSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
-        return new HbaseSinkWriter(
-                seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes, 
versionColumnIndex);
+    public Optional<SaveModeHandler> getSaveModeHandler() {
+        CatalogFactory catalogFactory =
+                discoverFactory(
+                        Thread.currentThread().getContextClassLoader(),
+                        CatalogFactory.class,
+                        getPluginName());
+        if (catalogFactory == null) {
+            return Optional.empty();
+        }
+        Catalog catalog = 
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
+        SchemaSaveMode schemaSaveMode = 
config.get(HbaseConfig.SCHEMA_SAVE_MODE);
+        DataSaveMode dataSaveMode = config.get(HbaseConfig.DATA_SAVE_MODE);
+        TablePath tablePath =
+                TablePath.of(hbaseParameters.getNamespace(), 
hbaseParameters.getTable());
+        return Optional.of(
+                new DefaultSaveModeHandler(
+                        schemaSaveMode, dataSaveMode, catalog, tablePath, 
null, null));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
index 1bbeb43f4e..0992b11d71 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
@@ -17,23 +17,25 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.sink.SinkCommonOptions;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
 
 import com.google.auto.service.AutoService;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.DATA_SAVE_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.SCHEMA_SAVE_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
@@ -47,29 +49,34 @@ public class HbaseSinkFactory implements TableSinkFactory {
 
     @Override
     public String factoryIdentifier() {
-        return IDENTIFIER;
+        return HbaseIdentifier.IDENTIFIER_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(ZOOKEEPER_QUORUM, TABLE, ROWKEY_COLUMNS, FAMILY_NAME)
+                .required(
+                        ZOOKEEPER_QUORUM,
+                        TABLE,
+                        ROWKEY_COLUMNS,
+                        FAMILY_NAME,
+                        SCHEMA_SAVE_MODE,
+                        DATA_SAVE_MODE)
                 .optional(
-                        SinkCommonOptions.MULTI_TABLE_SINK_REPLICA,
                         ROWKEY_DELIMITER,
                         VERSION_COLUMN,
                         NULL_MODE,
                         WAL_WRITE,
                         WRITE_BUFFER_SIZE,
                         ENCODING,
-                        HBASE_EXTRA_CONFIG)
+                        HBASE_EXTRA_CONFIG,
+                        SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
-        HbaseParameters hbaseParameters = 
HbaseParameters.buildWithConfig(context.getOptions());
-        CatalogTable catalogTable = context.getCatalogTable();
-        return () -> new HbaseSink(hbaseParameters, catalogTable);
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        return () -> new HbaseSink(readonlyConfig, context.getCatalogTable());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index e1e312d305..73ee19f936 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -17,52 +17,46 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
 
+import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
 import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseSinkState;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.BufferedMutatorParams;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
-        implements SupportMultiTableSinkWriter<Void> {
+public class HbaseSinkWriter
+        implements SinkWriter<SeaTunnelRow, HbaseCommitInfo, HbaseSinkState>,
+                SupportMultiTableSinkWriter<Void> {
 
     private static final String ALL_COLUMNS = "all_columns";
 
-    private final Configuration hbaseConfiguration = 
HBaseConfiguration.create();
-
-    private final Connection hbaseConnection;
-
-    private final BufferedMutator hbaseMutator;
+    private final HbaseClient hbaseClient;
 
     private final SeaTunnelRowType seaTunnelRowType;
 
     private final HbaseParameters hbaseParameters;
 
-    private final List<Integer> rowkeyColumnIndexes;
+    private List<Integer> rowkeyColumnIndexes;
 
-    private final int versionColumnIndex;
+    private int versionColumnIndex;
 
     private String defaultFamilyName = "value";
 
@@ -70,8 +64,7 @@ public class HbaseSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
             SeaTunnelRowType seaTunnelRowType,
             HbaseParameters hbaseParameters,
             List<Integer> rowkeyColumnIndexes,
-            int versionColumnIndex)
-            throws IOException {
+            int versionColumnIndex) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.hbaseParameters = hbaseParameters;
         this.rowkeyColumnIndexes = rowkeyColumnIndexes;
@@ -82,34 +75,27 @@ public class HbaseSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
                     hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, 
defaultFamilyName);
         }
 
-        // initialize hbase configuration
-        hbaseConfiguration.set("hbase.zookeeper.quorum", 
hbaseParameters.getZookeeperQuorum());
-        if (hbaseParameters.getHbaseExtraConfig() != null) {
-            
hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
-        }
-        // initialize hbase connection
-        hbaseConnection = 
ConnectionFactory.createConnection(hbaseConfiguration);
-        // initialize hbase mutator
-        BufferedMutatorParams bufferedMutatorParams =
-                new 
BufferedMutatorParams(TableName.valueOf(hbaseParameters.getTable()))
-                        .pool(HTable.getDefaultExecutor(hbaseConfiguration))
-                        .writeBufferSize(hbaseParameters.getWriteBufferSize());
-        hbaseMutator = 
hbaseConnection.getBufferedMutator(bufferedMutatorParams);
+        this.hbaseClient = HbaseClient.createInstance(hbaseParameters);
     }
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
         Put put = convertRowToPut(element);
-        hbaseMutator.mutate(put);
+        hbaseClient.mutate(put);
     }
 
+    @Override
+    public Optional<HbaseCommitInfo> prepareCommit() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public void abortPrepare() {}
+
     @Override
     public void close() throws IOException {
-        if (hbaseMutator != null) {
-            hbaseMutator.close();
-        }
-        if (hbaseConnection != null) {
-            hbaseConnection.close();
+        if (hbaseClient != null) {
+            hbaseClient.close();
         }
     }
 
@@ -134,6 +120,7 @@ public class HbaseSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
                         .collect(Collectors.toList());
         for (Integer writeColumnIndex : writeColumnIndexes) {
             String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
+            Map<String, String> configurationFamilyNames = 
hbaseParameters.getFamilyNames();
             String familyName =
                     hbaseParameters.getFamilyNames().getOrDefault(fieldName, 
defaultFamilyName);
             byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
index 3aca316151..1a597eea13 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
@@ -35,11 +35,9 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.collect.Lists;
 
 import java.util.List;
@@ -51,9 +49,6 @@ public class HbaseSource
         implements SeaTunnelSource<SeaTunnelRow, HbaseSourceSplit, 
HbaseSourceState>,
                 SupportParallelism,
                 SupportColumnProjection {
-    private static final Logger LOG = 
LoggerFactory.getLogger(HbaseSource.class);
-    public static final String PLUGIN_NAME = "Hbase";
-    private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
     private HbaseParameters hbaseParameters;
 
@@ -61,11 +56,10 @@ public class HbaseSource
 
     @Override
     public String getPluginName() {
-        return PLUGIN_NAME;
+        return HbaseIdentifier.IDENTIFIER_NAME;
     }
 
     HbaseSource(Config pluginConfig) {
-        this.pluginConfig = pluginConfig;
         CheckResult result =
                 CheckConfigUtil.checkAllExists(pluginConfig, 
ZOOKEEPER_QUORUM.key(), TABLE.key());
         if (!result.isSuccess()) {
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
index 2de385dbd1..5e250337d7 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
@@ -26,18 +26,17 @@ import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
 
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.sink.HbaseSinkFactory.IDENTIFIER;
-
 @AutoService(Factory.class)
 public class HbaseSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return IDENTIFIER;
+        return HbaseIdentifier.IDENTIFIER_NAME;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
index 526ac826db..2f78fb280c 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
@@ -22,16 +22,12 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
 import 
org.apache.seatunnel.connectors.seatunnel.hbase.format.HBaseDeserializationFormat;
-import 
org.apache.seatunnel.connectors.seatunnel.hbase.utils.HbaseConnectionUtil;
 
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -55,13 +51,13 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
 
     private final transient Map<String, byte[][]> namesMap;
 
-    private final SourceReader.Context context;
+    private final Context context;
     private final SeaTunnelRowType seaTunnelRowType;
     private volatile boolean noMoreSplit = false;
+    private final HbaseClient hbaseClient;
 
     private HbaseParameters hbaseParameters;
     private final List<String> columnNames;
-    private final transient Connection connection;
 
     private HBaseDeserializationFormat hbaseDeserializationFormat =
             new HBaseDeserializationFormat();
@@ -85,8 +81,7 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
                                 Preconditions.checkArgument(
                                         column.contains(":") && 
column.split(":").length == 2,
                                         "Invalid column names, it should be 
[ColumnFamily:Column] format"));
-
-        connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+        hbaseClient = HbaseClient.createInstance(hbaseParameters);
     }
 
     @Override
@@ -103,9 +98,9 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
                 throw new IOException("Failed to close HBase Scanner.", e);
             }
         }
-        if (this.connection != null) {
+        if (this.hbaseClient != null) {
             try {
-                this.connection.close();
+                this.hbaseClient.close();
             } catch (Exception e) {
                 throw new IOException("Failed to close HBase connection.", e);
             }
@@ -119,23 +114,8 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
             final HbaseSourceSplit split = sourceSplits.poll();
             if (Objects.nonNull(split)) {
                 // read logic
-                if (this.currentScanner == null) {
-                    Scan scan = new Scan();
-                    scan.withStartRow(split.getStartRow(), true);
-                    scan.withStopRow(split.getEndRow(), true);
-                    scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
-                    scan.setCaching(hbaseParameters.getCaching());
-                    scan.setBatch(hbaseParameters.getBatch());
-                    for (String columnName : this.columnNames) {
-                        String[] columnNameSplit = columnName.split(":");
-                        scan.addColumn(
-                                Bytes.toBytes(columnNameSplit[0]),
-                                Bytes.toBytes(columnNameSplit[1]));
-                    }
-                    this.currentScanner =
-                            this.connection
-                                    
.getTable(TableName.valueOf(hbaseParameters.getTable()))
-                                    .getScanner(scan);
+                if (currentScanner == null) {
+                    currentScanner = hbaseClient.scan(split, hbaseParameters, 
this.columnNames);
                 }
                 for (Result result : currentScanner) {
                     SeaTunnelRow seaTunnelRow =
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
index 094128b174..f5508c9037 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
@@ -18,14 +18,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hbase.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
-import 
org.apache.seatunnel.connectors.seatunnel.hbase.utils.HbaseConnectionUtil;
 
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionLocator;
 
 import lombok.extern.slf4j.Slf4j;
@@ -43,7 +39,6 @@ public class HbaseSourceSplitEnumerator
     /** Source split enumerator context */
     private final Context<HbaseSourceSplit> context;
 
-    private Config pluginConfig;
     /** The splits that has assigned */
     private final Set<HbaseSourceSplit> assignedSplit;
 
@@ -51,24 +46,29 @@ public class HbaseSourceSplitEnumerator
     private Set<HbaseSourceSplit> pendingSplit;
 
     private HbaseParameters hbaseParameters;
-    private Connection connection;
+
+    private HbaseClient hbaseClient;
 
     public HbaseSourceSplitEnumerator(
             Context<HbaseSourceSplit> context, HbaseParameters 
hbaseParameters) {
-        this.context = context;
-        this.hbaseParameters = hbaseParameters;
-        this.assignedSplit = new HashSet<>();
-        connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+        this(context, hbaseParameters, new HashSet<>());
     }
 
     public HbaseSourceSplitEnumerator(
             Context<HbaseSourceSplit> context,
             HbaseParameters hbaseParameters,
             HbaseSourceState sourceState) {
+        this(context, hbaseParameters, sourceState.getAssignedSplits());
+    }
+
+    private HbaseSourceSplitEnumerator(
+            Context<HbaseSourceSplit> context,
+            HbaseParameters hbaseParameters,
+            Set<HbaseSourceSplit> assignedSplit) {
         this.context = context;
         this.hbaseParameters = hbaseParameters;
-        this.assignedSplit = sourceState.getAssignedSplits();
-        connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+        this.assignedSplit = assignedSplit;
+        this.hbaseClient = HbaseClient.createInstance(hbaseParameters);
     }
 
     @Override
@@ -157,8 +157,7 @@ public class HbaseSourceSplitEnumerator
         List<HbaseSourceSplit> splits = new ArrayList<>();
 
         try {
-            RegionLocator regionLocator =
-                    
connection.getRegionLocator(TableName.valueOf(hbaseParameters.getTable()));
+            RegionLocator regionLocator = 
hbaseClient.getRegionLocator(hbaseParameters.getTable());
             byte[][] startKeys = regionLocator.getStartKeys();
             byte[][] endKeys = regionLocator.getEndKeys();
             if (startKeys.length != endKeys.length) {
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseAggregatedCommitInfo.java
new file mode 100644
index 0000000000..c1996dc057
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseAggregatedCommitInfo.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.state;
+
+import java.io.Serializable;
+
+public class HbaseAggregatedCommitInfo implements Serializable {}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseCommitInfo.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseCommitInfo.java
new file mode 100644
index 0000000000..39999ceddc
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseCommitInfo.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.state;
+
+import java.io.Serializable;
+
+public class HbaseCommitInfo implements Serializable {}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseSinkState.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseSinkState.java
new file mode 100644
index 0000000000..6e1f068cf6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseSinkState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.state;
+
+import java.io.Serializable;
+
+public class HbaseSinkState implements Serializable {}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
deleted file mode 100644
index f006986e66..0000000000
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hbase.utils;
-
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
-import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-
-import java.io.IOException;
-
-public class HbaseConnectionUtil {
-    public static Connection getHbaseConnection(HbaseParameters 
hbaseParameters) {
-        Configuration hbaseConfiguration = HBaseConfiguration.create();
-        hbaseConfiguration.set("hbase.zookeeper.quorum", 
hbaseParameters.getZookeeperQuorum());
-        if (hbaseParameters.getHbaseExtraConfig() != null) {
-            
hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
-        }
-        // initialize hbase connection
-        try {
-            Connection connection = 
ConnectionFactory.createConnection(hbaseConfiguration);
-            return connection;
-        } catch (IOException e) {
-            String errorMsg = "Build Hbase connection failed.";
-            throw new 
HbaseConnectorException(HbaseConnectorErrorCode.CONNECTION_FAILED, errorMsg);
-        }
-    }
-}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index fe736f965e..1957e1bd08 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -29,10 +29,12 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import org.junit.jupiter.api.AfterAll;
@@ -47,6 +49,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Objects;
+import java.util.UUID;
 
 @Slf4j
 @DisabledOnContainer(
@@ -79,6 +82,7 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
     public void startUp() throws Exception {
         hbaseCluster = new HbaseCluster();
         hbaseConnection = hbaseCluster.startService();
+        admin = hbaseConnection.getAdmin();
         // Create table for hbase sink test
         log.info("initial");
         hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
@@ -112,6 +116,87 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, sourceExecResult.getExitCode());
     }
 
+    @TestTemplate
+    public void testHbaseSinkWithErrorWhenDataExists(TestContainer container)
+            throws IOException, InterruptedException {
+        deleteData(table);
+        insertData(table);
+        Assertions.assertEquals(5, countData(table));
+        Container.ExecResult execResult =
+                
container.executeJob("/fake_to_hbase_with_error_when_data_exists.conf");
+        Assertions.assertEquals(1, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testHbaseSinkWithRecreateSchema(TestContainer container)
+            throws IOException, InterruptedException {
+        String tableName = "seatunnel_test_with_recreate_schema";
+        TableName table = TableName.valueOf(tableName);
+        dropTable(table);
+        hbaseCluster.createTable(tableName, Arrays.asList("test_rs"));
+        TableDescriptor descriptorBefore = 
hbaseConnection.getTable(table).getDescriptor();
+        String[] familiesBefore =
+                Arrays.stream(descriptorBefore.getColumnFamilies())
+                        .map(f -> f.getNameAsString())
+                        .toArray(String[]::new);
+        Assertions.assertTrue(Arrays.equals(familiesBefore, new String[] 
{"test_rs"}));
+        Container.ExecResult execResult =
+                
container.executeJob("/fake_to_hbase_with_recreate_schema.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        TableDescriptor descriptorAfter = 
hbaseConnection.getTable(table).getDescriptor();
+        String[] familiesAfter =
+                Arrays.stream(descriptorAfter.getColumnFamilies())
+                        .map(f -> f.getNameAsString())
+                        .toArray(String[]::new);
+        Assertions.assertTrue(!Arrays.equals(familiesBefore, familiesAfter));
+    }
+
+    @TestTemplate
+    public void testHbaseSinkWithDropData(TestContainer container)
+            throws IOException, InterruptedException {
+        deleteData(table);
+        insertData(table);
+        countData(table);
+        Assertions.assertEquals(5, countData(table));
+        Container.ExecResult execResult =
+                container.executeJob("/fake_to_hbase_with_drop_data.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(5, countData(table));
+    }
+
+    @TestTemplate
+    public void testHbaseSinkWithCreateWhenNotExists(TestContainer container)
+            throws IOException, InterruptedException {
+        TableName seatunnelTestWithCreateWhenNotExists =
+                
TableName.valueOf("seatunnel_test_with_create_when_not_exists");
+        dropTable(seatunnelTestWithCreateWhenNotExists);
+        Container.ExecResult execResult =
+                
container.executeJob("/fake_to_hbase_with_create_when_not_exists.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(5, 
countData(seatunnelTestWithCreateWhenNotExists));
+    }
+
+    @TestTemplate
+    public void testHbaseSinkWithAppendData(TestContainer container)
+            throws IOException, InterruptedException {
+        deleteData(table);
+        insertData(table);
+        countData(table);
+        Assertions.assertEquals(5, countData(table));
+        Container.ExecResult execResult =
+                container.executeJob("/fake_to_hbase_with_append_data.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(10, countData(table));
+    }
+
+    @TestTemplate
+    public void testHbaseSinkWithErrorWhenNotExists(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/fake_to_hbase_with_error_when_not_exists.conf");
+        Assertions.assertEquals(1, execResult.getExitCode());
+    }
+
     @TestTemplate
     public void testHbaseSinkWithArray(TestContainer container)
             throws IOException, InterruptedException {
@@ -223,6 +308,13 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         scanner.close();
     }
 
+    private void dropTable(TableName tableName) throws IOException {
+        if (admin.tableExists(tableName)) {
+            admin.disableTable(tableName);
+            admin.deleteTable(tableName);
+        }
+    }
+
     private void deleteData(TableName table) throws IOException {
         Table hbaseTable = hbaseConnection.getTable(table);
         Scan scan = new Scan();
@@ -234,6 +326,32 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         }
     }
 
+    private void insertData(TableName table) throws IOException {
+        Table hbaseTable = hbaseConnection.getTable(table);
+        for (int i = 0; i < 5; i++) {
+            String rowKey = "row" + UUID.randomUUID();
+            String value = "value" + i;
+            hbaseTable.put(
+                    new Put(Bytes.toBytes(rowKey))
+                            .addColumn(
+                                    Bytes.toBytes(FAMILY_NAME),
+                                    Bytes.toBytes("name"),
+                                    Bytes.toBytes(value)));
+        }
+    }
+
+    private int countData(TableName table) throws IOException {
+        Table hbaseTable = hbaseConnection.getTable(table);
+        Scan scan = new Scan();
+        ResultScanner scanner = hbaseTable.getScanner(scan);
+        int count = 0;
+        for (Result result = scanner.next(); result != null; result = 
scanner.next()) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
     public ArrayList<Result> readData(TableName table) throws IOException {
         Table hbaseTable = hbaseConnection.getTable(table);
         Scan scan = new Scan();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_append_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_append_data.conf
new file mode 100644
index 0000000000..0778d8cb36
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_append_data.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 5
+    schema {
+      fields {
+        name = string
+        age = int
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_boolean = boolean
+      }
+    }
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hbase_e2e:2181"
+    table = "seatunnel_test"
+    rowkey_column = ["name"]
+    family_name {
+      all_columns = info
+    }
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode = "APPEND_DATA"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_create_when_not_exists.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_create_when_not_exists.conf
new file mode 100644
index 0000000000..2132717082
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_create_when_not_exists.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 5
+    schema {
+      fields {
+        name = string
+        age = int
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_boolean = boolean
+      }
+    }
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hbase_e2e:2181"
+    table = "seatunnel_test_with_create_when_not_exists"
+    rowkey_column = ["name"]
+    family_name {
+      all_columns = info
+    }
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_drop_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_drop_data.conf
new file mode 100644
index 0000000000..66b3981206
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_drop_data.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 5
+    schema {
+      fields {
+        name = string
+        age = int
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_boolean = boolean
+      }
+    }
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hbase_e2e:2181"
+    table = "seatunnel_test_with_create_when_not_exists"
+    rowkey_column = ["name"]
+    family_name {
+      all_columns = info
+    }
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode = "DROP_DATA"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_data_exists.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_data_exists.conf
new file mode 100644
index 0000000000..00e0485e3d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_data_exists.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 5
+    schema {
+      fields {
+        name = string
+        age = int
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_boolean = boolean
+      }
+    }
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hbase_e2e:2181"
+    table = "seatunnel_test"
+    rowkey_column = ["name"]
+    family_name {
+      all_columns = info
+    }
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode = "ERROR_WHEN_DATA_EXISTS"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_not_exists.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_not_exists.conf
new file mode 100644
index 0000000000..359b71b79f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_not_exists.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 5
+    schema {
+      fields {
+        name = string
+        age = int
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_boolean = boolean
+      }
+    }
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hbase_e2e:2181"
+    table = "seatunnel_test_with_error_when_not_exists"
+    rowkey_column = ["name"]
+    family_name {
+      all_columns = info
+    }
+    schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_recreate_schema.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_recreate_schema.conf
new file mode 100644
index 0000000000..c8a8c43d9c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_recreate_schema.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 5
+    schema {
+      fields {
+        name = string
+        age = int
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_boolean = boolean
+      }
+    }
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hbase_e2e:2181"
+    table = "seatunnel_test_with_recreate_schema"
+    rowkey_column = ["name"]
+    family_name {
+      all_columns = info
+    }
+    schema_save_mode = "RECREATE_SCHEMA"
+  }
+}
\ No newline at end of file


Reply via email to