shuiqiangchen commented on a change in pull request #13322:
URL: https://github.com/apache/flink/pull/13322#discussion_r491760113



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions 
createPythonProgramOptions(CommandLine line) throws
                                        "or not working as expected.", e);
                }
        }
+
+       public static ProgramOptions 
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+               NoSuchFieldException, IllegalAccessException {
+               ProgramOptions pythonProgramOptions = 
createPythonProgramOptions(line);
+               Field jarFilePath = 
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+               jarFilePath.setAccessible(true);
+               // This is the python jar path in client, which is invalid at 
runtime and it will be replaced with the actual
+               // path when retrieving the packaged program in the job manager 
container.
+               String pythonJarPath = "local:///opt/flink/opt/" + 
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()

Review comment:
       Here we have two options for PyFlink job:
   1. Make a judgement that if it is PyFlink job, skip the jarFile path 
validation;
   2. Set a dummy jar file path in client side and replace it with the actual 
path in JM.
   It seems that the former is better.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions 
createPythonProgramOptions(CommandLine line) throws
                                        "or not working as expected.", e);
                }
        }
+
+       public static ProgramOptions 
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+               NoSuchFieldException, IllegalAccessException {
+               ProgramOptions pythonProgramOptions = 
createPythonProgramOptions(line);
+               Field jarFilePath = 
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+               jarFilePath.setAccessible(true);
+               // This is the python jar path in client, which is invalid at 
runtime and it will be replaced with the actual
+               // path when retrieving the packaged program in the job manager 
container.
+               String pythonJarPath = "local:///opt/flink/opt/" + 
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()

Review comment:
       Here we have two options for PyFlink job:
   1. Make a judgement that if it is PyFlink job, skip the jarFile path 
validation;
   2. Set a dummy jar file path in client side and replace it with the actual 
path in JM.
   
   It seems that the former is better.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions 
createPythonProgramOptions(CommandLine line) throws
                                        "or not working as expected.", e);
                }
        }
+
+       public static ProgramOptions 
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+               NoSuchFieldException, IllegalAccessException {
+               ProgramOptions pythonProgramOptions = 
createPythonProgramOptions(line);
+               Field jarFilePath = 
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+               jarFilePath.setAccessible(true);
+               // This is the python jar path in client, which is invalid at 
runtime and it will be replaced with the actual
+               // path when retrieving the packaged program in the job manager 
container.
+               String pythonJarPath = "local:///opt/flink/opt/" + 
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()
+                       .getPath());
+               jarFilePath.set(pythonProgramOptions, pythonJarPath);
+               return pythonProgramOptions;
+       }
+
+       public static void configurePythonExecution(Configuration configuration,
+                                                                               
                PackagedProgram packagedProgram) throws CliArgsException,
+               NoSuchFieldException, IllegalAccessException {
+
+                       final Options commandOptions = 
CliFrontendParser.getRunCommandOptions();
+                       final CommandLine commandLine = 
CliFrontendParser.parse(commandOptions, packagedProgram.getArguments(),
+                               true);
+                       final ProgramOptions programOptions = 
createPythonProgramOptions(commandLine);
+
+                       //Extract real program args by eliminating the PyFlink 
dependency options
+                       String[] programArgs = 
programOptions.extractProgramArgs(commandLine);
+                       //Set the real program args to the packaged program
+                       Field argsField = 
packagedProgram.getClass().getDeclaredField("args");
+                       argsField.setAccessible(true);
+                       argsField.set(packagedProgram, programArgs);
+
+                       //PyFlink dependency configurations are set in the 
pythonConfiguration when constructing the program option,
+                       // we need to get the python configuration and merge 
with the execution configuration.
+                       Field pythonConfiguration = 
programOptions.getClass().getDeclaredField("pythonConfiguration");

Review comment:
       This change is in `flink-clients` module that do not want to involve the 
`flink-python` module, so I choose to set the field by reflection. 

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -111,6 +112,20 @@ private ClassPathPackagedProgramRetriever(
        @Override
        public PackagedProgram getPackagedProgram() throws FlinkException {
                try {
+
+                       // It is Python job if program arguments contain 
"-py"/--python" or "-pym/--pyModule", set the fixed
+                       // jobClassName and jarFile path.
+                       if (PackagedProgramUtils.isPython(jobClassName) || 
PackagedProgramUtils.isPython(programArguments)){
+                               String pythonJobClassName = 
PackagedProgramUtils.PYTHON_DRIVER_CLASS_NAME;
+                               File pythonJarFile = new 
File(PackagedProgramUtils.getPythonJar().getPath());

Review comment:
       Currently, we have no plan to support running Pyflink on yarn in 
application mode.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions 
createPythonProgramOptions(CommandLine line) throws
                                        "or not working as expected.", e);
                }
        }
+
+       public static ProgramOptions 
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+               NoSuchFieldException, IllegalAccessException {
+               ProgramOptions pythonProgramOptions = 
createPythonProgramOptions(line);
+               Field jarFilePath = 
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+               jarFilePath.setAccessible(true);
+               // This is the python jar path in client, which is invalid at 
runtime and it will be replaced with the actual
+               // path when retrieving the packaged program in the job manager 
container.
+               String pythonJarPath = "local:///opt/flink/opt/" + 
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()
+                       .getPath());
+               jarFilePath.set(pythonProgramOptions, pythonJarPath);
+               return pythonProgramOptions;
+       }
+
+       public static void configurePythonExecution(Configuration configuration,
+                                                                               
                PackagedProgram packagedProgram) throws CliArgsException,
+               NoSuchFieldException, IllegalAccessException {
+
+                       final Options commandOptions = 
CliFrontendParser.getRunCommandOptions();
+                       final CommandLine commandLine = 
CliFrontendParser.parse(commandOptions, packagedProgram.getArguments(),
+                               true);
+                       final ProgramOptions programOptions = 
createPythonProgramOptions(commandLine);
+
+                       //Extract real program args by eliminating the PyFlink 
dependency options
+                       String[] programArgs = 
programOptions.extractProgramArgs(commandLine);
+                       //Set the real program args to the packaged program
+                       Field argsField = 
packagedProgram.getClass().getDeclaredField("args");
+                       argsField.setAccessible(true);
+                       argsField.set(packagedProgram, programArgs);
+
+                       //PyFlink dependency configurations are set in the 
pythonConfiguration when constructing the program option,
+                       // we need to get the python configuration and merge 
with the execution configuration.
+                       Field pythonConfiguration = 
programOptions.getClass().getDeclaredField("pythonConfiguration");

Review comment:
       This change is in `flink-clients` module that do not want to bring in 
the `flink-python` module, so I choose to set the field by reflection. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to