I would suggest trying out an established working example and gradually
change it to fit the project structure that you have, while making sure it
continues to work.

The short answer is Dataflow will pick up only what is specified in the
pipeline options

Whether your package uses or doesn't use  a .toml is not essential. You can
install it inside the custom container image or supply package distribution
(such as an sdist or multi-platform wheel)   via --extra_package, or if it
has sources and a setup.py file, use the --setup_file pipeline option.

On Thu, Oct 17, 2024 at 9:45 PM Sofia’s World <[email protected]> wrote:

> Hello Valentin
>   have never used a  .toml file (perhaps i am behind time)
> could you explain how will dataflow pick up the  .toml?
> I am currently using same setup as the pipeline project but i am NOT using
> a .toml and i am getting problems as my main class cannot see my equivalent
> of 'mypackage///'
> Kind regards
>  Marco
>
> On Thu, Oct 17, 2024 at 5:13 PM Valentyn Tymofieiev via user <
> [email protected]> wrote:
>
>> See also:
>> https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies/
>>
>> On Wed, Oct 16, 2024 at 4:50 PM XQ Hu via user <[email protected]>
>> wrote:
>>
>>> It is fine to put that import inside the process method. I think
>>> Dataflow probably complains about this due to your template launcher image
>>> that does not install `psycopg2`.
>>>
>>> On Wed, Oct 16, 2024 at 6:08 PM Henry Tremblay via user <
>>> [email protected]> wrote:
>>>
>>>> Not exactly Apache Beam, but I notice if I run Apache Beam on Dataflow,
>>>> using a flex template, I have import problems:
>>>>
>>>>
>>>>
>>>> For example, the following code will fail because it can’t find
>>>>    psycopg2
>>>>
>>>>
>>>>
>>>> 1 import psycopg2
>>>>
>>>>
>>>>
>>>> class ReadDb(beam.DoFn):
>>>>
>>>> 50
>>>>
>>>>  51     def __init__(self, user, password, host):
>>>>
>>>> 52         self.user = user
>>>>
>>>> 53         self.password = password
>>>>
>>>> 54         self.host = host
>>>>
>>>> 55
>>>>
>>>>  56     def process(self, element):
>>>>
>>>> 58         conn  =  psycopg2.connect (
>>>>
>>>> 59                 host = self.host,
>>>>
>>>> 60                 user = self.user,
>>>>
>>>> 61                 password = self.password,
>>>>
>>>> 62                 database = 'chassis_trusted_data',
>>>>
>>>> 63                 port = 5432)
>>>>
>>>> 64
>>>>
>>>>  65         yield 'a'
>>>>
>>>>
>>>>
>>>> I actually need to import pyscopg2 in the process method (line 57)
>>>>
>>>>
>>>>
>>>> I know I can use
>>>>
>>>>
>>>>
>>>> pipeline_options.view_as(SetupOptions).save_main_session =
>>>> save_main_session
>>>>
>>>>
>>>>
>>>> but this causes pickling problems, and defeats the purpose of building
>>>> a Docker image
>>>>
>>>>
>>>>
>>>

Reply via email to