Can you try passing `extra_packages` instead of `extra_package` when
passing pipeline options as a dict?

On Tue, Dec 19, 2023 at 12:26 PM Sumit Desai via user <user@beam.apache.org>
wrote:

> Hi all,
> I have created a Dataflow pipeline in batch mode using Apache beam Python
> SDK. I am using one non-public dependency 'uplight-telemetry'. I have
> specified it using parameter extra_package while creating pipeline_options
> object. However, the pipeline loading is failing with an error *No module
> named 'uplight_telemetry'*.
> The code to create pipeline_options is as following-
>
> def __create_pipeline_options_dataflow(job_name):
>     # Set up the Dataflow runner options
>     gcp_project_id = os.environ.get(GCP_PROJECT_ID)
>     current_dir = os.path.dirname(os.path.abspath(__file__))
>     print("current_dir=", current_dir)
>     setup_file_path = os.path.join(current_dir, '..', '..', 'setup.py')
>     print("Set-up file path=", setup_file_path)
>     #TODO:Move file to proper location
>     uplight_telemetry_tar_file_path=os.path.join(current_dir, '..', 
> '..','..','non-public-dependencies', 'uplight-telemetry-1.0.0.tar.gz')
>     # TODO:Move to environmental variables
>     pipeline_options = {
>         'project': gcp_project_id,
>         'region': "us-east1",
>         'job_name': job_name,  # Provide a unique job name
>         'temp_location': 
> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>         'staging_location': 
> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>         'runner': 'DataflowRunner',
>         'save_main_session': True,
>         'service_account_email': os.environ.get(SERVICE_ACCOUNT),
>         # 'network': f'projects/{gcp_project_id}/global/networks/default',
>         'subnetwork': os.environ.get(SUBNETWORK_URL),
>         'setup_file': setup_file_path,
>         'extra_package': uplight_telemetry_tar_file_path
>         # 'template_location': 
> 'gcr.io/dataflow-templates-base/python310-template-launcher-base'
>     }
>     print("Pipeline created for job-name", job_name)
>     logger.debug(f"pipeline_options created as {pipeline_options}")
>     return pipeline_options
>
> Why is it not trying to install this package from extra_package?
>

Reply via email to