Myasuka commented on code in PR #22767: URL: https://github.com/apache/flink/pull/22767#discussion_r1264367897
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java: ########## @@ -146,7 +146,18 @@ public final class FactoryUtil { ConfigOptions.key("scan.watermark.alignment.update-interval") .durationType() .defaultValue(Duration.ofMillis(1000)) - .withDescription("update interval to align watermark."); + .withDescription("Update interval to align watermark."); + + public static final ConfigOption<Duration> SOURCE_IDLE_TIMEOUT = + ConfigOptions.key("scan.watermark.idle-timeout") + .durationType() + .noDefaultValue() + .withDescription( + "When a source do not receive any elements for the timeout time, " + + "it will be marked as temporarily idle. This allows downstream " + + "tasks to advance their watermarks without the need to wait for " + + "watermarks from this source while it is idle. " + + "Default value is 0, which means detecting source idleness is not enabled."); Review Comment: This option has no default value as zero. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.java: ########## @@ -117,8 +126,33 @@ protected FlinkLogicalTableSourceScan getNewScan( idleTimeoutMillis = -1L; } + Optional<RelHint> optionsHintOptional = + scan.getHints().stream() + .filter( + relHint -> + relHint.hintName.equalsIgnoreCase( + FlinkHints.HINT_NAME_OPTIONS)) + .findFirst(); + Configuration hintOptions = new Configuration(); + if (optionsHintOptional.isPresent()) { + RelHint optionsHint = optionsHintOptional.get(); + hintOptions = Configuration.fromMap(optionsHint.kvOptions); + } + Configuration tableOptions = new Configuration(); + RelOptTable table = scan.getTable(); + if (table instanceof TableSourceTable) { + Map<String, String> tableConfigs = + ((TableSourceTable) table) + .contextResolvedTable() + .getResolvedTable() + .getOptions(); + tableOptions = Configuration.fromMap(tableConfigs); + } Review Comment: We can also improve the code to avoid creating unnecessary objects via ~~~java Configuration tableOptions = table instanceof TableSourceTable ? Configuration.fromMap( ((TableSourceTable) table) .contextResolvedTable() .getResolvedTable() .getOptions()) : new Configuration(); ~~~ ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java: ########## @@ -104,11 +116,15 @@ public String getDigests(SourceAbilityContext context) { watermarkExpr, JavaScalaConversionUtil.toScala( context.getSourceRowType().getFieldNames())); + String digest = String.format("watermark=[%s]", expressionStr); if (idleTimeoutMillis > 0) { - return String.format( - "watermark=[%s], idletimeout=[%d]", expressionStr, idleTimeoutMillis); + digest = String.format("%s, idletimeout=[%d]", digest, idleTimeoutMillis); + } + if (watermarkParams != null) { + WatermarkEmitStrategy emitStrategy = watermarkParams.getEmitStrategy(); + digest = String.format("%s, watermarkEmitStrategy=[%s]", digest, emitStrategy); } - return String.format("watermark=[%s]", expressionStr); + return digest; Review Comment: We can use `StringBuilder` to avoid the expense string contact on `digest`, it could help 10x performance improvement. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.java: ########## @@ -117,8 +126,33 @@ protected FlinkLogicalTableSourceScan getNewScan( idleTimeoutMillis = -1L; } + Optional<RelHint> optionsHintOptional = + scan.getHints().stream() + .filter( + relHint -> + relHint.hintName.equalsIgnoreCase( + FlinkHints.HINT_NAME_OPTIONS)) + .findFirst(); + Configuration hintOptions = new Configuration(); + if (optionsHintOptional.isPresent()) { + RelHint optionsHint = optionsHintOptional.get(); + hintOptions = Configuration.fromMap(optionsHint.kvOptions); + } Review Comment: I think we can easily avoid to create unnecessary objects via ~~~java Configuration hintOptions = optionsHintOptional .map(relHint -> Configuration.fromMap(relHint.kvOptions)) .orElseGet(Configuration::new); ~~~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org