wuchong commented on a change in pull request #15006:
URL: https://github.com/apache/flink/pull/15006#discussion_r592119863



##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/DefaultContext.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.context;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamPipelineOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.entries.DeploymentEntry;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.descriptors.FunctionDescriptorValidator;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Context describing default environment, command line options, flink config, 
etc.
+ *
+ * <p>When the {@link Executor} execute `reset` commands, the session can 
restore from the "default"
+ * context.
+ */
+public class DefaultContext {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultContext.class);
+
+    private final Environment defaultEnv;
+    private final List<URL> dependencies;
+    private final Configuration flinkConfig;
+
+    public DefaultContext(
+            Environment defaultEnv,
+            List<URL> dependencies,
+            Configuration flinkConfig,
+            List<CustomCommandLine> commandLines) {
+        this.defaultEnv = defaultEnv;
+        this.dependencies = dependencies;
+        this.flinkConfig = flinkConfig;
+        Options commandLineOptions = collectCommandLineOptions(commandLines);
+
+        // initialize default file system
+        FileSystem.initialize(
+                flinkConfig, 
PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
+
+        // add python dependencies
+        if (containsPythonFunction(defaultEnv)) {
+            addPythonDependency();
+        }
+
+        // put environment entry into Configuration
+        initConfigurationFromEnvironment();
+        try {
+            CommandLine deploymentCommandLine =
+                    createCommandLine(defaultEnv.getDeployment(), 
commandLineOptions);
+            flinkConfig.addAll(
+                    createExecutionConfig(
+                            deploymentCommandLine, commandLineOptions, 
commandLines, dependencies));
+        } catch (Exception e) {
+            throw new SqlExecutionException(
+                    "Could not load available CLI with Environment Deployment 
entry.", e);
+        }
+    }
+
+    public Configuration getFlinkConfig() {
+        return flinkConfig;
+    }
+
+    public Environment getDefaultEnv() {
+        return defaultEnv;
+    }
+
+    public List<URL> getDependencies() {
+        return dependencies;
+    }
+
+    private Options collectCommandLineOptions(List<CustomCommandLine> 
commandLines) {
+        final Options customOptions = new Options();
+        for (CustomCommandLine customCommandLine : commandLines) {
+            customCommandLine.addGeneralOptions(customOptions);
+            customCommandLine.addRunOptions(customOptions);
+        }
+        return CliFrontendParser.mergeOptions(
+                CliFrontendParser.getRunCommandOptions(), customOptions);
+    }
+
+    private void initConfigurationFromEnvironment() {
+        defaultEnv.getConfiguration().asMap().forEach(flinkConfig::setString);
+        ExecutionEntry execution = defaultEnv.getExecution();
+        flinkConfig.set(
+                ExecutionConfigOptions.IDLE_STATE_RETENTION,
+                Duration.ofMillis(execution.getMinStateRetention()));
+
+        if (execution.getParallelism().isPresent()) {
+            flinkConfig.set(CoreOptions.DEFAULT_PARALLELISM, 
execution.getParallelism().get());
+        }
+        flinkConfig.set(PipelineOptions.MAX_PARALLELISM, 
execution.getMaxParallelism());
+        flinkConfig.set(
+                StreamPipelineOptions.TIME_CHARACTERISTIC, 
execution.getTimeCharacteristic());
+        if (execution.getTimeCharacteristic() == TimeCharacteristic.EventTime) 
{
+            flinkConfig.set(
+                    PipelineOptions.AUTO_WATERMARK_INTERVAL,
+                    
Duration.ofMillis(execution.getPeriodicWatermarksInterval()));
+        }
+
+        setRestartStrategy();
+    }
+
+    private void setRestartStrategy() {
+        RestartStrategies.RestartStrategyConfiguration restartStrategy =
+                defaultEnv.getExecution().getRestartStrategy();
+        if (restartStrategy instanceof 
RestartStrategies.NoRestartStrategyConfiguration) {
+            flinkConfig.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
+        } else if (restartStrategy
+                instanceof 
RestartStrategies.FixedDelayRestartStrategyConfiguration) {
+            flinkConfig.set(RestartStrategyOptions.RESTART_STRATEGY, 
"fixed-delay");
+            RestartStrategies.FixedDelayRestartStrategyConfiguration 
fixedDelay =
+                    
((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy);
+            flinkConfig.set(
+                    
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+                    fixedDelay.getRestartAttempts());
+            flinkConfig.set(
+                    RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+                    Duration.ofMillis(
+                            
fixedDelay.getDelayBetweenAttemptsInterval().toMilliseconds()));
+        } else if (restartStrategy
+                instanceof 
RestartStrategies.FailureRateRestartStrategyConfiguration) {
+            flinkConfig.set(RestartStrategyOptions.RESTART_STRATEGY, 
"failure-rate");
+            RestartStrategies.FailureRateRestartStrategyConfiguration 
failureRate =
+                    
(RestartStrategies.FailureRateRestartStrategyConfiguration) restartStrategy;
+            flinkConfig.set(
+                    
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL,
+                    failureRate.getMaxFailureRate());
+            flinkConfig.set(
+                    
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL,
+                    
Duration.ofMillis(failureRate.getFailureInterval().toMilliseconds()));
+            flinkConfig.set(
+                    RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY,
+                    Duration.ofMillis(
+                            
failureRate.getDelayBetweenAttemptsInterval().toMilliseconds()));
+        } else if (restartStrategy
+                instanceof 
RestartStrategies.FallbackRestartStrategyConfiguration) {
+            // default is FallbackRestartStrategyConfiguration
+            // see ExecutionConfig.restartStrategyConfiguration
+            flinkConfig.removeConfig(RestartStrategyOptions.RESTART_STRATEGY);
+        }
+    }
+
+    private static Configuration createExecutionConfig(
+            CommandLine commandLine,
+            Options commandLineOptions,
+            List<CustomCommandLine> availableCommandLines,
+            List<URL> dependencies)
+            throws FlinkException {
+        LOG.debug("Available commandline options: {}", commandLineOptions);
+        List<String> options =
+                Stream.of(commandLine.getOptions())
+                        .map(o -> o.getOpt() + "=" + o.getValue())
+                        .collect(Collectors.toList());
+        LOG.debug(
+                "Instantiated commandline args: {}, options: {}",
+                commandLine.getArgList(),
+                options);
+
+        final CustomCommandLine activeCommandLine =
+                findActiveCommandLine(availableCommandLines, commandLine);
+        LOG.debug(
+                "Available commandlines: {}, active commandline: {}",
+                availableCommandLines,
+                activeCommandLine);
+
+        Configuration executionConfig = 
activeCommandLine.toConfiguration(commandLine);
+
+        try {
+            final ProgramOptions programOptions = 
ProgramOptions.create(commandLine);
+            final ExecutionConfigAccessor executionConfigAccessor =
+                    ExecutionConfigAccessor.fromProgramOptions(programOptions, 
dependencies);
+            executionConfigAccessor.applyToConfiguration(executionConfig);
+        } catch (CliArgsException e) {
+            throw new SqlExecutionException("Invalid deployment run options.", 
e);
+        }
+
+        LOG.info("Executor config: {}", executionConfig);
+        return executionConfig;
+    }
+
+    private static CustomCommandLine findActiveCommandLine(
+            List<CustomCommandLine> availableCommandLines, CommandLine 
commandLine) {
+        for (CustomCommandLine cli : availableCommandLines) {
+            if (cli.isActive(commandLine)) {
+                return cli;
+            }
+        }
+        throw new SqlExecutionException("Could not find a matching 
deployment.");
+    }
+
+    private static CommandLine createCommandLine(
+            DeploymentEntry deployment, Options commandLineOptions) {
+        try {
+            return deployment.getCommandLine(commandLineOptions);
+        } catch (Exception e) {
+            throw new SqlExecutionException("Invalid deployment options.", e);
+        }
+    }
+
+    private boolean containsPythonFunction(Environment environment) {
+        return environment.getFunctions().values().stream()
+                .anyMatch(
+                        f ->
+                                
FunctionDescriptorValidator.FROM_VALUE_PYTHON.equals(
+                                        f.getDescriptor()
+                                                .toProperties()
+                                                
.get(FunctionDescriptorValidator.FROM)));
+    }
+
+    private void addPythonDependency() {
+        try {
+            URL location =
+                    Class.forName(
+                                    
"org.apache.flink.python.PythonFunctionRunner",
+                                    false,
+                                    
Thread.currentThread().getContextClassLoader())
+                            .getProtectionDomain()
+                            .getCodeSource()
+                            .getLocation();
+            if (Paths.get(location.toURI()).toFile().isFile()) {
+                this.dependencies.add(location);
+            }
+        } catch (URISyntaxException | ClassNotFoundException e) {
+            throw new SqlExecutionException(
+                    "Python UDF detected but flink-python jar not found. "
+                            + "If you starts SQL-Client via `sql-client.sh`, 
please add the flink-python jar "
+                            + "via `-j` command option manually.",
+                    e);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof DefaultContext)) {
+            return false;
+        }
+        DefaultContext context = (DefaultContext) o;
+        return Objects.equals(defaultEnv, context.defaultEnv)
+                && Objects.equals(dependencies, context.dependencies)
+                && Objects.equals(flinkConfig, context.flinkConfig);
+    }
+
+    @Override
+    public int hashCode() {

Review comment:
       Do we need the `hashCode` and `equals` for `DefaultContext`?

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.context;
+
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
+import static org.apache.flink.configuration.PipelineOptions.NAME;
+import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+
+/** Test {@link SessionContext}. */
+public class SessionContextTest {
+
+    private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
+
+    @Test
+    public void testSetAndResetYamlKey() throws Exception {
+        SessionContext sessionContext = createSessionContext();
+        sessionContext.set("execution.max-table-result-rows", "100000");
+        Assert.assertEquals(
+                "100000",
+                sessionContext
+                        .getSessionEnvironment()
+                        .getExecution()
+                        .asMap()
+                        .get("max-table-result-rows"));
+
+        sessionContext.reset();
+
+        Assert.assertEquals(
+                "100",
+                sessionContext
+                        .getSessionEnvironment()
+                        .getExecution()
+                        .asMap()
+                        .get("max-table-result-rows"));
+    }
+
+    @Test
+    public void testSetAndResetOption() throws Exception {
+        SessionContext sessionContext = createSessionContext();
+        // table config option
+        sessionContext.set(TABLE_SQL_DIALECT.key(), "hive");
+        // runtime config option and has default value
+        sessionContext.set(DEFAULT_PARALLELISM.key(), "128");
+        // runtime config option and doesn't have default value
+        sessionContext.set(NAME.key(), "test");
+        Assert.assertEquals(
+                "hive",
+                sessionContext
+                        .getSessionEnvironment()
+                        .getConfiguration()
+                        .asMap()
+                        .get(TABLE_SQL_DIALECT.key()));
+        Assert.assertEquals(
+                "hive",
+                sessionContext
+                        .getExecutionContext()
+                        .getTableEnvironment()
+                        .getConfig()
+                        .getConfiguration()
+                        .getString(TABLE_SQL_DIALECT));
+        Assert.assertEquals(
+                128,
+                sessionContext
+                        .getExecutionContext()
+                        .getTableEnvironment()
+                        .getConfig()
+                        .getConfiguration()
+                        .getInteger(DEFAULT_PARALLELISM));
+        Assert.assertEquals(
+                "test",
+                sessionContext
+                        .getExecutionContext()
+                        .getTableEnvironment()
+                        .getConfig()
+                        .getConfiguration()
+                        .getString(NAME));
+
+        sessionContext.reset();

Review comment:
       Could you try to reset configuration in `flink-conf.yaml` (i.e. the 
`Configuration` in `DefaultContext`)? According to the desgin, the 
configuration can't be reset. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
##########
@@ -44,8 +44,8 @@ class WindowAggregateTest extends TableTestBase {
        |  d DECIMAL(10, 3),
        |  e BIGINT,
        |  rowtime TIMESTAMP(3),
-       |  proctime as PROCTIME(),
-       |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
+       |  ts as rowtime,

Review comment:
       revert changes. 

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
##########
@@ -32,13 +32,13 @@
     void start() throws SqlExecutionException;
 
     /**
-     * Open a new session by using the given {@link SessionContext}.
+     * Open a new session by using the given session id.
      *
-     * @param session context to create new session.
-     * @return session identifier to track the session.
+     * @param sessionId session identifier.
+     * @return used session identifier to track the session.
      * @throws SqlExecutionException if any error happen
      */
-    String openSession(SessionContext session) throws SqlExecutionException;
+    String openSession(String sessionId) throws SqlExecutionException;

Review comment:
       Currently, we call it in `SqlClient` like this 
`executor.openSession(options.getSessionId())`, but `options.getSessionId()` 
might be `null`. I suggest either add `@Nullable` annotation to the parameter, 
or construct a not-null `sessionId` in `SqlClient` and pass it to 
`openSession(..)`.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.context;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import 
org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
+import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+
+import java.lang.reflect.Method;
+import java.net.URLClassLoader;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.table.client.gateway.context.SessionContext.SessionState;
+
+/**
+ * Context for executing table programs. This class caches everything that can 
be cached across
+ * multiple queries as long as the session context does not change. This must 
be thread-safe as it
+ * might be reused across different query submissions.
+ */
+public class ExecutionContext {
+
+    // TODO: merge the ExecutionContext into the SessionContext.
+    // Members that should be reused in the same session.
+    private final Environment environment;
+    private final Configuration flinkConfig;
+    private final SessionState sessionState;
+    private final URLClassLoader classLoader;
+
+    private final TableEnvironment tableEnv;
+
+    public ExecutionContext(
+            Environment environment,
+            Configuration flinkConfig,
+            URLClassLoader classLoader,
+            SessionState sessionState) {
+        this.environment = environment;
+        this.flinkConfig = flinkConfig;
+        this.sessionState = sessionState;
+        this.classLoader = classLoader;
+
+        this.tableEnv = createTableEnvironment();
+    }
+
+    /**
+     * Create a new {@link ExecutionContext}.
+     *
+     * <p>It just copies from the {@link ExecutionContext} and rebuild a new 
{@link
+     * TableEnvironment}. But it still needs {@link Environment} because 
{@link Environment} doesn't
+     * allow modification.
+     *
+     * <p>When FLINK-21462 finishes, the constructor only uses {@link 
ExecutionContext} as input.
+     */
+    public ExecutionContext(Environment environment, ExecutionContext context) 
{
+        this.environment = environment;
+        this.flinkConfig = context.flinkConfig;
+        this.sessionState = context.sessionState;
+        this.classLoader = context.classLoader;
+        // create a new table env
+        this.tableEnv = createTableEnvironment();
+    }
+
+    /**
+     * Executes the given supplier using the execution context's classloader 
as thread classloader.
+     */
+    public <R> R wrapClassLoader(Supplier<R> supplier) {
+        try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {
+            return supplier.get();
+        }
+    }
+
+    public TableEnvironment getTableEnvironment() {
+        return tableEnv;
+    }
+
+    // 
------------------------------------------------------------------------------------------------------------------
+    // Helper to create Table Environment
+    // 
------------------------------------------------------------------------------------------------------------------
+
+    private TableEnvironment createTableEnvironment() {
+        EnvironmentSettings settings = 
environment.getExecution().getEnvironmentSettings();
+        TableConfig config = new TableConfig();
+        config.addConfiguration(flinkConfig);
+        // Override the value in configuration.
+        // TODO: use `table.planner` and `execution.runtime-mode` to configure 
the TableEnvironment
+        config.addConfiguration(settings.toConfiguration());
+
+        if (environment.getExecution().isStreamingPlanner()) {

Review comment:
       It seems the new introduced `table.planner` and `execution.mode` option 
doesn't take effect here. Currently, the implementation only depends on the SQL 
Client configuration. 
   
   I think we should use `table.planner` and `execution.mode`  to decide how to 
create TableEnv. 
   
   Besides, please add tests for this.

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
##########
@@ -199,7 +197,6 @@ private void verifyUpdateSubmission(
             } else {
                 assertTrue(client.submitUpdate(statement));
                 assertEquals(statement, mockExecutor.receivedStatement);
-                assertEquals(context, mockExecutor.receivedContext);

Review comment:
       `receivedContext` in `MockExecutor` is never used, can be removed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to