WeiZhong94 commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409535114
 
 

 ##########
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##########
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment 
pythonEnv, List<String> comm
 
                return process;
        }
+
+       static void shutdownPythonProcess(Process pythonProcess, long 
timeoutMillis) {
+               pythonProcess.destroy();
+               try {
+                       pythonProcess.waitFor(timeoutMillis, 
TimeUnit.MILLISECONDS);
+               } catch (InterruptedException e) {
+                       throw new RuntimeException("Interrupt while waiting for 
the python process to stop.", e);
+               }
+               if (pythonProcess.isAlive()) {
+                       pythonProcess.destroyForcibly();
+               }
+       }
+
+       private static int findFreePort() throws IOException {
+               ServerSocket socket = new ServerSocket(0);
+               int port = socket.getLocalPort();
+               socket.close();
+               return port;
+       }
+
+       /**
+        * Creates a GatewayServer run in a daemon thread.
+        *
+        * @return The created GatewayServer
+        */
+       static GatewayServer startGatewayServer() throws ExecutionException, 
InterruptedException {
+               CompletableFuture<GatewayServer> gatewayServer = new 
CompletableFuture<>();
+               Thread thread = new Thread(() -> {
+                       int freePort;
+                       try {
+                               freePort = findFreePort();
+                       } catch (IOException e) {
+                               throw new RuntimeException("Could not find a 
free port for Py4jCallbackClient.");
+                       }
+                       GatewayServer server = new 
GatewayServer.GatewayServerBuilder()
+                               .gateway(new Gateway(new 
ConcurrentHashMap<String, Object>(), new CallbackClient(freePort)))
+                               .javaPort(0)
 
 Review comment:
   Because the loopback address is the default value of the `GatewayServer`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to