This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new efe53f30 [Improve] add partial limit push down (#553)
efe53f30 is described below
commit efe53f304274dc4a6cea3c52b6a18e1b3e48bce4
Author: wudi <[email protected]>
AuthorDate: Wed Feb 12 10:32:54 2025 +0800
[Improve] add partial limit push down (#553)
---
.../apache/doris/flink/cfg/DorisReadOptions.java | 27 ++++++++++++++++++----
.../source/reader/DorisFlightValueReader.java | 5 ++++
.../flink/source/reader/DorisValueReader.java | 6 +++++
.../doris/flink/table/DorisDynamicTableSource.java | 10 +++++++-
4 files changed, 42 insertions(+), 6 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 22a77b83..1889ace5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -39,6 +39,8 @@ public class DorisReadOptions implements Serializable {
private boolean useOldApi;
private boolean useFlightSql;
private Integer flightSqlPort;
+ // for flink sql limit push down
+ private Long rowLimit;
public DorisReadOptions(
String readFields,
@@ -54,7 +56,8 @@ public class DorisReadOptions implements Serializable {
Boolean deserializeArrowAsync,
boolean useOldApi,
boolean useFlightSql,
- Integer flightSqlPort) {
+ Integer flightSqlPort,
+ Long rowLimit) {
this.readFields = readFields;
this.filterQuery = filterQuery;
this.requestTabletSize = requestTabletSize;
@@ -69,6 +72,7 @@ public class DorisReadOptions implements Serializable {
this.useOldApi = useOldApi;
this.useFlightSql = useFlightSql;
this.flightSqlPort = flightSqlPort;
+ this.rowLimit = rowLimit;
}
public String getReadFields() {
@@ -135,6 +139,14 @@ public class DorisReadOptions implements Serializable {
return flightSqlPort;
}
+ public Long getRowLimit() {
+ return rowLimit;
+ }
+
+ public void setRowLimit(Long rowLimit) {
+ this.rowLimit = rowLimit;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -165,7 +177,8 @@ public class DorisReadOptions implements Serializable {
&& Objects.equals(deserializeQueueSize,
that.deserializeQueueSize)
&& Objects.equals(deserializeArrowAsync,
that.deserializeArrowAsync)
&& Objects.equals(useFlightSql, that.useFlightSql)
- && Objects.equals(flightSqlPort, that.flightSqlPort);
+ && Objects.equals(flightSqlPort, that.flightSqlPort)
+ && Objects.equals(rowLimit, that.rowLimit);
}
@Override
@@ -184,7 +197,8 @@ public class DorisReadOptions implements Serializable {
deserializeArrowAsync,
useOldApi,
useFlightSql,
- flightSqlPort);
+ flightSqlPort,
+ rowLimit);
}
public DorisReadOptions copy() {
@@ -202,7 +216,8 @@ public class DorisReadOptions implements Serializable {
deserializeArrowAsync,
useOldApi,
useFlightSql,
- flightSqlPort);
+ flightSqlPort,
+ rowLimit);
}
/** Builder of {@link DorisReadOptions}. */
@@ -227,6 +242,7 @@ public class DorisReadOptions implements Serializable {
private Boolean useOldApi = false;
private Boolean useFlightSql =
ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
private Integer flightSqlPort;
+ private Long rowLimit;
/**
* Sets the readFields for doris table to push down projection, such
as name,age.
@@ -406,7 +422,8 @@ public class DorisReadOptions implements Serializable {
deserializeArrowAsync,
useOldApi,
useFlightSql,
- flightSqlPort);
+ flightSqlPort,
+ rowLimit);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
index fbf05050..0234be0f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
@@ -132,6 +132,11 @@ public class DorisFlightValueReader extends ValueReader
implements AutoCloseable
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " WHERE " + readOptions.getFilterQuery();
}
+
+ if (readOptions.getRowLimit() != null) {
+ sql += " LIMIT " + readOptions.getRowLimit();
+ }
+
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
return sql;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
index 2db4f798..e55bb7ca 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -74,6 +74,7 @@ public class DorisValueReader extends ValueReader implements
AutoCloseable {
protected String contextId;
protected Schema schema;
protected boolean asyncThreadStarted;
+ private long readRowCount = 0L;
public DorisValueReader(
PartitionDefinition partition, DorisOptions options,
DorisReadOptions readOptions) {
@@ -210,6 +211,10 @@ public class DorisValueReader extends ValueReader
implements AutoCloseable {
* @return true if hax next value
*/
public boolean hasNext() {
+ if (readOptions.getRowLimit() != null && readRowCount >=
readOptions.getRowLimit()) {
+ return false;
+ }
+
boolean hasNext = false;
if (deserializeArrowToRowBatchAsync && asyncThreadStarted) {
// support deserialize Arrow to RowBatch asynchronously
@@ -275,6 +280,7 @@ public class DorisValueReader extends ValueReader
implements AutoCloseable {
LOG.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
+ readRowCount++;
return rowBatch.next();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index a68cf189..e4f27d8b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -29,6 +29,7 @@ import
org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
@@ -58,7 +59,8 @@ public final class DorisDynamicTableSource
implements ScanTableSource,
LookupTableSource,
SupportsFilterPushDown,
- SupportsProjectionPushDown {
+ SupportsProjectionPushDown,
+ SupportsLimitPushDown {
private static final Logger LOG =
LoggerFactory.getLogger(DorisDynamicTableSource.class);
private final DorisOptions options;
@@ -256,4 +258,10 @@ public final class DorisDynamicTableSource
resolvedFilterQuery,
physicalRowDataType);
}
+
+ @Override
+ public void applyLimit(long limit) {
+ // partial limit push down to reduce the amount of data scanned
+ readOptions.setRowLimit(limit);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]