wuchong commented on a change in pull request #15197:
URL: https://github.com/apache/flink/pull/15197#discussion_r594266993



##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
##########
@@ -453,22 +457,27 @@ private void callSet(SqlCommandCall cmdCall) {
                 terminal.writer()
                         
.println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
             } else {
-                properties.entrySet().stream()
-                        .map((e) -> e.getKey() + "=" + e.getValue())
-                        .sorted()
-                        .forEach((p) -> terminal.writer().println(p));
+                List<String> prettyEntries = 
ConfigurationUtils.getPropertiesInPretty(properties);
+                prettyEntries.forEach(entry -> 
terminal.writer().println(entry));
             }
         }
         // set a property
         else {
+            String key = cmdCall.operands[0].trim();
+            String value = cmdCall.operands[1].trim();
             try {
-                executor.setSessionProperty(
-                        sessionId, cmdCall.operands[0], 
cmdCall.operands[1].trim());
+                executor.setSessionProperty(sessionId, key, value);
             } catch (SqlExecutionException e) {
                 printExecutionException(e);
                 return;
             }
-            
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi());
+            if (ConfigurationUtils.isDeprecatedKey(key)) {
+                
terminal.writer().println(CliStrings.messageWarning(MESSAGE_SET_DEPRECATED_KEY));

Review comment:
       Should we also show the suggest key? For example: `The specified key 
'execution.mode' is deprecated, please use 'execution.runtime-mode' instead.`.

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/config/ConfigurationUtilsTest.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
+
+/** Test {@link ConfigurationUtils}. */
+public class ConfigurationUtilsTest {
+
+    private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
+
+    @Test
+    public void testSetYamlKey() {
+        Configuration configuration = new Configuration();
+        ConfigurationUtils.setKeyToConfiguration(configuration, 
"execution.type", "batch");
+        Assert.assertEquals("batch", configuration.getString("execution.type", 
null));
+        Assert.assertEquals(RuntimeExecutionMode.BATCH, 
configuration.get(RUNTIME_MODE));

Review comment:
       I think it would be better to convert Configuration into Map, and assert 
on the Map. This can avoid using the deprecated methods (`getString`) and avoid 
the effect of default values. 

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
##########
@@ -453,22 +457,27 @@ private void callSet(SqlCommandCall cmdCall) {
                 terminal.writer()
                         
.println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
             } else {
-                properties.entrySet().stream()
-                        .map((e) -> e.getKey() + "=" + e.getValue())
-                        .sorted()
-                        .forEach((p) -> terminal.writer().println(p));
+                List<String> prettyEntries = 
ConfigurationUtils.getPropertiesInPretty(properties);
+                prettyEntries.forEach(entry -> 
terminal.writer().println(entry));
             }
         }
         // set a property
         else {
+            String key = cmdCall.operands[0].trim();
+            String value = cmdCall.operands[1].trim();
             try {
-                executor.setSessionProperty(
-                        sessionId, cmdCall.operands[0], 
cmdCall.operands[1].trim());
+                executor.setSessionProperty(sessionId, key, value);
             } catch (SqlExecutionException e) {
                 printExecutionException(e);
                 return;
             }
-            
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi());
+            if (ConfigurationUtils.isDeprecatedKey(key)) {
+                
terminal.writer().println(CliStrings.messageWarning(MESSAGE_SET_DEPRECATED_KEY));
+                return;
+            } else if (ConfigurationUtils.isYamlKey(key)) {

Review comment:
       `The specified key 'execution.mode' is deprecated, please use 
'execution.runtime-mode' instead.`.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
##########
@@ -112,15 +104,17 @@ public TableEnvironment getTableEnvironment() {
     // 
------------------------------------------------------------------------------------------------------------------
 
     private TableEnvironment createTableEnvironment() {
-        EnvironmentSettings settings = 
environment.getExecution().getEnvironmentSettings();
+        // check the value of TABLE_PLANNER and RUNTIME_MODE
+        EnvironmentSettings settings = 
EnvironmentSettings.fromConfiguration(flinkConfig);
         TableConfig config = new TableConfig();
         config.addConfiguration(flinkConfig);
-        // Override the value in configuration.
-        // TODO: use `table.planner` and `execution.runtime-mode` to configure 
the TableEnvironment
-        // in FLINK-21462.
-        config.addConfiguration(settings.toConfiguration());
+        if 
(flinkConfig.get(ExecutionOptions.RUNTIME_MODE).equals(RuntimeExecutionMode.BATCH)
+                && 
flinkConfig.get(TableConfigOptions.TABLE_PLANNER).equals(PlannerType.OLD)) {

Review comment:
       We can use methods on `EnvironmentSettings` to check the runtime mode 
and planner. 

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options used in sql client. */
+public class SqlClientOptions {
+    private SqlClientOptions() {}
+
+    public static final ConfigOption<Integer> EXECUTION_MAX_TABLE_RESULT_ROWS =
+            ConfigOptions.key("sql-client.execution.max-table-result.rows")
+                    .intType()
+                    .defaultValue(1000_000)
+                    .withDescription(
+                            "The number of rows to cache when in the table 
mode. If the number of rows exceeds the "
+                                    + "specified value, it retries the row in 
the FIFO style.");
+
+    public static final ConfigOption<ResultMode> EXECUTION_RESULT_MODE =
+            ConfigOptions.key("sql-client.execution.result-mode")
+                    .enumType(ResultMode.class)
+                    .defaultValue(ResultMode.TABLE)
+                    .withDescription(
+                            "Determine the mode when display the query result. 
The available values are ['table', 'tableau', 'changelog'].");

Review comment:
       Please explain the difference between the enums. You can take the 
[docs](https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#running-sql-queries)
 as an example.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
##########
@@ -453,22 +457,27 @@ private void callSet(SqlCommandCall cmdCall) {
                 terminal.writer()
                         
.println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
             } else {
-                properties.entrySet().stream()
-                        .map((e) -> e.getKey() + "=" + e.getValue())
-                        .sorted()
-                        .forEach((p) -> terminal.writer().println(p));
+                List<String> prettyEntries = 
ConfigurationUtils.getPropertiesInPretty(properties);
+                prettyEntries.forEach(entry -> 
terminal.writer().println(entry));
             }
         }
         // set a property
         else {
+            String key = cmdCall.operands[0].trim();
+            String value = cmdCall.operands[1].trim();
             try {
-                executor.setSessionProperty(
-                        sessionId, cmdCall.operands[0], 
cmdCall.operands[1].trim());
+                executor.setSessionProperty(sessionId, key, value);
             } catch (SqlExecutionException e) {
                 printExecutionException(e);
                 return;
             }
-            
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi());
+            if (ConfigurationUtils.isDeprecatedKey(key)) {
+                
terminal.writer().println(CliStrings.messageWarning(MESSAGE_SET_DEPRECATED_KEY));
+                return;
+            } else if (ConfigurationUtils.isYamlKey(key)) {

Review comment:
       Users don't understand what is YAML key. What's the difference between 
YAML key and deperacted key?

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
##########
@@ -44,25 +49,23 @@ public ResultStore() {
      * Creates a result. Might start threads or opens sockets so every created 
result must be
      * closed.
      */
-    public DynamicResult createResult(Environment env, TableResult 
tableResult) {
-        if (env.getExecution().inStreamingMode()) {
-            if (env.getExecution().isChangelogMode() || 
env.getExecution().isTableauMode()) {
-                return new ChangelogCollectResult(tableResult);
-            } else {
-                return new MaterializedCollectStreamResult(
-                        tableResult, 
env.getExecution().getMaxTableResultRows());
-            }
-        } else {
-            // Batch Execution
-            if (env.getExecution().isTableMode()) {
-                return new MaterializedCollectBatchResult(
-                        tableResult, 
env.getExecution().getMaxTableResultRows());
-            } else if (env.getExecution().isTableauMode()) {
+    public DynamicResult createResult(ReadableConfig config, TableResult 
tableResult) {
+        switch (config.get(EXECUTION_RESULT_MODE)) {
+            case CHANGELOG:

Review comment:
       batch doesn't support `CHANGELOG` mode. 

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
##########
@@ -234,4 +229,15 @@ public SessionState(
             this.functionCatalog = functionCatalog;
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private void resetSessionConfigurationToValue(Configuration newValue) {

Review comment:
       ```suggestion
       private void resetSessionConfigurationToDefault(Configuration 
defaultConf) {
   ```
   
   

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
##########
@@ -138,12 +142,7 @@ protected ExecutionContext getExecutionContext(String 
sessionId) throws SqlExecu
 
     @Override
     public Map<String, String> getSessionProperties(String sessionId) throws 
SqlExecutionException {
-        final Environment env = 
getSessionContext(sessionId).getSessionEnvironment();
-        final Map<String, String> properties = new HashMap<>();
-        properties.putAll(env.getExecution().asTopLevelMap());
-        properties.putAll(env.getDeployment().asTopLevelMap());
-        properties.putAll(env.getConfiguration().asMap());
-        return properties;
+        return ((Configuration) 
getSessionContext(sessionId).getReadableConfig()).toMap();

Review comment:
       We can add a method `Map<String, String> getConfigMap()` in 
`SessionContext` to avoid type cast. 

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/LegacyTableEnvironmentInitializer.java
##########
@@ -64,11 +64,8 @@
 
 import static org.apache.flink.table.api.Expressions.$;
 
-/**
- * Utils to initialize {@link TableEnvironment} from {@link Environment}.
- *
- * <p>It will be removed when sql-client.yaml is deprecated.
- */
+/** Utils to initialize {@link TableEnvironment} from {@link Environment}. */

Review comment:
       ```suggestion
   /**
    * Utils to initialize {@link TableEnvironment} from {@link Environment}.
    *
    * @deprecated This will be dropped in Flink 1.4 with dropping support of 
{@code sql-client.yaml}
    *     configuration file.
    */
   ```
   
   Add `@deprecated` annotation for every deperacted files. 

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options used in sql client. */
+public class SqlClientOptions {

Review comment:
       Please add a `SQL Client Options` section in this page and auto-generate 
the option table : 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/

##########
File path: flink-table/flink-sql-client/src/test/resources/sql/set.q
##########
@@ -42,20 +42,11 @@ CREATE TABLE hive_table (
 [INFO] Table has been created.
 !info
 
-# list the configured configuration
-set;
-table.sql-dialect=hive
-!ok
-

Review comment:
       Why remove this?

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
##########
@@ -444,6 +444,7 @@ public void testStreamQueryExecutionSink() throws Exception 
{
         replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
         replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
         replaceVars.put("$VAR_MAX_ROWS", "100");
+        replaceVars.put("$VAR_RESULT_MODE", "table");

Review comment:
       Why need this?

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
##########
@@ -453,22 +457,27 @@ private void callSet(SqlCommandCall cmdCall) {
                 terminal.writer()
                         
.println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
             } else {
-                properties.entrySet().stream()
-                        .map((e) -> e.getKey() + "=" + e.getValue())
-                        .sorted()
-                        .forEach((p) -> terminal.writer().println(p));
+                List<String> prettyEntries = 
ConfigurationUtils.getPropertiesInPretty(properties);
+                prettyEntries.forEach(entry -> 
terminal.writer().println(entry));
             }
         }
         // set a property
         else {
+            String key = cmdCall.operands[0].trim();
+            String value = cmdCall.operands[1].trim();
             try {
-                executor.setSessionProperty(
-                        sessionId, cmdCall.operands[0], 
cmdCall.operands[1].trim());
+                executor.setSessionProperty(sessionId, key, value);
             } catch (SqlExecutionException e) {
                 printExecutionException(e);
                 return;
             }
-            
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi());
+            if (ConfigurationUtils.isDeprecatedKey(key)) {
+                
terminal.writer().println(CliStrings.messageWarning(MESSAGE_SET_DEPRECATED_KEY));

Review comment:
       `The specified key is not supported anymore.`.

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -122,12 +120,7 @@ public void testSetAndResetOption() throws Exception {
                         .getConfig()
                         .getConfiguration()
                         .getString(TABLE_SQL_DIALECT));
-        Assert.assertNull(
-                sessionContext
-                        .getSessionEnvironment()
-                        .getConfiguration()
-                        .asMap()
-                        .get("pipeline.name"));
+        Assert.assertNull(sessionContext.getReadableConfig().get(NAME));

Review comment:
       Why not use 
   
   ```
   sessionContext
                           .getExecutionContext()
                           .getTableEnvironment()
                           .getConfig()
                           .getConfiguration()
   ```
   ?

##########
File path: flink-table/flink-sql-client/src/test/resources/sql/set.q
##########
@@ -42,20 +42,11 @@ CREATE TABLE hive_table (
 [INFO] Table has been created.
 !info
 
-# list the configured configuration
-set;
-table.sql-dialect=hive
-!ok
-
 # reset the configuration
 reset;
 [INFO] All session properties have been set to their default values.
 !info
 
-set;
-[INFO] Result was empty.
-!info

Review comment:
       Why remove this?

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/config/ConfigurationUtilsTest.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
+
+/** Test {@link ConfigurationUtils}. */
+public class ConfigurationUtilsTest {
+
+    private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
+
+    @Test
+    public void testSetYamlKey() {
+        Configuration configuration = new Configuration();
+        ConfigurationUtils.setKeyToConfiguration(configuration, 
"execution.type", "batch");
+        Assert.assertEquals("batch", configuration.getString("execution.type", 
null));

Review comment:
       Remove all the `Assert.` prefix. 

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/config/ConfigurationUtilsTest.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
+
+/** Test {@link ConfigurationUtils}. */
+public class ConfigurationUtilsTest {
+
+    private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
+
+    @Test
+    public void testSetYamlKey() {
+        Configuration configuration = new Configuration();
+        ConfigurationUtils.setKeyToConfiguration(configuration, 
"execution.type", "batch");
+        Assert.assertEquals("batch", configuration.getString("execution.type", 
null));
+        Assert.assertEquals(RuntimeExecutionMode.BATCH, 
configuration.get(RUNTIME_MODE));
+    }
+
+    @Test
+    public void testSetConfigOption() {
+        Configuration configuration = new Configuration();
+        ConfigurationUtils.setKeyToConfiguration(configuration, 
RUNTIME_MODE.key(), "batch");
+        Assert.assertNull(configuration.getString("execution.type", null));
+        Assert.assertEquals(RuntimeExecutionMode.BATCH, 
configuration.get(RUNTIME_MODE));
+    }
+
+    @Test
+    public void testModifyYamlKeyWhenSetConfigOptionOnly() {
+        Configuration configuration = new Configuration();
+        // set config option
+        ConfigurationUtils.setKeyToConfiguration(configuration, 
RUNTIME_MODE.key(), "batch");
+        // modify yaml key
+        ConfigurationUtils.setKeyToConfiguration(configuration, 
"execution.type", "streaming");
+
+        Assert.assertEquals(RuntimeExecutionMode.STREAMING, 
configuration.get(RUNTIME_MODE));
+        Assert.assertEquals("streaming", 
configuration.getString("execution.type", null));
+    }
+
+    @Test
+    public void testModifyConfigOptionWhenSetYamlKey() {
+        Configuration configuration = new Configuration();
+        // set yaml key
+        ConfigurationUtils.setKeyToConfiguration(configuration, 
"execution.type", "streaming");
+        // modify config option
+        ConfigurationUtils.setKeyToConfiguration(configuration, 
RUNTIME_MODE.key(), "batch");
+
+        Assert.assertEquals(RuntimeExecutionMode.BATCH, 
configuration.get(RUNTIME_MODE));
+        Assert.assertEquals("batch", configuration.getString("execution.type", 
null));
+    }
+
+    @Test
+    public void testExecutionEntryToConfigOption() throws Exception {
+        final Environment env = getEnvironment();
+
+        Configuration configuration =
+                
ConfigurationUtils.convertExecutionEntryToConfiguration(env.getExecution());
+
+        Assert.assertEquals(RuntimeExecutionMode.BATCH, 
configuration.get(RUNTIME_MODE));
+        Assert.assertEquals(ResultMode.TABLE, 
configuration.get(EXECUTION_RESULT_MODE));
+        Assert.assertEquals(100, 
configuration.getInteger(EXECUTION_MAX_TABLE_RESULT_ROWS));
+        Assert.assertEquals("failure-rate", 
configuration.getString(RESTART_STRATEGY));
+
+        List<String> items = 
ConfigurationUtils.getPropertiesInPretty(configuration.toMap());
+        List<String> expectedItems =
+                Arrays.asList(
+                        "[DEPRECATED]execution.max-parallelism=16",
+                        "[DEPRECATED]execution.max-table-result-rows=100",
+                        "[DEPRECATED]execution.min-idle-state-retention=1000",
+                        "[DEPRECATED]execution.parallelism=1",
+                        
"[DEPRECATED]execution.periodic-watermarks-interval=99",
+                        "[DEPRECATED]execution.planner=old",
+                        "[DEPRECATED]execution.restart-strategy.delay=1000",
+                        
"[DEPRECATED]execution.restart-strategy.failure-rate-interval=99000",
+                        
"[DEPRECATED]execution.restart-strategy.max-failures-per-interval=10",
+                        
"[DEPRECATED]execution.restart-strategy.type=failure-rate",
+                        "[DEPRECATED]execution.result-mode=table",
+                        "[DEPRECATED]execution.type=batch",

Review comment:
       Would be better to put deprecated keys at the end, and add a space 
before the deprecated key, e.g. `[DEPRECATED] execution.type=batch`.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigurationUtils.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static 
org.apache.flink.configuration.PipelineOptions.AUTO_WATERMARK_INTERVAL;
+import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.IDLE_STATE_RETENTION;
+import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_PLANNER;
+import static org.apache.flink.table.client.config.Environment.EXECUTION_ENTRY;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
+
+/**
+ * Convert {@link ExecutionEntry} to {@link Configuration}.
+ *
+ * <ul>
+ *   The rule is simple.
+ *   <li>If the key is from {@link ExecutionEntry}, set the key and 
corresponding {@link
+ *       ConfigOption} at the same time.
+ *   <li>If the key is {@link ConfigOption} and corresponding {@link 
ExecutionEntry} is not set, set
+ *       the value for {@link ConfigOption} only.
+ *   <li>If the key is {@link ConfigOption} and corresponding {@link 
ExecutionEntry} is set, set the
+ *       value for both.
+ * </ul>
+ *
+ * <p>When read all entries from the {@link Configuration}, read the key from 
{@link ExecutionEntry}
+ * first and then read the {@link ConfigOption}.
+ *
+ * <p>When YAML is removed from the project, it should also remove this helper 
class.
+ */
+@Deprecated
+public class ConfigurationUtils {
+
+    static Map<String, String> entryToConfigOptions = new HashMap<>();
+    static Map<String, String> configOptionToEntries = new HashMap<>();
+    static Set<String> deprecatedEntries = new HashSet<>();
+
+    static {
+        // EnvironmentSettings
+        entryToConfigOptions.put(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_TYPE),
+                RUNTIME_MODE.key());
+        entryToConfigOptions.put(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PLANNER),
+                TABLE_PLANNER.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PERIODIC_WATERMARKS_INTERVAL),
+                AUTO_WATERMARK_INTERVAL.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MIN_STATE_RETENTION),
+                IDLE_STATE_RETENTION.key());
+        entryToConfigOptions.put(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PARALLELISM),
+                DEFAULT_PARALLELISM.key());
+        entryToConfigOptions.put(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MAX_PARALLELISM),
+                MAX_PARALLELISM.key());
+        entryToConfigOptions.put(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESULT_MODE),
+                EXECUTION_RESULT_MODE.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MAX_TABLE_RESULT_ROWS),
+                EXECUTION_MAX_TABLE_RESULT_ROWS.key());
+        // restart strategy
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_TYPE),
+                RESTART_STRATEGY.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_DELAY),
+                RESTART_STRATEGY_FIXED_DELAY_DELAY.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_ATTEMPTS),
+                RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL),
+                RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL),
+                RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL),
+                
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY.key());
+    }
+
+    static {
+        configOptionToEntries.put(
+                RUNTIME_MODE.key(),
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_TYPE));
+        configOptionToEntries.put(
+                TABLE_PLANNER.key(),
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PLANNER));
+        configOptionToEntries.put(
+                AUTO_WATERMARK_INTERVAL.key(),
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PERIODIC_WATERMARKS_INTERVAL));
+        configOptionToEntries.put(
+                IDLE_STATE_RETENTION.key(),
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MIN_STATE_RETENTION));
+        configOptionToEntries.put(
+                DEFAULT_PARALLELISM.key(),
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PARALLELISM));
+        configOptionToEntries.put(
+                MAX_PARALLELISM.key(),
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MAX_PARALLELISM));
+        configOptionToEntries.put(
+                EXECUTION_RESULT_MODE.key(),
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESULT_MODE));
+        configOptionToEntries.put(
+                EXECUTION_MAX_TABLE_RESULT_ROWS.key(),
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MAX_TABLE_RESULT_ROWS));
+        // restart strategy
+        configOptionToEntries.put(
+                RESTART_STRATEGY.key(),
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_TYPE));
+        configOptionToEntries.put(
+                RESTART_STRATEGY_FIXED_DELAY_DELAY.key(),
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_DELAY));
+        configOptionToEntries.put(
+                RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS.key(),
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_ATTEMPTS));
+        configOptionToEntries.put(
+                RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL.key(),
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL));
+        configOptionToEntries.put(
+                RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL.key(),
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL));
+        configOptionToEntries.put(
+                
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY.key(),
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL));
+    }
+
+    static {
+        deprecatedEntries.add(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_TIME_CHARACTERISTIC));
+        deprecatedEntries.add(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MAX_STATE_RETENTION));
+        deprecatedEntries.add(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_CURRENT_CATALOG));
+        deprecatedEntries.add(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_CURRENT_DATABASE));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    public static boolean isDeprecatedKey(String key) {
+        return deprecatedEntries.contains(key);
+    }
+
+    public static boolean isYamlKey(String key) {
+        return deprecatedEntries.contains(key) || 
entryToConfigOptions.containsKey(key);

Review comment:
       `entryToConfigOptions.containsKey(key)`.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigurationUtils.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static 
org.apache.flink.configuration.PipelineOptions.AUTO_WATERMARK_INTERVAL;
+import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.IDLE_STATE_RETENTION;
+import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_PLANNER;
+import static org.apache.flink.table.client.config.Environment.EXECUTION_ENTRY;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
+
+/**
+ * Convert {@link ExecutionEntry} to {@link Configuration}.
+ *
+ * <ul>
+ *   The rule is simple.
+ *   <li>If the key is from {@link ExecutionEntry}, set the key and 
corresponding {@link
+ *       ConfigOption} at the same time.
+ *   <li>If the key is {@link ConfigOption} and corresponding {@link 
ExecutionEntry} is not set, set
+ *       the value for {@link ConfigOption} only.
+ *   <li>If the key is {@link ConfigOption} and corresponding {@link 
ExecutionEntry} is set, set the
+ *       value for both.
+ * </ul>
+ *
+ * <p>When read all entries from the {@link Configuration}, read the key from 
{@link ExecutionEntry}
+ * first and then read the {@link ConfigOption}.
+ *
+ * <p>When YAML is removed from the project, it should also remove this helper 
class.
+ */
+@Deprecated
+public class ConfigurationUtils {

Review comment:
       Could we merge this class and `ConfigUtils`? It's quite confusing there 
are so many config utils. Maybe we can call it `YamlConfigUtils` to be more 
specific. 

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigurationUtils.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static 
org.apache.flink.configuration.PipelineOptions.AUTO_WATERMARK_INTERVAL;
+import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS;
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.IDLE_STATE_RETENTION;
+import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_PLANNER;
+import static org.apache.flink.table.client.config.Environment.EXECUTION_ENTRY;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
+import static 
org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
+
+/**
+ * Convert {@link ExecutionEntry} to {@link Configuration}.
+ *
+ * <ul>
+ *   The rule is simple.
+ *   <li>If the key is from {@link ExecutionEntry}, set the key and 
corresponding {@link
+ *       ConfigOption} at the same time.
+ *   <li>If the key is {@link ConfigOption} and corresponding {@link 
ExecutionEntry} is not set, set
+ *       the value for {@link ConfigOption} only.
+ *   <li>If the key is {@link ConfigOption} and corresponding {@link 
ExecutionEntry} is set, set the
+ *       value for both.
+ * </ul>
+ *
+ * <p>When read all entries from the {@link Configuration}, read the key from 
{@link ExecutionEntry}
+ * first and then read the {@link ConfigOption}.
+ *
+ * <p>When YAML is removed from the project, it should also remove this helper 
class.
+ */
+@Deprecated
+public class ConfigurationUtils {
+
+    static Map<String, String> entryToConfigOptions = new HashMap<>();
+    static Map<String, String> configOptionToEntries = new HashMap<>();
+    static Set<String> deprecatedEntries = new HashSet<>();
+
+    static {
+        // EnvironmentSettings
+        entryToConfigOptions.put(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_TYPE),
+                RUNTIME_MODE.key());
+        entryToConfigOptions.put(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PLANNER),
+                TABLE_PLANNER.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PERIODIC_WATERMARKS_INTERVAL),
+                AUTO_WATERMARK_INTERVAL.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MIN_STATE_RETENTION),
+                IDLE_STATE_RETENTION.key());
+        entryToConfigOptions.put(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PARALLELISM),
+                DEFAULT_PARALLELISM.key());
+        entryToConfigOptions.put(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MAX_PARALLELISM),
+                MAX_PARALLELISM.key());
+        entryToConfigOptions.put(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESULT_MODE),
+                EXECUTION_RESULT_MODE.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MAX_TABLE_RESULT_ROWS),
+                EXECUTION_MAX_TABLE_RESULT_ROWS.key());
+        // restart strategy
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_TYPE),
+                RESTART_STRATEGY.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_DELAY),
+                RESTART_STRATEGY_FIXED_DELAY_DELAY.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_ATTEMPTS),
+                RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL),
+                RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL),
+                RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL.key());
+        entryToConfigOptions.put(
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL),
+                
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY.key());
+    }
+
+    static {
+        configOptionToEntries.put(
+                RUNTIME_MODE.key(),
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_TYPE));
+        configOptionToEntries.put(
+                TABLE_PLANNER.key(),
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PLANNER));
+        configOptionToEntries.put(
+                AUTO_WATERMARK_INTERVAL.key(),
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PERIODIC_WATERMARKS_INTERVAL));
+        configOptionToEntries.put(
+                IDLE_STATE_RETENTION.key(),
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MIN_STATE_RETENTION));
+        configOptionToEntries.put(
+                DEFAULT_PARALLELISM.key(),
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_PARALLELISM));
+        configOptionToEntries.put(
+                MAX_PARALLELISM.key(),
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MAX_PARALLELISM));
+        configOptionToEntries.put(
+                EXECUTION_RESULT_MODE.key(),
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESULT_MODE));
+        configOptionToEntries.put(
+                EXECUTION_MAX_TABLE_RESULT_ROWS.key(),
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MAX_TABLE_RESULT_ROWS));
+        // restart strategy
+        configOptionToEntries.put(
+                RESTART_STRATEGY.key(),
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_TYPE));
+        configOptionToEntries.put(
+                RESTART_STRATEGY_FIXED_DELAY_DELAY.key(),
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_DELAY));
+        configOptionToEntries.put(
+                RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS.key(),
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_RESTART_STRATEGY_ATTEMPTS));
+        configOptionToEntries.put(
+                RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL.key(),
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL));
+        configOptionToEntries.put(
+                RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL.key(),
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL));
+        configOptionToEntries.put(
+                
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY.key(),
+                String.format(
+                        "%s.%s",
+                        EXECUTION_ENTRY,
+                        
ExecutionEntry.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL));
+    }
+
+    static {
+        deprecatedEntries.add(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_TIME_CHARACTERISTIC));
+        deprecatedEntries.add(
+                String.format(
+                        "%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_MAX_STATE_RETENTION));
+        deprecatedEntries.add(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_CURRENT_CATALOG));
+        deprecatedEntries.add(
+                String.format("%s.%s", EXECUTION_ENTRY, 
ExecutionEntry.EXECUTION_CURRENT_DATABASE));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    public static boolean isDeprecatedKey(String key) {
+        return deprecatedEntries.contains(key);
+    }
+
+    public static boolean isYamlKey(String key) {
+        return deprecatedEntries.contains(key) || 
entryToConfigOptions.containsKey(key);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    public static void setKeyToConfiguration(
+            Configuration configuration, String key, String value) {
+        // ignore deprecated key
+        if (isDeprecatedKey(key)) {
+            return;
+        }
+        if (entryToConfigOptions.containsKey(key)) {
+            configuration.setString(key, value);
+            configuration.setString(entryToConfigOptions.get(key), value);
+        } else {
+            if (configOptionToEntries.containsKey(key)
+                    && 
configuration.containsKey(configOptionToEntries.get(key))) {
+                configuration.setString(configOptionToEntries.get(key), value);
+            }
+            configuration.setString(key, value);
+        }

Review comment:
       ```
           configuration.setString(key, value);
   
           if (ENTRY_TO_CONFIG_OPTION.containsKey(key)) {
               // old key => set new key
               configuration.setString(ENTRY_TO_CONFIG_OPTION.get(key), value);
           } else if (CONFIG_OPTION_TO_ENTRY.containsKey(key)
                   && 
configuration.containsKey(CONFIG_OPTION_TO_ENTRY.get(key))) {
               // new key && old key exist => set old key
               configuration.setString(CONFIG_OPTION_TO_ENTRY.get(key), value);
           }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to