[ https://issues.apache.org/jira/browse/BEAM-13984?focusedWorklogId=758027&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758027 ]
ASF GitHub Bot logged work on BEAM-13984: ----------------------------------------- Author: ASF GitHub Bot Created on: 18/Apr/22 17:41 Start Date: 18/Apr/22 17:41 Worklog Time Spent: 10m Work Description: ryanthompson591 commented on code in PR #17196: URL: https://github.com/apache/beam/pull/17196#discussion_r852285231 ########## sdks/python/apache_beam/ml/inference/pytorch_test.py: ########## @@ -0,0 +1,246 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import os +import shutil +import tempfile +import unittest +from collections import OrderedDict + +import numpy as np +import pytest + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +# Protect against environments where pytorch library is not available. +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + import torch + from apache_beam.ml.inference.api import PredictionResult + from apache_beam.ml.inference.base import RunInference + from apache_beam.ml.inference.pytorch import PytorchInferenceRunner + from apache_beam.ml.inference.pytorch import PytorchModelLoader +except ImportError: + raise unittest.SkipTest('PyTorch dependencies are not installed') + + +def _compare_prediction_result(a, b): + return ( + torch.equal(a.inference, b.inference) and + torch.equal(a.example, b.example)) + + +class PytorchLinearRegression(torch.nn.Module): + def __init__(self, input_dim, output_dim): + super().__init__() + self.linear = torch.nn.Linear(input_dim, output_dim) + + def forward(self, x): + out = self.linear(x) + return out + + +@pytest.mark.uses_pytorch +class PytorchRunInferenceTest(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + def test_inference_runner_single_tensor_feature(self): + examples = [ + torch.from_numpy(np.array([1], dtype="float32")), + torch.from_numpy(np.array([5], dtype="float32")), + torch.from_numpy(np.array([-3], dtype="float32")), + torch.from_numpy(np.array([10.0], dtype="float32")), + ] + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + examples, + torch.Tensor([example * 2.0 + 0.5 + for example in examples]).reshape(-1, 1)) + ] + + model = PytorchLinearRegression(input_dim=1, output_dim=1) + model.load_state_dict( + OrderedDict([('linear.weight', torch.Tensor([[2.0]])), + ('linear.bias', torch.Tensor([0.5]))])) + model.eval() + + inference_runner = PytorchInferenceRunner(torch.device('cpu')) + predictions = inference_runner.run_inference(examples, model) + for actual, expected in zip(predictions, expected_predictions): + self.assertTrue(_compare_prediction_result(actual, expected)) + + def test_inference_runner_multiple_tensor_features(self): + examples = torch.from_numpy( + np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5], + dtype="float32")).reshape(-1, 2) + examples = [ + torch.from_numpy(np.array([1, 5], dtype="float32")), + torch.from_numpy(np.array([3, 10], dtype="float32")), + torch.from_numpy(np.array([-14, 0], dtype="float32")), + torch.from_numpy(np.array([0.5, 0.5], dtype="float32")), + ] + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + examples, + torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5 + for f1, f2 in examples]).reshape(-1, 1)) + ] + + model = PytorchLinearRegression(input_dim=2, output_dim=1) + model.load_state_dict( + OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])), + ('linear.bias', torch.Tensor([0.5]))])) + model.eval() + + inference_runner = PytorchInferenceRunner(torch.device('cpu')) + predictions = inference_runner.run_inference(examples, model) + for actual, expected in zip(predictions, expected_predictions): + self.assertTrue(_compare_prediction_result(actual, expected)) + + def test_num_bytes(self): + inference_runner = PytorchInferenceRunner(torch.device('cpu')) + examples = torch.from_numpy( + np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5], + dtype="float32")).reshape(-1, 2) + self.assertEqual((examples[0].element_size()) * 8, + inference_runner.get_num_bytes(examples)) + + def test_namespace(self): + inference_runner = PytorchInferenceRunner(torch.device('cpu')) + self.assertEqual( + 'RunInferencePytorch', inference_runner.get_metrics_namespace()) + + def test_pipeline_local_model(self): + with TestPipeline() as pipeline: + examples = torch.from_numpy( + np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5], + dtype="float32")).reshape(-1, 2) + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + examples, + torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5 + for f1, f2 in examples]).reshape(-1, 1)) + ] + + state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])), + ('linear.bias', torch.Tensor([0.5]))]) + path = os.path.join(self.tmpdir, 'my_state_dict_path') + torch.save(state_dict, path) + + model_loader = PytorchModelLoader( + state_dict_path=path, + model_class=PytorchLinearRegression(input_dim=2, output_dim=1)) + + pcoll = pipeline | 'start' >> beam.Create(examples) + predictions = pcoll | RunInference(model_loader) + assert_that( + predictions, + equal_to(expected_predictions, equals_fn=_compare_prediction_result)) + + def test_pipeline_local_model_with_key(self): + with TestPipeline() as pipeline: + examples = torch.from_numpy( + np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1)) + keyed_examples = list(zip(range(len(examples)), examples)) + expected_values = [ + PredictionResult(ex, pred) for ex, + pred in zip( + examples, + torch.Tensor([example * 2.0 + 0.5 + for example in examples]).reshape(-1, 1)) + ] + expected_predictions = list(zip(range(len(examples)), expected_values)) + + state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0]])), + ('linear.bias', torch.Tensor([0.5]))]) + path = os.path.join(self.tmpdir, 'my_state_dict_path') + torch.save(state_dict, path) + + model_loader = PytorchModelLoader( + state_dict_path=path, + model_class=PytorchLinearRegression(input_dim=1, output_dim=1)) + + pcoll = pipeline | 'start' >> beam.Create(keyed_examples) + predictions = pcoll | RunInference(model_loader) + + def _compare_keyed_prediction_result(a, b): + key_equal = a[0] == b[0] + return ( + torch.equal(a[1].inference, b[1].inference) and + torch.equal(a[1].example, b[1].example) and key_equal) + + assert_that( + predictions, + equal_to( + expected_predictions, equals_fn=_compare_keyed_prediction_result)) + + def test_pipeline_gcs_model(self): + with TestPipeline() as pipeline: + examples = torch.from_numpy( + np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1)) + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + examples, + torch.Tensor([example * 2.0 + 0.5 + for example in examples]).reshape(-1, 1)) + ] + + gs_pth = 'gs://apache-beam-ml/pytorch_lin_reg_model_2x+0.5_state_dict.pth' Review Comment: I didn't test gcs in my implementation. Does it make sense to break these larger E2E tests into another module instead of the more unit test like tests? ########## sdks/python/apache_beam/ml/inference/pytorch.py: ########## @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +from typing import Iterable +from typing import List + +import torch +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.api import PredictionResult +from apache_beam.ml.inference.base import InferenceRunner +from apache_beam.ml.inference.base import ModelLoader + + +class PytorchInferenceRunner(InferenceRunner): + """ + Implements Pytorch inference method + """ + def __init__(self, device: torch.device): + self._device = device + + def run_inference(self, batch: List[torch.Tensor], + model: torch.nn.Module) -> Iterable[torch.Tensor]: + """ + Runs inferences on a batch of examples and returns an Iterable of + Predictions.""" + + if not batch: Review Comment: This should never be none or an empty list right? Not sure if we need this check. ########## sdks/python/apache_beam/ml/inference/pytorch_test.py: ########## @@ -0,0 +1,246 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import os +import shutil +import tempfile +import unittest +from collections import OrderedDict + +import numpy as np +import pytest + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +# Protect against environments where pytorch library is not available. +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + import torch + from apache_beam.ml.inference.api import PredictionResult + from apache_beam.ml.inference.base import RunInference + from apache_beam.ml.inference.pytorch import PytorchInferenceRunner + from apache_beam.ml.inference.pytorch import PytorchModelLoader +except ImportError: + raise unittest.SkipTest('PyTorch dependencies are not installed') + + +def _compare_prediction_result(a, b): + return ( + torch.equal(a.inference, b.inference) and + torch.equal(a.example, b.example)) + + +class PytorchLinearRegression(torch.nn.Module): + def __init__(self, input_dim, output_dim): + super().__init__() + self.linear = torch.nn.Linear(input_dim, output_dim) + + def forward(self, x): + out = self.linear(x) + return out + + +@pytest.mark.uses_pytorch +class PytorchRunInferenceTest(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + def test_inference_runner_single_tensor_feature(self): + examples = [ + torch.from_numpy(np.array([1], dtype="float32")), + torch.from_numpy(np.array([5], dtype="float32")), + torch.from_numpy(np.array([-3], dtype="float32")), + torch.from_numpy(np.array([10.0], dtype="float32")), + ] + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + examples, + torch.Tensor([example * 2.0 + 0.5 + for example in examples]).reshape(-1, 1)) + ] + + model = PytorchLinearRegression(input_dim=1, output_dim=1) + model.load_state_dict( + OrderedDict([('linear.weight', torch.Tensor([[2.0]])), + ('linear.bias', torch.Tensor([0.5]))])) + model.eval() + + inference_runner = PytorchInferenceRunner(torch.device('cpu')) + predictions = inference_runner.run_inference(examples, model) + for actual, expected in zip(predictions, expected_predictions): + self.assertTrue(_compare_prediction_result(actual, expected)) + + def test_inference_runner_multiple_tensor_features(self): + examples = torch.from_numpy( + np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5], + dtype="float32")).reshape(-1, 2) + examples = [ + torch.from_numpy(np.array([1, 5], dtype="float32")), + torch.from_numpy(np.array([3, 10], dtype="float32")), + torch.from_numpy(np.array([-14, 0], dtype="float32")), + torch.from_numpy(np.array([0.5, 0.5], dtype="float32")), + ] + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + examples, + torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5 + for f1, f2 in examples]).reshape(-1, 1)) + ] + + model = PytorchLinearRegression(input_dim=2, output_dim=1) + model.load_state_dict( + OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])), + ('linear.bias', torch.Tensor([0.5]))])) + model.eval() + + inference_runner = PytorchInferenceRunner(torch.device('cpu')) + predictions = inference_runner.run_inference(examples, model) + for actual, expected in zip(predictions, expected_predictions): + self.assertTrue(_compare_prediction_result(actual, expected)) + + def test_num_bytes(self): + inference_runner = PytorchInferenceRunner(torch.device('cpu')) + examples = torch.from_numpy( + np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5], + dtype="float32")).reshape(-1, 2) + self.assertEqual((examples[0].element_size()) * 8, + inference_runner.get_num_bytes(examples)) + + def test_namespace(self): + inference_runner = PytorchInferenceRunner(torch.device('cpu')) + self.assertEqual( + 'RunInferencePytorch', inference_runner.get_metrics_namespace()) + + def test_pipeline_local_model(self): + with TestPipeline() as pipeline: + examples = torch.from_numpy( + np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5], + dtype="float32")).reshape(-1, 2) + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + examples, + torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5 + for f1, f2 in examples]).reshape(-1, 1)) + ] + + state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])), + ('linear.bias', torch.Tensor([0.5]))]) + path = os.path.join(self.tmpdir, 'my_state_dict_path') + torch.save(state_dict, path) + + model_loader = PytorchModelLoader( + state_dict_path=path, + model_class=PytorchLinearRegression(input_dim=2, output_dim=1)) + + pcoll = pipeline | 'start' >> beam.Create(examples) + predictions = pcoll | RunInference(model_loader) + assert_that( + predictions, + equal_to(expected_predictions, equals_fn=_compare_prediction_result)) + + def test_pipeline_local_model_with_key(self): Review Comment: you might not need to worry about keys/no keys as much in these tests, since that is tested in the base runinference. ########## sdks/python/apache_beam/ml/inference/pytorch.py: ########## @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +from typing import Iterable +from typing import List + +import torch +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.api import PredictionResult +from apache_beam.ml.inference.base import InferenceRunner +from apache_beam.ml.inference.base import ModelLoader + + +class PytorchInferenceRunner(InferenceRunner): + """ + Implements Pytorch inference method + """ + def __init__(self, device: torch.device): + self._device = device + + def run_inference(self, batch: List[torch.Tensor], + model: torch.nn.Module) -> Iterable[torch.Tensor]: + """ + Runs inferences on a batch of examples and returns an Iterable of Review Comment: This is copied from the base inference. I suggest fixing docs here and above. Issue Time Tracking ------------------- Worklog Id: (was: 758027) Time Spent: 3h (was: 2h 50m) > Implement RunInference for PyTorch > ---------------------------------- > > Key: BEAM-13984 > URL: https://issues.apache.org/jira/browse/BEAM-13984 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Andy Ye > Assignee: Andy Ye > Priority: P2 > Labels: run-inference > Time Spent: 3h > Remaining Estimate: 0h > > Implement RunInference for PyTorch as described in the design doc > [https://s.apache.org/inference-sklearn-pytorch] > There will be a pytorch_impl.py file that contains PyTorchModelLoader and > PyTorchInferenceRunner classes. -- This message was sent by Atlassian Jira (v8.20.1#820001)