zhuzhurk commented on code in PR #23852: URL: https://github.com/apache/flink/pull/23852#discussion_r1444574392
########## flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java: ########## @@ -40,7 +46,7 @@ class GlobalConfigurationTest { @Test void testConfigurationYAML() { Review Comment: Maybe rename it to `testConfigurationWithLegacyYAML` ########## flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java: ########## @@ -257,7 +371,19 @@ public static boolean isSensitive(String key) { return false; } + public static String getFlinkConfFilename() { + if (isStandardYaml()) { + return FLINK_CONF_FILENAME; + } else { + return LEGACY_FLINK_CONF_FILENAME; + } + } + public static boolean isStandardYaml() { return standardYaml; } + + public static void setStandardYaml(boolean standardYaml) { Review Comment: Should be marked as `@VisibleForTesting`. ########## flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java: ########## @@ -654,7 +653,13 @@ void testGetEnvironmentVariablesErroneous() { } @Test - void testWriteConfigurationAndReload() throws IOException { + void testWriteConfigurationAndReload() throws Exception { + testWriteConfigurationAndReloadInternal(false); + testWriteConfigurationAndReloadInternal(true); + } + + private void testWriteConfigurationAndReloadInternal(boolean standardYaml) throws IOException { + GlobalConfiguration.setStandardYaml(standardYaml); Review Comment: With this kind of changes, the tests can be unstable when running concurrently. ########## flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java: ########## @@ -112,25 +112,26 @@ public static String[] parseLocalStateDirectories(Configuration configuration) { } /** - * Parses a string as a map of strings. The expected format of the map is: + * Parses a string as a map of strings. The expected format of the map which parsed by FLINK Review Comment: which parsed -> to be parsed ########## flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java: ########## @@ -241,6 +263,98 @@ private static Configuration loadYAMLResource(File file) { return config; } + /** + * Flattens a nested configuration map to be only one level deep. + * + * <p>Nested keys are concatinated using the {@code KEY_SEPARATOR} character. So that: + * + * <pre> + * keyA: + * keyB: + * keyC: "hello" + * keyD: "world" + * </pre> + * + * <p>becomes: + * + * <pre> + * keyA.keyB.keyC: "hello" + * keyA.keyB.keyD: "world" + * </pre> + * + * @param config an arbitrarily nested config map + * @param keyPrefix The string to prefix the keys in the current config level + * @return A flattened, 1 level deep map + */ + @SuppressWarnings("unchecked") + private static Map<String, Object> flatten(Map<String, Object> config, String keyPrefix) { + final Map<String, Object> flattenedMap = new HashMap<>(); + + if (config == null) { Review Comment: Is this parameter nullable? ########## flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java: ########## @@ -422,7 +422,10 @@ public void appendConfiguration(Configuration config) throws IOException { .map(entry -> entry.getKey() + ": " + entry.getValue()) .collect(Collectors.toList()); - Files.write(conf.resolve("flink-conf.yaml"), configurationLines); + // NOTE: Before we change the default conf file in the flink-dist to 'config.yaml', we + // need to use the legacy flink conf file 'flink-conf.yaml' here. Review Comment: IIUC, we can do this before Flink 2.0, after we have reworked the E2E tests? ########## flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java: ########## @@ -112,15 +171,37 @@ void testInvalidConfiguration() { } @Test - // We allow malformed YAML files + // We allow malformed YAML files if loaded legacy flink conf void testInvalidYamlFile() throws IOException { - final File confFile = new File(tmpDir.getPath(), GlobalConfiguration.FLINK_CONF_FILENAME); + final File confFile = + new File(tmpDir.getPath(), GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME); try (PrintWriter pw = new PrintWriter(confFile)) { pw.append("invalid"); } assertThat(GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath())).isNotNull(); + // Clear the standard yaml flag to avoid has impact to other cases. + GlobalConfiguration.setStandardYaml(true); + } + + @Test + // We do not allow malformed YAML files if loaded standard yaml + void testInvalidYamlFile_StandardYaml() throws IOException { Review Comment: `_` should be avoided in method names ########## flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java: ########## @@ -112,15 +171,37 @@ void testInvalidConfiguration() { } @Test - // We allow malformed YAML files + // We allow malformed YAML files if loaded legacy flink conf void testInvalidYamlFile() throws IOException { Review Comment: Maybe rename it to `testInvalidLegacyYamlFile` ########## flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java: ########## @@ -85,11 +91,64 @@ void testConfigurationYAML() { assertThat(conf.getString("mykey8", "null")).isEqualTo("null"); assertThat(conf.getString("mykey9", null)).isEqualTo("myvalue10"); } finally { + // Clear the standard yaml flag to avoid has impact to other cases. Review Comment: avoid has impact -> avoid impact ########## flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java: ########## @@ -112,25 +112,26 @@ public static String[] parseLocalStateDirectories(Configuration configuration) { } /** - * Parses a string as a map of strings. The expected format of the map is: + * Parses a string as a map of strings. The expected format of the map which parsed by FLINK + * parser is: * * <pre> * key1:value1,key2:value2 * </pre> * + * <p>The expected format of the map which parsed by standard YAML parser is: Review Comment: which parsed -> to be parsed ########## flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java: ########## @@ -654,7 +653,13 @@ void testGetEnvironmentVariablesErroneous() { } @Test - void testWriteConfigurationAndReload() throws IOException { + void testWriteConfigurationAndReload() throws Exception { + testWriteConfigurationAndReloadInternal(false); + testWriteConfigurationAndReloadInternal(true); Review Comment: It's better to have two test methods to verify them separately. ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java: ########## @@ -136,7 +136,7 @@ void testUpdateWithCustomColumns() { "CREATE TABLE t (a int, b string, c double) WITH" + " (" + "'connector' = 'test-update-delete', " - + "'required-columns-for-update' = 'b;c', " + + "'required-columns-for-update' = '[b, c]', " Review Comment: The incompatibility of existing SQLs can be a problem. I think we need to find way that allows users to use standard YAML config file without changing the existing sql jobs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org