[ https://issues.apache.org/jira/browse/BEAM-14014?focusedWorklogId=769344&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-769344 ]
ASF GitHub Bot logged work on BEAM-14014: ----------------------------------------- Author: ASF GitHub Bot Created on: 11/May/22 23:00 Start Date: 11/May/22 23:00 Worklog Time Spent: 10m Work Description: tvalentyn commented on code in PR #17244: URL: https://github.com/apache/beam/pull/17244#discussion_r870706037 ########## sdks/python/apache_beam/internal/gcp/auth.py: ########## @@ -59,7 +73,7 @@ def set_running_in_gce(worker_executing_project): executing_project = worker_executing_project -def get_service_credentials(): +def get_service_credentials(pipeline_options): Review Comment: Any concerns with setting `pipeline_options=None` ? ########## sdks/python/apache_beam/io/gcp/gcsfilesystem.py: ########## @@ -45,6 +45,10 @@ class GCSFileSystem(FileSystem): CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' + def __init__(self, pipeline_options): + super().__init__(pipeline_options) Review Comment: another possibility is to initialize the global credential here, like: _ = auth.get_service_credentials(pipeline_options) ########## sdks/python/apache_beam/io/gcp/bigquery_tools.py: ########## @@ -328,7 +328,7 @@ class BigQueryWrapper(object): def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): self.client = client or bigquery.BigqueryV2( http=get_new_http(), - credentials=auth.get_service_credentials(), + credentials=auth.get_service_credentials({}), Review Comment: Let's pass `None` (or have None a default value). ########## sdks/python/apache_beam/internal/gcp/auth.py: ########## @@ -160,3 +167,26 @@ def _get_service_credentials(): 'Connecting anonymously.', e) return None + + @staticmethod + def _add_impersonation_credentials(credentials, pipeline_options): + if not pipeline_options: + return credentials + if isinstance(pipeline_options, PipelineOptions): + gcs_options = pipeline_options.view_as(GoogleCloudOptions) + impersonate_service_account = gcs_options.impersonate_service_account + else: + impersonate_service_account = pipeline_options.get( Review Comment: When would this param be not PipelineOptions? If somehow this value is dictionary, this call will crash with key error. ########## sdks/python/apache_beam/io/gcp/bigquery_tools.py: ########## @@ -1475,8 +1475,12 @@ def __init__(self, sink, test_bigquery_client=None, buffer_size=None): # If table schema did not define a project we default to executing project. if self.project_id is None and hasattr(sink, 'pipeline_options'): + self._pipeline_options = sink.pipeline_options Review Comment: Do we need this? ########## sdks/python/apache_beam/internal/gcp/auth.py: ########## @@ -59,7 +73,7 @@ def set_running_in_gce(worker_executing_project): executing_project = worker_executing_project -def get_service_credentials(): +def get_service_credentials(pipeline_options): Review Comment: You can also mention in the docstring that How pipeline options are used. Issue Time Tracking ------------------- Worklog Id: (was: 769344) Time Spent: 5.5h (was: 5h 20m) > Support impersonation credentials in Dataflow runner. > ----------------------------------------------------- > > Key: BEAM-14014 > URL: https://issues.apache.org/jira/browse/BEAM-14014 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow > Reporter: Valentyn Tymofieiev > Assignee: Ryan Thompson > Priority: P2 > Time Spent: 5.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.7#820007)