This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 6c1577267f [Paimon]support projection for paimon source (#6343) 6c1577267f is described below commit 6c1577267ff828f4cf024d4edba6cf6ec09cd073 Author: TaoZex <45089228+tao...@users.noreply.github.com> AuthorDate: Fri Jun 14 22:02:31 2024 +0800 [Paimon]support projection for paimon source (#6343) --- docs/en/connector-v2/source/Paimon.md | 3 +- .../seatunnel/paimon/catalog/PaimonCatalog.java | 32 +++++++ .../seatunnel/paimon/config/PaimonConfig.java | 6 -- .../seatunnel/paimon/source/PaimonSource.java | 47 ++++++--- .../paimon/source/PaimonSourceReader.java | 9 +- .../paimon/source/PaimonSourceSplitEnumerator.java | 20 +++- .../converter/SqlToPaimonPredicateConverter.java | 105 +++++++++++++++------ .../seatunnel/paimon/utils/RowTypeConverter.java | 16 +++- ...rterTest.java => SqlToPaimonConverterTest.java} | 57 +++++++++-- .../paimon/utils/RowTypeConverterTest.java | 16 +++- .../seatunnel/e2e/connector/paimon/PaimonIT.java | 3 + ...ssert.conf => paimon_projection_to_assert.conf} | 16 ++-- .../src/test/resources/paimon_to_assert.conf | 6 ++ 13 files changed, 266 insertions(+), 70 deletions(-) diff --git a/docs/en/connector-v2/source/Paimon.md b/docs/en/connector-v2/source/Paimon.md index e50ee0df9e..32155abde0 100644 --- a/docs/en/connector-v2/source/Paimon.md +++ b/docs/en/connector-v2/source/Paimon.md @@ -104,7 +104,7 @@ source { warehouse = "/tmp/paimon" database = "full_type" table = "st_test" - query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116 and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'" + query = "select c_boolean, c_tinyint from st_test where c_boolean= 'true' and c_tinyint > 116 and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'" } } ``` @@ -161,4 +161,5 @@ source { ### next version - Add Paimon Source Connector +- Support projection for Paimon Source diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java index 1e40090805..2c9fcd6f82 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -45,6 +45,10 @@ import lombok.extern.slf4j.Slf4j; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; @Slf4j public class PaimonCatalog implements Catalog, PaimonTable { @@ -124,6 +128,16 @@ public class PaimonCatalog implements Catalog, PaimonTable { } } + public CatalogTable getTableWithProjection(TablePath tablePath, int[] projectionIndex) + throws CatalogException, TableNotExistException { + try { + FileStoreTable paimonFileStoreTableTable = (FileStoreTable) getPaimonTable(tablePath); + return toCatalogTable(paimonFileStoreTableTable, tablePath, projectionIndex); + } catch (Exception e) { + throw new TableNotExistException(this.catalogName, tablePath); + } + } + @Override public Table getPaimonTable(TablePath tablePath) throws CatalogException, TableNotExistException { @@ -181,8 +195,26 @@ public class PaimonCatalog implements Catalog, PaimonTable { private CatalogTable toCatalogTable( FileStoreTable paimonFileStoreTableTable, TablePath tablePath) { + return toCatalogTable(paimonFileStoreTableTable, tablePath, null); + } + + private CatalogTable toCatalogTable( + FileStoreTable paimonFileStoreTableTable, TablePath tablePath, int[] projectionIndex) { org.apache.paimon.schema.TableSchema schema = paimonFileStoreTableTable.schema(); List<DataField> dataFields = schema.fields(); + if (!Objects.isNull(projectionIndex)) { + Map<Integer, DataField> indexMap = + IntStream.range(0, dataFields.size()) + .boxed() + .collect(Collectors.toMap(i -> i, dataFields::get)); + + dataFields = + java.util.Arrays.stream(projectionIndex) + .distinct() + .filter(indexMap::containsKey) + .mapToObj(indexMap::get) + .collect(Collectors.toList()); + } TableSchema.Builder builder = TableSchema.builder(); dataFields.forEach( dataField -> { diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java index caa5f1e72c..d394455d2f 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java @@ -86,12 +86,6 @@ public class PaimonConfig implements Serializable { .noDefaultValue() .withDescription("The table you intend to access"); - public static final Option<List<String>> READ_COLUMNS = - Options.key("read_columns") - .listType() - .noDefaultValue() - .withDescription("The read columns of the flink table store"); - @Deprecated public static final Option<String> HDFS_SITE_PATH = Options.key("hdfs_site_path") diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java index 358026b5d3..d0a0c4a793 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java @@ -29,12 +29,20 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter; +import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.Table; +import org.apache.paimon.types.RowType; + +import net.sf.jsqlparser.statement.select.PlainSelect; import java.util.Collections; import java.util.List; +import java.util.Objects; + +import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex; +import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect; /** Paimon connector source class. */ public class PaimonSource @@ -52,6 +60,8 @@ public class PaimonSource private Predicate predicate; + private int[] projectionIndex; + private CatalogTable catalogTable; public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) { @@ -61,12 +71,22 @@ public class PaimonSource TablePath.of(paimonSourceConfig.getNamespace(), paimonSourceConfig.getTable()); this.catalogTable = paimonCatalog.getTable(tablePath); this.paimonTable = paimonCatalog.getPaimonTable(tablePath); - this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); - // TODO: We can use this to realize the column projection feature later + String filterSql = readonlyConfig.get(PaimonSourceConfig.QUERY_SQL); - this.predicate = - SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate( - this.paimonTable.rowType(), filterSql); + PlainSelect plainSelect = convertToPlainSelect(filterSql); + RowType paimonRowType = this.paimonTable.rowType(); + String[] filedNames = paimonRowType.getFieldNames().toArray(new String[0]); + if (!Objects.isNull(plainSelect)) { + this.projectionIndex = convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect); + if (!Objects.isNull(projectionIndex)) { + this.catalogTable = + paimonCatalog.getTableWithProjection(tablePath, projectionIndex); + } + this.predicate = + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate( + paimonRowType, plainSelect); + } + seaTunnelRowType = RowTypeConverter.convert(paimonRowType, projectionIndex); } @Override @@ -75,26 +95,27 @@ public class PaimonSource } @Override - public Boundedness getBoundedness() { - return Boundedness.BOUNDED; + public List<CatalogTable> getProducedCatalogTables() { + return Collections.singletonList(catalogTable); } @Override - public List<CatalogTable> getProducedCatalogTables() { - return Collections.singletonList(catalogTable); + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; } @Override public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader( SourceReader.Context readerContext) throws Exception { - - return new PaimonSourceReader(readerContext, paimonTable, seaTunnelRowType, predicate); + return new PaimonSourceReader( + readerContext, paimonTable, seaTunnelRowType, predicate, projectionIndex); } @Override public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> createEnumerator( SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext) throws Exception { - return new PaimonSourceSplitEnumerator(enumeratorContext, paimonTable, predicate); + return new PaimonSourceSplitEnumerator( + enumeratorContext, paimonTable, predicate, projectionIndex); } @Override @@ -103,6 +124,6 @@ public class PaimonSource PaimonSourceState checkpointState) throws Exception { return new PaimonSourceSplitEnumerator( - enumeratorContext, paimonTable, checkpointState, predicate); + enumeratorContext, paimonTable, checkpointState, predicate, projectionIndex); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java index 6cd1d87c63..3cfa5ee8b9 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java @@ -49,13 +49,19 @@ public class PaimonSourceReader implements SourceReader<SeaTunnelRow, PaimonSour private final SeaTunnelRowType seaTunnelRowType; private volatile boolean noMoreSplit; private final Predicate predicate; + private int[] projection; public PaimonSourceReader( - Context context, Table table, SeaTunnelRowType seaTunnelRowType, Predicate predicate) { + Context context, + Table table, + SeaTunnelRowType seaTunnelRowType, + Predicate predicate, + int[] projection) { this.context = context; this.table = table; this.seaTunnelRowType = seaTunnelRowType; this.predicate = predicate; + this.projection = projection; } @Override @@ -76,6 +82,7 @@ public class PaimonSourceReader implements SourceReader<SeaTunnelRow, PaimonSour // read logic try (final RecordReader<InternalRow> reader = table.newReadBuilder() + .withProjection(projection) .withFilter(predicate) .newRead() .executeFilter() diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java index f6c76dc895..7b0f14c3ab 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java @@ -51,23 +51,31 @@ public class PaimonSourceSplitEnumerator private final Predicate predicate; + private int[] projection; + public PaimonSourceSplitEnumerator( - Context<PaimonSourceSplit> context, Table table, Predicate predicate) { + Context<PaimonSourceSplit> context, + Table table, + Predicate predicate, + int[] projection) { this.context = context; this.table = table; this.assignedSplit = new HashSet<>(); this.predicate = predicate; + this.projection = projection; } public PaimonSourceSplitEnumerator( Context<PaimonSourceSplit> context, Table table, PaimonSourceState sourceState, - Predicate predicate) { + Predicate predicate, + int[] projection) { this.context = context; this.table = table; this.assignedSplit = sourceState.getAssignedSplits(); this.predicate = predicate; + this.projection = projection; } @Override @@ -154,9 +162,13 @@ public class PaimonSourceSplitEnumerator /** Get all splits of table */ private Set<PaimonSourceSplit> getTableSplits() { final Set<PaimonSourceSplit> tableSplits = new HashSet<>(); - // TODO Support columns projection final List<Split> splits = - table.newReadBuilder().withFilter(predicate).newScan().plan().splits(); + table.newReadBuilder() + .withProjection(projection) + .withFilter(predicate) + .newScan() + .plan() + .splits(); splits.forEach(split -> tableSplits.add(new PaimonSourceSplit(split))); return tableSplits; } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java index 7b076ba558..212bfd6e8b 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java @@ -51,48 +51,97 @@ import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; import net.sf.jsqlparser.parser.CCJSqlParserUtil; import net.sf.jsqlparser.schema.Column; import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.select.AllColumns; import net.sf.jsqlparser.statement.select.PlainSelect; import net.sf.jsqlparser.statement.select.Select; import net.sf.jsqlparser.statement.select.SelectBody; +import net.sf.jsqlparser.statement.select.SelectExpressionItem; +import net.sf.jsqlparser.statement.select.SelectItem; import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.stream.IntStream; public class SqlToPaimonPredicateConverter { - public static Predicate convertSqlWhereToPaimonPredicate(RowType rowType, String query) { + public static PlainSelect convertToPlainSelect(String query) { + if (StringUtils.isBlank(query)) { + return null; + } + Statement statement = null; try { - if (StringUtils.isBlank(query)) { - return null; - } - Statement statement = CCJSqlParserUtil.parse(query); - // Confirm that the SQL statement is a Select statement - if (!(statement instanceof Select)) { - throw new IllegalArgumentException("Only SELECT statements are supported."); - } - Select select = (Select) statement; - SelectBody selectBody = select.getSelectBody(); - if (!(selectBody instanceof PlainSelect)) { - throw new IllegalArgumentException("Only simple SELECT statements are supported."); - } - PlainSelect plainSelect = (PlainSelect) selectBody; - if (plainSelect.getHaving() != null - || plainSelect.getGroupBy() != null - || plainSelect.getOrderByElements() != null - || plainSelect.getLimit() != null) { - throw new IllegalArgumentException( - "Only SELECT statements with WHERE clause are supported. The Having, Group By, Order By, Limit clauses are currently unsupported."); - } - Expression whereExpression = plainSelect.getWhere(); - if (Objects.isNull(whereExpression)) { + statement = CCJSqlParserUtil.parse(query); + } catch (JSQLParserException e) { + throw new IllegalArgumentException("Error parsing SQL.", e); + } + // Confirm that the SQL statement is a Select statement + if (!(statement instanceof Select)) { + throw new IllegalArgumentException("Only SELECT statements are supported."); + } + Select select = (Select) statement; + SelectBody selectBody = select.getSelectBody(); + if (!(selectBody instanceof PlainSelect)) { + throw new IllegalArgumentException("Only simple SELECT statements are supported."); + } + PlainSelect plainSelect = (PlainSelect) selectBody; + if (plainSelect.getHaving() != null + || plainSelect.getGroupBy() != null + || plainSelect.getOrderByElements() != null + || plainSelect.getLimit() != null) { + throw new IllegalArgumentException( + "Only SELECT statements with WHERE clause are supported. The Having, Group By, Order By, Limit clauses are currently unsupported."); + } + return plainSelect; + } + + public static int[] convertSqlSelectToPaimonProjectionIndex( + String[] fieldNames, PlainSelect plainSelect) { + int[] projectionIndex = null; + List<SelectItem> selectItems = plainSelect.getSelectItems(); + + List<String> columnNames = new ArrayList<>(); + for (SelectItem selectItem : selectItems) { + if (selectItem instanceof AllColumns) { return null; + } else if (selectItem instanceof SelectExpressionItem) { + SelectExpressionItem selectExpressionItem = (SelectExpressionItem) selectItem; + String columnName = selectExpressionItem.getExpression().toString(); + columnNames.add(columnName); + } else { + throw new IllegalArgumentException("Error encountered parsing query fields."); } - PredicateBuilder builder = new PredicateBuilder(rowType); - return parseExpressionToPredicate(builder, rowType, whereExpression); - } catch (JSQLParserException e) { - throw new IllegalArgumentException("Error parsing SQL WHERE clause", e); } + + String[] columnNamesArray = columnNames.toArray(new String[0]); + projectionIndex = + IntStream.range(0, columnNamesArray.length) + .map( + i -> { + String fieldName = columnNamesArray[i]; + int index = Arrays.asList(fieldNames).indexOf(fieldName); + if (index == -1) { + throw new IllegalArgumentException( + "column " + fieldName + " does not exist."); + } + return index; + }) + .toArray(); + + return projectionIndex; + } + + public static Predicate convertSqlWhereToPaimonPredicate( + RowType rowType, PlainSelect plainSelect) { + Expression whereExpression = plainSelect.getWhere(); + if (Objects.isNull(whereExpression)) { + return null; + } + PredicateBuilder builder = new PredicateBuilder(rowType); + return parseExpressionToPredicate(builder, rowType, whereExpression); } private static Predicate parseExpressionToPredicate( diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java index b250fd21e9..150c8a138a 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java @@ -56,6 +56,7 @@ import org.apache.paimon.types.VarCharType; import lombok.extern.slf4j.Slf4j; +import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -73,12 +74,25 @@ public class RowTypeConverter { * @param rowType Paimon row type * @return SeaTunnel row type {@link SeaTunnelRowType} */ - public static SeaTunnelRowType convert(RowType rowType) { + public static SeaTunnelRowType convert(RowType rowType, int[] projectionIndex) { String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); SeaTunnelDataType<?>[] dataTypes = rowType.getFields().stream() .map(field -> field.type().accept(PaimonToSeaTunnelTypeVisitor.INSTANCE)) .toArray(SeaTunnelDataType<?>[]::new); + if (projectionIndex != null) { + String[] projectionFieldNames = + Arrays.stream(projectionIndex) + .filter(index -> index >= 0 && index < fieldNames.length) + .mapToObj(index -> fieldNames[index]) + .toArray(String[]::new); + SeaTunnelDataType<?>[] projectionDataTypes = + Arrays.stream(projectionIndex) + .filter(index -> index >= 0 && index < fieldNames.length) + .mapToObj(index -> dataTypes[index]) + .toArray(SeaTunnelDataType<?>[]::new); + return new SeaTunnelRowType(projectionFieldNames, projectionDataTypes); + } return new SeaTunnelRowType(fieldNames, dataTypes); } diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java similarity index 77% rename from seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java rename to seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java index d04b76c09b..d8e819b505 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java @@ -41,18 +41,25 @@ import org.apache.paimon.utils.DateTimeUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import net.sf.jsqlparser.statement.select.PlainSelect; + import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.Arrays; +import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; -public class SqlToPaimonPredicateConverterTest { +public class SqlToPaimonConverterTest { private RowType rowType; + private String[] fieldNames; + @BeforeEach public void setUp() { rowType = @@ -71,6 +78,8 @@ public class SqlToPaimonPredicateConverterTest { new DataField(10, "double_col", new DoubleType()), new DataField(11, "date_col", new DateType()), new DataField(12, "timestamp_col", new TimestampType()))); + + fieldNames = rowType.getFieldNames().toArray(new String[0]); } @Test @@ -90,8 +99,10 @@ public class SqlToPaimonPredicateConverterTest { + "date_col = '2022-01-01' AND " + "timestamp_col = '2022-01-01T12:00:00.123'"; + PlainSelect plainSelect = convertToPlainSelect(query); Predicate predicate = - SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query); + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate( + rowType, plainSelect); assertNotNull(predicate); @@ -123,8 +134,10 @@ public class SqlToPaimonPredicateConverterTest { public void testConvertSqlWhereToPaimonPredicateWithIsNull() { String query = "SELECT * FROM table WHERE char_col IS NULL"; + PlainSelect plainSelect = convertToPlainSelect(query); Predicate predicate = - SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query); + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate( + rowType, plainSelect); assertNotNull(predicate); @@ -138,8 +151,10 @@ public class SqlToPaimonPredicateConverterTest { public void testConvertSqlWhereToPaimonPredicateWithIsNotNull() { String query = "SELECT * FROM table WHERE char_col IS NOT NULL"; + PlainSelect plainSelect = convertToPlainSelect(query); Predicate predicate = - SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query); + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate( + rowType, plainSelect); assertNotNull(predicate); @@ -153,8 +168,10 @@ public class SqlToPaimonPredicateConverterTest { public void testConvertSqlWhereToPaimonPredicateWithAnd() { String query = "SELECT * FROM table WHERE int_col > 3 AND double_col < 6.6"; + PlainSelect plainSelect = convertToPlainSelect(query); Predicate predicate = - SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query); + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate( + rowType, plainSelect); assertNotNull(predicate); @@ -169,8 +186,10 @@ public class SqlToPaimonPredicateConverterTest { public void testConvertSqlWhereToPaimonPredicateWithOr() { String query = "SELECT * FROM table WHERE int_col > 3 OR double_col < 6.6"; + PlainSelect plainSelect = convertToPlainSelect(query); Predicate predicate = - SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query); + SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate( + rowType, plainSelect); assertNotNull(predicate); @@ -180,4 +199,30 @@ public class SqlToPaimonPredicateConverterTest { assertEquals(expectedPredicate.toString(), predicate.toString()); } + + @Test + public void testConvertSqlSelectToPaimonProjectionArrayWithALL() { + String query = "SELECT * FROM table WHERE int_col > 3 OR double_col < 6.6"; + + PlainSelect plainSelect = convertToPlainSelect(query); + int[] projectionIndex = + SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex( + fieldNames, plainSelect); + + assertNull(projectionIndex); + } + + @Test + public void testConvertSqlSelectToPaimonProjectionArrayWithStar() { + String query = + "SELECT decimal_col, int_col, char_col, timestamp_col, boolean_col FROM table WHERE int_col > 3 OR double_col < 6.6"; + + PlainSelect plainSelect = convertToPlainSelect(query); + int[] projectionIndex = + SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex( + fieldNames, plainSelect); + + int[] expectedProjectionIndex = {4, 7, 0, 12, 2}; + assertArrayEquals(projectionIndex, expectedProjectionIndex); + } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java index e075efde15..580bdfde87 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java @@ -47,6 +47,7 @@ public class RowTypeConverterTest { private SeaTunnelRowType seaTunnelRowType; + private SeaTunnelRowType seaTunnelProjectionRowType; private RowType rowType; private BasicTypeDefine<DataType> typeDefine; @@ -128,6 +129,12 @@ public class RowTypeConverterTest { new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE), ArrayType.STRING_ARRAY_TYPE }); + + seaTunnelProjectionRowType = + new SeaTunnelRowType( + new String[] {"c_string", "c_int"}, + new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); + rowType = DataTypes.ROW( new DataField(0, "c_tinyint", DataTypes.TINYINT()), @@ -188,10 +195,17 @@ public class RowTypeConverterTest { @Test public void paimonRowTypeToSeaTunnel() { - SeaTunnelRowType convert = RowTypeConverter.convert(rowType); + SeaTunnelRowType convert = RowTypeConverter.convert(rowType, null); Assertions.assertEquals(convert, seaTunnelRowType); } + @Test + public void paimonToSeaTunnelWithProjection() { + int[] projection = {7, 2}; + SeaTunnelRowType convert = RowTypeConverter.convert(rowType, projection); + Assertions.assertEquals(convert, seaTunnelProjectionRowType); + } + @Test public void seaTunnelToPaimon() { RowType convert = RowTypeConverter.reconvert(seaTunnelRowType, tableSchema); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java index c8b5ed80ee..a24a375c31 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java @@ -55,5 +55,8 @@ public class PaimonIT extends TestSuiteBase { Assertions.assertEquals(0, textWriteResult.getExitCode()); Container.ExecResult readResult = container.executeJob("/paimon_to_assert.conf"); Assertions.assertEquals(0, readResult.getExitCode()); + Container.ExecResult readProjectionResult = + container.executeJob("/paimon_projection_to_assert.conf"); + Assertions.assertEquals(0, readProjectionResult.getExitCode()); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf similarity index 90% copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf index b9ecb4283f..6b67aa70c6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf @@ -33,6 +33,7 @@ source { database = "default" table = "st_test" result_table_name = paimon_source + query = "select c_string, c_boolean from st_test where c_string is not null" } } @@ -40,6 +41,12 @@ sink { Assert { source_table_name = paimon_source rules { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100000 + } + ], row_rules = [ { rule_type = MAX_ROW @@ -64,15 +71,6 @@ sink { rule_type = NOT_NULL } ] - }, - { - field_name = c_double - field_type = double - field_value = [ - { - rule_type = NOT_NULL - } - ] } ] } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf index b9ecb4283f..f4f6cbcaf2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf @@ -40,6 +40,12 @@ sink { Assert { source_table_name = paimon_source rules { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100000 + } + ], row_rules = [ { rule_type = MAX_ROW