This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new b6a702b58f [Improve] hbase options (#8923) b6a702b58f is described below commit b6a702b58f0503faf2d127332c4386c075006073 Author: Jarvis <jar...@apache.org> AuthorDate: Mon Mar 10 20:06:16 2025 +0800 [Improve] hbase options (#8923) --- .../seatunnel/api/ConnectorOptionCheckTest.java | 2 - .../seatunnel/hbase/config/HbaseBaseOptions.java | 41 ++++++++++ .../seatunnel/hbase/config/HbaseParameters.java | 95 +++++++++------------- .../{HbaseConfig.java => HbaseSinkOptions.java} | 17 +--- .../seatunnel/hbase/config/HbaseSourceOptions.java | 20 +++++ .../connectors/seatunnel/hbase/sink/HbaseSink.java | 6 +- .../seatunnel/hbase/sink/HbaseSinkFactory.java | 43 ++++------ .../seatunnel/hbase/source/HbaseSource.java | 36 ++------ .../seatunnel/hbase/source/HbaseSourceFactory.java | 12 ++- .../seatunnel/e2e/connector/hbase/HbaseIT.java | 11 +-- 10 files changed, 142 insertions(+), 141 deletions(-) diff --git a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java index d301086222..0df7f57c1e 100644 --- a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java +++ b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java @@ -183,12 +183,10 @@ public class ConnectorOptionCheckTest { whiteList.add("TDengineSourceOptions"); whiteList.add("PulsarSourceOptions"); whiteList.add("FakeSourceOptions"); - whiteList.add("HbaseSinkOptions"); whiteList.add("MongodbSinkOptions"); whiteList.add("IoTDBSinkOptions"); whiteList.add("EasysearchSourceOptions"); whiteList.add("IcebergSourceOptions"); - whiteList.add("HbaseSourceOptions"); whiteList.add("PaimonSourceOptions"); whiteList.add("IoTDBSourceOptions"); whiteList.add("SlsSourceOptions"); diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java new file mode 100644 index 0000000000..46bb9cb0f6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hbase.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import java.util.List; + +public class HbaseBaseOptions { + + public static final Option<String> ZOOKEEPER_QUORUM = + Options.key("zookeeper_quorum") + .stringType() + .noDefaultValue() + .withDescription("Hbase zookeeper quorum"); + + public static final Option<String> TABLE = + Options.key("table").stringType().noDefaultValue().withDescription("Hbase table name"); + + public static final Option<List<String>> ROWKEY_COLUMNS = + Options.key("rowkey_column") + .listType() + .noDefaultValue() + .withDescription("Hbase rowkey column"); +} diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java index 66b4eb967b..35d9fbfcbe 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java @@ -17,10 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.common.config.TypesafeConfigUtils; import lombok.Builder; import lombok.Getter; @@ -29,22 +26,6 @@ import java.io.Serializable; import java.util.List; import java.util.Map; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_BATCH_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHE_BLOCKS_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHING_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM; - @Builder @Getter public class HbaseParameters implements Serializable { @@ -65,27 +46,32 @@ public class HbaseParameters implements Serializable { private Map<String, String> hbaseExtraConfig; - @Builder.Default private int caching = HBASE_CACHING_CONFIG.defaultValue(); + @Builder.Default private int caching = HbaseSinkOptions.HBASE_CACHING_CONFIG.defaultValue(); - @Builder.Default private int batch = HBASE_BATCH_CONFIG.defaultValue(); + @Builder.Default private int batch = HbaseSinkOptions.HBASE_BATCH_CONFIG.defaultValue(); - @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue(); + @Builder.Default private Long ttl = HbaseSinkOptions.HBASE_TTL_CONFIG.defaultValue(); - @Builder.Default private boolean cacheBlocks = HBASE_CACHE_BLOCKS_CONFIG.defaultValue(); + @Builder.Default + private boolean cacheBlocks = HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG.defaultValue(); - @Builder.Default private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue(); + @Builder.Default + private String rowkeyDelimiter = HbaseSinkOptions.ROWKEY_DELIMITER.defaultValue(); - @Builder.Default private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue(); + @Builder.Default + private HbaseSinkOptions.NullMode nullMode = HbaseSinkOptions.NULL_MODE.defaultValue(); - @Builder.Default private boolean walWrite = WAL_WRITE.defaultValue(); + @Builder.Default private boolean walWrite = HbaseSinkOptions.WAL_WRITE.defaultValue(); - @Builder.Default private int writeBufferSize = WRITE_BUFFER_SIZE.defaultValue(); + @Builder.Default + private int writeBufferSize = HbaseSinkOptions.WRITE_BUFFER_SIZE.defaultValue(); - @Builder.Default private HbaseConfig.EnCoding enCoding = ENCODING.defaultValue(); + @Builder.Default + private HbaseSinkOptions.EnCoding enCoding = HbaseSinkOptions.ENCODING.defaultValue(); public static HbaseParameters buildWithConfig(ReadonlyConfig config) { HbaseParametersBuilder builder = HbaseParameters.builder(); - String table = config.get(TABLE); + String table = config.get(HbaseBaseOptions.TABLE); int colonIndex = table.indexOf(':'); if (colonIndex != -1) { String namespace = table.substring(0, colonIndex); @@ -97,29 +83,29 @@ public class HbaseParameters implements Serializable { } // required parameters - builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM)); - builder.rowkeyColumns(config.get(ROWKEY_COLUMNS)); - builder.familyNames(config.get(FAMILY_NAME)); - - builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER)); - builder.versionColumn(config.get(VERSION_COLUMN)); - String nullMode = String.valueOf(config.get(NULL_MODE)); - builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase())); - builder.walWrite(config.get(WAL_WRITE)); - builder.writeBufferSize(config.get(WRITE_BUFFER_SIZE)); - String encoding = String.valueOf(config.get(ENCODING)); - builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase())); - builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG)); - builder.ttl(config.get(HBASE_TTL_CONFIG)); + builder.zookeeperQuorum(config.get(HbaseBaseOptions.ZOOKEEPER_QUORUM)); + builder.rowkeyColumns(config.get(HbaseBaseOptions.ROWKEY_COLUMNS)); + builder.familyNames(config.get(HbaseSinkOptions.FAMILY_NAME)); + + builder.rowkeyDelimiter(config.get(HbaseSinkOptions.ROWKEY_DELIMITER)); + builder.versionColumn(config.get(HbaseSinkOptions.VERSION_COLUMN)); + String nullMode = String.valueOf(config.get(HbaseSinkOptions.NULL_MODE)); + builder.nullMode(HbaseSinkOptions.NullMode.valueOf(nullMode.toUpperCase())); + builder.walWrite(config.get(HbaseSinkOptions.WAL_WRITE)); + builder.writeBufferSize(config.get(HbaseSinkOptions.WRITE_BUFFER_SIZE)); + String encoding = String.valueOf(config.get(HbaseSinkOptions.ENCODING)); + builder.enCoding(HbaseSinkOptions.EnCoding.valueOf(encoding.toUpperCase())); + builder.hbaseExtraConfig(config.get(HbaseSinkOptions.HBASE_EXTRA_CONFIG)); + builder.ttl(config.get(HbaseSinkOptions.HBASE_TTL_CONFIG)); return builder.build(); } - public static HbaseParameters buildWithSourceConfig(Config pluginConfig) { + public static HbaseParameters buildWithSourceConfig(ReadonlyConfig pluginConfig) { HbaseParametersBuilder builder = HbaseParameters.builder(); // required parameters - builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key())); - String table = pluginConfig.getString(TABLE.key()); + builder.zookeeperQuorum(pluginConfig.get(HbaseBaseOptions.ZOOKEEPER_QUORUM)); + String table = pluginConfig.get(HbaseBaseOptions.TABLE); int colonIndex = table.indexOf(':'); if (colonIndex != -1) { String namespace = table.substring(0, colonIndex); @@ -129,18 +115,17 @@ public class HbaseParameters implements Serializable { builder.table(table); } - if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) { - Config extraConfig = pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key()); - builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig)); + if (pluginConfig.getOptional(HbaseSinkOptions.HBASE_EXTRA_CONFIG).isPresent()) { + builder.hbaseExtraConfig(pluginConfig.get(HbaseSinkOptions.HBASE_EXTRA_CONFIG)); } - if (pluginConfig.hasPath(HBASE_CACHING_CONFIG.key())) { - builder.caching(pluginConfig.getInt(HBASE_CACHING_CONFIG.key())); + if (pluginConfig.getOptional(HbaseSinkOptions.HBASE_CACHING_CONFIG).isPresent()) { + builder.caching(pluginConfig.get(HbaseSinkOptions.HBASE_CACHING_CONFIG)); } - if (pluginConfig.hasPath(HBASE_BATCH_CONFIG.key())) { - builder.batch(pluginConfig.getInt(HBASE_BATCH_CONFIG.key())); + if (pluginConfig.getOptional(HbaseSinkOptions.HBASE_BATCH_CONFIG).isPresent()) { + builder.batch(pluginConfig.get(HbaseSinkOptions.HBASE_BATCH_CONFIG)); } - if (pluginConfig.hasPath(HBASE_CACHE_BLOCKS_CONFIG.key())) { - builder.cacheBlocks(pluginConfig.getBoolean(HBASE_CACHE_BLOCKS_CONFIG.key())); + if (pluginConfig.getOptional(HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG).isPresent()) { + builder.cacheBlocks(pluginConfig.get(HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG)); } return builder.build(); } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java similarity index 93% rename from seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java rename to seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java index 2921e1f91c..7a520ee5ff 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java @@ -30,18 +30,7 @@ import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS; -public class HbaseConfig { - - private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024; - - public static final Option<String> ZOOKEEPER_QUORUM = - Options.key("zookeeper_quorum") - .stringType() - .noDefaultValue() - .withDescription("Hbase zookeeper quorum"); - - public static final Option<String> TABLE = - Options.key("table").stringType().noDefaultValue().withDescription("Hbase table name"); +public class HbaseSinkOptions extends HbaseBaseOptions { public static final Option<List<String>> ROWKEY_COLUMNS = Options.key("rowkey_column") @@ -49,6 +38,8 @@ public class HbaseConfig { .noDefaultValue() .withDescription("Hbase rowkey column"); + private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024; + public static final Option<String> ROWKEY_DELIMITER = Options.key("rowkey_delimiter") .stringType() @@ -149,6 +140,4 @@ public class HbaseConfig { UTF8, GBK; } - - private HbaseConfig() {} } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java new file mode 100644 index 0000000000..e1f151054d --- /dev/null +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hbase.config; + +public class HbaseSourceOptions extends HbaseBaseOptions {} diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java index e8d7b8b205..9cd37f9986 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java @@ -32,8 +32,8 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.CatalogFactory; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig; import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; +import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions; import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier; import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseCommitInfo; @@ -102,8 +102,8 @@ public class HbaseSink return Optional.empty(); } Catalog catalog = catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config); - SchemaSaveMode schemaSaveMode = config.get(HbaseConfig.SCHEMA_SAVE_MODE); - DataSaveMode dataSaveMode = config.get(HbaseConfig.DATA_SAVE_MODE); + SchemaSaveMode schemaSaveMode = config.get(HbaseSinkOptions.SCHEMA_SAVE_MODE); + DataSaveMode dataSaveMode = config.get(HbaseSinkOptions.DATA_SAVE_MODE); TablePath tablePath = TablePath.of(hbaseParameters.getNamespace(), hbaseParameters.getTable()); return Optional.of( diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java index 9f3b2fdd5e..d265c4f431 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java @@ -24,29 +24,14 @@ import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions; import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier; import com.google.auto.service.AutoService; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.DATA_SAVE_MODE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.SCHEMA_SAVE_MODE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM; - @AutoService(Factory.class) public class HbaseSinkFactory implements TableSinkFactory { - public static final String IDENTIFIER = "Hbase"; - @Override public String factoryIdentifier() { return HbaseIdentifier.IDENTIFIER_NAME; @@ -56,20 +41,20 @@ public class HbaseSinkFactory implements TableSinkFactory { public OptionRule optionRule() { return OptionRule.builder() .required( - ZOOKEEPER_QUORUM, - TABLE, - ROWKEY_COLUMNS, - FAMILY_NAME, - SCHEMA_SAVE_MODE, - DATA_SAVE_MODE) + HbaseSinkOptions.ZOOKEEPER_QUORUM, + HbaseSinkOptions.TABLE, + HbaseSinkOptions.ROWKEY_COLUMNS, + HbaseSinkOptions.FAMILY_NAME, + HbaseSinkOptions.SCHEMA_SAVE_MODE, + HbaseSinkOptions.DATA_SAVE_MODE) .optional( - ROWKEY_DELIMITER, - VERSION_COLUMN, - NULL_MODE, - WAL_WRITE, - WRITE_BUFFER_SIZE, - ENCODING, - HBASE_EXTRA_CONFIG, + HbaseSinkOptions.ROWKEY_DELIMITER, + HbaseSinkOptions.VERSION_COLUMN, + HbaseSinkOptions.NULL_MODE, + HbaseSinkOptions.WAL_WRITE, + HbaseSinkOptions.WRITE_BUFFER_SIZE, + HbaseSinkOptions.ENCODING, + HbaseSinkOptions.HBASE_EXTRA_CONFIG, SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java index 1178878aa7..1ff95b4d97 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java @@ -19,9 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.source; import org.apache.seatunnel.shade.com.google.common.collect.Lists; -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; @@ -29,48 +27,27 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.source.SupportColumnProjection; import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier; -import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException; import java.util.List; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM; - public class HbaseSource implements SeaTunnelSource<SeaTunnelRow, HbaseSourceSplit, HbaseSourceState>, SupportParallelism, SupportColumnProjection { - private SeaTunnelRowType seaTunnelRowType; - private HbaseParameters hbaseParameters; - - private CatalogTable catalogTable; + private final CatalogTable catalogTable; + private final HbaseParameters hbaseParameters; @Override public String getPluginName() { return HbaseIdentifier.IDENTIFIER_NAME; } - HbaseSource(Config pluginConfig) { - CheckResult result = - CheckConfigUtil.checkAllExists(pluginConfig, ZOOKEEPER_QUORUM.key(), TABLE.key()); - if (!result.isSuccess()) { - throw new HbaseConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - this.hbaseParameters = HbaseParameters.buildWithSourceConfig(pluginConfig); - this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig); - this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); + HbaseSource(HbaseParameters hbaseParameters, CatalogTable catalogTable) { + this.hbaseParameters = hbaseParameters; + this.catalogTable = catalogTable; } @Override @@ -86,7 +63,8 @@ public class HbaseSource @Override public SourceReader<SeaTunnelRow, HbaseSourceSplit> createReader( SourceReader.Context readerContext) throws Exception { - return new HbaseSourceReader(hbaseParameters, readerContext, seaTunnelRowType); + return new HbaseSourceReader( + hbaseParameters, readerContext, catalogTable.getSeaTunnelRowType()); } @Override diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java index 5e250337d7..70dcdab41e 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java @@ -21,11 +21,13 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.source; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; -import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig; +import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; +import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSourceOptions; import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier; import com.google.auto.service.AutoService; @@ -42,8 +44,8 @@ public class HbaseSourceFactory implements TableSourceFactory { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(HbaseConfig.ZOOKEEPER_QUORUM) - .required(HbaseConfig.TABLE) + .required(HbaseSourceOptions.ZOOKEEPER_QUORUM) + .required(HbaseSourceOptions.TABLE) .build(); } @@ -57,6 +59,8 @@ public class HbaseSourceFactory implements TableSourceFactory { TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) { return () -> (SeaTunnelSource<T, SplitT, StateT>) - new HbaseSource(context.getOptions().toConfig()); + new HbaseSource( + HbaseParameters.buildWithSourceConfig(context.getOptions()), + CatalogTableUtil.buildWithConfig(context.getOptions())); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index 5274c1a8c9..48f3e48eee 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -23,8 +23,9 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.connectors.seatunnel.hbase.catalog.HbaseCatalog; -import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig; +import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseBaseOptions; import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; +import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.EngineType; @@ -109,10 +110,10 @@ public class HbaseIT extends TestSuiteBase implements TestResource { hbaseCluster.createTable(MULTI_TABLE_TWO_NAME, Arrays.asList(FAMILY_NAME)); Map<String, Object> config = new HashMap<>(); - config.put(HbaseConfig.ZOOKEEPER_QUORUM.key(), hbaseCluster.getZookeeperQuorum()); - config.put(HbaseConfig.ROWKEY_COLUMNS.key(), "id"); - config.put(HbaseConfig.FAMILY_NAME.key(), Maps.of("all_columns", FAMILY_NAME)); - config.put(HbaseConfig.TABLE.key(), TABLE_NAME); + config.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), hbaseCluster.getZookeeperQuorum()); + config.put(HbaseBaseOptions.ROWKEY_COLUMNS.key(), "id"); + config.put(HbaseSinkOptions.FAMILY_NAME.key(), Maps.of("all_columns", FAMILY_NAME)); + config.put(HbaseBaseOptions.TABLE.key(), TABLE_NAME); // config.put(HbaseConfig.) catalog =