Hi Sumeet, Probably there is an issue with uploading the archive while submitting the job. The commands and API usage look good to me. Dian could you please confirm that?
Regards, Roman On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra <sumeet.malho...@gmail.com> wrote: > > Thank you Roman. Yes, that's what I am going to do. > > But I'm running into another issue... when I specify the --pyArchives option > on the command line, the job never gets submitted and is stuck forever. And > when I try to programmatically do this by calling add_python_archive(), the > job gets submitted but fails because the target directory is not found on the > UDF node. Flink is deployed on a K8S cluster in my case and the port 8081 is > forwarded to the localhost. > > Here's the command line I use: > > ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081 --python my_job.py > --pyArchives file:///path/to/schema.zip#schema > > And within the UDF I'm access the schema file as: > > read_schema('schema/my_schema.json') > > Or if I try using the API instead of the command-line, the call looks as: > > env = StreamExecutionEnvironment.get_execution_environment() > env.add_python_archive('schema.zip', 'schema') > > Initially, my_job.py itself had its own command line options, and I was > thinking that might interfere with the overall Flink command line options, > but even after removing that I'm not able to submit the job anymore. However, > if I don't use the --pyArchives option and manually transfer the schema file > to a location on the UDF node, the job gets submitted and works as expected. > > Any reason why this might happen? > > Thanks, > Sumeet > > > On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan <ro...@apache.org> wrote: >> >> Hi, >> >> I think the second option is what you need. The documentation says >> only zip format is supported. >> Alternatively, you could upload the files to S3 or other DFS and >> access from TMs and re-upload when needed. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives >> >> Regards, >> Roman >> >> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra >> <sumeet.malho...@gmail.com> wrote: >> > >> > Hi, >> > >> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON >> > schema files actually). The path of this file can be passed into the UDTF, >> > but essentially this path needs to exist on the Task Manager node where >> > the task executes. What's the best way to upload these resource files? As >> > of now, my custom Flink image creates a fixed path with the required >> > resource files, but I'd like it to be run time configurable. >> > >> > There are 2 APIs available to load files when submitting a PyFlink job... >> > >> > stream_execution_environment.add_python_file() - Recommended to upload >> > files (.py etc) but doesn't let me configure the final path on the target >> > node. The files are added to PYTHONPATH, but it needs the UDTF function to >> > lookup for this file. I'd like to pass the file location into the UDTF >> > instead. >> > >> > stream_execution_environment.add_python_archive() - Appears to be more >> > generic, in the sense that it allows a target directory to be specified. >> > The documentation doesn't say anything about the contents of the archive, >> > so I'm guessing it could be any type of file. Is this what is needed for >> > my use case? >> > >> > Or is there any other recommended way to upload non-Python >> > dependencies/resources? >> > >> > Thanks in advance, >> > Sumeet