This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b6a702b58f [Improve] hbase options (#8923)
b6a702b58f is described below

commit b6a702b58f0503faf2d127332c4386c075006073
Author: Jarvis <jar...@apache.org>
AuthorDate: Mon Mar 10 20:06:16 2025 +0800

    [Improve] hbase options (#8923)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  2 -
 .../seatunnel/hbase/config/HbaseBaseOptions.java   | 41 ++++++++++
 .../seatunnel/hbase/config/HbaseParameters.java    | 95 +++++++++-------------
 .../{HbaseConfig.java => HbaseSinkOptions.java}    | 17 +---
 .../seatunnel/hbase/config/HbaseSourceOptions.java | 20 +++++
 .../connectors/seatunnel/hbase/sink/HbaseSink.java |  6 +-
 .../seatunnel/hbase/sink/HbaseSinkFactory.java     | 43 ++++------
 .../seatunnel/hbase/source/HbaseSource.java        | 36 ++------
 .../seatunnel/hbase/source/HbaseSourceFactory.java | 12 ++-
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     | 11 +--
 10 files changed, 142 insertions(+), 141 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index d301086222..0df7f57c1e 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -183,12 +183,10 @@ public class ConnectorOptionCheckTest {
         whiteList.add("TDengineSourceOptions");
         whiteList.add("PulsarSourceOptions");
         whiteList.add("FakeSourceOptions");
-        whiteList.add("HbaseSinkOptions");
         whiteList.add("MongodbSinkOptions");
         whiteList.add("IoTDBSinkOptions");
         whiteList.add("EasysearchSourceOptions");
         whiteList.add("IcebergSourceOptions");
-        whiteList.add("HbaseSourceOptions");
         whiteList.add("PaimonSourceOptions");
         whiteList.add("IoTDBSourceOptions");
         whiteList.add("SlsSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java
new file mode 100644
index 0000000000..46bb9cb0f6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
+public class HbaseBaseOptions {
+
+    public static final Option<String> ZOOKEEPER_QUORUM =
+            Options.key("zookeeper_quorum")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Hbase zookeeper quorum");
+
+    public static final Option<String> TABLE =
+            
Options.key("table").stringType().noDefaultValue().withDescription("Hbase table 
name");
+
+    public static final Option<List<String>> ROWKEY_COLUMNS =
+            Options.key("rowkey_column")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription("Hbase rowkey column");
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index 66b4eb967b..35d9fbfcbe 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -17,10 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hbase.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 
 import lombok.Builder;
 import lombok.Getter;
@@ -29,22 +26,6 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_BATCH_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHE_BLOCKS_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHING_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
-
 @Builder
 @Getter
 public class HbaseParameters implements Serializable {
@@ -65,27 +46,32 @@ public class HbaseParameters implements Serializable {
 
     private Map<String, String> hbaseExtraConfig;
 
-    @Builder.Default private int caching = HBASE_CACHING_CONFIG.defaultValue();
+    @Builder.Default private int caching = 
HbaseSinkOptions.HBASE_CACHING_CONFIG.defaultValue();
 
-    @Builder.Default private int batch = HBASE_BATCH_CONFIG.defaultValue();
+    @Builder.Default private int batch = 
HbaseSinkOptions.HBASE_BATCH_CONFIG.defaultValue();
 
-    @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue();
+    @Builder.Default private Long ttl = 
HbaseSinkOptions.HBASE_TTL_CONFIG.defaultValue();
 
-    @Builder.Default private boolean cacheBlocks = 
HBASE_CACHE_BLOCKS_CONFIG.defaultValue();
+    @Builder.Default
+    private boolean cacheBlocks = 
HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG.defaultValue();
 
-    @Builder.Default private String rowkeyDelimiter = 
ROWKEY_DELIMITER.defaultValue();
+    @Builder.Default
+    private String rowkeyDelimiter = 
HbaseSinkOptions.ROWKEY_DELIMITER.defaultValue();
 
-    @Builder.Default private HbaseConfig.NullMode nullMode = 
NULL_MODE.defaultValue();
+    @Builder.Default
+    private HbaseSinkOptions.NullMode nullMode = 
HbaseSinkOptions.NULL_MODE.defaultValue();
 
-    @Builder.Default private boolean walWrite = WAL_WRITE.defaultValue();
+    @Builder.Default private boolean walWrite = 
HbaseSinkOptions.WAL_WRITE.defaultValue();
 
-    @Builder.Default private int writeBufferSize = 
WRITE_BUFFER_SIZE.defaultValue();
+    @Builder.Default
+    private int writeBufferSize = 
HbaseSinkOptions.WRITE_BUFFER_SIZE.defaultValue();
 
-    @Builder.Default private HbaseConfig.EnCoding enCoding = 
ENCODING.defaultValue();
+    @Builder.Default
+    private HbaseSinkOptions.EnCoding enCoding = 
HbaseSinkOptions.ENCODING.defaultValue();
 
     public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
         HbaseParametersBuilder builder = HbaseParameters.builder();
-        String table = config.get(TABLE);
+        String table = config.get(HbaseBaseOptions.TABLE);
         int colonIndex = table.indexOf(':');
         if (colonIndex != -1) {
             String namespace = table.substring(0, colonIndex);
@@ -97,29 +83,29 @@ public class HbaseParameters implements Serializable {
         }
 
         // required parameters
-        builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM));
-        builder.rowkeyColumns(config.get(ROWKEY_COLUMNS));
-        builder.familyNames(config.get(FAMILY_NAME));
-
-        builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER));
-        builder.versionColumn(config.get(VERSION_COLUMN));
-        String nullMode = String.valueOf(config.get(NULL_MODE));
-        builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase()));
-        builder.walWrite(config.get(WAL_WRITE));
-        builder.writeBufferSize(config.get(WRITE_BUFFER_SIZE));
-        String encoding = String.valueOf(config.get(ENCODING));
-        builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase()));
-        builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG));
-        builder.ttl(config.get(HBASE_TTL_CONFIG));
+        builder.zookeeperQuorum(config.get(HbaseBaseOptions.ZOOKEEPER_QUORUM));
+        builder.rowkeyColumns(config.get(HbaseBaseOptions.ROWKEY_COLUMNS));
+        builder.familyNames(config.get(HbaseSinkOptions.FAMILY_NAME));
+
+        builder.rowkeyDelimiter(config.get(HbaseSinkOptions.ROWKEY_DELIMITER));
+        builder.versionColumn(config.get(HbaseSinkOptions.VERSION_COLUMN));
+        String nullMode = 
String.valueOf(config.get(HbaseSinkOptions.NULL_MODE));
+        
builder.nullMode(HbaseSinkOptions.NullMode.valueOf(nullMode.toUpperCase()));
+        builder.walWrite(config.get(HbaseSinkOptions.WAL_WRITE));
+        
builder.writeBufferSize(config.get(HbaseSinkOptions.WRITE_BUFFER_SIZE));
+        String encoding = 
String.valueOf(config.get(HbaseSinkOptions.ENCODING));
+        
builder.enCoding(HbaseSinkOptions.EnCoding.valueOf(encoding.toUpperCase()));
+        
builder.hbaseExtraConfig(config.get(HbaseSinkOptions.HBASE_EXTRA_CONFIG));
+        builder.ttl(config.get(HbaseSinkOptions.HBASE_TTL_CONFIG));
         return builder.build();
     }
 
-    public static HbaseParameters buildWithSourceConfig(Config pluginConfig) {
+    public static HbaseParameters buildWithSourceConfig(ReadonlyConfig 
pluginConfig) {
         HbaseParametersBuilder builder = HbaseParameters.builder();
 
         // required parameters
-        
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
-        String table = pluginConfig.getString(TABLE.key());
+        
builder.zookeeperQuorum(pluginConfig.get(HbaseBaseOptions.ZOOKEEPER_QUORUM));
+        String table = pluginConfig.get(HbaseBaseOptions.TABLE);
         int colonIndex = table.indexOf(':');
         if (colonIndex != -1) {
             String namespace = table.substring(0, colonIndex);
@@ -129,18 +115,17 @@ public class HbaseParameters implements Serializable {
             builder.table(table);
         }
 
-        if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
-            Config extraConfig = 
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
-            
builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig));
+        if 
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_EXTRA_CONFIG).isPresent()) {
+            
builder.hbaseExtraConfig(pluginConfig.get(HbaseSinkOptions.HBASE_EXTRA_CONFIG));
         }
-        if (pluginConfig.hasPath(HBASE_CACHING_CONFIG.key())) {
-            builder.caching(pluginConfig.getInt(HBASE_CACHING_CONFIG.key()));
+        if 
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_CACHING_CONFIG).isPresent()) {
+            
builder.caching(pluginConfig.get(HbaseSinkOptions.HBASE_CACHING_CONFIG));
         }
-        if (pluginConfig.hasPath(HBASE_BATCH_CONFIG.key())) {
-            builder.batch(pluginConfig.getInt(HBASE_BATCH_CONFIG.key()));
+        if 
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_BATCH_CONFIG).isPresent()) {
+            
builder.batch(pluginConfig.get(HbaseSinkOptions.HBASE_BATCH_CONFIG));
         }
-        if (pluginConfig.hasPath(HBASE_CACHE_BLOCKS_CONFIG.key())) {
-            
builder.cacheBlocks(pluginConfig.getBoolean(HBASE_CACHE_BLOCKS_CONFIG.key()));
+        if 
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG).isPresent())
 {
+            
builder.cacheBlocks(pluginConfig.get(HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG));
         }
         return builder.build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java
similarity index 93%
rename from 
seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
rename to 
seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java
index 2921e1f91c..7a520ee5ff 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java
@@ -30,18 +30,7 @@ import static 
org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
 import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
 import static 
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
 
-public class HbaseConfig {
-
-    private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
-
-    public static final Option<String> ZOOKEEPER_QUORUM =
-            Options.key("zookeeper_quorum")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Hbase zookeeper quorum");
-
-    public static final Option<String> TABLE =
-            
Options.key("table").stringType().noDefaultValue().withDescription("Hbase table 
name");
+public class HbaseSinkOptions extends HbaseBaseOptions {
 
     public static final Option<List<String>> ROWKEY_COLUMNS =
             Options.key("rowkey_column")
@@ -49,6 +38,8 @@ public class HbaseConfig {
                     .noDefaultValue()
                     .withDescription("Hbase rowkey column");
 
+    private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
+
     public static final Option<String> ROWKEY_DELIMITER =
             Options.key("rowkey_delimiter")
                     .stringType()
@@ -149,6 +140,4 @@ public class HbaseConfig {
         UTF8,
         GBK;
     }
-
-    private HbaseConfig() {}
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
new file mode 100644
index 0000000000..e1f151054d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.config;
+
+public class HbaseSourceOptions extends HbaseBaseOptions {}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index e8d7b8b205..9cd37f9986 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -32,8 +32,8 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseCommitInfo;
@@ -102,8 +102,8 @@ public class HbaseSink
             return Optional.empty();
         }
         Catalog catalog = 
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
-        SchemaSaveMode schemaSaveMode = 
config.get(HbaseConfig.SCHEMA_SAVE_MODE);
-        DataSaveMode dataSaveMode = config.get(HbaseConfig.DATA_SAVE_MODE);
+        SchemaSaveMode schemaSaveMode = 
config.get(HbaseSinkOptions.SCHEMA_SAVE_MODE);
+        DataSaveMode dataSaveMode = 
config.get(HbaseSinkOptions.DATA_SAVE_MODE);
         TablePath tablePath =
                 TablePath.of(hbaseParameters.getNamespace(), 
hbaseParameters.getTable());
         return Optional.of(
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
index 9f3b2fdd5e..d265c4f431 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
@@ -24,29 +24,14 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.DATA_SAVE_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.SCHEMA_SAVE_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
-
 @AutoService(Factory.class)
 public class HbaseSinkFactory implements TableSinkFactory {
 
-    public static final String IDENTIFIER = "Hbase";
-
     @Override
     public String factoryIdentifier() {
         return HbaseIdentifier.IDENTIFIER_NAME;
@@ -56,20 +41,20 @@ public class HbaseSinkFactory implements TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(
-                        ZOOKEEPER_QUORUM,
-                        TABLE,
-                        ROWKEY_COLUMNS,
-                        FAMILY_NAME,
-                        SCHEMA_SAVE_MODE,
-                        DATA_SAVE_MODE)
+                        HbaseSinkOptions.ZOOKEEPER_QUORUM,
+                        HbaseSinkOptions.TABLE,
+                        HbaseSinkOptions.ROWKEY_COLUMNS,
+                        HbaseSinkOptions.FAMILY_NAME,
+                        HbaseSinkOptions.SCHEMA_SAVE_MODE,
+                        HbaseSinkOptions.DATA_SAVE_MODE)
                 .optional(
-                        ROWKEY_DELIMITER,
-                        VERSION_COLUMN,
-                        NULL_MODE,
-                        WAL_WRITE,
-                        WRITE_BUFFER_SIZE,
-                        ENCODING,
-                        HBASE_EXTRA_CONFIG,
+                        HbaseSinkOptions.ROWKEY_DELIMITER,
+                        HbaseSinkOptions.VERSION_COLUMN,
+                        HbaseSinkOptions.NULL_MODE,
+                        HbaseSinkOptions.WAL_WRITE,
+                        HbaseSinkOptions.WRITE_BUFFER_SIZE,
+                        HbaseSinkOptions.ENCODING,
+                        HbaseSinkOptions.HBASE_EXTRA_CONFIG,
                         SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
index 1178878aa7..1ff95b4d97 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
@@ -19,9 +19,7 @@
 package org.apache.seatunnel.connectors.seatunnel.hbase.source;
 
 import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -29,48 +27,27 @@ import 
org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
 import 
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
-import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
 
 import java.util.List;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
-
 public class HbaseSource
         implements SeaTunnelSource<SeaTunnelRow, HbaseSourceSplit, 
HbaseSourceState>,
                 SupportParallelism,
                 SupportColumnProjection {
-    private SeaTunnelRowType seaTunnelRowType;
-    private HbaseParameters hbaseParameters;
-
-    private CatalogTable catalogTable;
+    private final CatalogTable catalogTable;
+    private final HbaseParameters hbaseParameters;
 
     @Override
     public String getPluginName() {
         return HbaseIdentifier.IDENTIFIER_NAME;
     }
 
-    HbaseSource(Config pluginConfig) {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(pluginConfig, 
ZOOKEEPER_QUORUM.key(), TABLE.key());
-        if (!result.isSuccess()) {
-            throw new HbaseConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.hbaseParameters = 
HbaseParameters.buildWithSourceConfig(pluginConfig);
-        this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
-        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+    HbaseSource(HbaseParameters hbaseParameters, CatalogTable catalogTable) {
+        this.hbaseParameters = hbaseParameters;
+        this.catalogTable = catalogTable;
     }
 
     @Override
@@ -86,7 +63,8 @@ public class HbaseSource
     @Override
     public SourceReader<SeaTunnelRow, HbaseSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
-        return new HbaseSourceReader(hbaseParameters, readerContext, 
seaTunnelRowType);
+        return new HbaseSourceReader(
+                hbaseParameters, readerContext, 
catalogTable.getSeaTunnelRowType());
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
index 5e250337d7..70dcdab41e 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
@@ -21,11 +21,13 @@ package 
org.apache.seatunnel.connectors.seatunnel.hbase.source;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
 
 import com.google.auto.service.AutoService;
@@ -42,8 +44,8 @@ public class HbaseSourceFactory implements TableSourceFactory 
{
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(HbaseConfig.ZOOKEEPER_QUORUM)
-                .required(HbaseConfig.TABLE)
+                .required(HbaseSourceOptions.ZOOKEEPER_QUORUM)
+                .required(HbaseSourceOptions.TABLE)
                 .build();
     }
 
@@ -57,6 +59,8 @@ public class HbaseSourceFactory implements TableSourceFactory 
{
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () ->
                 (SeaTunnelSource<T, SplitT, StateT>)
-                        new HbaseSource(context.getOptions().toConfig());
+                        new HbaseSource(
+                                
HbaseParameters.buildWithSourceConfig(context.getOptions()),
+                                
CatalogTableUtil.buildWithConfig(context.getOptions()));
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index 5274c1a8c9..48f3e48eee 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -23,8 +23,9 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.connectors.seatunnel.hbase.catalog.HbaseCatalog;
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseBaseOptions;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -109,10 +110,10 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         hbaseCluster.createTable(MULTI_TABLE_TWO_NAME, 
Arrays.asList(FAMILY_NAME));
 
         Map<String, Object> config = new HashMap<>();
-        config.put(HbaseConfig.ZOOKEEPER_QUORUM.key(), 
hbaseCluster.getZookeeperQuorum());
-        config.put(HbaseConfig.ROWKEY_COLUMNS.key(), "id");
-        config.put(HbaseConfig.FAMILY_NAME.key(), Maps.of("all_columns", 
FAMILY_NAME));
-        config.put(HbaseConfig.TABLE.key(), TABLE_NAME);
+        config.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), 
hbaseCluster.getZookeeperQuorum());
+        config.put(HbaseBaseOptions.ROWKEY_COLUMNS.key(), "id");
+        config.put(HbaseSinkOptions.FAMILY_NAME.key(), Maps.of("all_columns", 
FAMILY_NAME));
+        config.put(HbaseBaseOptions.TABLE.key(), TABLE_NAME);
         // config.put(HbaseConfig.)
 
         catalog =

Reply via email to