wuchong commented on a change in pull request #13392: URL: https://github.com/apache/flink/pull/13392#discussion_r490117998
########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala ########## @@ -198,9 +198,10 @@ object WindowEmitStrategy { // It is a experimental config, will may be removed later. @Experimental - val TABLE_EXEC_EMIT_LATE_FIRE_DELAY: ConfigOption[String] = + val TABLE_EXEC_EMIT_LATE_FIRE_DELAY: ConfigOption[Duration] = key("table.exec.emit.late-fire.delay") - .noDefaultValue + .durationType() + .defaultValue(Duration.ofMillis(-1)) Review comment: We can use `.noDefaultValue()` to keep the same behavior as before. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ########## @@ -60,9 +60,10 @@ // ------------------------------------------------------------------------ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) - public static final ConfigOption<String> TABLE_EXEC_SOURCE_IDLE_TIMEOUT = + public static final ConfigOption<Duration> TABLE_EXEC_SOURCE_IDLE_TIMEOUT = key("table.exec.source.idle-timeout") - .defaultValue("-1 ms") + .durationType() + .defaultValue(Duration.ofMillis(0)) .withDescription("When a source do not receive any elements for the timeout time, " + "it will be marked as temporarily idle. This allows downstream " + "tasks to advance their watermarks without the need to wait for " + Review comment: The default value is 0ms now. It would be better to mention the meaning of default value, otherwise it's confusing. > Default value is 0, which means detecting source idleness is not enabled. ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala ########## @@ -61,8 +60,8 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) val miniBatchInterval: MiniBatchInterval = if (config.getConfiguration.getBoolean( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)) { - val miniBatchLatency = getMillisecondFromConfigDuration(config, - ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY) + val miniBatchLatency =config.getConfiguration.get( Review comment: ```suggestion val miniBatchLatency = config.getConfiguration.get( ``` ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala ########## @@ -179,9 +178,10 @@ object WindowEmitStrategy { // It is a experimental config, will may be removed later. @Experimental - val TABLE_EXEC_EMIT_EARLY_FIRE_DELAY: ConfigOption[String] = + val TABLE_EXEC_EMIT_EARLY_FIRE_DELAY: ConfigOption[Duration] = key("table.exec.emit.early-fire.delay") - .noDefaultValue + .durationType() + .defaultValue(Duration.ofMillis(-1)) Review comment: It's not allowed to use `-1ms` for Duration type. We can use `.noDefaultValue()` to keep the same behavior as before. You can use `tableConfig.getConfiguration.getOptional(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY).isPresent` to check whether it is set. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org