twalthr commented on a change in pull request #18980: URL: https://github.com/apache/flink/pull/18980#discussion_r820476699
########## File path: flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java ########## @@ -185,6 +184,7 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro @Override public JobClient executeAsync(StreamGraph streamGraph) throws Exception { validateAllowedExecution(); + checkNotAllowedConfigurations(); Review comment: Remove this from the PR. It is unrelated and has been fixed already by Fabian. ########## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java ########## @@ -94,7 +94,9 @@ static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) { return create( executionEnvironment, - EnvironmentSettings.fromConfiguration(executionEnvironment.getConfiguration())); + EnvironmentSettings.newInstance() + .withConfiguration((Configuration) executionEnvironment.getConfiguration()) Review comment: Is this correct? It means that the root configuration ends up in the `TableConfig` non-root-config part. This should just be a default instance, no? Maybe an internal method that accepts `null` is also fine. ########## File path: flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala ########## @@ -292,6 +294,12 @@ class StreamTableEnvironmentImpl ( object StreamTableEnvironmentImpl { + def create( + executionEnvironment: StreamExecutionEnvironment, + settings: EnvironmentSettings): StreamTableEnvironmentImpl = + create(executionEnvironment, settings, TableConfig.getDefault) + + @VisibleForTesting Review comment: shouldn't we drop this method as well? Why does it take `TableConfig`? ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java ########## @@ -99,8 +98,8 @@ new DynamicTableSinkSpec( ContextResolvedTable.temporary( ObjectIdentifier.of( - DEFAULT_BUILTIN_CATALOG, - DEFAULT_BUILTIN_DATABASE, + TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(), Review comment: use the constants from `CatalogManagerMocks`? ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala ########## @@ -49,8 +49,8 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase( override def before(): Unit = { super.before() val setting = EnvironmentSettings.newInstance().inStreamingMode().build() - val config = new TestTableConfig - this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config) +// val config = new TestTableConfig Review comment: remove? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ########## @@ -311,6 +311,7 @@ public ExecutionConfig getConfig() { */ public StreamExecutionEnvironment setParallelism(int parallelism) { config.setParallelism(parallelism); + configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism); Review comment: Remove this change for now. We should perform this kind of changes consistently for all Core/PipelineOptions. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java ########## @@ -40,6 +40,24 @@ public class TableConfigOptions { private TableConfigOptions() {} + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<String> TABLE_CATALOG_NAME = + key("table.catalog-name") Review comment: we should be a bit more specific here: `table.builtin-catalog-name` and `table.builtin-database-name` ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala ########## @@ -121,7 +124,7 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase { elements.filter(e => !e.isInstanceOf[Watermark]).toList } - class TestTableConfig extends TableConfig { + class TestTableConfig() extends TableConfig { Review comment: unnecessary change? ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java ########## @@ -145,39 +135,41 @@ public Configuration toConfiguration() { * TableEnvironment}. */ public String getBuiltInCatalogName() { - return builtInCatalogName; + return configuration.get(TABLE_CATALOG_NAME); } /** * Gets the specified name of the default database in the initial catalog to be created when * instantiating a {@link TableEnvironment}. */ public String getBuiltInDatabaseName() { - return builtInDatabaseName; + return configuration.get(TABLE_DATABASE_NAME); } /** Tells if the {@link TableEnvironment} should work in a batch or streaming mode. */ public boolean isStreamingMode() { - return isStreamingMode; + return configuration.get(RUNTIME_MODE) == STREAMING; } /** A builder for {@link EnvironmentSettings}. */ @PublicEvolving public static class Builder { - private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG; - private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE; - private boolean isStreamingMode = true; + private final Configuration configuration = new Configuration(); + + public Builder() { + configuration.set(RUNTIME_MODE, STREAMING); Review comment: let's not set the mode here, but determine it lazily. otherwise the merging with the root config could be problematic with the comment I had in `StreamTableEnvironment.create` ########## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java ########## @@ -30,9 +30,11 @@ /** Mock implementations of {@link CatalogManager} for testing purposes. */ public final class CatalogManagerMocks { - public static final String DEFAULT_CATALOG = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG; + public static final String DEFAULT_CATALOG = Review comment: nit: you could also use these constants in the Hive tests above if available. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java ########## @@ -111,32 +104,29 @@ public static Builder newInstance() { return new Builder(); } - /** Creates an instance of {@link EnvironmentSettings} from configuration. */ + /** + * Creates an instance of {@link EnvironmentSettings} from configuration. + * + * @deprecated use {@link Builder#newInstance()}{@code Review comment: nit: link to `withConfiguration` directly or use a full code example instead of a mixture of linking and code. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala ########## @@ -132,7 +135,8 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase { override def getMaxIdleStateRetentionTime: Long = maxIdleStateRetentionTime override def setIdleStateRetentionTime(minTime: Time, maxTime: Time): Unit = { - super.setIdleStateRetention(Duration.ofMillis(minTime.toMilliseconds)) + val duration = Duration.ofMillis(minTime.toMilliseconds) Review comment: unnecessary change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org