dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409259043
########## File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java ########## @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> comm return process; } + + static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) { + pythonProcess.destroy(); + try { + pythonProcess.waitFor(timeoutMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupt while waiting for the python process to stop.", e); + } + if (pythonProcess.isAlive()) { + pythonProcess.destroyForcibly(); + } + } + + private static int findFreePort() throws IOException { + ServerSocket socket = new ServerSocket(0); + int port = socket.getLocalPort(); + socket.close(); + return port; + } + + /** + * Creates a GatewayServer run in a daemon thread. + * + * @return The created GatewayServer + */ + static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException { + CompletableFuture<GatewayServer> gatewayServer = new CompletableFuture<>(); + Thread thread = new Thread(() -> { + int freePort; + try { + freePort = findFreePort(); + } catch (IOException e) { + throw new RuntimeException("Could not find a free port for Py4jCallbackClient."); + } + GatewayServer server = new GatewayServer.GatewayServerBuilder() + .gateway(new Gateway(new ConcurrentHashMap<String, Object>(), new CallbackClient(freePort))) + .javaPort(0) + .build(); + gatewayServer.complete(server); + server.start(true); + }); + thread.setName("py4j-gateway"); + thread.setDaemon(true); + thread.start(); + thread.join(); + return gatewayServer.get(); + } + + static Process launchPy4jPythonClient( + GatewayServer gatewayServer, + ReadableConfig config, + List<String> commands, + String entryPointScript, + String tmpDir) throws IOException { + PythonEnvUtils.PythonEnvironment pythonEnv = PythonEnvUtils.preparePythonEnvironment( + config, entryPointScript, tmpDir); + // set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python process. + pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort())); + // set env variable PYFLINK_CALLBACK_PORT for creating callback server in python process. + pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", String.valueOf(gatewayServer.getCallbackClient().getPort())); + // start the python process. + return PythonEnvUtils.startPythonProcess(pythonEnv, commands); + } + + static Process launchPy4jPythonClient( Review comment: remove this method ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services