twalthr commented on a change in pull request #18980:
URL: https://github.com/apache/flink/pull/18980#discussion_r820476699



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -185,6 +184,7 @@ private JobExecutionResult getJobExecutionResult(final 
JobClient jobClient) thro
     @Override
     public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
         validateAllowedExecution();
+        checkNotAllowedConfigurations();

Review comment:
       Remove this from the PR. It is unrelated and has been fixed already by 
Fabian.

##########
File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
##########
@@ -94,7 +94,9 @@
     static StreamTableEnvironment create(StreamExecutionEnvironment 
executionEnvironment) {
         return create(
                 executionEnvironment,
-                
EnvironmentSettings.fromConfiguration(executionEnvironment.getConfiguration()));
+                EnvironmentSettings.newInstance()
+                        .withConfiguration((Configuration) 
executionEnvironment.getConfiguration())

Review comment:
       Is this correct? It means that the root configuration ends up in the 
`TableConfig` non-root-config part. This should just be a default instance, no? 
Maybe an internal method that accepts `null` is also fine.

##########
File path: 
flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
##########
@@ -292,6 +294,12 @@ class StreamTableEnvironmentImpl (
 
 object StreamTableEnvironmentImpl {
 
+  def create(
+              executionEnvironment: StreamExecutionEnvironment,
+              settings: EnvironmentSettings): StreamTableEnvironmentImpl =
+    create(executionEnvironment, settings, TableConfig.getDefault)
+
+  @VisibleForTesting

Review comment:
       shouldn't we drop this method as well? Why does it take `TableConfig`?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
##########
@@ -99,8 +98,8 @@
                 new DynamicTableSinkSpec(
                         ContextResolvedTable.temporary(
                                 ObjectIdentifier.of(
-                                        DEFAULT_BUILTIN_CATALOG,
-                                        DEFAULT_BUILTIN_DATABASE,
+                                        
TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(),

Review comment:
       use the constants from `CatalogManagerMocks`?

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
##########
@@ -49,8 +49,8 @@ class TableAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
   override def before(): Unit = {
     super.before()
     val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
-    val config = new TestTableConfig
-    this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
+//    val config = new TestTableConfig

Review comment:
       remove?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -311,6 +311,7 @@ public ExecutionConfig getConfig() {
      */
     public StreamExecutionEnvironment setParallelism(int parallelism) {
         config.setParallelism(parallelism);
+        configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);

Review comment:
       Remove this change for now. We should perform this kind of changes 
consistently for all Core/PipelineOptions.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
##########
@@ -40,6 +40,24 @@
 public class TableConfigOptions {
     private TableConfigOptions() {}
 
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<String> TABLE_CATALOG_NAME =
+            key("table.catalog-name")

Review comment:
       we should be a bit more specific here: `table.builtin-catalog-name` and 
`table.builtin-database-name`

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
##########
@@ -121,7 +124,7 @@ class HarnessTestBase(mode: StateBackendMode) extends 
StreamingTestBase {
     elements.filter(e => !e.isInstanceOf[Watermark]).toList
   }
 
-  class TestTableConfig extends TableConfig {
+  class TestTableConfig() extends TableConfig {

Review comment:
       unnecessary change?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
##########
@@ -145,39 +135,41 @@ public Configuration toConfiguration() {
      * TableEnvironment}.
      */
     public String getBuiltInCatalogName() {
-        return builtInCatalogName;
+        return configuration.get(TABLE_CATALOG_NAME);
     }
 
     /**
      * Gets the specified name of the default database in the initial catalog 
to be created when
      * instantiating a {@link TableEnvironment}.
      */
     public String getBuiltInDatabaseName() {
-        return builtInDatabaseName;
+        return configuration.get(TABLE_DATABASE_NAME);
     }
 
     /** Tells if the {@link TableEnvironment} should work in a batch or 
streaming mode. */
     public boolean isStreamingMode() {
-        return isStreamingMode;
+        return configuration.get(RUNTIME_MODE) == STREAMING;
     }
 
     /** A builder for {@link EnvironmentSettings}. */
     @PublicEvolving
     public static class Builder {
 
-        private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG;
-        private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
-        private boolean isStreamingMode = true;
+        private final Configuration configuration = new Configuration();
+
+        public Builder() {
+            configuration.set(RUNTIME_MODE, STREAMING);

Review comment:
       let's not set the mode here, but determine it lazily. otherwise the 
merging with the root config could be problematic with the comment I had in 
`StreamTableEnvironment.create`

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
##########
@@ -30,9 +30,11 @@
 /** Mock implementations of {@link CatalogManager} for testing purposes. */
 public final class CatalogManagerMocks {
 
-    public static final String DEFAULT_CATALOG = 
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
+    public static final String DEFAULT_CATALOG =

Review comment:
       nit: you could also use these constants in the Hive tests above if 
available.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
##########
@@ -111,32 +104,29 @@ public static Builder newInstance() {
         return new Builder();
     }
 
-    /** Creates an instance of {@link EnvironmentSettings} from configuration. 
*/
+    /**
+     * Creates an instance of {@link EnvironmentSettings} from configuration.
+     *
+     * @deprecated use {@link Builder#newInstance()}{@code

Review comment:
       nit: link to `withConfiguration` directly or use a full code example 
instead of a mixture of linking and code.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
##########
@@ -132,7 +135,8 @@ class HarnessTestBase(mode: StateBackendMode) extends 
StreamingTestBase {
     override def getMaxIdleStateRetentionTime: Long = maxIdleStateRetentionTime
 
     override def setIdleStateRetentionTime(minTime: Time, maxTime: Time): Unit 
= {
-      super.setIdleStateRetention(Duration.ofMillis(minTime.toMilliseconds))
+      val duration = Duration.ofMillis(minTime.toMilliseconds)

Review comment:
       unnecessary change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to