[ 
https://issues.apache.org/jira/browse/BEAM-14014?focusedWorklogId=769344&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-769344
 ]

ASF GitHub Bot logged work on BEAM-14014:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/22 23:00
            Start Date: 11/May/22 23:00
    Worklog Time Spent: 10m 
      Work Description: tvalentyn commented on code in PR #17244:
URL: https://github.com/apache/beam/pull/17244#discussion_r870706037


##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -59,7 +73,7 @@ def set_running_in_gce(worker_executing_project):
   executing_project = worker_executing_project
 
 
-def get_service_credentials():
+def get_service_credentials(pipeline_options):

Review Comment:
   Any concerns with setting `pipeline_options=None` ?



##########
sdks/python/apache_beam/io/gcp/gcsfilesystem.py:
##########
@@ -45,6 +45,10 @@ class GCSFileSystem(FileSystem):
   CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE  # Chuck size in batch operations
   GCS_PREFIX = 'gs://'
 
+  def __init__(self, pipeline_options):
+    super().__init__(pipeline_options)

Review Comment:
   another possibility is to initialize the global credential here, like:
   
   _ = auth.get_service_credentials(pipeline_options)



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -328,7 +328,7 @@ class BigQueryWrapper(object):
   def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
     self.client = client or bigquery.BigqueryV2(
         http=get_new_http(),
-        credentials=auth.get_service_credentials(),
+        credentials=auth.get_service_credentials({}),

Review Comment:
   Let's pass `None` (or have None a default value).



##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -160,3 +167,26 @@ def _get_service_credentials():
           'Connecting anonymously.',
           e)
       return None
+
+  @staticmethod
+  def _add_impersonation_credentials(credentials, pipeline_options):
+    if not pipeline_options:
+      return credentials
+    if isinstance(pipeline_options, PipelineOptions):
+      gcs_options = pipeline_options.view_as(GoogleCloudOptions)
+      impersonate_service_account = gcs_options.impersonate_service_account
+    else:
+      impersonate_service_account = pipeline_options.get(

Review Comment:
   When would this param be not PipelineOptions?
   If somehow this value is dictionary,  this call will crash with key error.



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1475,8 +1475,12 @@ def __init__(self, sink, test_bigquery_client=None, 
buffer_size=None):
 
     # If table schema did not define a project we default to executing project.
     if self.project_id is None and hasattr(sink, 'pipeline_options'):
+      self._pipeline_options = sink.pipeline_options

Review Comment:
   Do we need this?



##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -59,7 +73,7 @@ def set_running_in_gce(worker_executing_project):
   executing_project = worker_executing_project
 
 
-def get_service_credentials():
+def get_service_credentials(pipeline_options):

Review Comment:
   You can also mention in the docstring that
   
   How pipeline options are used.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 769344)
    Time Spent: 5.5h  (was: 5h 20m)

> Support impersonation credentials in Dataflow runner.
> -----------------------------------------------------
>
>                 Key: BEAM-14014
>                 URL: https://issues.apache.org/jira/browse/BEAM-14014
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow
>            Reporter: Valentyn Tymofieiev
>            Assignee: Ryan Thompson
>            Priority: P2
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to