nsivabalan commented on code in PR #18076:
URL: https://github.com/apache/hudi/pull/18076#discussion_r2928370428
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -109,37 +109,34 @@ public CloudObjectsSelectorCommon(TypedProperties
properties) {
}
/**
- * Return a function that extracts filepaths from a list of Rows.
+ * Return a function that extracts filepath from a single Row without using
any intermediate list.
* Here Row is assumed to have the schema [bucket_name,
filepath_relative_to_bucket, object_size]
* @param storageUrlSchemePrefix Eg: s3:// or gs://. The
storage-provider-specific prefix to use within the URL.
* @param storageConf storage configuration.
- * @param checkIfExists check if each file exists, before adding
it to the returned list
- * @return
+ * @param checkIfExists check if each file exists, before
returning
+ * @return FlatMapFunction that returns 0 or 1 CloudObjectMetadata per row
*/
- public static MapPartitionsFunction<Row, CloudObjectMetadata>
getCloudObjectMetadataPerPartition(
+ public static FlatMapFunction<Row, CloudObjectMetadata>
getCloudObjectMetadataPerPartition(
Review Comment:
why do we need flatMap?
wouldn't just `map` suffice?
is there any resource open/close we are doing? I don't see anything.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java:
##########
@@ -101,19 +104,20 @@ public Pair<Option<Dataset<Row>>, Checkpoint>
fetchPartitionedSource(
log.info("Adding filter string to Dataset: {}", filter);
Dataset<Row> filteredSourceData =
queryInfoDatasetPair.getRight().filter(filter);
+ long rowLimit = props.getLong(SOURCE_MAX_ROWS_PER_SYNC.key(),
SOURCE_MAX_ROWS_PER_SYNC.defaultValue());
log.info("Adjusting end checkpoint:{} based on sourceLimit :{}",
queryInfo.getEndInstant(), sourceLimit);
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset
=
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
- filteredSourceData, sourceLimit, queryInfo,
cloudObjectIncrCheckpoint);
+ filteredSourceData, sourceLimit, rowLimit, queryInfo,
cloudObjectIncrCheckpoint);
Review Comment:
can we name this `numFilesLimit`
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -243,9 +240,9 @@ public static String generateFilter(Type type,
* @param cloudObjectMetadataDF a Dataset that contains metadata of S3/GCS
objects. Assumed to be a persisted form
* of a Cloud Storage SQS/PubSub Notification
event.
* @param checkIfExists Check if each file exists, before returning
its full path
- * @return A {@link List} of {@link CloudObjectMetadata} containing file
info.
+ * @return A {@link Dataset} of {@link CloudObjectMetadata} containing file
info.
*/
- public static List<CloudObjectMetadata> getObjectMetadata(
+ public static Dataset<CloudObjectMetadata> getObjectMetadata(
Review Comment:
we need to ensure that we cache the return value.
so that callers do not retrigger the dag repeatedly
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -299,12 +300,13 @@ public static Pair<CloudObjectIncrCheckpoint,
Option<Dataset<Row>>> filterAndGen
}
}
- // Limit based on sourceLimit
+ // Limit based on sourceLimit and rowLimit
WindowSpec windowSpec = Window.orderBy(col(queryInfo.getOrderColumn()),
col(queryInfo.getKeyColumn()));
// Add the 'cumulativeSize' column with running sum of 'limitColumn'
Dataset<Row> aggregatedData = orderedDf.withColumn(CUMULATIVE_COLUMN_NAME,
sum(col(queryInfo.getLimitColumn())).over(windowSpec));
Dataset<Row> collectedRows =
aggregatedData.filter(col(CUMULATIVE_COLUMN_NAME).leq(sourceLimit));
+ collectedRows = collectedRows.limit((int) Math.min(rowLimit,
Integer.MAX_VALUE));
Review Comment:
Not sure if this will give us the gains we are looking for.
while processing entries from S3 metadata table only, we should apply the
limits.
looks like in your fix, we are reading the contents from Metadata table and
then adding a limit clause on top of that.
Can we retain the list as before and then trim the list of files before we
call `loadAsDataset` in CloudObjectsSelectorCommon.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java:
##########
@@ -166,6 +166,14 @@ public class CloudSourceConfig extends HoodieConfig {
.sinceVersion("0.14.1")
.withDocumentation("specify this value in bytes, to coalesce partitions
of source dataset not greater than specified limit");
+ public static final ConfigProperty<Long> SOURCE_MAX_ROWS_PER_SYNC =
ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.max.rows.per.sync")
+ .defaultValue(10_000_000L)
+ .markAdvanced()
+ .withDocumentation("Maximum number of rows to process per sync round
when using source limit based batching. "
Review Comment:
infact, we can fix the config key only
`STREAMER_CONFIG_PREFIX + "source.cloud.data.max.files.per.sync`
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java:
##########
@@ -101,19 +104,20 @@ public Pair<Option<Dataset<Row>>, Checkpoint>
fetchPartitionedSource(
log.info("Adding filter string to Dataset: {}", filter);
Dataset<Row> filteredSourceData =
queryInfoDatasetPair.getRight().filter(filter);
+ long rowLimit = props.getLong(SOURCE_MAX_ROWS_PER_SYNC.key(),
SOURCE_MAX_ROWS_PER_SYNC.defaultValue());
log.info("Adjusting end checkpoint:{} based on sourceLimit :{}",
queryInfo.getEndInstant(), sourceLimit);
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset
=
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
- filteredSourceData, sourceLimit, queryInfo,
cloudObjectIncrCheckpoint);
+ filteredSourceData, sourceLimit, rowLimit, queryInfo,
cloudObjectIncrCheckpoint);
if (!checkPointAndDataset.getRight().isPresent()) {
log.info("Empty source, returning endpoint:{}",
checkPointAndDataset.getLeft());
return Pair.of(Option.empty(), new
StreamerCheckpointV1(checkPointAndDataset.getLeft().toString()));
}
log.info("Adjusted end checkpoint :{}", checkPointAndDataset.getLeft());
boolean checkIfFileExists = getBooleanWithAltKeys(props,
ENABLE_EXISTS_CHECK);
- List<CloudObjectMetadata> cloudObjectMetadata =
CloudObjectsSelectorCommon.getObjectMetadata(cloudType, sparkContext,
checkPointAndDataset.getRight().get(), checkIfFileExists, props);
- log.info("Total number of files to process :{}",
cloudObjectMetadata.size());
+ Dataset<CloudObjectMetadata> cloudObjectMetadataDS =
CloudObjectsSelectorCommon.getObjectMetadata(cloudType, sparkContext,
checkPointAndDataset.getRight().get(), checkIfFileExists, props);
+ log.info("Total number of files to process :{}",
cloudObjectMetadataDS.count());
Review Comment:
we need to cache the `cloudObjectMetadataDS`. if not, we might keep
retriggering the dag repeatedly.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java:
##########
@@ -166,6 +166,14 @@ public class CloudSourceConfig extends HoodieConfig {
.sinceVersion("0.14.1")
.withDocumentation("specify this value in bytes, to coalesce partitions
of source dataset not greater than specified limit");
+ public static final ConfigProperty<Long> SOURCE_MAX_ROWS_PER_SYNC =
ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.max.rows.per.sync")
+ .defaultValue(10_000_000L)
+ .markAdvanced()
+ .withDocumentation("Maximum number of rows to process per sync round
when using source limit based batching. "
Review Comment:
can we also say `files` in addition to `rows`. `rows` is bit ambiguous.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]