dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409959968
 
 

 ##########
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +256,63 @@ static Process startPythonProcess(PythonEnvironment 
pythonEnv, List<String> comm
 
                return process;
        }
+
+       /**
+        * Py4J both supports Java to Python RPC and Python to Java RPC. The 
GatewayServer object is
+        * the entry point of Java to Python RPC. Since the Py4j Python client 
will only be launched
+        * only once, the GatewayServer object needs to be reused.
+        */
+       private static GatewayServer gatewayServer = null;
+
+       /**
+        * Creates a GatewayServer run in a daemon thread.
+        *
+        * @return The created GatewayServer
+        */
+       static GatewayServer startGatewayServer() throws ExecutionException, 
InterruptedException {
+               if (gatewayServer != null) {
+                       return gatewayServer;
+               }
+               CompletableFuture<GatewayServer> gatewayServerFuture = new 
CompletableFuture<>();
+               Thread thread = new Thread(() -> {
+                       int freePort = NetUtils.getAvailablePort();
+                       GatewayServer server = new 
GatewayServer.GatewayServerBuilder()
+                               .gateway(new Gateway(new 
ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
+                               .javaPort(0)
+                               .build();
+                       gatewayServerFuture.complete(server);
+                       server.start(true);
+               });
+               thread.setName("py4j-gateway");
+               thread.setDaemon(true);
+               thread.start();
+               thread.join();
+               gatewayServer = gatewayServerFuture.get();
+               return gatewayServer;
+       }
+
+       static Process launchPy4jPythonClient(
+                       GatewayServer gatewayServer,
+                       ReadableConfig config,
+                       List<String> commands,
+                       String entryPointScript,
+                       String tmpDir) throws IOException {
+               PythonEnvironment pythonEnv = 
PythonEnvUtils.preparePythonEnvironment(
+                       config, entryPointScript, tmpDir);
+               // set env variable PYFLINK_GATEWAY_PORT for connecting of 
python gateway in python process.
+               pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", 
String.valueOf(gatewayServer.getListeningPort()));
+               // set env variable PYFLINK_CALLBACK_PORT for creating callback 
server in python process.
+               pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", 
String.valueOf(gatewayServer.getCallbackClient().getPort()));
+               // start the python process.
+               return PythonEnvUtils.startPythonProcess(pythonEnv, commands);
+       }
+
+       static GatewayServer getGatewayServer() {
+               return gatewayServer;
+       }
+
+       static void removeGatewayServer() {
 
 Review comment:
   Could we place all the methods about gateway server, e.g. 
startGatewayServer/getGatewayServer together?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to