zhuzhurk commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1448756961


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##########
@@ -2562,19 +2505,26 @@ private StreamGraphGenerator 
getStreamGraphGenerator(List<Transformation<?>> tra
                     "No operators defined in streaming topology. Cannot 
execute.");
         }
 
+        // Synchronize the cached file to config option 
PipelineOptions.CACHED_FILES
+        if (!getCachedFiles().isEmpty()) {

Review Comment:
   Why is this needed? Looks to me it is already set to configuration once 
updated in `registerCachedFile()`.



##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1227,7 +1227,13 @@ public void configure(ReadableConfig configuration, 
ClassLoader classLoader) {
         
RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy);
         configuration
                 .getOptional(RestartStrategyOptions.RESTART_STRATEGY)
-                .ifPresent(s -> this.setRestartStrategy(configuration));
+                .ifPresent(

Review Comment:
   I guess the line above 
`RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy);`
 is no longer needed?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java:
##########
@@ -476,13 +470,6 @@ private void testBufferTimeout(Configuration config, 
StreamExecutionEnvironment
         config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "0ms");
         env.configure(config, this.getClass().getClassLoader());
         assertThat(env.getBufferTimeout()).isZero();
-
-        assertThatThrownBy(
-                        () -> {
-                            
config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms");
-                            env.configure(config, 
this.getClass().getClassLoader());

Review Comment:
   I guess this verification is still needed, but with the new change it should 
verify that an exception will be thrown if buffer timeout is enabled but the 
value is `-1ms`.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -145,9 +140,11 @@ public class StreamGraph implements Pipeline {
     private boolean autoParallelismEnabled;
 
     public StreamGraph(
+            Configuration jobConfiguration,
             ExecutionConfig executionConfig,
             CheckpointConfig checkpointConfig,
             SavepointRestoreSettings savepointRestoreSettings) {
+        this.jobConfiguration = checkNotNull(jobConfiguration);

Review Comment:
   It's better to create a new `Configuration` to avoid be affected by the 
modification to the original `Configuration` in the env.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java:
##########
@@ -82,7 +83,8 @@ public void setUp() {
     @Test
     public void testImmediateCheckpointing() throws Exception {
         env.setRestartStrategy(RestartStrategies.noRestart());
-        env.enableCheckpointing(Long.MAX_VALUE - 1);
+        env.enableCheckpointing(
+                Duration.ofNanos(Long.MAX_VALUE /* max allowed by FLINK 
*/).toMillis());

Review Comment:
   What's this change for?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java:
##########
@@ -460,12 +460,6 @@ void testBufferTimeoutDisabled() {
         env.configure(config, this.getClass().getClassLoader());
         assertThat(env.getBufferTimeout())
                 .isEqualTo(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);
-
-        // Setting execution.buffer-timeout's to -1ms will not take effect.
-        config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms");
-        env.configure(config, this.getClass().getClassLoader());
-        assertThat(env.getBufferTimeout())
-                .isEqualTo(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);

Review Comment:
   I guess this verification is still needed to verify that `BUFFER_TIMEOUT` 
will not be used if buffer timeout is disabled.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java:
##########
@@ -222,7 +223,7 @@ public void testGettingEnvironmentWithConfiguration() {
 
         assertThat(env.getParallelism(), equalTo(10));
         assertThat(env.getConfig().getAutoWatermarkInterval(), equalTo(100L));
-        assertThat(env.getStateBackend(), 
instanceOf(MemoryStateBackend.class));
+        assertThat(env.getStateBackend(), nullValue());

Review Comment:
   I would remove this verification and the setting of state.backend.



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/StreamOperatorContextBuilder.java:
##########
@@ -108,11 +108,15 @@ StreamOperatorStateContext build(Logger logger) throws 
IOException {
 
         StateBackend stateBackend;
         try {
+            Configuration jobConfig = environment.getJobConfiguration();
+            jobConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, 
false);
+            Configuration clusterConfig = new Configuration(configuration);
+            clusterConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, 
false);

Review Comment:
   IIUC, the cluster config is possible to be true currently. But we always set 
it to false here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -792,7 +792,11 @@ public CompletableFuture<Acknowledge> 
disposeSavepoint(String savepointPath, Tim
 
                     try {
                         Checkpoints.disposeSavepoint(
-                                savepointPath, configuration, classLoader, 
log);
+                                savepointPath,
+                                new Configuration(),

Review Comment:
   Why using an empty job configuration here? 



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java:
##########
@@ -72,7 +73,7 @@ public void testLoadingStateBackendFromConfiguration() {
                 configuration, Thread.currentThread().getContextClassLoader());
 
         StateBackend actualStateBackend = 
envFromConfiguration.getStateBackend();
-        assertThat(actualStateBackend, instanceOf(MemoryStateBackend.class));
+        assertThat(actualStateBackend, nullValue());

Review Comment:
   This test becomes nonsense if changing it like this. I would remove it as 
long as there is a test to cover the case that the state backend set via 
`StreamExecutionEnvironment#configure` can be properly created at runtime.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -1016,6 +1016,10 @@ public void configure(ReadableConfig configuration) {
         configuration
                 .getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY)
                 .ifPresent(this::setCheckpointStorage);
+        // reset checkpoint storage for backward compatibility
+        configuration
+                .getOptional(CheckpointingOptions.CHECKPOINT_STORAGE)
+                .ifPresent(ignored -> this.storage = null);

Review Comment:
   It's a bit weird here to set and reset the checkpointStorage back and forth. 
Maybe do not invoke `setCheckpointStorage` but direct do 
`configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,...)`?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java:
##########
@@ -520,9 +518,7 @@ void testConfigureCheckpointStorage() {
                 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         CheckpointStorage storage = 
env.getCheckpointConfig().getCheckpointStorage();
-        assertThat(storage).isInstanceOf(JobManagerCheckpointStorage.class);
-        assertThat(((JobManagerCheckpointStorage) storage).getCheckpointPath())
-                .isEqualTo(new Path(path));
+        assertThat(storage).isNull();

Review Comment:
   This test becomes nonsense. I would remove it entirely as long as there is a 
test to cover the case that the checkpoint storage set in 
StreamExecutionEnvironment can be properly created at runtime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to