cshuo commented on code in PR #13592:
URL: https://github.com/apache/hudi/pull/13592#discussion_r2250246770
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java:
##########
@@ -292,4 +341,25 @@ public String getIssuedOffset() {
return issuedOffset;
}
+ private static class SplitState implements Serializable {
+ private int totalSplits;
+ private List<MergeOnReadInputSplit> remainingSplits;
Review Comment:
can we just make these two fields immutable with "final" ?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java:
##########
@@ -178,6 +200,14 @@ public void initializeState(FunctionInitializationContext
context) throws Except
getClass().getSimpleName(), issuedInstant, issuedOffset,
conf.get(FlinkOptions.TABLE_NAME), path);
}
}
+ Iterator<SplitState> inputSplitsIterable =
inputSplitsState.get().iterator();
+ if (inputSplitsIterable.hasNext()) {
+ SplitState splitState = inputSplitsIterable.next();
+ for (MergeOnReadInputSplit split : splitState.getRemainingSplits()) {
Review Comment:
can use `remainingSplits.addAll(splitState.getRemainingSplits());` which
internally utilizes the efficient System.arraycopy.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java:
##########
@@ -131,6 +147,7 @@ public StreamReadMonitoringFunction(
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
+ this.remainingSplits = new ArrayList<>();
Review Comment:
`remainingSplits` can be transient then.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]