zhuzhurk commented on code in PR #25256: URL: https://github.com/apache/flink/pull/25256#discussion_r1758217831
########## flink-python/docs/reference/pyflink.datastream/stream_execution_environment.rst: ########## @@ -132,4 +130,4 @@ SlotSharingGroup :toctree: api/ SlotSharingGroup - MemorySize Review Comment: What's changed here? ########## flink-runtime/src/main/java/org/apache/flink/streaming/util/RestartStrategyUtils.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.time.Duration; + +/** Utility class for configuring restart strategies. */ +public class RestartStrategyUtils { Review Comment: Is it possible to make it a test util? ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java: ########## @@ -54,77 +46,23 @@ private RestartBackoffTimeStrategyFactoryLoader() {} * NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory} * </ol> * - * @param jobRestartStrategyConfiguration restart configuration given within the job graph + * @param jobConfiguration job configuration * @param clusterConfiguration cluster(server-side) configuration * @param isCheckpointingEnabled if checkpointing is enabled for the job * @return new version restart strategy factory */ public static RestartBackoffTimeStrategy.Factory createRestartBackoffTimeStrategyFactory( - final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, final Configuration jobConfiguration, final Configuration clusterConfiguration, final boolean isCheckpointingEnabled) { - checkNotNull(jobRestartStrategyConfiguration); checkNotNull(jobConfiguration); checkNotNull(clusterConfiguration); - return getJobRestartStrategyFactory(jobRestartStrategyConfiguration) + return getRestartStrategyFactoryFromConfig(jobConfiguration) .orElse( - getRestartStrategyFactoryFromConfig(jobConfiguration) - .orElse( - (getRestartStrategyFactoryFromConfig(clusterConfiguration) - .orElse( - getDefaultRestartStrategyFactory( - isCheckpointingEnabled))))); - } - - private static Optional<RestartBackoffTimeStrategy.Factory> getJobRestartStrategyFactory( - final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) { - - if (restartStrategyConfiguration instanceof NoRestartStrategyConfiguration) { - return Optional.of( - NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory.INSTANCE); - } else if (restartStrategyConfiguration instanceof FixedDelayRestartStrategyConfiguration) { - final FixedDelayRestartStrategyConfiguration fixedDelayConfig = - (FixedDelayRestartStrategyConfiguration) restartStrategyConfiguration; - - return Optional.of( - new FixedDelayRestartBackoffTimeStrategy - .FixedDelayRestartBackoffTimeStrategyFactory( - fixedDelayConfig.getRestartAttempts(), - fixedDelayConfig.getDurationBetweenAttempts().toMillis())); - } else if (restartStrategyConfiguration - instanceof FailureRateRestartStrategyConfiguration) { - final FailureRateRestartStrategyConfiguration failureRateConfig = - (FailureRateRestartStrategyConfiguration) restartStrategyConfiguration; - - return Optional.of( - new FailureRateRestartBackoffTimeStrategy - .FailureRateRestartBackoffTimeStrategyFactory( - failureRateConfig.getMaxFailureRate(), - failureRateConfig.getFailureIntervalDuration().toMillis(), - failureRateConfig.getDurationBetweenAttempts().toMillis())); - } else if (restartStrategyConfiguration instanceof FallbackRestartStrategyConfiguration) { - return Optional.empty(); - } else if (restartStrategyConfiguration - instanceof ExponentialDelayRestartStrategyConfiguration) { - final ExponentialDelayRestartStrategyConfiguration exponentialDelayConfig = - (ExponentialDelayRestartStrategyConfiguration) restartStrategyConfiguration; - return Optional.of( - new ExponentialDelayRestartBackoffTimeStrategy - .ExponentialDelayRestartBackoffTimeStrategyFactory( - exponentialDelayConfig.getInitialBackoffDuration().toMillis(), - exponentialDelayConfig.getMaxBackoffDuration().toMillis(), - exponentialDelayConfig.getBackoffMultiplier(), - exponentialDelayConfig.getResetBackoffDurationThreshold().toMillis(), - exponentialDelayConfig.getJitterFactor(), - RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS - .defaultValue())); - } else { - throw new IllegalArgumentException( - "Unknown restart strategy configuration " + restartStrategyConfiguration + "."); - } + (getRestartStrategyFactoryFromConfig(clusterConfiguration) Review Comment: extra quote? ########## flink-runtime/src/main/java/org/apache/flink/streaming/util/RestartStrategyUtils.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.time.Duration; + +/** Utility class for configuring restart strategies. */ +public class RestartStrategyUtils { + + /** + * Disables the restart strategy for the given StreamExecutionEnvironment. + * + * @param env the StreamExecutionEnvironment to configure + */ + public static void configureNoRestartStrategy(StreamExecutionEnvironment env) { + env.configure(new Configuration().set(RestartStrategyOptions.RESTART_STRATEGY, "none")); + } + + /** + * Disables the restart strategy for the given JobGraph. + * + * @param jobGraph the JobGraph to configure + */ + public static void configureNoRestartStrategy(JobGraph jobGraph) { + jobGraph.getJobConfiguration().set(RestartStrategyOptions.RESTART_STRATEGY, "none"); + } + + /** + * Sets a fixed-delay restart strategy for the given StreamExecutionEnvironment. + * + * @param env the StreamExecutionEnvironment to configure + * @param restartAttempts the number of restart attempts + * @param delayBetweenAttempts the delay between restart attempts in milliseconds + */ + public static void configureFixedDelayRestartStrategy( + StreamExecutionEnvironment env, int restartAttempts, long delayBetweenAttempts) { + Configuration configuration = new Configuration(); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + configuration.set( + RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, restartAttempts); + configuration.set( + RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, + Duration.ofMillis(delayBetweenAttempts)); + + env.configure(configuration); + } + + /** + * Sets a fixed-delay restart strategy for the given StreamExecutionEnvironment. + * + * @param env the StreamExecutionEnvironment to configure + * @param restartAttempts the number of restart attempts + * @param delayBetweenAttempts the delay between restart attempts + */ + public static void configureFixedDelayRestartStrategy( + StreamExecutionEnvironment env, int restartAttempts, Duration delayBetweenAttempts) { + Configuration configuration = new Configuration(); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + configuration.set( + RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, restartAttempts); + configuration.set( + RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, delayBetweenAttempts); + + env.configure(configuration); + } + + /** + * Sets a fixed-delay restart strategy for the given JobGraph. + * + * @param jobGraph the JobGraph to configure + * @param restartAttempts the number of restart attempts + * @param delayBetweenAttempts the delay between restart attempts in milliseconds + */ + public static void configureFixedDelayRestartStrategy( + JobGraph jobGraph, int restartAttempts, long delayBetweenAttempts) { + Configuration configuration = jobGraph.getJobConfiguration(); Review Comment: This method can reuse the method `configureFixedDelayRestartStrategy(JobGraph, int, Duration)`. ########## flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java: ########## @@ -104,4 +105,80 @@ public long getPeriodicMaterializeIntervalMillis() { public Map<String, String> getGlobalJobParameters() { return globalJobParameters; } + + private String getRestartStrategyDescription(Configuration configuration) { Review Comment: Can we introduce an internal util class to host these new codes? And add some tests for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org