wuchong commented on a change in pull request #15197: URL: https://github.com/apache/flink/pull/15197#discussion_r594266993
########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ########## @@ -453,22 +457,27 @@ private void callSet(SqlCommandCall cmdCall) { terminal.writer() .println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi()); } else { - properties.entrySet().stream() - .map((e) -> e.getKey() + "=" + e.getValue()) - .sorted() - .forEach((p) -> terminal.writer().println(p)); + List<String> prettyEntries = ConfigurationUtils.getPropertiesInPretty(properties); + prettyEntries.forEach(entry -> terminal.writer().println(entry)); } } // set a property else { + String key = cmdCall.operands[0].trim(); + String value = cmdCall.operands[1].trim(); try { - executor.setSessionProperty( - sessionId, cmdCall.operands[0], cmdCall.operands[1].trim()); + executor.setSessionProperty(sessionId, key, value); } catch (SqlExecutionException e) { printExecutionException(e); return; } - terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi()); + if (ConfigurationUtils.isDeprecatedKey(key)) { + terminal.writer().println(CliStrings.messageWarning(MESSAGE_SET_DEPRECATED_KEY)); Review comment: Should we also show the suggest key? For example: `The specified key 'execution.mode' is deprecated, please use 'execution.runtime-mode' instead.`. ########## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/config/ConfigurationUtilsTest.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; + +/** Test {@link ConfigurationUtils}. */ +public class ConfigurationUtilsTest { + + private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml"; + + @Test + public void testSetYamlKey() { + Configuration configuration = new Configuration(); + ConfigurationUtils.setKeyToConfiguration(configuration, "execution.type", "batch"); + Assert.assertEquals("batch", configuration.getString("execution.type", null)); + Assert.assertEquals(RuntimeExecutionMode.BATCH, configuration.get(RUNTIME_MODE)); Review comment: I think it would be better to convert Configuration into Map, and assert on the Map. This can avoid using the deprecated methods (`getString`) and avoid the effect of default values. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ########## @@ -453,22 +457,27 @@ private void callSet(SqlCommandCall cmdCall) { terminal.writer() .println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi()); } else { - properties.entrySet().stream() - .map((e) -> e.getKey() + "=" + e.getValue()) - .sorted() - .forEach((p) -> terminal.writer().println(p)); + List<String> prettyEntries = ConfigurationUtils.getPropertiesInPretty(properties); + prettyEntries.forEach(entry -> terminal.writer().println(entry)); } } // set a property else { + String key = cmdCall.operands[0].trim(); + String value = cmdCall.operands[1].trim(); try { - executor.setSessionProperty( - sessionId, cmdCall.operands[0], cmdCall.operands[1].trim()); + executor.setSessionProperty(sessionId, key, value); } catch (SqlExecutionException e) { printExecutionException(e); return; } - terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi()); + if (ConfigurationUtils.isDeprecatedKey(key)) { + terminal.writer().println(CliStrings.messageWarning(MESSAGE_SET_DEPRECATED_KEY)); + return; + } else if (ConfigurationUtils.isYamlKey(key)) { Review comment: `The specified key 'execution.mode' is deprecated, please use 'execution.runtime-mode' instead.`. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java ########## @@ -112,15 +104,17 @@ public TableEnvironment getTableEnvironment() { // ------------------------------------------------------------------------------------------------------------------ private TableEnvironment createTableEnvironment() { - EnvironmentSettings settings = environment.getExecution().getEnvironmentSettings(); + // check the value of TABLE_PLANNER and RUNTIME_MODE + EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(flinkConfig); TableConfig config = new TableConfig(); config.addConfiguration(flinkConfig); - // Override the value in configuration. - // TODO: use `table.planner` and `execution.runtime-mode` to configure the TableEnvironment - // in FLINK-21462. - config.addConfiguration(settings.toConfiguration()); + if (flinkConfig.get(ExecutionOptions.RUNTIME_MODE).equals(RuntimeExecutionMode.BATCH) + && flinkConfig.get(TableConfigOptions.TABLE_PLANNER).equals(PlannerType.OLD)) { Review comment: We can use methods on `EnvironmentSettings` to check the runtime mode and planner. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** Options used in sql client. */ +public class SqlClientOptions { + private SqlClientOptions() {} + + public static final ConfigOption<Integer> EXECUTION_MAX_TABLE_RESULT_ROWS = + ConfigOptions.key("sql-client.execution.max-table-result.rows") + .intType() + .defaultValue(1000_000) + .withDescription( + "The number of rows to cache when in the table mode. If the number of rows exceeds the " + + "specified value, it retries the row in the FIFO style."); + + public static final ConfigOption<ResultMode> EXECUTION_RESULT_MODE = + ConfigOptions.key("sql-client.execution.result-mode") + .enumType(ResultMode.class) + .defaultValue(ResultMode.TABLE) + .withDescription( + "Determine the mode when display the query result. The available values are ['table', 'tableau', 'changelog']."); Review comment: Please explain the difference between the enums. You can take the [docs](https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#running-sql-queries) as an example. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ########## @@ -453,22 +457,27 @@ private void callSet(SqlCommandCall cmdCall) { terminal.writer() .println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi()); } else { - properties.entrySet().stream() - .map((e) -> e.getKey() + "=" + e.getValue()) - .sorted() - .forEach((p) -> terminal.writer().println(p)); + List<String> prettyEntries = ConfigurationUtils.getPropertiesInPretty(properties); + prettyEntries.forEach(entry -> terminal.writer().println(entry)); } } // set a property else { + String key = cmdCall.operands[0].trim(); + String value = cmdCall.operands[1].trim(); try { - executor.setSessionProperty( - sessionId, cmdCall.operands[0], cmdCall.operands[1].trim()); + executor.setSessionProperty(sessionId, key, value); } catch (SqlExecutionException e) { printExecutionException(e); return; } - terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi()); + if (ConfigurationUtils.isDeprecatedKey(key)) { + terminal.writer().println(CliStrings.messageWarning(MESSAGE_SET_DEPRECATED_KEY)); + return; + } else if (ConfigurationUtils.isYamlKey(key)) { Review comment: Users don't understand what is YAML key. What's the difference between YAML key and deperacted key? ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java ########## @@ -44,25 +49,23 @@ public ResultStore() { * Creates a result. Might start threads or opens sockets so every created result must be * closed. */ - public DynamicResult createResult(Environment env, TableResult tableResult) { - if (env.getExecution().inStreamingMode()) { - if (env.getExecution().isChangelogMode() || env.getExecution().isTableauMode()) { - return new ChangelogCollectResult(tableResult); - } else { - return new MaterializedCollectStreamResult( - tableResult, env.getExecution().getMaxTableResultRows()); - } - } else { - // Batch Execution - if (env.getExecution().isTableMode()) { - return new MaterializedCollectBatchResult( - tableResult, env.getExecution().getMaxTableResultRows()); - } else if (env.getExecution().isTableauMode()) { + public DynamicResult createResult(ReadableConfig config, TableResult tableResult) { + switch (config.get(EXECUTION_RESULT_MODE)) { + case CHANGELOG: Review comment: batch doesn't support `CHANGELOG` mode. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java ########## @@ -234,4 +229,15 @@ public SessionState( this.functionCatalog = functionCatalog; } } + + // -------------------------------------------------------------------------------------------- + + private void resetSessionConfigurationToValue(Configuration newValue) { Review comment: ```suggestion private void resetSessionConfigurationToDefault(Configuration defaultConf) { ``` ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java ########## @@ -138,12 +142,7 @@ protected ExecutionContext getExecutionContext(String sessionId) throws SqlExecu @Override public Map<String, String> getSessionProperties(String sessionId) throws SqlExecutionException { - final Environment env = getSessionContext(sessionId).getSessionEnvironment(); - final Map<String, String> properties = new HashMap<>(); - properties.putAll(env.getExecution().asTopLevelMap()); - properties.putAll(env.getDeployment().asTopLevelMap()); - properties.putAll(env.getConfiguration().asMap()); - return properties; + return ((Configuration) getSessionContext(sessionId).getReadableConfig()).toMap(); Review comment: We can add a method `Map<String, String> getConfigMap()` in `SessionContext` to avoid type cast. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/LegacyTableEnvironmentInitializer.java ########## @@ -64,11 +64,8 @@ import static org.apache.flink.table.api.Expressions.$; -/** - * Utils to initialize {@link TableEnvironment} from {@link Environment}. - * - * <p>It will be removed when sql-client.yaml is deprecated. - */ +/** Utils to initialize {@link TableEnvironment} from {@link Environment}. */ Review comment: ```suggestion /** * Utils to initialize {@link TableEnvironment} from {@link Environment}. * * @deprecated This will be dropped in Flink 1.4 with dropping support of {@code sql-client.yaml} * configuration file. */ ``` Add `@deprecated` annotation for every deperacted files. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** Options used in sql client. */ +public class SqlClientOptions { Review comment: Please add a `SQL Client Options` section in this page and auto-generate the option table : https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/ ########## File path: flink-table/flink-sql-client/src/test/resources/sql/set.q ########## @@ -42,20 +42,11 @@ CREATE TABLE hive_table ( [INFO] Table has been created. !info -# list the configured configuration -set; -table.sql-dialect=hive -!ok - Review comment: Why remove this? ########## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ########## @@ -444,6 +444,7 @@ public void testStreamQueryExecutionSink() throws Exception { replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath); replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); replaceVars.put("$VAR_MAX_ROWS", "100"); + replaceVars.put("$VAR_RESULT_MODE", "table"); Review comment: Why need this? ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ########## @@ -453,22 +457,27 @@ private void callSet(SqlCommandCall cmdCall) { terminal.writer() .println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi()); } else { - properties.entrySet().stream() - .map((e) -> e.getKey() + "=" + e.getValue()) - .sorted() - .forEach((p) -> terminal.writer().println(p)); + List<String> prettyEntries = ConfigurationUtils.getPropertiesInPretty(properties); + prettyEntries.forEach(entry -> terminal.writer().println(entry)); } } // set a property else { + String key = cmdCall.operands[0].trim(); + String value = cmdCall.operands[1].trim(); try { - executor.setSessionProperty( - sessionId, cmdCall.operands[0], cmdCall.operands[1].trim()); + executor.setSessionProperty(sessionId, key, value); } catch (SqlExecutionException e) { printExecutionException(e); return; } - terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi()); + if (ConfigurationUtils.isDeprecatedKey(key)) { + terminal.writer().println(CliStrings.messageWarning(MESSAGE_SET_DEPRECATED_KEY)); Review comment: `The specified key is not supported anymore.`. ########## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java ########## @@ -122,12 +120,7 @@ public void testSetAndResetOption() throws Exception { .getConfig() .getConfiguration() .getString(TABLE_SQL_DIALECT)); - Assert.assertNull( - sessionContext - .getSessionEnvironment() - .getConfiguration() - .asMap() - .get("pipeline.name")); + Assert.assertNull(sessionContext.getReadableConfig().get(NAME)); Review comment: Why not use ``` sessionContext .getExecutionContext() .getTableEnvironment() .getConfig() .getConfiguration() ``` ? ########## File path: flink-table/flink-sql-client/src/test/resources/sql/set.q ########## @@ -42,20 +42,11 @@ CREATE TABLE hive_table ( [INFO] Table has been created. !info -# list the configured configuration -set; -table.sql-dialect=hive -!ok - # reset the configuration reset; [INFO] All session properties have been set to their default values. !info -set; -[INFO] Result was empty. -!info Review comment: Why remove this? ########## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/config/ConfigurationUtilsTest.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; + +/** Test {@link ConfigurationUtils}. */ +public class ConfigurationUtilsTest { + + private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml"; + + @Test + public void testSetYamlKey() { + Configuration configuration = new Configuration(); + ConfigurationUtils.setKeyToConfiguration(configuration, "execution.type", "batch"); + Assert.assertEquals("batch", configuration.getString("execution.type", null)); Review comment: Remove all the `Assert.` prefix. ########## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/config/ConfigurationUtilsTest.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; + +/** Test {@link ConfigurationUtils}. */ +public class ConfigurationUtilsTest { + + private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml"; + + @Test + public void testSetYamlKey() { + Configuration configuration = new Configuration(); + ConfigurationUtils.setKeyToConfiguration(configuration, "execution.type", "batch"); + Assert.assertEquals("batch", configuration.getString("execution.type", null)); + Assert.assertEquals(RuntimeExecutionMode.BATCH, configuration.get(RUNTIME_MODE)); + } + + @Test + public void testSetConfigOption() { + Configuration configuration = new Configuration(); + ConfigurationUtils.setKeyToConfiguration(configuration, RUNTIME_MODE.key(), "batch"); + Assert.assertNull(configuration.getString("execution.type", null)); + Assert.assertEquals(RuntimeExecutionMode.BATCH, configuration.get(RUNTIME_MODE)); + } + + @Test + public void testModifyYamlKeyWhenSetConfigOptionOnly() { + Configuration configuration = new Configuration(); + // set config option + ConfigurationUtils.setKeyToConfiguration(configuration, RUNTIME_MODE.key(), "batch"); + // modify yaml key + ConfigurationUtils.setKeyToConfiguration(configuration, "execution.type", "streaming"); + + Assert.assertEquals(RuntimeExecutionMode.STREAMING, configuration.get(RUNTIME_MODE)); + Assert.assertEquals("streaming", configuration.getString("execution.type", null)); + } + + @Test + public void testModifyConfigOptionWhenSetYamlKey() { + Configuration configuration = new Configuration(); + // set yaml key + ConfigurationUtils.setKeyToConfiguration(configuration, "execution.type", "streaming"); + // modify config option + ConfigurationUtils.setKeyToConfiguration(configuration, RUNTIME_MODE.key(), "batch"); + + Assert.assertEquals(RuntimeExecutionMode.BATCH, configuration.get(RUNTIME_MODE)); + Assert.assertEquals("batch", configuration.getString("execution.type", null)); + } + + @Test + public void testExecutionEntryToConfigOption() throws Exception { + final Environment env = getEnvironment(); + + Configuration configuration = + ConfigurationUtils.convertExecutionEntryToConfiguration(env.getExecution()); + + Assert.assertEquals(RuntimeExecutionMode.BATCH, configuration.get(RUNTIME_MODE)); + Assert.assertEquals(ResultMode.TABLE, configuration.get(EXECUTION_RESULT_MODE)); + Assert.assertEquals(100, configuration.getInteger(EXECUTION_MAX_TABLE_RESULT_ROWS)); + Assert.assertEquals("failure-rate", configuration.getString(RESTART_STRATEGY)); + + List<String> items = ConfigurationUtils.getPropertiesInPretty(configuration.toMap()); + List<String> expectedItems = + Arrays.asList( + "[DEPRECATED]execution.max-parallelism=16", + "[DEPRECATED]execution.max-table-result-rows=100", + "[DEPRECATED]execution.min-idle-state-retention=1000", + "[DEPRECATED]execution.parallelism=1", + "[DEPRECATED]execution.periodic-watermarks-interval=99", + "[DEPRECATED]execution.planner=old", + "[DEPRECATED]execution.restart-strategy.delay=1000", + "[DEPRECATED]execution.restart-strategy.failure-rate-interval=99000", + "[DEPRECATED]execution.restart-strategy.max-failures-per-interval=10", + "[DEPRECATED]execution.restart-strategy.type=failure-rate", + "[DEPRECATED]execution.result-mode=table", + "[DEPRECATED]execution.type=batch", Review comment: Would be better to put deprecated keys at the end, and add a space before the deprecated key, e.g. `[DEPRECATED] execution.type=batch`. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigurationUtils.java ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.table.client.config.entries.ExecutionEntry; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM; +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.configuration.PipelineOptions.AUTO_WATERMARK_INTERVAL; +import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.IDLE_STATE_RETENTION; +import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_PLANNER; +import static org.apache.flink.table.client.config.Environment.EXECUTION_ENTRY; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; + +/** + * Convert {@link ExecutionEntry} to {@link Configuration}. + * + * <ul> + * The rule is simple. + * <li>If the key is from {@link ExecutionEntry}, set the key and corresponding {@link + * ConfigOption} at the same time. + * <li>If the key is {@link ConfigOption} and corresponding {@link ExecutionEntry} is not set, set + * the value for {@link ConfigOption} only. + * <li>If the key is {@link ConfigOption} and corresponding {@link ExecutionEntry} is set, set the + * value for both. + * </ul> + * + * <p>When read all entries from the {@link Configuration}, read the key from {@link ExecutionEntry} + * first and then read the {@link ConfigOption}. + * + * <p>When YAML is removed from the project, it should also remove this helper class. + */ +@Deprecated +public class ConfigurationUtils { + + static Map<String, String> entryToConfigOptions = new HashMap<>(); + static Map<String, String> configOptionToEntries = new HashMap<>(); + static Set<String> deprecatedEntries = new HashSet<>(); + + static { + // EnvironmentSettings + entryToConfigOptions.put( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_TYPE), + RUNTIME_MODE.key()); + entryToConfigOptions.put( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PLANNER), + TABLE_PLANNER.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", + EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PERIODIC_WATERMARKS_INTERVAL), + AUTO_WATERMARK_INTERVAL.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MIN_STATE_RETENTION), + IDLE_STATE_RETENTION.key()); + entryToConfigOptions.put( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PARALLELISM), + DEFAULT_PARALLELISM.key()); + entryToConfigOptions.put( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MAX_PARALLELISM), + MAX_PARALLELISM.key()); + entryToConfigOptions.put( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESULT_MODE), + EXECUTION_RESULT_MODE.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MAX_TABLE_RESULT_ROWS), + EXECUTION_MAX_TABLE_RESULT_ROWS.key()); + // restart strategy + entryToConfigOptions.put( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_TYPE), + RESTART_STRATEGY.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_DELAY), + RESTART_STRATEGY_FIXED_DELAY_DELAY.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", + EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_ATTEMPTS), + RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL), + RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL), + RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL), + RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY.key()); + } + + static { + configOptionToEntries.put( + RUNTIME_MODE.key(), + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_TYPE)); + configOptionToEntries.put( + TABLE_PLANNER.key(), + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PLANNER)); + configOptionToEntries.put( + AUTO_WATERMARK_INTERVAL.key(), + String.format( + "%s.%s", + EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PERIODIC_WATERMARKS_INTERVAL)); + configOptionToEntries.put( + IDLE_STATE_RETENTION.key(), + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MIN_STATE_RETENTION)); + configOptionToEntries.put( + DEFAULT_PARALLELISM.key(), + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PARALLELISM)); + configOptionToEntries.put( + MAX_PARALLELISM.key(), + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MAX_PARALLELISM)); + configOptionToEntries.put( + EXECUTION_RESULT_MODE.key(), + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESULT_MODE)); + configOptionToEntries.put( + EXECUTION_MAX_TABLE_RESULT_ROWS.key(), + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MAX_TABLE_RESULT_ROWS)); + // restart strategy + configOptionToEntries.put( + RESTART_STRATEGY.key(), + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_TYPE)); + configOptionToEntries.put( + RESTART_STRATEGY_FIXED_DELAY_DELAY.key(), + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_DELAY)); + configOptionToEntries.put( + RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS.key(), + String.format( + "%s.%s", + EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_ATTEMPTS)); + configOptionToEntries.put( + RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL.key(), + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL)); + configOptionToEntries.put( + RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL.key(), + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL)); + configOptionToEntries.put( + RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY.key(), + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL)); + } + + static { + deprecatedEntries.add( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_TIME_CHARACTERISTIC)); + deprecatedEntries.add( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MAX_STATE_RETENTION)); + deprecatedEntries.add( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_CURRENT_CATALOG)); + deprecatedEntries.add( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_CURRENT_DATABASE)); + } + + // -------------------------------------------------------------------------------------------- + + public static boolean isDeprecatedKey(String key) { + return deprecatedEntries.contains(key); + } + + public static boolean isYamlKey(String key) { + return deprecatedEntries.contains(key) || entryToConfigOptions.containsKey(key); Review comment: `entryToConfigOptions.containsKey(key)`. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigurationUtils.java ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.table.client.config.entries.ExecutionEntry; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM; +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.configuration.PipelineOptions.AUTO_WATERMARK_INTERVAL; +import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.IDLE_STATE_RETENTION; +import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_PLANNER; +import static org.apache.flink.table.client.config.Environment.EXECUTION_ENTRY; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; + +/** + * Convert {@link ExecutionEntry} to {@link Configuration}. + * + * <ul> + * The rule is simple. + * <li>If the key is from {@link ExecutionEntry}, set the key and corresponding {@link + * ConfigOption} at the same time. + * <li>If the key is {@link ConfigOption} and corresponding {@link ExecutionEntry} is not set, set + * the value for {@link ConfigOption} only. + * <li>If the key is {@link ConfigOption} and corresponding {@link ExecutionEntry} is set, set the + * value for both. + * </ul> + * + * <p>When read all entries from the {@link Configuration}, read the key from {@link ExecutionEntry} + * first and then read the {@link ConfigOption}. + * + * <p>When YAML is removed from the project, it should also remove this helper class. + */ +@Deprecated +public class ConfigurationUtils { Review comment: Could we merge this class and `ConfigUtils`? It's quite confusing there are so many config utils. Maybe we can call it `YamlConfigUtils` to be more specific. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigurationUtils.java ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.table.client.config.entries.ExecutionEntry; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM; +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.configuration.PipelineOptions.AUTO_WATERMARK_INTERVAL; +import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS; +import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.IDLE_STATE_RETENTION; +import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_PLANNER; +import static org.apache.flink.table.client.config.Environment.EXECUTION_ENTRY; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS; +import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE; + +/** + * Convert {@link ExecutionEntry} to {@link Configuration}. + * + * <ul> + * The rule is simple. + * <li>If the key is from {@link ExecutionEntry}, set the key and corresponding {@link + * ConfigOption} at the same time. + * <li>If the key is {@link ConfigOption} and corresponding {@link ExecutionEntry} is not set, set + * the value for {@link ConfigOption} only. + * <li>If the key is {@link ConfigOption} and corresponding {@link ExecutionEntry} is set, set the + * value for both. + * </ul> + * + * <p>When read all entries from the {@link Configuration}, read the key from {@link ExecutionEntry} + * first and then read the {@link ConfigOption}. + * + * <p>When YAML is removed from the project, it should also remove this helper class. + */ +@Deprecated +public class ConfigurationUtils { + + static Map<String, String> entryToConfigOptions = new HashMap<>(); + static Map<String, String> configOptionToEntries = new HashMap<>(); + static Set<String> deprecatedEntries = new HashSet<>(); + + static { + // EnvironmentSettings + entryToConfigOptions.put( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_TYPE), + RUNTIME_MODE.key()); + entryToConfigOptions.put( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PLANNER), + TABLE_PLANNER.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", + EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PERIODIC_WATERMARKS_INTERVAL), + AUTO_WATERMARK_INTERVAL.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MIN_STATE_RETENTION), + IDLE_STATE_RETENTION.key()); + entryToConfigOptions.put( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PARALLELISM), + DEFAULT_PARALLELISM.key()); + entryToConfigOptions.put( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MAX_PARALLELISM), + MAX_PARALLELISM.key()); + entryToConfigOptions.put( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESULT_MODE), + EXECUTION_RESULT_MODE.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MAX_TABLE_RESULT_ROWS), + EXECUTION_MAX_TABLE_RESULT_ROWS.key()); + // restart strategy + entryToConfigOptions.put( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_TYPE), + RESTART_STRATEGY.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_DELAY), + RESTART_STRATEGY_FIXED_DELAY_DELAY.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", + EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_ATTEMPTS), + RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL), + RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL), + RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL.key()); + entryToConfigOptions.put( + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL), + RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY.key()); + } + + static { + configOptionToEntries.put( + RUNTIME_MODE.key(), + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_TYPE)); + configOptionToEntries.put( + TABLE_PLANNER.key(), + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PLANNER)); + configOptionToEntries.put( + AUTO_WATERMARK_INTERVAL.key(), + String.format( + "%s.%s", + EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PERIODIC_WATERMARKS_INTERVAL)); + configOptionToEntries.put( + IDLE_STATE_RETENTION.key(), + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MIN_STATE_RETENTION)); + configOptionToEntries.put( + DEFAULT_PARALLELISM.key(), + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_PARALLELISM)); + configOptionToEntries.put( + MAX_PARALLELISM.key(), + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MAX_PARALLELISM)); + configOptionToEntries.put( + EXECUTION_RESULT_MODE.key(), + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESULT_MODE)); + configOptionToEntries.put( + EXECUTION_MAX_TABLE_RESULT_ROWS.key(), + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MAX_TABLE_RESULT_ROWS)); + // restart strategy + configOptionToEntries.put( + RESTART_STRATEGY.key(), + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_TYPE)); + configOptionToEntries.put( + RESTART_STRATEGY_FIXED_DELAY_DELAY.key(), + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_DELAY)); + configOptionToEntries.put( + RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS.key(), + String.format( + "%s.%s", + EXECUTION_ENTRY, ExecutionEntry.EXECUTION_RESTART_STRATEGY_ATTEMPTS)); + configOptionToEntries.put( + RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL.key(), + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL)); + configOptionToEntries.put( + RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL.key(), + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL)); + configOptionToEntries.put( + RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY.key(), + String.format( + "%s.%s", + EXECUTION_ENTRY, + ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL)); + } + + static { + deprecatedEntries.add( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_TIME_CHARACTERISTIC)); + deprecatedEntries.add( + String.format( + "%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_MAX_STATE_RETENTION)); + deprecatedEntries.add( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_CURRENT_CATALOG)); + deprecatedEntries.add( + String.format("%s.%s", EXECUTION_ENTRY, ExecutionEntry.EXECUTION_CURRENT_DATABASE)); + } + + // -------------------------------------------------------------------------------------------- + + public static boolean isDeprecatedKey(String key) { + return deprecatedEntries.contains(key); + } + + public static boolean isYamlKey(String key) { + return deprecatedEntries.contains(key) || entryToConfigOptions.containsKey(key); + } + + // -------------------------------------------------------------------------------------------- + + public static void setKeyToConfiguration( + Configuration configuration, String key, String value) { + // ignore deprecated key + if (isDeprecatedKey(key)) { + return; + } + if (entryToConfigOptions.containsKey(key)) { + configuration.setString(key, value); + configuration.setString(entryToConfigOptions.get(key), value); + } else { + if (configOptionToEntries.containsKey(key) + && configuration.containsKey(configOptionToEntries.get(key))) { + configuration.setString(configOptionToEntries.get(key), value); + } + configuration.setString(key, value); + } Review comment: ``` configuration.setString(key, value); if (ENTRY_TO_CONFIG_OPTION.containsKey(key)) { // old key => set new key configuration.setString(ENTRY_TO_CONFIG_OPTION.get(key), value); } else if (CONFIG_OPTION_TO_ENTRY.containsKey(key) && configuration.containsKey(CONFIG_OPTION_TO_ENTRY.get(key))) { // new key && old key exist => set old key configuration.setString(CONFIG_OPTION_TO_ENTRY.get(key), value); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org