This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 658643ae53 [Improve][Connector-V2] Improve the paimon source (#6887) 658643ae53 is described below commit 658643ae532d4ef1db5b4e591f7bb0b8b342f401 Author: dailai <dai...@chinatelecom.cn> AuthorDate: Tue Jun 11 18:07:51 2024 +0800 [Improve][Connector-V2] Improve the paimon source (#6887) --- docs/en/connector-v2/source/Paimon.md | 116 ++++++++- seatunnel-connectors-v2/connector-paimon/pom.xml | 5 + .../paimon/config/PaimonSourceConfig.java | 41 ++++ .../seatunnel/paimon/source/PaimonSource.java | 114 +++------ .../paimon/source/PaimonSourceFactory.java | 38 ++- .../paimon/source/PaimonSourceReader.java | 16 +- .../paimon/source/PaimonSourceSplitEnumerator.java | 16 +- .../converter/SqlToPaimonPredicateConverter.java | 265 +++++++++++++++++++++ .../seatunnel/paimon/utils/RowConverter.java | 18 +- .../SqlToPaimonPredicateConverterTest.java | 183 ++++++++++++++ .../seatunnel/paimon/utils/RowConverterTest.java | 2 +- .../e2e/connector/paimon/PaimonSinkCDCIT.java | 27 ++- .../e2e/connector/paimon/PaimonSinkHdfsIT.java | 11 +- .../resources/fake_to_paimon_with_full_type.conf | 91 +++++++ .../resources/paimon_to_assert_with_filter1.conf | 60 +++++ .../resources/paimon_to_assert_with_filter2.conf | 61 +++++ .../resources/paimon_to_assert_with_filter3.conf | 81 +++++++ .../resources/paimon_to_assert_with_filter4.conf | 71 ++++++ .../paimon_to_assert_with_hivecatalog.conf | 59 +++++ .../paimon_to_assert_with_timestampN.conf | 90 +++++++ .../read_from_paimon_with_hdfs_ha_to_assert.conf | 90 +++++++ 21 files changed, 1355 insertions(+), 100 deletions(-) diff --git a/docs/en/connector-v2/source/Paimon.md b/docs/en/connector-v2/source/Paimon.md index eb83e3bb42..e50ee0df9e 100644 --- a/docs/en/connector-v2/source/Paimon.md +++ b/docs/en/connector-v2/source/Paimon.md @@ -17,17 +17,30 @@ Read data from Apache Paimon. ## Options -| name | type | required | default value | -|----------------|--------|----------|---------------| -| warehouse | String | Yes | - | -| database | String | Yes | - | -| table | String | Yes | - | -| hdfs_site_path | String | No | - | +| name | type | required | default value | +|-------------------------|--------|----------|---------------| +| warehouse | String | Yes | - | +| catalog_type | String | No | filesystem | +| catalog_uri | String | No | - | +| database | String | Yes | - | +| table | String | Yes | - | +| hdfs_site_path | String | No | - | +| query | String | No | - | +| paimon.hadoop.conf | Map | No | - | +| paimon.hadoop.conf-path | String | No | - | ### warehouse [string] Paimon warehouse path +### catalog_type [string] + +Catalog type of Paimon, support filesystem and hive + +### catalog_uri [string] + +Catalog uri of Paimon, only needed when catalog_type is hive + ### database [string] The database you want to access @@ -40,8 +53,39 @@ The table you want to access The file path of `hdfs-site.xml` +### query [string] + +The filter condition of the table read. For example: `select * from st_test where id > 100`. If not specified, all rows are read. +Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, is not null, and others are not supported. +The Having, Group By, Order By clauses are currently unsupported, because these clauses are not supported by Paimon. +The projection and limit will be supported in the future. + +Note: When the field after the where condition is a string or boolean value, its value must be enclosed in single quotes, otherwise an error will be reported. `For example: name='abc' or tag='true'` +The field data types currently supported by where conditions are as follows: + +* string +* boolean +* tinyint +* smallint +* int +* bigint +* float +* double +* date +* timestamp + +### paimon.hadoop.conf [string] + +Properties in hadoop conf + +### paimon.hadoop.conf-path [string] + +The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files + ## Examples +### Simple example + ```hocon source { Paimon { @@ -52,6 +96,66 @@ source { } ``` +### Filter example + +```hocon +source { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test" + query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116 and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'" + } +} +``` + +### Hadoop conf example + +```hocon +source { + Paimon { + catalog_name="seatunnel_test" + warehouse="hdfs:///tmp/paimon" + database="seatunnel_namespace1" + table="st_test" + query = "select * from st_test where pk_id is not null and pk_id < 3" + paimon.hadoop.conf = { + fs.defaultFS = "hdfs://nameservice1" + dfs.nameservices = "nameservice1" + dfs.ha.namenodes.nameservice1 = "nn1,nn2" + dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020" + dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020" + dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + dfs.client.use.datanode.hostname = "true" + } + } +} +``` + +### Hive catalog example + +```hocon +source { + Paimon { + catalog_name="seatunnel_test" + catalog_type="hive" + catalog_uri="thrift://hadoop04:9083" + warehouse="hdfs:///tmp/seatunnel" + database="seatunnel_test" + table="st_test3" + paimon.hadoop.conf = { + fs.defaultFS = "hdfs://nameservice1" + dfs.nameservices = "nameservice1" + dfs.ha.namenodes.nameservice1 = "nn1,nn2" + dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020" + dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020" + dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + dfs.client.use.datanode.hostname = "true" + } + } +} +``` + ## Changelog ### next version diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml b/seatunnel-connectors-v2/connector-paimon/pom.xml index 267e66dc0a..80934e68a2 100644 --- a/seatunnel-connectors-v2/connector-paimon/pom.xml +++ b/seatunnel-connectors-v2/connector-paimon/pom.xml @@ -90,6 +90,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>com.github.jsqlparser</groupId> + <artifactId>jsqlparser</artifactId> + <version>${jsqlparser.version}</version> + </dependency> </dependencies> diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java new file mode 100644 index 0000000000..b29542ba45 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.Getter; + +@Getter +public class PaimonSourceConfig extends PaimonConfig { + + public static final Option<String> QUERY_SQL = + Options.key("query") + .stringType() + .noDefaultValue() + .withDescription("The query of paimon source"); + + private String query; + + public PaimonSourceConfig(ReadonlyConfig readonlyConfig) { + super(readonlyConfig); + this.query = readonlyConfig.get(QUERY_SQL); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java index 658e1e45e5..358026b5d3 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java @@ -17,45 +17,26 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.source; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; -import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; -import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.options.Options; -import org.apache.paimon.table.Table; - -import com.google.auto.service.AutoService; +import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter; -import java.util.HashMap; -import java.util.Map; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.Table; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.HDFS_SITE_PATH; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.TABLE; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE; +import java.util.Collections; +import java.util.List; /** Paimon connector source class. */ -@AutoService(SeaTunnelSource.class) public class PaimonSource implements SeaTunnelSource<SeaTunnelRow, PaimonSourceSplit, PaimonSourceState> { @@ -63,77 +44,57 @@ public class PaimonSource public static final String PLUGIN_NAME = "Paimon"; - private Config pluginConfig; + private ReadonlyConfig readonlyConfig; private SeaTunnelRowType seaTunnelRowType; - private Table table; + private Table paimonTable; + + private Predicate predicate; + + private CatalogTable catalogTable; + + public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) { + this.readonlyConfig = readonlyConfig; + PaimonSourceConfig paimonSourceConfig = new PaimonSourceConfig(readonlyConfig); + TablePath tablePath = + TablePath.of(paimonSourceConfig.getNamespace(), paimonSourceConfig.getTable()); + this.catalogTable = paimonCatalog.getTable(tablePath); + this.paimonTable = paimonCatalog.getPaimonTable(tablePath); + this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); + // TODO: We can use this to realize the column projection feature later + String filterSql = readonlyConfig.get(PaimonSourceConfig.QUERY_SQL); + this.predicate = + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate( + this.paimonTable.rowType(), filterSql); + } @Override public String getPluginName() { return PLUGIN_NAME; } - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - this.pluginConfig = pluginConfig; - final CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, WAREHOUSE.key(), DATABASE.key(), TABLE.key()); - if (!result.isSuccess()) { - throw new PaimonConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - // initialize paimon table - final String warehouse = pluginConfig.getString(WAREHOUSE.key()); - final String database = pluginConfig.getString(DATABASE.key()); - final String table = pluginConfig.getString(TABLE.key()); - final Map<String, String> optionsMap = new HashMap<>(); - optionsMap.put(WAREHOUSE.key(), warehouse); - final Options options = Options.fromMap(optionsMap); - final Configuration hadoopConf = new Configuration(); - if (pluginConfig.hasPath(HDFS_SITE_PATH.key())) { - hadoopConf.addResource(new Path(pluginConfig.getString(HDFS_SITE_PATH.key()))); - } - final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf); - try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) { - Identifier identifier = Identifier.create(database, table); - this.table = catalog.getTable(identifier); - } catch (Exception e) { - String errorMsg = - String.format( - "Failed to get table [%s] from database [%s] on warehouse [%s]", - database, table, warehouse); - throw new PaimonConnectorException( - PaimonConnectorErrorCode.GET_TABLE_FAILED, errorMsg, e); - } - // TODO: Support column projection - seaTunnelRowType = RowTypeConverter.convert(this.table.rowType()); - } - @Override public Boundedness getBoundedness() { return Boundedness.BOUNDED; } @Override - public SeaTunnelDataType<SeaTunnelRow> getProducedType() { - return seaTunnelRowType; + public List<CatalogTable> getProducedCatalogTables() { + return Collections.singletonList(catalogTable); } @Override public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader( SourceReader.Context readerContext) throws Exception { - return new PaimonSourceReader(readerContext, table, seaTunnelRowType); + + return new PaimonSourceReader(readerContext, paimonTable, seaTunnelRowType, predicate); } @Override public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> createEnumerator( SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext) throws Exception { - return new PaimonSourceSplitEnumerator(enumeratorContext, table); + return new PaimonSourceSplitEnumerator(enumeratorContext, paimonTable, predicate); } @Override @@ -141,6 +102,7 @@ public class PaimonSource SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext, PaimonSourceState checkpointState) throws Exception { - return new PaimonSourceSplitEnumerator(enumeratorContext, table, checkpointState); + return new PaimonSourceSplitEnumerator( + enumeratorContext, paimonTable, checkpointState, predicate); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java index 91042be7dd..c11906dec4 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java @@ -17,14 +17,24 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.source; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; +import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogEnum; +import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogFactory; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig; import com.google.auto.service.AutoService; +import java.io.Serializable; + @AutoService(Factory.class) public class PaimonSourceFactory implements TableSourceFactory { @@ -36,10 +46,15 @@ public class PaimonSourceFactory implements TableSourceFactory { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(PaimonConfig.WAREHOUSE) - .required(PaimonConfig.DATABASE) - .required(PaimonConfig.TABLE) - .optional(PaimonConfig.HDFS_SITE_PATH) + .required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE, PaimonConfig.TABLE) + .optional( + PaimonConfig.CATALOG_TYPE, + PaimonConfig.HDFS_SITE_PATH, + PaimonSourceConfig.QUERY_SQL, + PaimonConfig.HADOOP_CONF, + PaimonConfig.HADOOP_CONF_PATH) + .conditional( + PaimonConfig.CATALOG_TYPE, PaimonCatalogEnum.HIVE, PaimonConfig.CATALOG_URI) .build(); } @@ -47,4 +62,19 @@ public class PaimonSourceFactory implements TableSourceFactory { public Class<? extends SeaTunnelSource> getSourceClass() { return PaimonSource.class; } + + @Override + public <T, SplitT extends SourceSplit, StateT extends Serializable> + TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); + PaimonCatalogFactory paimonCatalogFactory = new PaimonCatalogFactory(); + try (PaimonCatalog paimonCatalog = + (PaimonCatalog) + paimonCatalogFactory.createCatalog(factoryIdentifier(), readonlyConfig)) { + paimonCatalog.open(); + return () -> + (SeaTunnelSource<T, SplitT, StateT>) + new PaimonSource(readonlyConfig, paimonCatalog); + } + } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java index 139365c04d..6cd1d87c63 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java @@ -24,8 +24,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import lombok.extern.slf4j.Slf4j; @@ -46,11 +48,14 @@ public class PaimonSourceReader implements SourceReader<SeaTunnelRow, PaimonSour private final Table table; private final SeaTunnelRowType seaTunnelRowType; private volatile boolean noMoreSplit; + private final Predicate predicate; - public PaimonSourceReader(Context context, Table table, SeaTunnelRowType seaTunnelRowType) { + public PaimonSourceReader( + Context context, Table table, SeaTunnelRowType seaTunnelRowType, Predicate predicate) { this.context = context; this.table = table; this.seaTunnelRowType = seaTunnelRowType; + this.predicate = predicate; } @Override @@ -70,13 +75,18 @@ public class PaimonSourceReader implements SourceReader<SeaTunnelRow, PaimonSour if (Objects.nonNull(split)) { // read logic try (final RecordReader<InternalRow> reader = - table.newReadBuilder().newRead().createReader(split.getSplit())) { + table.newReadBuilder() + .withFilter(predicate) + .newRead() + .executeFilter() + .createReader(split.getSplit())) { final RecordReaderIterator<InternalRow> rowIterator = new RecordReaderIterator<>(reader); while (rowIterator.hasNext()) { final InternalRow row = rowIterator.next(); final SeaTunnelRow seaTunnelRow = - RowConverter.convert(row, seaTunnelRowType); + RowConverter.convert( + row, seaTunnelRowType, ((FileStoreTable) table).schema()); output.collect(seaTunnelRow); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java index aab44b2260..f6c76dc895 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java @@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.source; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; @@ -48,17 +49,25 @@ public class PaimonSourceSplitEnumerator /** The table that wants to read */ private final Table table; - public PaimonSourceSplitEnumerator(Context<PaimonSourceSplit> context, Table table) { + private final Predicate predicate; + + public PaimonSourceSplitEnumerator( + Context<PaimonSourceSplit> context, Table table, Predicate predicate) { this.context = context; this.table = table; this.assignedSplit = new HashSet<>(); + this.predicate = predicate; } public PaimonSourceSplitEnumerator( - Context<PaimonSourceSplit> context, Table table, PaimonSourceState sourceState) { + Context<PaimonSourceSplit> context, + Table table, + PaimonSourceState sourceState, + Predicate predicate) { this.context = context; this.table = table; this.assignedSplit = sourceState.getAssignedSplits(); + this.predicate = predicate; } @Override @@ -146,7 +155,8 @@ public class PaimonSourceSplitEnumerator private Set<PaimonSourceSplit> getTableSplits() { final Set<PaimonSourceSplit> tableSplits = new HashSet<>(); // TODO Support columns projection - final List<Split> splits = table.newReadBuilder().newScan().plan().splits(); + final List<Split> splits = + table.newReadBuilder().withFilter(predicate).newScan().plan().splits(); splits.forEach(split -> tableSplits.add(new PaimonSourceSplit(split))); return tableSplits; } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java new file mode 100644 index 0000000000..7b076ba558 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.source.converter; + +import org.apache.commons.lang3.StringUtils; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DateTimeUtils; + +import net.sf.jsqlparser.JSQLParserException; +import net.sf.jsqlparser.expression.DateValue; +import net.sf.jsqlparser.expression.DoubleValue; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.HexValue; +import net.sf.jsqlparser.expression.LongValue; +import net.sf.jsqlparser.expression.Parenthesis; +import net.sf.jsqlparser.expression.StringValue; +import net.sf.jsqlparser.expression.TimeValue; +import net.sf.jsqlparser.expression.TimestampValue; +import net.sf.jsqlparser.expression.operators.conditional.AndExpression; +import net.sf.jsqlparser.expression.operators.conditional.OrExpression; +import net.sf.jsqlparser.expression.operators.relational.EqualsTo; +import net.sf.jsqlparser.expression.operators.relational.GreaterThan; +import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals; +import net.sf.jsqlparser.expression.operators.relational.IsNullExpression; +import net.sf.jsqlparser.expression.operators.relational.MinorThan; +import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; +import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; +import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import net.sf.jsqlparser.schema.Column; +import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.select.PlainSelect; +import net.sf.jsqlparser.statement.select.Select; +import net.sf.jsqlparser.statement.select.SelectBody; + +import java.math.BigDecimal; +import java.util.Objects; +import java.util.Optional; + +public class SqlToPaimonPredicateConverter { + + public static Predicate convertSqlWhereToPaimonPredicate(RowType rowType, String query) { + try { + if (StringUtils.isBlank(query)) { + return null; + } + Statement statement = CCJSqlParserUtil.parse(query); + // Confirm that the SQL statement is a Select statement + if (!(statement instanceof Select)) { + throw new IllegalArgumentException("Only SELECT statements are supported."); + } + Select select = (Select) statement; + SelectBody selectBody = select.getSelectBody(); + if (!(selectBody instanceof PlainSelect)) { + throw new IllegalArgumentException("Only simple SELECT statements are supported."); + } + PlainSelect plainSelect = (PlainSelect) selectBody; + if (plainSelect.getHaving() != null + || plainSelect.getGroupBy() != null + || plainSelect.getOrderByElements() != null + || plainSelect.getLimit() != null) { + throw new IllegalArgumentException( + "Only SELECT statements with WHERE clause are supported. The Having, Group By, Order By, Limit clauses are currently unsupported."); + } + Expression whereExpression = plainSelect.getWhere(); + if (Objects.isNull(whereExpression)) { + return null; + } + PredicateBuilder builder = new PredicateBuilder(rowType); + return parseExpressionToPredicate(builder, rowType, whereExpression); + } catch (JSQLParserException e) { + throw new IllegalArgumentException("Error parsing SQL WHERE clause", e); + } + } + + private static Predicate parseExpressionToPredicate( + PredicateBuilder builder, RowType rowType, Expression expression) { + if (expression instanceof IsNullExpression) { + IsNullExpression isNullExpression = (IsNullExpression) expression; + Column column = (Column) isNullExpression.getLeftExpression(); + int columnIndex = getColumnIndex(builder, column); + if (isNullExpression.isNot()) { + return builder.isNotNull(columnIndex); + } + return builder.isNull(columnIndex); + } else if (expression instanceof EqualsTo) { + EqualsTo equalsTo = (EqualsTo) expression; + Column column = (Column) equalsTo.getLeftExpression(); + int columnIndex = getColumnIndex(builder, column); + Object jsqlParserDataTypeValue = + getJSQLParserDataTypeValue(equalsTo.getRightExpression()); + Object paimonDataValue = + convertValueByPaimonDataType( + rowType, column.getColumnName(), jsqlParserDataTypeValue); + return builder.equal(columnIndex, paimonDataValue); + } else if (expression instanceof GreaterThan) { + GreaterThan greaterThan = (GreaterThan) expression; + Column column = (Column) greaterThan.getLeftExpression(); + int columnIndex = getColumnIndex(builder, column); + Object jsqlParserDataTypeValue = + getJSQLParserDataTypeValue(greaterThan.getRightExpression()); + Object paimonDataValue = + convertValueByPaimonDataType( + rowType, column.getColumnName(), jsqlParserDataTypeValue); + return builder.greaterThan(columnIndex, paimonDataValue); + } else if (expression instanceof GreaterThanEquals) { + GreaterThanEquals greaterThanEquals = (GreaterThanEquals) expression; + Column column = (Column) greaterThanEquals.getLeftExpression(); + int columnIndex = getColumnIndex(builder, column); + Object jsqlParserDataTypeValue = + getJSQLParserDataTypeValue(greaterThanEquals.getRightExpression()); + Object paimonDataValue = + convertValueByPaimonDataType( + rowType, column.getColumnName(), jsqlParserDataTypeValue); + return builder.greaterOrEqual(columnIndex, paimonDataValue); + } else if (expression instanceof MinorThan) { + MinorThan minorThan = (MinorThan) expression; + Column column = (Column) minorThan.getLeftExpression(); + int columnIndex = getColumnIndex(builder, column); + Object jsqlParserDataTypeValue = + getJSQLParserDataTypeValue(minorThan.getRightExpression()); + Object paimonDataValue = + convertValueByPaimonDataType( + rowType, column.getColumnName(), jsqlParserDataTypeValue); + return builder.lessThan(columnIndex, paimonDataValue); + } else if (expression instanceof MinorThanEquals) { + MinorThanEquals minorThanEquals = (MinorThanEquals) expression; + Column column = (Column) minorThanEquals.getLeftExpression(); + int columnIndex = getColumnIndex(builder, column); + Object jsqlParserDataTypeValue = + getJSQLParserDataTypeValue(minorThanEquals.getRightExpression()); + Object paimonDataValue = + convertValueByPaimonDataType( + rowType, column.getColumnName(), jsqlParserDataTypeValue); + return builder.lessOrEqual(columnIndex, paimonDataValue); + } else if (expression instanceof NotEqualsTo) { + NotEqualsTo notEqualsTo = (NotEqualsTo) expression; + Column column = (Column) notEqualsTo.getLeftExpression(); + int columnIndex = getColumnIndex(builder, column); + Object jsqlParserDataTypeValue = + getJSQLParserDataTypeValue(notEqualsTo.getRightExpression()); + Object paimonDataValue = + convertValueByPaimonDataType( + rowType, column.getColumnName(), jsqlParserDataTypeValue); + return builder.notEqual(columnIndex, paimonDataValue); + } else if (expression instanceof AndExpression) { + AndExpression andExpression = (AndExpression) expression; + Predicate leftPredicate = + parseExpressionToPredicate(builder, rowType, andExpression.getLeftExpression()); + Predicate rightPredicate = + parseExpressionToPredicate( + builder, rowType, andExpression.getRightExpression()); + return PredicateBuilder.and(leftPredicate, rightPredicate); + } else if (expression instanceof OrExpression) { + OrExpression orExpression = (OrExpression) expression; + Predicate leftPredicate = + parseExpressionToPredicate(builder, rowType, orExpression.getLeftExpression()); + Predicate rightPredicate = + parseExpressionToPredicate(builder, rowType, orExpression.getRightExpression()); + return PredicateBuilder.or(leftPredicate, rightPredicate); + } else if (expression instanceof Parenthesis) { + Parenthesis parenthesis = (Parenthesis) expression; + return parseExpressionToPredicate(builder, rowType, parenthesis.getExpression()); + } + throw new IllegalArgumentException( + "Unsupported expression type: " + expression.getClass().getSimpleName()); + } + + private static Object convertValueByPaimonDataType( + RowType rowType, String columnName, Object jsqlParserDataTypeValue) { + Optional<DataField> theFiled = + rowType.getFields().stream() + .filter(field -> field.name().equalsIgnoreCase(columnName)) + .findFirst(); + String strValue = jsqlParserDataTypeValue.toString(); + if (theFiled.isPresent()) { + DataType dataType = theFiled.get().type(); + switch (dataType.getTypeRoot()) { + case CHAR: + case VARCHAR: + return jsqlParserDataTypeValue; + case BOOLEAN: + return Boolean.parseBoolean(strValue); + case DECIMAL: + DecimalType decimalType = (DecimalType) dataType; + return Decimal.fromBigDecimal( + new BigDecimal(strValue), + decimalType.getPrecision(), + decimalType.getScale()); + case TINYINT: + return Byte.parseByte(strValue); + case SMALLINT: + return Short.parseShort(strValue); + case INTEGER: + return Integer.parseInt(strValue); + case BIGINT: + return Long.parseLong(strValue); + case FLOAT: + return Float.parseFloat(strValue); + case DOUBLE: + return Double.parseDouble(strValue); + case DATE: + return DateTimeUtils.toInternal( + org.apache.seatunnel.common.utils.DateUtils.parse(strValue)); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return Timestamp.fromLocalDateTime( + org.apache.seatunnel.common.utils.DateTimeUtils.parse(strValue)); + default: + throw new IllegalArgumentException( + "Unsupported Paimon data type :" + dataType.getTypeRoot()); + } + } + throw new IllegalArgumentException( + String.format("The column named [%s] is not exists", columnName)); + } + + private static Object getJSQLParserDataTypeValue(Expression expression) { + if (expression instanceof LongValue) { + return ((LongValue) expression).getValue(); + } else if (expression instanceof StringValue || expression instanceof HexValue) { + return BinaryString.fromString(((StringValue) expression).getValue()); + } else if (expression instanceof DoubleValue) { + return ((DoubleValue) expression).getValue(); + } else if (expression instanceof DateValue) { + return ((DateValue) expression).getValue(); + } else if (expression instanceof TimeValue) { + return ((TimeValue) expression).getValue(); + } else if (expression instanceof TimestampValue) { + return ((TimestampValue) expression).getValue(); + } + throw new IllegalArgumentException("Unsupported expression value type: " + expression); + } + + private static int getColumnIndex(PredicateBuilder builder, Column column) { + int index = builder.indexOf(column.getColumnName()); + if (index == -1) { + throw new IllegalArgumentException( + String.format("The column named [%s] is not exists", column.getColumnName())); + } + return index; + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java index 11d9b3b61a..f92d175c2a 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java @@ -55,6 +55,7 @@ import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** The converter for converting {@link InternalRow} and {@link SeaTunnelRow} */ public class RowConverter { @@ -238,7 +239,8 @@ public class RowConverter { * @param seaTunnelRowType SeaTunnel row type * @return SeaTunnel row */ - public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunnelRowType) { + public static SeaTunnelRow convert( + InternalRow rowData, SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) { Object[] objects = new Object[seaTunnelRowType.getTotalFields()]; for (int i = 0; i < objects.length; i++) { // judge the field is or not equals null @@ -289,9 +291,15 @@ public class RowConverter { objects[i] = DateTimeUtils.toLocalDate(dateInt); break; case TIMESTAMP: - // Now SeaTunnel not supported assigned the timezone for timestamp, - // so we use the default precision 6 - Timestamp timestamp = rowData.getTimestamp(i, 6); + int precision = TimestampType.DEFAULT_PRECISION; + Optional<DataField> precisionOptional = + tableSchema.fields().stream() + .filter(dataField -> dataField.name().equals(fieldName)) + .findFirst(); + if (precisionOptional.isPresent()) { + precision = ((TimestampType) precisionOptional.get().type()).getPrecision(); + } + Timestamp timestamp = rowData.getTimestamp(i, precision); objects[i] = timestamp.toLocalDateTime(); break; case ARRAY: @@ -320,7 +328,7 @@ public class RowConverter { SeaTunnelDataType<?> rowType = seaTunnelRowType.getFieldType(i); InternalRow row = rowData.getRow(i, ((SeaTunnelRowType) rowType).getTotalFields()); - objects[i] = convert(row, (SeaTunnelRowType) rowType); + objects[i] = convert(row, (SeaTunnelRowType) rowType, tableSchema); break; default: throw CommonError.unsupportedDataType( diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java new file mode 100644 index 0000000000..d04b76c09b --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.source.converter; + +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.DateTimeUtils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class SqlToPaimonPredicateConverterTest { + + private RowType rowType; + + @BeforeEach + public void setUp() { + rowType = + new RowType( + Arrays.asList( + new DataField(0, "char_col", new CharType()), + new DataField(1, "varchar_col", new VarCharType()), + new DataField(2, "boolean_col", new BooleanType()), + new DataField(3, "binary_col", new VarBinaryType()), + new DataField(4, "decimal_col", new DecimalType(10, 2)), + new DataField(5, "tinyint_col", new TinyIntType()), + new DataField(6, "smallint_col", new SmallIntType()), + new DataField(7, "int_col", new IntType()), + new DataField(8, "bigint_col", new BigIntType()), + new DataField(9, "float_col", new FloatType()), + new DataField(10, "double_col", new DoubleType()), + new DataField(11, "date_col", new DateType()), + new DataField(12, "timestamp_col", new TimestampType()))); + } + + @Test + public void testConvertSqlWhereToPaimonPredicate() { + String query = + "SELECT * FROM table WHERE " + + "char_col = 'a' AND " + + "varchar_col = 'test' AND " + + "boolean_col = 'true' AND " + + "decimal_col = 123.45 AND " + + "tinyint_col = 1 AND " + + "smallint_col = 2 AND " + + "int_col = 3 AND " + + "bigint_col = 4 AND " + + "float_col = 5.5 AND " + + "double_col = 6.6 AND " + + "date_col = '2022-01-01' AND " + + "timestamp_col = '2022-01-01T12:00:00.123'"; + + Predicate predicate = + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query); + + assertNotNull(predicate); + + PredicateBuilder builder = new PredicateBuilder(rowType); + + // Validate each part of the predicate + Predicate expectedPredicate = + PredicateBuilder.and( + builder.equal(0, "a"), + builder.equal(1, "test"), + builder.equal(2, true), + builder.equal(4, Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2)), + builder.equal(5, (byte) 1), + builder.equal(6, (short) 2), + builder.equal(7, 3), + builder.equal(8, 4L), + builder.equal(9, 5.5f), + builder.equal(10, 6.6d), + builder.equal(11, DateTimeUtils.toInternal(LocalDate.parse("2022-01-01"))), + builder.equal( + 12, + Timestamp.fromLocalDateTime( + LocalDateTime.parse("2022-01-01T12:00:00.123")))); + + assertEquals(expectedPredicate.toString(), predicate.toString()); + } + + @Test + public void testConvertSqlWhereToPaimonPredicateWithIsNull() { + String query = "SELECT * FROM table WHERE char_col IS NULL"; + + Predicate predicate = + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query); + + assertNotNull(predicate); + + PredicateBuilder builder = new PredicateBuilder(rowType); + Predicate expectedPredicate = builder.isNull(0); + + assertEquals(expectedPredicate.toString(), predicate.toString()); + } + + @Test + public void testConvertSqlWhereToPaimonPredicateWithIsNotNull() { + String query = "SELECT * FROM table WHERE char_col IS NOT NULL"; + + Predicate predicate = + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query); + + assertNotNull(predicate); + + PredicateBuilder builder = new PredicateBuilder(rowType); + Predicate expectedPredicate = builder.isNotNull(0); + + assertEquals(expectedPredicate.toString(), predicate.toString()); + } + + @Test + public void testConvertSqlWhereToPaimonPredicateWithAnd() { + String query = "SELECT * FROM table WHERE int_col > 3 AND double_col < 6.6"; + + Predicate predicate = + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query); + + assertNotNull(predicate); + + PredicateBuilder builder = new PredicateBuilder(rowType); + Predicate expectedPredicate = + PredicateBuilder.and(builder.greaterThan(7, 3), builder.lessThan(10, 6.6d)); + + assertEquals(expectedPredicate.toString(), predicate.toString()); + } + + @Test + public void testConvertSqlWhereToPaimonPredicateWithOr() { + String query = "SELECT * FROM table WHERE int_col > 3 OR double_col < 6.6"; + + Predicate predicate = + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query); + + assertNotNull(predicate); + + PredicateBuilder builder = new PredicateBuilder(rowType); + Predicate expectedPredicate = + PredicateBuilder.or(builder.greaterThan(7, 3), builder.lessThan(10, 6.6d)); + + assertEquals(expectedPredicate.toString(), predicate.toString()); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java index fd0bf33381..ebde744d03 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java @@ -235,7 +235,7 @@ public class RowConverterTest { @Test public void paimonToSeaTunnel() { - SeaTunnelRow convert = RowConverter.convert(internalRow, seaTunnelRowType); + SeaTunnelRow convert = RowConverter.convert(internalRow, seaTunnelRowType, tableSchema); Assertions.assertEquals(convert, seaTunnelRow); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java index 05fa3db4b9..90b9a63cdd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -341,7 +341,8 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource { } @TestTemplate - public void testFakeCDCSinkPaimonWithTimestampN(TestContainer container) throws Exception { + public void testFakeCDCSinkPaimonWithTimestampNAndRead(TestContainer container) + throws Exception { Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case7.conf"); Assertions.assertEquals(0, execResult.getExitCode()); @@ -404,6 +405,10 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource { "2024-03-10T10:00:00.123456789"); } }); + + Container.ExecResult readResult = + container.executeJob("/paimon_to_assert_with_timestampN.conf"); + Assertions.assertEquals(0, readResult.getExitCode()); } @TestTemplate @@ -457,6 +462,26 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource { }); } + @TestTemplate + public void testFakeSinkPaimonWithFullTypeAndReadWithFilter(TestContainer container) + throws Exception { + Container.ExecResult writeResult = + container.executeJob("/fake_to_paimon_with_full_type.conf"); + Assertions.assertEquals(0, writeResult.getExitCode()); + Container.ExecResult readResult = + container.executeJob("/paimon_to_assert_with_filter1.conf"); + Assertions.assertEquals(0, readResult.getExitCode()); + Container.ExecResult readResult2 = + container.executeJob("/paimon_to_assert_with_filter2.conf"); + Assertions.assertEquals(0, readResult2.getExitCode()); + Container.ExecResult readResult3 = + container.executeJob("/paimon_to_assert_with_filter3.conf"); + Assertions.assertEquals(0, readResult3.getExitCode()); + Container.ExecResult readResult4 = + container.executeJob("/paimon_to_assert_with_filter4.conf"); + Assertions.assertEquals(0, readResult4.getExitCode()); + } + protected final ContainerExtendedFactory containerExtendedFactory = container -> { FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java index 5d2ce86c2b..259bc0128a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java @@ -148,10 +148,15 @@ public class PaimonSinkHdfsIT extends TestSuiteBase { } }); }); + + Container.ExecResult readResult = + container.executeJob("/read_from_paimon_with_hdfs_ha_to_assert.conf"); + Assertions.assertEquals(0, readResult.getExitCode()); } @TestTemplate - public void testFakeCDCSinkPaimonWithHiveCatalog(TestContainer container) throws Exception { + public void testFakeCDCSinkPaimonWithHiveCatalogAndRead(TestContainer container) + throws Exception { Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf"); Assertions.assertEquals(0, execResult.getExitCode()); @@ -194,5 +199,9 @@ public class PaimonSinkHdfsIT extends TestSuiteBase { } }); }); + + Container.ExecResult readResult = + container.executeJob("/paimon_to_assert_with_hivecatalog.conf"); + Assertions.assertEquals(0, readResult.getExitCode()); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf new file mode 100644 index 0000000000..df53bde66e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + c_map = "map<string, string>" + c_array = "array<int>" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + primaryKey { + name = "c_tinyint" + columnNames = [c_tinyint] + } + } + rows = [ + { + kind = INSERT + fields = [{"a": "b"}, [101], "c_string", true, 117, 15987, 563873951, 7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", "bWlJWmo=", "2023-04-21", "2023-04-21T23:20:58"] + } + { + kind = INSERT + fields = [{"a": "c"}, [102], "c_string1", false, 118, 15988, 563873952, 7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212", "bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"] + } + { + kind = INSERT + fields = [{"a": "e"}, [103], "c_string2", true, 119, 15987, 563873953, 7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", "bWlJWmo=", "2023-04-23", "2023-04-23T23:20:58"] + } + { + kind = INSERT + fields = [{"a": "f"}, [104], null, false, 118, 15988, 563873951, 7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214", "bWlJWmo=", "2023-04-24", "2023-04-24T23:20:58"] + } + { + kind = INSERT + fields = [{"a": "b"}, [101], "c_string1", true, 120, 15987, 563873952, 7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", "bWlJWmo=", "2023-04-25", "2023-04-25T23:20:58"] + } + { + kind = UPDATE_BEFORE + fields = [{"a": "c"}, [102], "c_string2", false, 116, 15987, 563873953, 7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212", "bWlJWmo=", "2023-04-26", "2023-04-26T23:20:58"] + } + { + kind = UPDATE_AFTER + fields = [{"a": "e"}, [103], "c_string3", true, 116, 15989, 563873951, 7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", "bWlJWmo=", "2023-04-27", "2023-04-27T23:20:58"] + } + { + kind = DELETE + fields = [{"a": "f"}, [104], "c_string4", true, 120, 15987, 563873952, 7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214", "bWlJWmo=", "2023-04-28", "2023-04-28T23:20:58"] + } + ] + result_table_name = "fake" + } +} + +sink { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf new file mode 100644 index 0000000000..0a1b88d1d3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test" + query = "select * from st_test where c_string is not null" + result_table_name = paimon_source + } +} + +sink { + Assert { + source_table_name = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 3 + } + { + rule_type = MIN_ROW + rule_value = 3 + } + ] + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf new file mode 100644 index 0000000000..c516e1382b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test" + query = "select * from st_test where c_string='c_string2'" + result_table_name = paimon_source + } +} + +sink { + Assert { + source_table_name = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 1 + } + { + rule_type = MIN_ROW + rule_value = 1 + } + ] + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "c_string2" + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf new file mode 100644 index 0000000000..b30dbee56f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test" + query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116 and c_smallint = 15987" + result_table_name = paimon_source + } +} + +sink { + Assert { + source_table_name = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 2 + } + { + rule_type = MIN_ROW + rule_value = 2 + } + ] + field_rules = [ + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + equals_to = "true" + } + ] + } + { + field_name = c_tinyint + field_type = tinyint + field_value = [ + { + rule_type = MIN + rule_value = 116 + } + ] + } + { + field_name = c_smallint + field_type = smallint + field_value = [ + { + rule_type = NOT_NULL + equals_to = 15987 + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf new file mode 100644 index 0000000000..7f0872114c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test" + query = "select * from st_test where c_date > '2023-04-21' and c_timestamp='2023-04-27 23:20:58'" + result_table_name = paimon_source + } +} + +sink { + Assert { + source_table_name = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 1 + } + { + rule_type = MIN_ROW + rule_value = 1 + } + ] + field_rules = [ + { + field_name = c_date + field_type = date + field_value = [ + { + rule_type = NOT_NULL + equals_to = "2023-04-27" + } + ] + } + { + field_name = c_timestamp + field_type = timestamp + field_value = [ + { + rule_type = NOT_NULL + equals_to = "2023-04-27T23:20:58" + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf new file mode 100644 index 0000000000..8b7077916e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + catalog_name="seatunnel_test" + catalog_type="hive" + catalog_uri="thrift://hadoop04:9083" + warehouse="hdfs:///tmp/seatunnel" + database="seatunnel_test" + table="st_test3" + paimon.hadoop.conf = { + fs.defaultFS = "hdfs://nameservice1" + dfs.nameservices = "nameservice1" + dfs.ha.namenodes.nameservice1 = "nn1,nn2" + dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020" + dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020" + dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + dfs.client.use.datanode.hostname = "true" + } + } +} + +sink { + Assert { + source_table_name = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 2 + } + { + rule_type = MIN_ROW + rule_value = 2 + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf new file mode 100644 index 0000000000..63c489364b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/paimon" + database = "seatunnel_namespace7" + table = "st_test" + result_table_name = paimon_source + } +} + +sink { + Assert { + source_table_name = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 2 + } + { + rule_type = MIN_ROW + rule_value = 2 + } + ] + field_rules = [ + { + field_name = one_time + field_type = timestamp + field_value = [ + { + rule_type = NOT_NULL + equals_to = "2024-03-10T10:00:12" + } + ] + } + { + field_name = two_time + field_type = timestamp + field_value = [ + { + rule_type = NOT_NULL + equals_to = "2024-03-10T10:00:00.123" + } + ] + } + { + field_name = three_time + field_type = timestamp + field_value = [ + { + rule_type = NOT_NULL + equals_to = "2024-03-10T10:00:00.123456" + } + ] + } + { + field_name = four_time + field_type = timestamp + field_value = [ + { + rule_type = NOT_NULL + equals_to = "2024-03-10T10:00:00.123456789" + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf new file mode 100644 index 0000000000..29dc2be324 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + catalog_name="seatunnel_test" + warehouse="hdfs:///tmp/paimon" + database="seatunnel_namespace1" + table="st_test" + query = "select * from st_test where pk_id is not null and pk_id < 3" + paimon.hadoop.conf = { + fs.defaultFS = "hdfs://nameservice1" + dfs.nameservices = "nameservice1" + dfs.ha.namenodes.nameservice1 = "nn1,nn2" + dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020" + dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020" + dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + dfs.client.use.datanode.hostname = "true" + } + } +} + +sink { + Assert { + source_table_name = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 1 + } + { + rule_type = MIN_ROW + rule_value = 1 + } + ] + field_rules = [ + { + field_name = pk_id + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + equals_to = 1 + } + ] + } + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "A_1" + } + ] + } + { + field_name = score + field_type = int + field_value = [ + { + rule_type = NOT_NULL + equals_to = 100 + } + ] + } + ] + } + } +}