nsivabalan commented on code in PR #18076:
URL: https://github.com/apache/hudi/pull/18076#discussion_r2928370428


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -109,37 +109,34 @@ public CloudObjectsSelectorCommon(TypedProperties 
properties) {
   }
 
   /**
-   * Return a function that extracts filepaths from a list of Rows.
+   * Return a function that extracts filepath from a single Row without using 
any intermediate list.
    * Here Row is assumed to have the schema [bucket_name, 
filepath_relative_to_bucket, object_size]
    * @param storageUrlSchemePrefix    Eg: s3:// or gs://. The 
storage-provider-specific prefix to use within the URL.
    * @param storageConf               storage configuration.
-   * @param checkIfExists             check if each file exists, before adding 
it to the returned list
-   * @return
+   * @param checkIfExists             check if each file exists, before 
returning
+   * @return FlatMapFunction that returns 0 or 1 CloudObjectMetadata per row
    */
-  public static MapPartitionsFunction<Row, CloudObjectMetadata> 
getCloudObjectMetadataPerPartition(
+  public static FlatMapFunction<Row, CloudObjectMetadata> 
getCloudObjectMetadataPerPartition(

Review Comment:
   why do we need flatMap?
   wouldn't just `map` suffice? 
   
   is there any resource open/close we are doing? I don't see anything. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java:
##########
@@ -101,19 +104,20 @@ public Pair<Option<Dataset<Row>>, Checkpoint> 
fetchPartitionedSource(
     log.info("Adding filter string to Dataset: {}", filter);
     Dataset<Row> filteredSourceData = 
queryInfoDatasetPair.getRight().filter(filter);
 
+    long rowLimit = props.getLong(SOURCE_MAX_ROWS_PER_SYNC.key(), 
SOURCE_MAX_ROWS_PER_SYNC.defaultValue());
     log.info("Adjusting end checkpoint:{} based on sourceLimit :{}", 
queryInfo.getEndInstant(), sourceLimit);
     Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
         IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
-            filteredSourceData, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
+            filteredSourceData, sourceLimit, rowLimit, queryInfo, 
cloudObjectIncrCheckpoint);

Review Comment:
   can we name this `numFilesLimit`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -243,9 +240,9 @@ public static String generateFilter(Type type,
    * @param cloudObjectMetadataDF a Dataset that contains metadata of S3/GCS 
objects. Assumed to be a persisted form
    *                              of a Cloud Storage SQS/PubSub Notification 
event.
    * @param checkIfExists         Check if each file exists, before returning 
its full path
-   * @return A {@link List} of {@link CloudObjectMetadata} containing file 
info.
+   * @return A {@link Dataset} of {@link CloudObjectMetadata} containing file 
info.
    */
-  public static List<CloudObjectMetadata> getObjectMetadata(
+  public static Dataset<CloudObjectMetadata> getObjectMetadata(

Review Comment:
   we need to ensure that we cache the return value. 
   so that callers do not retrigger the dag repeatedly 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -299,12 +300,13 @@ public static Pair<CloudObjectIncrCheckpoint, 
Option<Dataset<Row>>> filterAndGen
       }
     }
 
-    // Limit based on sourceLimit
+    // Limit based on sourceLimit and rowLimit
     WindowSpec windowSpec = Window.orderBy(col(queryInfo.getOrderColumn()), 
col(queryInfo.getKeyColumn()));
     // Add the 'cumulativeSize' column with running sum of 'limitColumn'
     Dataset<Row> aggregatedData = orderedDf.withColumn(CUMULATIVE_COLUMN_NAME,
         sum(col(queryInfo.getLimitColumn())).over(windowSpec));
     Dataset<Row> collectedRows = 
aggregatedData.filter(col(CUMULATIVE_COLUMN_NAME).leq(sourceLimit));
+    collectedRows = collectedRows.limit((int) Math.min(rowLimit, 
Integer.MAX_VALUE));

Review Comment:
   Not sure if this will give us the gains we are looking for. 
   while processing entries from S3 metadata table only, we should apply the 
limits. 
   
   looks like in your fix, we are reading the contents from Metadata table and 
then adding a limit clause on top of that. 
   Can we retain the list as before and then trim the list of files before we 
call `loadAsDataset` in CloudObjectsSelectorCommon. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java:
##########
@@ -166,6 +166,14 @@ public class CloudSourceConfig extends HoodieConfig {
       .sinceVersion("0.14.1")
       .withDocumentation("specify this value in bytes, to coalesce partitions 
of source dataset not greater than specified limit");
 
+  public static final ConfigProperty<Long> SOURCE_MAX_ROWS_PER_SYNC = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.max.rows.per.sync")
+      .defaultValue(10_000_000L)
+      .markAdvanced()
+      .withDocumentation("Maximum number of rows to process per sync round 
when using source limit based batching. "

Review Comment:
   infact, we can fix the config key only
   `STREAMER_CONFIG_PREFIX + "source.cloud.data.max.files.per.sync`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java:
##########
@@ -101,19 +104,20 @@ public Pair<Option<Dataset<Row>>, Checkpoint> 
fetchPartitionedSource(
     log.info("Adding filter string to Dataset: {}", filter);
     Dataset<Row> filteredSourceData = 
queryInfoDatasetPair.getRight().filter(filter);
 
+    long rowLimit = props.getLong(SOURCE_MAX_ROWS_PER_SYNC.key(), 
SOURCE_MAX_ROWS_PER_SYNC.defaultValue());
     log.info("Adjusting end checkpoint:{} based on sourceLimit :{}", 
queryInfo.getEndInstant(), sourceLimit);
     Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
         IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
-            filteredSourceData, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
+            filteredSourceData, sourceLimit, rowLimit, queryInfo, 
cloudObjectIncrCheckpoint);
     if (!checkPointAndDataset.getRight().isPresent()) {
       log.info("Empty source, returning endpoint:{}", 
checkPointAndDataset.getLeft());
       return Pair.of(Option.empty(), new 
StreamerCheckpointV1(checkPointAndDataset.getLeft().toString()));
     }
     log.info("Adjusted end checkpoint :{}", checkPointAndDataset.getLeft());
 
     boolean checkIfFileExists = getBooleanWithAltKeys(props, 
ENABLE_EXISTS_CHECK);
-    List<CloudObjectMetadata> cloudObjectMetadata = 
CloudObjectsSelectorCommon.getObjectMetadata(cloudType, sparkContext, 
checkPointAndDataset.getRight().get(), checkIfFileExists, props);
-    log.info("Total number of files to process :{}", 
cloudObjectMetadata.size());
+    Dataset<CloudObjectMetadata> cloudObjectMetadataDS = 
CloudObjectsSelectorCommon.getObjectMetadata(cloudType, sparkContext, 
checkPointAndDataset.getRight().get(), checkIfFileExists, props);
+    log.info("Total number of files to process :{}", 
cloudObjectMetadataDS.count());

Review Comment:
   we need to cache the `cloudObjectMetadataDS`. if not, we might keep 
retriggering the dag repeatedly. 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java:
##########
@@ -166,6 +166,14 @@ public class CloudSourceConfig extends HoodieConfig {
       .sinceVersion("0.14.1")
       .withDocumentation("specify this value in bytes, to coalesce partitions 
of source dataset not greater than specified limit");
 
+  public static final ConfigProperty<Long> SOURCE_MAX_ROWS_PER_SYNC = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.max.rows.per.sync")
+      .defaultValue(10_000_000L)
+      .markAdvanced()
+      .withDocumentation("Maximum number of rows to process per sync round 
when using source limit based batching. "

Review Comment:
   can we also say `files` in addition to `rows`. `rows` is bit ambiguous. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to