This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6c1577267f [Paimon]support projection for paimon source (#6343)
6c1577267f is described below

commit 6c1577267ff828f4cf024d4edba6cf6ec09cd073
Author: TaoZex <45089228+tao...@users.noreply.github.com>
AuthorDate: Fri Jun 14 22:02:31 2024 +0800

    [Paimon]support projection for paimon source (#6343)
---
 docs/en/connector-v2/source/Paimon.md              |   3 +-
 .../seatunnel/paimon/catalog/PaimonCatalog.java    |  32 +++++++
 .../seatunnel/paimon/config/PaimonConfig.java      |   6 --
 .../seatunnel/paimon/source/PaimonSource.java      |  47 ++++++---
 .../paimon/source/PaimonSourceReader.java          |   9 +-
 .../paimon/source/PaimonSourceSplitEnumerator.java |  20 +++-
 .../converter/SqlToPaimonPredicateConverter.java   | 105 +++++++++++++++------
 .../seatunnel/paimon/utils/RowTypeConverter.java   |  16 +++-
 ...rterTest.java => SqlToPaimonConverterTest.java} |  57 +++++++++--
 .../paimon/utils/RowTypeConverterTest.java         |  16 +++-
 .../seatunnel/e2e/connector/paimon/PaimonIT.java   |   3 +
 ...ssert.conf => paimon_projection_to_assert.conf} |  16 ++--
 .../src/test/resources/paimon_to_assert.conf       |   6 ++
 13 files changed, 266 insertions(+), 70 deletions(-)

diff --git a/docs/en/connector-v2/source/Paimon.md 
b/docs/en/connector-v2/source/Paimon.md
index e50ee0df9e..32155abde0 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -104,7 +104,7 @@ source {
     warehouse = "/tmp/paimon"
     database = "full_type"
     table = "st_test"
-    query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116 
and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'"
+    query = "select c_boolean, c_tinyint from st_test where c_boolean= 'true' 
and c_tinyint > 116 and c_smallint = 15987 or 
c_decimal='2924137191386439303744.39292213'"
   }
 }
 ```
@@ -161,4 +161,5 @@ source {
 ### next version
 
 - Add Paimon Source Connector
+- Support projection for Paimon Source
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index 1e40090805..2c9fcd6f82 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -45,6 +45,10 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 @Slf4j
 public class PaimonCatalog implements Catalog, PaimonTable {
@@ -124,6 +128,16 @@ public class PaimonCatalog implements Catalog, PaimonTable 
{
         }
     }
 
+    public CatalogTable getTableWithProjection(TablePath tablePath, int[] 
projectionIndex)
+            throws CatalogException, TableNotExistException {
+        try {
+            FileStoreTable paimonFileStoreTableTable = (FileStoreTable) 
getPaimonTable(tablePath);
+            return toCatalogTable(paimonFileStoreTableTable, tablePath, 
projectionIndex);
+        } catch (Exception e) {
+            throw new TableNotExistException(this.catalogName, tablePath);
+        }
+    }
+
     @Override
     public Table getPaimonTable(TablePath tablePath)
             throws CatalogException, TableNotExistException {
@@ -181,8 +195,26 @@ public class PaimonCatalog implements Catalog, PaimonTable 
{
 
     private CatalogTable toCatalogTable(
             FileStoreTable paimonFileStoreTableTable, TablePath tablePath) {
+        return toCatalogTable(paimonFileStoreTableTable, tablePath, null);
+    }
+
+    private CatalogTable toCatalogTable(
+            FileStoreTable paimonFileStoreTableTable, TablePath tablePath, 
int[] projectionIndex) {
         org.apache.paimon.schema.TableSchema schema = 
paimonFileStoreTableTable.schema();
         List<DataField> dataFields = schema.fields();
+        if (!Objects.isNull(projectionIndex)) {
+            Map<Integer, DataField> indexMap =
+                    IntStream.range(0, dataFields.size())
+                            .boxed()
+                            .collect(Collectors.toMap(i -> i, 
dataFields::get));
+
+            dataFields =
+                    java.util.Arrays.stream(projectionIndex)
+                            .distinct()
+                            .filter(indexMap::containsKey)
+                            .mapToObj(indexMap::get)
+                            .collect(Collectors.toList());
+        }
         TableSchema.Builder builder = TableSchema.builder();
         dataFields.forEach(
                 dataField -> {
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
index caa5f1e72c..d394455d2f 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
@@ -86,12 +86,6 @@ public class PaimonConfig implements Serializable {
                     .noDefaultValue()
                     .withDescription("The table you intend to access");
 
-    public static final Option<List<String>> READ_COLUMNS =
-            Options.key("read_columns")
-                    .listType()
-                    .noDefaultValue()
-                    .withDescription("The read columns of the flink table 
store");
-
     @Deprecated
     public static final Option<String> HDFS_SITE_PATH =
             Options.key("hdfs_site_path")
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
index 358026b5d3..d0a0c4a793 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
@@ -29,12 +29,20 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
 
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.types.RowType;
+
+import net.sf.jsqlparser.statement.select.PlainSelect;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex;
+import static 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect;
 
 /** Paimon connector source class. */
 public class PaimonSource
@@ -52,6 +60,8 @@ public class PaimonSource
 
     private Predicate predicate;
 
+    private int[] projectionIndex;
+
     private CatalogTable catalogTable;
 
     public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog 
paimonCatalog) {
@@ -61,12 +71,22 @@ public class PaimonSource
                 TablePath.of(paimonSourceConfig.getNamespace(), 
paimonSourceConfig.getTable());
         this.catalogTable = paimonCatalog.getTable(tablePath);
         this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
-        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
-        // TODO: We can use this to realize the column projection feature later
+
         String filterSql = readonlyConfig.get(PaimonSourceConfig.QUERY_SQL);
-        this.predicate =
-                SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
-                        this.paimonTable.rowType(), filterSql);
+        PlainSelect plainSelect = convertToPlainSelect(filterSql);
+        RowType paimonRowType = this.paimonTable.rowType();
+        String[] filedNames = paimonRowType.getFieldNames().toArray(new 
String[0]);
+        if (!Objects.isNull(plainSelect)) {
+            this.projectionIndex = 
convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
+            if (!Objects.isNull(projectionIndex)) {
+                this.catalogTable =
+                        paimonCatalog.getTableWithProjection(tablePath, 
projectionIndex);
+            }
+            this.predicate =
+                    
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+                            paimonRowType, plainSelect);
+        }
+        seaTunnelRowType = RowTypeConverter.convert(paimonRowType, 
projectionIndex);
     }
 
     @Override
@@ -75,26 +95,27 @@ public class PaimonSource
     }
 
     @Override
-    public Boundedness getBoundedness() {
-        return Boundedness.BOUNDED;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
-    public List<CatalogTable> getProducedCatalogTables() {
-        return Collections.singletonList(catalogTable);
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
     }
 
     @Override
     public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
-
-        return new PaimonSourceReader(readerContext, paimonTable, 
seaTunnelRowType, predicate);
+        return new PaimonSourceReader(
+                readerContext, paimonTable, seaTunnelRowType, predicate, 
projectionIndex);
     }
 
     @Override
     public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> 
createEnumerator(
             SourceSplitEnumerator.Context<PaimonSourceSplit> 
enumeratorContext) throws Exception {
-        return new PaimonSourceSplitEnumerator(enumeratorContext, paimonTable, 
predicate);
+        return new PaimonSourceSplitEnumerator(
+                enumeratorContext, paimonTable, predicate, projectionIndex);
     }
 
     @Override
@@ -103,6 +124,6 @@ public class PaimonSource
             PaimonSourceState checkpointState)
             throws Exception {
         return new PaimonSourceSplitEnumerator(
-                enumeratorContext, paimonTable, checkpointState, predicate);
+                enumeratorContext, paimonTable, checkpointState, predicate, 
projectionIndex);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
index 6cd1d87c63..3cfa5ee8b9 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
@@ -49,13 +49,19 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
     private final SeaTunnelRowType seaTunnelRowType;
     private volatile boolean noMoreSplit;
     private final Predicate predicate;
+    private int[] projection;
 
     public PaimonSourceReader(
-            Context context, Table table, SeaTunnelRowType seaTunnelRowType, 
Predicate predicate) {
+            Context context,
+            Table table,
+            SeaTunnelRowType seaTunnelRowType,
+            Predicate predicate,
+            int[] projection) {
         this.context = context;
         this.table = table;
         this.seaTunnelRowType = seaTunnelRowType;
         this.predicate = predicate;
+        this.projection = projection;
     }
 
     @Override
@@ -76,6 +82,7 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
                 // read logic
                 try (final RecordReader<InternalRow> reader =
                         table.newReadBuilder()
+                                .withProjection(projection)
                                 .withFilter(predicate)
                                 .newRead()
                                 .executeFilter()
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
index f6c76dc895..7b0f14c3ab 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
@@ -51,23 +51,31 @@ public class PaimonSourceSplitEnumerator
 
     private final Predicate predicate;
 
+    private int[] projection;
+
     public PaimonSourceSplitEnumerator(
-            Context<PaimonSourceSplit> context, Table table, Predicate 
predicate) {
+            Context<PaimonSourceSplit> context,
+            Table table,
+            Predicate predicate,
+            int[] projection) {
         this.context = context;
         this.table = table;
         this.assignedSplit = new HashSet<>();
         this.predicate = predicate;
+        this.projection = projection;
     }
 
     public PaimonSourceSplitEnumerator(
             Context<PaimonSourceSplit> context,
             Table table,
             PaimonSourceState sourceState,
-            Predicate predicate) {
+            Predicate predicate,
+            int[] projection) {
         this.context = context;
         this.table = table;
         this.assignedSplit = sourceState.getAssignedSplits();
         this.predicate = predicate;
+        this.projection = projection;
     }
 
     @Override
@@ -154,9 +162,13 @@ public class PaimonSourceSplitEnumerator
     /** Get all splits of table */
     private Set<PaimonSourceSplit> getTableSplits() {
         final Set<PaimonSourceSplit> tableSplits = new HashSet<>();
-        // TODO Support columns projection
         final List<Split> splits =
-                
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
+                table.newReadBuilder()
+                        .withProjection(projection)
+                        .withFilter(predicate)
+                        .newScan()
+                        .plan()
+                        .splits();
         splits.forEach(split -> tableSplits.add(new PaimonSourceSplit(split)));
         return tableSplits;
     }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
index 7b076ba558..212bfd6e8b 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
@@ -51,48 +51,97 @@ import 
net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.schema.Column;
 import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.select.AllColumns;
 import net.sf.jsqlparser.statement.select.PlainSelect;
 import net.sf.jsqlparser.statement.select.Select;
 import net.sf.jsqlparser.statement.select.SelectBody;
+import net.sf.jsqlparser.statement.select.SelectExpressionItem;
+import net.sf.jsqlparser.statement.select.SelectItem;
 
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.IntStream;
 
 public class SqlToPaimonPredicateConverter {
 
-    public static Predicate convertSqlWhereToPaimonPredicate(RowType rowType, 
String query) {
+    public static PlainSelect convertToPlainSelect(String query) {
+        if (StringUtils.isBlank(query)) {
+            return null;
+        }
+        Statement statement = null;
         try {
-            if (StringUtils.isBlank(query)) {
-                return null;
-            }
-            Statement statement = CCJSqlParserUtil.parse(query);
-            // Confirm that the SQL statement is a Select statement
-            if (!(statement instanceof Select)) {
-                throw new IllegalArgumentException("Only SELECT statements are 
supported.");
-            }
-            Select select = (Select) statement;
-            SelectBody selectBody = select.getSelectBody();
-            if (!(selectBody instanceof PlainSelect)) {
-                throw new IllegalArgumentException("Only simple SELECT 
statements are supported.");
-            }
-            PlainSelect plainSelect = (PlainSelect) selectBody;
-            if (plainSelect.getHaving() != null
-                    || plainSelect.getGroupBy() != null
-                    || plainSelect.getOrderByElements() != null
-                    || plainSelect.getLimit() != null) {
-                throw new IllegalArgumentException(
-                        "Only SELECT statements with WHERE clause are 
supported. The Having, Group By, Order By, Limit clauses are currently 
unsupported.");
-            }
-            Expression whereExpression = plainSelect.getWhere();
-            if (Objects.isNull(whereExpression)) {
+            statement = CCJSqlParserUtil.parse(query);
+        } catch (JSQLParserException e) {
+            throw new IllegalArgumentException("Error parsing SQL.", e);
+        }
+        // Confirm that the SQL statement is a Select statement
+        if (!(statement instanceof Select)) {
+            throw new IllegalArgumentException("Only SELECT statements are 
supported.");
+        }
+        Select select = (Select) statement;
+        SelectBody selectBody = select.getSelectBody();
+        if (!(selectBody instanceof PlainSelect)) {
+            throw new IllegalArgumentException("Only simple SELECT statements 
are supported.");
+        }
+        PlainSelect plainSelect = (PlainSelect) selectBody;
+        if (plainSelect.getHaving() != null
+                || plainSelect.getGroupBy() != null
+                || plainSelect.getOrderByElements() != null
+                || plainSelect.getLimit() != null) {
+            throw new IllegalArgumentException(
+                    "Only SELECT statements with WHERE clause are supported. 
The Having, Group By, Order By, Limit clauses are currently unsupported.");
+        }
+        return plainSelect;
+    }
+
+    public static int[] convertSqlSelectToPaimonProjectionIndex(
+            String[] fieldNames, PlainSelect plainSelect) {
+        int[] projectionIndex = null;
+        List<SelectItem> selectItems = plainSelect.getSelectItems();
+
+        List<String> columnNames = new ArrayList<>();
+        for (SelectItem selectItem : selectItems) {
+            if (selectItem instanceof AllColumns) {
                 return null;
+            } else if (selectItem instanceof SelectExpressionItem) {
+                SelectExpressionItem selectExpressionItem = 
(SelectExpressionItem) selectItem;
+                String columnName = 
selectExpressionItem.getExpression().toString();
+                columnNames.add(columnName);
+            } else {
+                throw new IllegalArgumentException("Error encountered parsing 
query fields.");
             }
-            PredicateBuilder builder = new PredicateBuilder(rowType);
-            return parseExpressionToPredicate(builder, rowType, 
whereExpression);
-        } catch (JSQLParserException e) {
-            throw new IllegalArgumentException("Error parsing SQL WHERE 
clause", e);
         }
+
+        String[] columnNamesArray = columnNames.toArray(new String[0]);
+        projectionIndex =
+                IntStream.range(0, columnNamesArray.length)
+                        .map(
+                                i -> {
+                                    String fieldName = columnNamesArray[i];
+                                    int index = 
Arrays.asList(fieldNames).indexOf(fieldName);
+                                    if (index == -1) {
+                                        throw new IllegalArgumentException(
+                                                "column " + fieldName + " does 
not exist.");
+                                    }
+                                    return index;
+                                })
+                        .toArray();
+
+        return projectionIndex;
+    }
+
+    public static Predicate convertSqlWhereToPaimonPredicate(
+            RowType rowType, PlainSelect plainSelect) {
+        Expression whereExpression = plainSelect.getWhere();
+        if (Objects.isNull(whereExpression)) {
+            return null;
+        }
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        return parseExpressionToPredicate(builder, rowType, whereExpression);
     }
 
     private static Predicate parseExpressionToPredicate(
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
index b250fd21e9..150c8a138a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
@@ -56,6 +56,7 @@ import org.apache.paimon.types.VarCharType;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
@@ -73,12 +74,25 @@ public class RowTypeConverter {
      * @param rowType Paimon row type
      * @return SeaTunnel row type {@link SeaTunnelRowType}
      */
-    public static SeaTunnelRowType convert(RowType rowType) {
+    public static SeaTunnelRowType convert(RowType rowType, int[] 
projectionIndex) {
         String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
         SeaTunnelDataType<?>[] dataTypes =
                 rowType.getFields().stream()
                         .map(field -> 
field.type().accept(PaimonToSeaTunnelTypeVisitor.INSTANCE))
                         .toArray(SeaTunnelDataType<?>[]::new);
+        if (projectionIndex != null) {
+            String[] projectionFieldNames =
+                    Arrays.stream(projectionIndex)
+                            .filter(index -> index >= 0 && index < 
fieldNames.length)
+                            .mapToObj(index -> fieldNames[index])
+                            .toArray(String[]::new);
+            SeaTunnelDataType<?>[] projectionDataTypes =
+                    Arrays.stream(projectionIndex)
+                            .filter(index -> index >= 0 && index < 
fieldNames.length)
+                            .mapToObj(index -> dataTypes[index])
+                            .toArray(SeaTunnelDataType<?>[]::new);
+            return new SeaTunnelRowType(projectionFieldNames, 
projectionDataTypes);
+        }
         return new SeaTunnelRowType(fieldNames, dataTypes);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
similarity index 77%
rename from 
seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java
rename to 
seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
index d04b76c09b..d8e819b505 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
@@ -41,18 +41,25 @@ import org.apache.paimon.utils.DateTimeUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import net.sf.jsqlparser.statement.select.PlainSelect;
+
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.Arrays;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class SqlToPaimonPredicateConverterTest {
+public class SqlToPaimonConverterTest {
 
     private RowType rowType;
 
+    private String[] fieldNames;
+
     @BeforeEach
     public void setUp() {
         rowType =
@@ -71,6 +78,8 @@ public class SqlToPaimonPredicateConverterTest {
                                 new DataField(10, "double_col", new 
DoubleType()),
                                 new DataField(11, "date_col", new DateType()),
                                 new DataField(12, "timestamp_col", new 
TimestampType())));
+
+        fieldNames = rowType.getFieldNames().toArray(new String[0]);
     }
 
     @Test
@@ -90,8 +99,10 @@ public class SqlToPaimonPredicateConverterTest {
                         + "date_col = '2022-01-01' AND "
                         + "timestamp_col = '2022-01-01T12:00:00.123'";
 
+        PlainSelect plainSelect = convertToPlainSelect(query);
         Predicate predicate =
-                
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+                SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+                        rowType, plainSelect);
 
         assertNotNull(predicate);
 
@@ -123,8 +134,10 @@ public class SqlToPaimonPredicateConverterTest {
     public void testConvertSqlWhereToPaimonPredicateWithIsNull() {
         String query = "SELECT * FROM table WHERE char_col IS NULL";
 
+        PlainSelect plainSelect = convertToPlainSelect(query);
         Predicate predicate =
-                
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+                SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+                        rowType, plainSelect);
 
         assertNotNull(predicate);
 
@@ -138,8 +151,10 @@ public class SqlToPaimonPredicateConverterTest {
     public void testConvertSqlWhereToPaimonPredicateWithIsNotNull() {
         String query = "SELECT * FROM table WHERE char_col IS NOT NULL";
 
+        PlainSelect plainSelect = convertToPlainSelect(query);
         Predicate predicate =
-                
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+                SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+                        rowType, plainSelect);
 
         assertNotNull(predicate);
 
@@ -153,8 +168,10 @@ public class SqlToPaimonPredicateConverterTest {
     public void testConvertSqlWhereToPaimonPredicateWithAnd() {
         String query = "SELECT * FROM table WHERE int_col > 3 AND double_col < 
6.6";
 
+        PlainSelect plainSelect = convertToPlainSelect(query);
         Predicate predicate =
-                
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+                SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+                        rowType, plainSelect);
 
         assertNotNull(predicate);
 
@@ -169,8 +186,10 @@ public class SqlToPaimonPredicateConverterTest {
     public void testConvertSqlWhereToPaimonPredicateWithOr() {
         String query = "SELECT * FROM table WHERE int_col > 3 OR double_col < 
6.6";
 
+        PlainSelect plainSelect = convertToPlainSelect(query);
         Predicate predicate =
-                
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+                SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+                        rowType, plainSelect);
 
         assertNotNull(predicate);
 
@@ -180,4 +199,30 @@ public class SqlToPaimonPredicateConverterTest {
 
         assertEquals(expectedPredicate.toString(), predicate.toString());
     }
+
+    @Test
+    public void testConvertSqlSelectToPaimonProjectionArrayWithALL() {
+        String query = "SELECT * FROM table WHERE int_col > 3 OR double_col < 
6.6";
+
+        PlainSelect plainSelect = convertToPlainSelect(query);
+        int[] projectionIndex =
+                
SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex(
+                        fieldNames, plainSelect);
+
+        assertNull(projectionIndex);
+    }
+
+    @Test
+    public void testConvertSqlSelectToPaimonProjectionArrayWithStar() {
+        String query =
+                "SELECT decimal_col, int_col, char_col, timestamp_col, 
boolean_col FROM table WHERE int_col > 3 OR double_col < 6.6";
+
+        PlainSelect plainSelect = convertToPlainSelect(query);
+        int[] projectionIndex =
+                
SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex(
+                        fieldNames, plainSelect);
+
+        int[] expectedProjectionIndex = {4, 7, 0, 12, 2};
+        assertArrayEquals(projectionIndex, expectedProjectionIndex);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
index e075efde15..580bdfde87 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
@@ -47,6 +47,7 @@ public class RowTypeConverterTest {
 
     private SeaTunnelRowType seaTunnelRowType;
 
+    private SeaTunnelRowType seaTunnelProjectionRowType;
     private RowType rowType;
 
     private BasicTypeDefine<DataType> typeDefine;
@@ -128,6 +129,12 @@ public class RowTypeConverterTest {
                             new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE),
                             ArrayType.STRING_ARRAY_TYPE
                         });
+
+        seaTunnelProjectionRowType =
+                new SeaTunnelRowType(
+                        new String[] {"c_string", "c_int"},
+                        new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE, 
BasicType.INT_TYPE});
+
         rowType =
                 DataTypes.ROW(
                         new DataField(0, "c_tinyint", DataTypes.TINYINT()),
@@ -188,10 +195,17 @@ public class RowTypeConverterTest {
 
     @Test
     public void paimonRowTypeToSeaTunnel() {
-        SeaTunnelRowType convert = RowTypeConverter.convert(rowType);
+        SeaTunnelRowType convert = RowTypeConverter.convert(rowType, null);
         Assertions.assertEquals(convert, seaTunnelRowType);
     }
 
+    @Test
+    public void paimonToSeaTunnelWithProjection() {
+        int[] projection = {7, 2};
+        SeaTunnelRowType convert = RowTypeConverter.convert(rowType, 
projection);
+        Assertions.assertEquals(convert, seaTunnelProjectionRowType);
+    }
+
     @Test
     public void seaTunnelToPaimon() {
         RowType convert = RowTypeConverter.reconvert(seaTunnelRowType, 
tableSchema);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
index c8b5ed80ee..a24a375c31 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
@@ -55,5 +55,8 @@ public class PaimonIT extends TestSuiteBase {
         Assertions.assertEquals(0, textWriteResult.getExitCode());
         Container.ExecResult readResult = 
container.executeJob("/paimon_to_assert.conf");
         Assertions.assertEquals(0, readResult.getExitCode());
+        Container.ExecResult readProjectionResult =
+                container.executeJob("/paimon_projection_to_assert.conf");
+        Assertions.assertEquals(0, readProjectionResult.getExitCode());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
similarity index 90%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
index b9ecb4283f..6b67aa70c6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
@@ -33,6 +33,7 @@ source {
     database = "default"
     table = "st_test"
     result_table_name = paimon_source
+    query = "select c_string, c_boolean from st_test where c_string is not 
null"
   }
 }
 
@@ -40,6 +41,12 @@ sink {
   Assert {
     source_table_name = paimon_source
     rules {
+    row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 100000
+        }
+      ],
       row_rules = [
         {
           rule_type = MAX_ROW
@@ -64,15 +71,6 @@ sink {
               rule_type = NOT_NULL
             }
           ]
-        },
-        {
-          field_name = c_double
-          field_type = double
-          field_value = [
-            {
-              rule_type = NOT_NULL
-            }
-          ]
         }
       ]
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
index b9ecb4283f..f4f6cbcaf2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
@@ -40,6 +40,12 @@ sink {
   Assert {
     source_table_name = paimon_source
     rules {
+    row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 100000
+        }
+      ],
       row_rules = [
         {
           rule_type = MAX_ROW


Reply via email to