zhuzhurk commented on code in PR #24025: URL: https://github.com/apache/flink/pull/24025#discussion_r1448756961
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ########## @@ -2562,19 +2505,26 @@ private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> tra "No operators defined in streaming topology. Cannot execute."); } + // Synchronize the cached file to config option PipelineOptions.CACHED_FILES + if (!getCachedFiles().isEmpty()) { Review Comment: Why is this needed? Looks to me it is already set to configuration once updated in `registerCachedFile()`. ########## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ########## @@ -1227,7 +1227,13 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) { RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy); configuration .getOptional(RestartStrategyOptions.RESTART_STRATEGY) - .ifPresent(s -> this.setRestartStrategy(configuration)); + .ifPresent( Review Comment: I guess the line above `RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy);` is no longer needed? ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java: ########## @@ -476,13 +470,6 @@ private void testBufferTimeout(Configuration config, StreamExecutionEnvironment config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "0ms"); env.configure(config, this.getClass().getClassLoader()); assertThat(env.getBufferTimeout()).isZero(); - - assertThatThrownBy( - () -> { - config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms"); - env.configure(config, this.getClass().getClassLoader()); Review Comment: I guess this verification is still needed, but with the new change it should verify that an exception will be thrown if buffer timeout is enabled but the value is `-1ms`. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ########## @@ -145,9 +140,11 @@ public class StreamGraph implements Pipeline { private boolean autoParallelismEnabled; public StreamGraph( + Configuration jobConfiguration, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, SavepointRestoreSettings savepointRestoreSettings) { + this.jobConfiguration = checkNotNull(jobConfiguration); Review Comment: It's better to create a new `Configuration` to avoid be affected by the modification to the original `Configuration` in the env. ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java: ########## @@ -82,7 +83,8 @@ public void setUp() { @Test public void testImmediateCheckpointing() throws Exception { env.setRestartStrategy(RestartStrategies.noRestart()); - env.enableCheckpointing(Long.MAX_VALUE - 1); + env.enableCheckpointing( + Duration.ofNanos(Long.MAX_VALUE /* max allowed by FLINK */).toMillis()); Review Comment: What's this change for? ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java: ########## @@ -460,12 +460,6 @@ void testBufferTimeoutDisabled() { env.configure(config, this.getClass().getClassLoader()); assertThat(env.getBufferTimeout()) .isEqualTo(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT); - - // Setting execution.buffer-timeout's to -1ms will not take effect. - config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms"); - env.configure(config, this.getClass().getClassLoader()); - assertThat(env.getBufferTimeout()) - .isEqualTo(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT); Review Comment: I guess this verification is still needed to verify that `BUFFER_TIMEOUT` will not be used if buffer timeout is disabled. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java: ########## @@ -222,7 +223,7 @@ public void testGettingEnvironmentWithConfiguration() { assertThat(env.getParallelism(), equalTo(10)); assertThat(env.getConfig().getAutoWatermarkInterval(), equalTo(100L)); - assertThat(env.getStateBackend(), instanceOf(MemoryStateBackend.class)); + assertThat(env.getStateBackend(), nullValue()); Review Comment: I would remove this verification and the setting of state.backend. ########## flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/StreamOperatorContextBuilder.java: ########## @@ -108,11 +108,15 @@ StreamOperatorStateContext build(Logger logger) throws IOException { StateBackend stateBackend; try { + Configuration jobConfig = environment.getJobConfiguration(); + jobConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false); + Configuration clusterConfig = new Configuration(configuration); + clusterConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false); Review Comment: IIUC, the cluster config is possible to be true currently. But we always set it to false here. ########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ########## @@ -792,7 +792,11 @@ public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Tim try { Checkpoints.disposeSavepoint( - savepointPath, configuration, classLoader, log); + savepointPath, + new Configuration(), Review Comment: Why using an empty job configuration here? ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java: ########## @@ -72,7 +73,7 @@ public void testLoadingStateBackendFromConfiguration() { configuration, Thread.currentThread().getContextClassLoader()); StateBackend actualStateBackend = envFromConfiguration.getStateBackend(); - assertThat(actualStateBackend, instanceOf(MemoryStateBackend.class)); + assertThat(actualStateBackend, nullValue()); Review Comment: This test becomes nonsense if changing it like this. I would remove it as long as there is a test to cover the case that the state backend set via `StreamExecutionEnvironment#configure` can be properly created at runtime. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java: ########## @@ -1016,6 +1016,10 @@ public void configure(ReadableConfig configuration) { configuration .getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY) .ifPresent(this::setCheckpointStorage); + // reset checkpoint storage for backward compatibility + configuration + .getOptional(CheckpointingOptions.CHECKPOINT_STORAGE) + .ifPresent(ignored -> this.storage = null); Review Comment: It's a bit weird here to set and reset the checkpointStorage back and forth. Maybe do not invoke `setCheckpointStorage` but direct do `configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,...)`? ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java: ########## @@ -520,9 +518,7 @@ void testConfigureCheckpointStorage() { StreamExecutionEnvironment.getExecutionEnvironment(configuration); CheckpointStorage storage = env.getCheckpointConfig().getCheckpointStorage(); - assertThat(storage).isInstanceOf(JobManagerCheckpointStorage.class); - assertThat(((JobManagerCheckpointStorage) storage).getCheckpointPath()) - .isEqualTo(new Path(path)); + assertThat(storage).isNull(); Review Comment: This test becomes nonsense. I would remove it entirely as long as there is a test to cover the case that the checkpoint storage set in StreamExecutionEnvironment can be properly created at runtime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org