LadyForest commented on code in PR #21752: URL: https://github.com/apache/flink/pull/21752#discussion_r1089891027
########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DefaultContextUtils.java: ########## @@ -69,6 +69,15 @@ public static DefaultContext buildDefaultContext(CliOptions options) { return new DefaultContext(configuration, dependencies); } + public static DefaultContext buildDefaultContext(CliOptions.GatewayCliOptions options) { Review Comment: `options` not used? ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java: ########## @@ -285,35 +268,18 @@ public static CliOptions parseGatewayModeClient(String[] args) { try { DefaultParser parser = new DefaultParser(); CommandLine line = parser.parse(GATEWAY_MODE_CLIENT_OPTIONS, args, true); - return new CliOptions( + return new CliOptions.GatewayCliOptions( line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()), checkSessionId(line), - null, - null, - checkUrls(line, CliOptionsParser.OPTION_JAR), - checkUrls(line, CliOptionsParser.OPTION_LIBRARY), + checkUrl(line, CliOptionsParser.OPTION_INIT_FILE), + checkUrl(line, CliOptionsParser.OPTION_FILE), line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()), line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()), - getPythonConfiguration(line)); - } catch (ParseException e) { - throw new SqlClientException(e.getMessage()); - } - } - - public static CliOptions parseGatewayModeGateway(String[] args) { - try { - DefaultParser parser = new DefaultParser(); - CommandLine line = parser.parse(GATEWAY_MODE_GATEWAY_OPTIONS, args, true); - return new CliOptions( - line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()), - null, - null, - null, - checkUrls(line, CliOptionsParser.OPTION_JAR), - checkUrls(line, CliOptionsParser.OPTION_LIBRARY), - null, - null, - getPythonConfiguration(line)); + line.hasOption(CliOptionsParser.OPTION_ENDPOINT_ADDRESS.getOpt()) + ? NetUtils.parseHostPortAddress( + line.getOptionValue( + CliOptionsParser.OPTION_ENDPOINT_ADDRESS.getOpt())) Review Comment: Just curious about what a `null` endpoint means. ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java: ########## @@ -56,38 +57,52 @@ * <p>- In embedded mode, the SQL CLI is tightly coupled with the executor in a common process. This * allows for submitting jobs without having to start an additional component. * - * <p>- In future versions: In gateway mode, the SQL CLI client connects to the REST API of the - * gateway and allows for managing queries via console. - * - * <p>For debugging in an IDE you can execute the main method of this class using: "--defaults - * /path/to/sql-client-defaults.yaml --jar /path/to/target/flink-sql-client-*.jar" - * - * <p>Make sure that the FLINK_CONF_DIR environment variable is set. + * <p>- In gateway mode, the SQL CLI client connects to the REST API of the gateway and allows for + * managing queries via console. */ public class SqlClient { private static final Logger LOG = LoggerFactory.getLogger(SqlClient.class); - private final boolean isEmbedded; + private final boolean isGatewayMode; private final CliOptions options; private final Supplier<Terminal> terminalFactory; public static final String MODE_EMBEDDED = "embedded"; public static final String MODE_GATEWAY = "gateway"; + public static final String MODE_NONE = ""; Review Comment: I think it's better to have all kinds of `MODE_XX` as `Enum` instead of `String` ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java: ########## @@ -265,9 +289,13 @@ public void close() { private static class EmbeddedShutdownThread extends Thread { Review Comment: If `embedded` and `gateway` shares the shutdown hook logic, we'd better rename `EmbeddedShutdownThread` to `ShutdownThread` ########## flink-table/flink-sql-client/src/test/resources/cli/all-mode-help.out: ########## @@ -0,0 +1,175 @@ +./sql-client [MODE] [OPTIONS] Review Comment: What I've got ```text ./bin/sql-client.sh -h usage: -f,--file <script file> Script file that should be executed. In this mode, the client will not open an interactive terminal. -h,--help Show the help message with descriptions of all options. -hist,--history <History file path> The file which you want to save the command history into. If not specified, we will auto-generate one under your user's home directory. -i,--init <initialization file> Script file that used to init the session context. If get error in execution, the sql client will exit. Notice it's not allowed to add query or insert into the init file. -j,--jar <JAR file> A JAR file to be imported into the session. The file might contain user-defined classes needed for the execution of statements such as functions, table sources, or sinks. Can be used multiple times. -l,--library <JAR directory> A JAR file directory with which every new session is initialized. The files might contain user-defined classes needed for the execution of statements such as functions, table sources, or sinks. Can be used multiple times. -pyarch,--pyArchives <arg> Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. For each archive file, a target directory be specified. If the target directory name is specified, the archive file will be extracted to a directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF (e.g., --pyArchives file:///tmp/py37.zip,file:///tmp /data.zip#data --pyExecutable py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: f = open('data/data.txt', 'r'). -pyclientexec,--pyClientExecutable <arg> The path of the Python interpreter used to launch the Python process when submitting the Python jobs via "flink run" or compiling the Java/Scala jobs containing Python UDFs. -pyexec,--pyExecutable <arg> Specify the path of the python interpreter used to execute the python UDF worker (e.g.: --pyExecutable /usr/local/bin/python3). The python UDF worker depends on Python 3.7+, Apache Beam (version == 2.43.0), Pip (version >= 20.3) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. -pyfs,--pyFiles <pythonFiles> Attach custom files for job. The standard resource file suffixes such as .py/.egg/.zip/.whl or directory are all supported. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. Files suffixed with .zip will be extracted and added to PYTHONPATH. Comma (',') could be used as the separator to specify multiple files (e.g., --pyFiles file:///tmp/myresource.zip,hdfs: ///$namenode_address/myresource2 .zip). -pyreq,--pyRequirements <arg> Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists (e.g., --pyRequirements file:///tmp/requirements.txt#fil e:///tmp/cached_dir). -s,--session <session identifier> The identifier for a session. 'default' is the default identifier. -u,--update <SQL update statement> Deprecated Experimental (for testing only!) feature: Instructs the SQL Client to immediately execute the given update statement after starting up. The process is shut down after the statement has been submitted to the cluster and returns an appropriate return code. Currently, this feature is only supported for INSERT INTO statements that declare the target sink table.Please use option -f to submit update statement. ./sql-client [MODE] [OPTIONS] The following options are available: Mode "embedded" (default) submits Flink jobs from the local machine. Syntax: [embedded] [OPTIONS] "embedded" mode options: -f,--file <script file> Script file that should be executed. In this mode, the client will not open an interactive terminal. -h,--help Show the help message with descriptions of all options. -hist,--history <History file path> The file which you want to save the command history into. If not specified, we will auto-generate one under your user's home directory. -i,--init <initialization file> Script file that used to init the session context. If get error in execution, the sql client will exit. Notice it's not allowed to add query or insert into the init file. -j,--jar <JAR file> A JAR file to be imported into the session. The file might contain user-defined classes needed for the execution of statements such as functions, table sources, or sinks. Can be used multiple times. -l,--library <JAR directory> A JAR file directory with which every new session is initialized. The files might contain user-defined classes needed for the execution of statements such as functions, table sources, or sinks. Can be used multiple times. -pyarch,--pyArchives <arg> Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. For each archive file, a target directory be specified. If the target directory name is specified, the archive file will be extracted to a directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF (e.g., --pyArchives file:///tmp/py37.zip,file:///tmp /data.zip#data --pyExecutable py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: f = open('data/data.txt', 'r'). -pyclientexec,--pyClientExecutable <arg> The path of the Python interpreter used to launch the Python process when submitting the Python jobs via "flink run" or compiling the Java/Scala jobs containing Python UDFs. -pyexec,--pyExecutable <arg> Specify the path of the python interpreter used to execute the python UDF worker (e.g.: --pyExecutable /usr/local/bin/python3). The python UDF worker depends on Python 3.7+, Apache Beam (version == 2.43.0), Pip (version >= 20.3) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. -pyfs,--pyFiles <pythonFiles> Attach custom files for job. The standard resource file suffixes such as .py/.egg/.zip/.whl or directory are all supported. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. Files suffixed with .zip will be extracted and added to PYTHONPATH. Comma (',') could be used as the separator to specify multiple files (e.g., --pyFiles file:///tmp/myresource.zip,hdfs: ///$namenode_address/myresource2 .zip). -pyreq,--pyRequirements <arg> Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists (e.g., --pyRequirements file:///tmp/requirements.txt#fil e:///tmp/cached_dir). -s,--session <session identifier> The identifier for a session. 'default' is the default identifier. -u,--update <SQL update statement> Deprecated Experimental (for testing only!) feature: Instructs the SQL Client to immediately execute the given update statement after starting up. The process is shut down after the statement has been submitted to the cluster and returns an appropriate return code. Currently, this feature is only supported for INSERT INTO statements that declare the target sink table.Please use option -f to submit update statement. Mode "gateway" mode connects to the SQL gateway for submission. Syntax: gateway [OPTIONS] "gateway" mode options: -e,--endpoint <SQL Gateway address> The address of the remote SQL Gateway to connect. -f,--file <script file> Script file that should be executed. In this mode, the client will not open an interactive terminal. -h,--help Show the help message with descriptions of all options. -hist,--history <History file path> The file which you want to save the command history into. If not specified, we will auto-generate one under your user's home directory. -i,--init <initialization file> Script file that used to init the session context. If get error in execution, the sql client will exit. Notice it's not allowed to add query or insert into the init file. -s,--session <session identifier> The identifier for a session. 'default' is the default identifier. -u,--update <SQL update statement> Deprecated Experimental (for testing only!) feature: Instructs the SQL Client to immediately execute the given update statement after starting up. The process is shut down after the statement has been submitted to the cluster and returns an appropriate return code. Currently, this feature is only supported for INSERT INTO statements that declare the target sink table.Please use option -f to submit update statement. ``` ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java: ########## @@ -56,38 +57,52 @@ * <p>- In embedded mode, the SQL CLI is tightly coupled with the executor in a common process. This * allows for submitting jobs without having to start an additional component. * - * <p>- In future versions: In gateway mode, the SQL CLI client connects to the REST API of the - * gateway and allows for managing queries via console. - * - * <p>For debugging in an IDE you can execute the main method of this class using: "--defaults - * /path/to/sql-client-defaults.yaml --jar /path/to/target/flink-sql-client-*.jar" - * - * <p>Make sure that the FLINK_CONF_DIR environment variable is set. + * <p>- In gateway mode, the SQL CLI client connects to the REST API of the gateway and allows for + * managing queries via console. */ public class SqlClient { private static final Logger LOG = LoggerFactory.getLogger(SqlClient.class); - private final boolean isEmbedded; + private final boolean isGatewayMode; private final CliOptions options; private final Supplier<Terminal> terminalFactory; public static final String MODE_EMBEDDED = "embedded"; public static final String MODE_GATEWAY = "gateway"; + public static final String MODE_NONE = ""; - public SqlClient(boolean isEmbedded, CliOptions options, Supplier<Terminal> terminalFactory) { - this.isEmbedded = isEmbedded; + public SqlClient( + boolean isGatewayMode, CliOptions options, Supplier<Terminal> terminalFactory) { + this.isGatewayMode = isGatewayMode; this.options = options; this.terminalFactory = terminalFactory; } private void start() { - if (isEmbedded) { - DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options); + if (isGatewayMode) { + CliOptions.GatewayCliOptions gatewayCliOptions = (CliOptions.GatewayCliOptions) options; + try (ExecutorImpl executor = + new ExecutorImpl( + DefaultContextUtils.buildDefaultContext(gatewayCliOptions), + gatewayCliOptions + .getGatewayAddress() + .orElseThrow( + () -> + new SqlClientException( + "Please specify the address of the SQL Gateway with command line option" + + " '-e,--endpoint <SQL Gateway address>' in the gateway mode.")))) { Review Comment: If the exception is thrown here, why `endpoint` might be null at `CliOptionsParser#L282` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org