[ 
https://issues.apache.org/jira/browse/BEAM-13984?focusedWorklogId=758027&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758027
 ]

ASF GitHub Bot logged work on BEAM-13984:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Apr/22 17:41
            Start Date: 18/Apr/22 17:41
    Worklog Time Spent: 10m 
      Work Description: ryanthompson591 commented on code in PR #17196:
URL: https://github.com/apache/beam/pull/17196#discussion_r852285231


##########
sdks/python/apache_beam/ml/inference/pytorch_test.py:
##########
@@ -0,0 +1,246 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  import torch
+  from apache_beam.ml.inference.api import PredictionResult
+  from apache_beam.ml.inference.base import RunInference
+  from apache_beam.ml.inference.pytorch import PytorchInferenceRunner
+  from apache_beam.ml.inference.pytorch import PytorchModelLoader
+except ImportError:
+  raise unittest.SkipTest('PyTorch dependencies are not installed')
+
+
+def _compare_prediction_result(a, b):
+  return (
+      torch.equal(a.inference, b.inference) and
+      torch.equal(a.example, b.example))
+
+
+class PytorchLinearRegression(torch.nn.Module):
+  def __init__(self, input_dim, output_dim):
+    super().__init__()
+    self.linear = torch.nn.Linear(input_dim, output_dim)
+
+  def forward(self, x):
+    out = self.linear(x)
+    return out
+
+
+@pytest.mark.uses_pytorch
+class PytorchRunInferenceTest(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  def test_inference_runner_single_tensor_feature(self):
+    examples = [
+        torch.from_numpy(np.array([1], dtype="float32")),
+        torch.from_numpy(np.array([5], dtype="float32")),
+        torch.from_numpy(np.array([-3], dtype="float32")),
+        torch.from_numpy(np.array([10.0], dtype="float32")),
+    ]
+    expected_predictions = [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            examples,
+            torch.Tensor([example * 2.0 + 0.5
+                          for example in examples]).reshape(-1, 1))
+    ]
+
+    model = PytorchLinearRegression(input_dim=1, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    model.eval()
+
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    predictions = inference_runner.run_inference(examples, model)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertTrue(_compare_prediction_result(actual, expected))
+
+  def test_inference_runner_multiple_tensor_features(self):
+    examples = torch.from_numpy(
+        np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+                 dtype="float32")).reshape(-1, 2)
+    examples = [
+        torch.from_numpy(np.array([1, 5], dtype="float32")),
+        torch.from_numpy(np.array([3, 10], dtype="float32")),
+        torch.from_numpy(np.array([-14, 0], dtype="float32")),
+        torch.from_numpy(np.array([0.5, 0.5], dtype="float32")),
+    ]
+    expected_predictions = [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            examples,
+            torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5
+                          for f1, f2 in examples]).reshape(-1, 1))
+    ]
+
+    model = PytorchLinearRegression(input_dim=2, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    model.eval()
+
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    predictions = inference_runner.run_inference(examples, model)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertTrue(_compare_prediction_result(actual, expected))
+
+  def test_num_bytes(self):
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    examples = torch.from_numpy(
+        np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+                 dtype="float32")).reshape(-1, 2)
+    self.assertEqual((examples[0].element_size()) * 8,
+                     inference_runner.get_num_bytes(examples))
+
+  def test_namespace(self):
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    self.assertEqual(
+        'RunInferencePytorch', inference_runner.get_metrics_namespace())
+
+  def test_pipeline_local_model(self):
+    with TestPipeline() as pipeline:
+      examples = torch.from_numpy(
+          np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+                   dtype="float32")).reshape(-1, 2)
+      expected_predictions = [
+          PredictionResult(ex, pred) for ex,
+          pred in zip(
+              examples,
+              torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5
+                            for f1, f2 in examples]).reshape(-1, 1))
+      ]
+
+      state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                                ('linear.bias', torch.Tensor([0.5]))])
+      path = os.path.join(self.tmpdir, 'my_state_dict_path')
+      torch.save(state_dict, path)
+
+      model_loader = PytorchModelLoader(
+          state_dict_path=path,
+          model_class=PytorchLinearRegression(input_dim=2, output_dim=1))
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_loader)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_pipeline_local_model_with_key(self):
+    with TestPipeline() as pipeline:
+      examples = torch.from_numpy(
+          np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1))
+      keyed_examples = list(zip(range(len(examples)), examples))
+      expected_values = [
+          PredictionResult(ex, pred) for ex,
+          pred in zip(
+              examples,
+              torch.Tensor([example * 2.0 + 0.5
+                            for example in examples]).reshape(-1, 1))
+      ]
+      expected_predictions = list(zip(range(len(examples)), expected_values))
+
+      state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                                ('linear.bias', torch.Tensor([0.5]))])
+      path = os.path.join(self.tmpdir, 'my_state_dict_path')
+      torch.save(state_dict, path)
+
+      model_loader = PytorchModelLoader(
+          state_dict_path=path,
+          model_class=PytorchLinearRegression(input_dim=1, output_dim=1))
+
+      pcoll = pipeline | 'start' >> beam.Create(keyed_examples)
+      predictions = pcoll | RunInference(model_loader)
+
+      def _compare_keyed_prediction_result(a, b):
+        key_equal = a[0] == b[0]
+        return (
+            torch.equal(a[1].inference, b[1].inference) and
+            torch.equal(a[1].example, b[1].example) and key_equal)
+
+      assert_that(
+          predictions,
+          equal_to(
+              expected_predictions, 
equals_fn=_compare_keyed_prediction_result))
+
+  def test_pipeline_gcs_model(self):
+    with TestPipeline() as pipeline:
+      examples = torch.from_numpy(
+          np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1))
+      expected_predictions = [
+          PredictionResult(ex, pred) for ex,
+          pred in zip(
+              examples,
+              torch.Tensor([example * 2.0 + 0.5
+                            for example in examples]).reshape(-1, 1))
+      ]
+
+      gs_pth = 
'gs://apache-beam-ml/pytorch_lin_reg_model_2x+0.5_state_dict.pth'

Review Comment:
   I didn't test gcs in my implementation. Does it make sense to break these 
larger E2E tests into another module instead of the more unit test like tests?



##########
sdks/python/apache_beam/ml/inference/pytorch.py:
##########
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Iterable
+from typing import List
+
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+
+class PytorchInferenceRunner(InferenceRunner):
+  """
+  Implements Pytorch inference method
+  """
+  def __init__(self, device: torch.device):
+    self._device = device
+
+  def run_inference(self, batch: List[torch.Tensor],
+                    model: torch.nn.Module) -> Iterable[torch.Tensor]:
+    """
+    Runs inferences on a batch of examples and returns an Iterable of
+    Predictions."""
+
+    if not batch:

Review Comment:
   This should never be none or an empty list right?  Not sure if we need this 
check.



##########
sdks/python/apache_beam/ml/inference/pytorch_test.py:
##########
@@ -0,0 +1,246 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  import torch
+  from apache_beam.ml.inference.api import PredictionResult
+  from apache_beam.ml.inference.base import RunInference
+  from apache_beam.ml.inference.pytorch import PytorchInferenceRunner
+  from apache_beam.ml.inference.pytorch import PytorchModelLoader
+except ImportError:
+  raise unittest.SkipTest('PyTorch dependencies are not installed')
+
+
+def _compare_prediction_result(a, b):
+  return (
+      torch.equal(a.inference, b.inference) and
+      torch.equal(a.example, b.example))
+
+
+class PytorchLinearRegression(torch.nn.Module):
+  def __init__(self, input_dim, output_dim):
+    super().__init__()
+    self.linear = torch.nn.Linear(input_dim, output_dim)
+
+  def forward(self, x):
+    out = self.linear(x)
+    return out
+
+
+@pytest.mark.uses_pytorch
+class PytorchRunInferenceTest(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  def test_inference_runner_single_tensor_feature(self):
+    examples = [
+        torch.from_numpy(np.array([1], dtype="float32")),
+        torch.from_numpy(np.array([5], dtype="float32")),
+        torch.from_numpy(np.array([-3], dtype="float32")),
+        torch.from_numpy(np.array([10.0], dtype="float32")),
+    ]
+    expected_predictions = [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            examples,
+            torch.Tensor([example * 2.0 + 0.5
+                          for example in examples]).reshape(-1, 1))
+    ]
+
+    model = PytorchLinearRegression(input_dim=1, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    model.eval()
+
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    predictions = inference_runner.run_inference(examples, model)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertTrue(_compare_prediction_result(actual, expected))
+
+  def test_inference_runner_multiple_tensor_features(self):
+    examples = torch.from_numpy(
+        np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+                 dtype="float32")).reshape(-1, 2)
+    examples = [
+        torch.from_numpy(np.array([1, 5], dtype="float32")),
+        torch.from_numpy(np.array([3, 10], dtype="float32")),
+        torch.from_numpy(np.array([-14, 0], dtype="float32")),
+        torch.from_numpy(np.array([0.5, 0.5], dtype="float32")),
+    ]
+    expected_predictions = [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            examples,
+            torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5
+                          for f1, f2 in examples]).reshape(-1, 1))
+    ]
+
+    model = PytorchLinearRegression(input_dim=2, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    model.eval()
+
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    predictions = inference_runner.run_inference(examples, model)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertTrue(_compare_prediction_result(actual, expected))
+
+  def test_num_bytes(self):
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    examples = torch.from_numpy(
+        np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+                 dtype="float32")).reshape(-1, 2)
+    self.assertEqual((examples[0].element_size()) * 8,
+                     inference_runner.get_num_bytes(examples))
+
+  def test_namespace(self):
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    self.assertEqual(
+        'RunInferencePytorch', inference_runner.get_metrics_namespace())
+
+  def test_pipeline_local_model(self):
+    with TestPipeline() as pipeline:
+      examples = torch.from_numpy(
+          np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+                   dtype="float32")).reshape(-1, 2)
+      expected_predictions = [
+          PredictionResult(ex, pred) for ex,
+          pred in zip(
+              examples,
+              torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5
+                            for f1, f2 in examples]).reshape(-1, 1))
+      ]
+
+      state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+                                ('linear.bias', torch.Tensor([0.5]))])
+      path = os.path.join(self.tmpdir, 'my_state_dict_path')
+      torch.save(state_dict, path)
+
+      model_loader = PytorchModelLoader(
+          state_dict_path=path,
+          model_class=PytorchLinearRegression(input_dim=2, output_dim=1))
+
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      predictions = pcoll | RunInference(model_loader)
+      assert_that(
+          predictions,
+          equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+  def test_pipeline_local_model_with_key(self):

Review Comment:
   you might not need to worry about keys/no keys as much in these tests, since 
that is tested in the base runinference.



##########
sdks/python/apache_beam/ml/inference/pytorch.py:
##########
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Iterable
+from typing import List
+
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+
+class PytorchInferenceRunner(InferenceRunner):
+  """
+  Implements Pytorch inference method
+  """
+  def __init__(self, device: torch.device):
+    self._device = device
+
+  def run_inference(self, batch: List[torch.Tensor],
+                    model: torch.nn.Module) -> Iterable[torch.Tensor]:
+    """
+    Runs inferences on a batch of examples and returns an Iterable of

Review Comment:
   This is copied from the base inference. I suggest fixing docs here and above.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 758027)
    Time Spent: 3h  (was: 2h 50m)

> Implement RunInference for PyTorch
> ----------------------------------
>
>                 Key: BEAM-13984
>                 URL: https://issues.apache.org/jira/browse/BEAM-13984
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Andy Ye
>            Assignee: Andy Ye
>            Priority: P2
>              Labels: run-inference
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Implement RunInference for PyTorch as described in the design doc 
> [https://s.apache.org/inference-sklearn-pytorch]
> There will be a pytorch_impl.py file that contains PyTorchModelLoader and 
> PyTorchInferenceRunner classes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to