[ 
https://issues.apache.org/jira/browse/BEAM-4032?focusedWorklogId=707923&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-707923
 ]

ASF GitHub Bot logged work on BEAM-4032:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Jan/22 22:18
            Start Date: 12/Jan/22 22:18
    Worklog Time Spent: 10m 
      Work Description: tvalentyn commented on a change in pull request #16448:
URL: https://github.com/apache/beam/pull/16448#discussion_r783453023



##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -233,10 +240,17 @@ def create_job_resources(options,  # type: PipelineOptions
         
resources.append(Stager._create_file_pip_requirements_artifact(tf.name))
         # Populate cache with packages from PyPI requirements and stage
         # the files in the cache.
-        (
-            populate_requirements_cache if populate_requirements_cache else
-            Stager._populate_requirements_cache)(
-                tf.name, requirements_cache_path)
+        if setup_options.view_as(WorkerOptions).sdk_container_image is None:

Review comment:
       You should be passing `tf.name` instead of setup_options.requirements to 
preserve current behavior.
   
   Also I am not sure why `      if pypi_requirements:` branch was necessary in 
the first place, asked on https://github.com/apache/beam/pull/13607.

##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -221,10 +221,17 @@ def create_job_resources(options,  # type: PipelineOptions
                 setup_options.requirements_file, REQUIREMENTS_FILE))
         # Populate cache with packages from the requirement file option and
         # stage the files in the cache.
-        (
-            populate_requirements_cache if populate_requirements_cache else
-            Stager._populate_requirements_cache)(
-                setup_options.requirements_file, requirements_cache_path)
+        if setup_options.view_as(WorkerOptions).sdk_container_image is None:
+          # downloads the binary distributions
+          (
+              populate_requirements_cache if populate_requirements_cache else

Review comment:
       could you please correct the typehint for                            
`populate_requirements_cache=None,  # type: Optional[str]` ? It should be a 
callable.

##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -221,10 +221,17 @@ def create_job_resources(options,  # type: PipelineOptions
                 setup_options.requirements_file, REQUIREMENTS_FILE))
         # Populate cache with packages from the requirement file option and
         # stage the files in the cache.
-        (
-            populate_requirements_cache if populate_requirements_cache else
-            Stager._populate_requirements_cache)(
-                setup_options.requirements_file, requirements_cache_path)
+        if setup_options.view_as(WorkerOptions).sdk_container_image is None:
+          # downloads the binary distributions

Review comment:
       If user passes a custom image we should advise against passing 
`--requirements_file` in a warning.

##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -221,10 +221,17 @@ def create_job_resources(options,  # type: PipelineOptions
                 setup_options.requirements_file, REQUIREMENTS_FILE))
         # Populate cache with packages from the requirement file option and
         # stage the files in the cache.
-        (
-            populate_requirements_cache if populate_requirements_cache else
-            Stager._populate_requirements_cache)(
-                setup_options.requirements_file, requirements_cache_path)
+        if setup_options.view_as(WorkerOptions).sdk_container_image is None:
+          # downloads the binary distributions
+          (
+              populate_requirements_cache if populate_requirements_cache else
+              Stager._populate_requirements_cache_with_whl)(

Review comment:
       I would keep one method: _populate_requirements_cache, and add add a 
flag such as `prefer_bdists=True/False` (or `fetch_binary` to match other 
flags) to avoid code duplication

##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -884,3 +946,49 @@ def _download_pypi_sdk_package(
         'Failed to download a distribution for the running SDK. '
         'Expected either one of %s to be found in the download folder.' %
         (expected_files))
+
+  @staticmethod
+  def _download_pypi_package(
+      temp_dir,
+      fetch_binary=False,
+      language_version_tag='27',
+      language_implementation_tag='cp',
+      abi_tag='cp27mu',
+      platform_tag='manylinux2014_x86_64',
+      package_name=None):
+    """Downloads SDK package from PyPI and returns path to local path."""
+    package_name = package_name or Stager.get_sdk_package_name()
+    cmd_args = [
+        Stager._get_python_executable(),
+        '-m',
+        'pip',
+        'download',
+        package_name,
+        '--dest',
+        temp_dir,
+        '--no-binary',
+        ':all:'
+    ]
+
+    if fetch_binary:
+      cmd_args.pop()  # remove the no binary flag

Review comment:
       I would make it part of the condition:
   
   ```
   if fetch_binary:
     # add binary flags:
   else:
     # add no-binary flags
   # run the command.
   ```

##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -815,9 +876,10 @@ def _download_pypi_sdk_package(
       language_version_tag='27',
       language_implementation_tag='cp',
       abi_tag='cp27mu',
-      platform_tag='manylinux1_x86_64'):
+      platform_tag='manylinux1_x86_64',
+      package_name=None):
     """Downloads SDK package from PyPI and returns path to local path."""
-    package_name = Stager.get_sdk_package_name()
+    package_name = package_name or Stager.get_sdk_package_name()

Review comment:
       Leftover? 
   
   Or did you mean to combine `_download_pypi_package` and 
`_download_pypi_sdk_package` into the same method (as in, you would pass 
`package_name =  Stager.get_sdk_package_name()` into `_download_pypi_package` 
to download the SDK)? That would make sense, as it's mostly the same code. We 
can remove the warning specific to the apache beam SDK package.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 707923)
    Time Spent: 3.5h  (was: 3h 20m)

> Support staging binary distributions of dependency packages.
> ------------------------------------------------------------
>
>                 Key: BEAM-4032
>                 URL: https://issues.apache.org/jira/browse/BEAM-4032
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Valentyn Tymofieiev
>            Assignee: Anand Inguva
>            Priority: P2
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> requirements.txt only supports source-distribution dependencies [1].
> --extra_packages does not officially support wheel files [2].
> It is possible to expand this to support binary distributions as long as we 
> have the knowledge of the target platform.
> We should take into consideration the mechanisms of staging dependencies 
> through portability framework, and perhaps consolidate some of the existing 
> options.
> [https://github.com/apache/beam/blob/a79d1b4fc27eb81db0d9a773047820a206f3d238/sdks/python/apache_beam/runners/dataflow/internal/dependency.py#L260]
> [https://github.com/apache/beam/blob/a79d1b4fc27eb81db0d9a773047820a206f3d238/sdks/python/apache_beam/runners/dataflow/internal/dependency.py#L188]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to