[ https://issues.apache.org/jira/browse/BEAM-14068?focusedWorklogId=778242&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-778242 ]
ASF GitHub Bot logged work on BEAM-14068: ----------------------------------------- Author: ASF GitHub Bot Created on: 03/Jun/22 18:06 Start Date: 03/Jun/22 18:06 Worklog Time Spent: 10m Work Description: AnandInguva commented on code in PR #17462: URL: https://github.com/apache/beam/pull/17462#discussion_r889212195 ########## sdks/python/apache_beam/examples/inference/pytorch_image_classification.py: ########## @@ -0,0 +1,146 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Pipeline that uses RunInference API to perform classification task on imagenet dataset""" # pylint: disable=line-too-long + +import argparse +import io +import os +from functools import partial +from typing import Any +from typing import Iterable +from typing import Tuple +from typing import Union + +import apache_beam as beam +import torch +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.api import PredictionResult +from apache_beam.ml.inference.api import RunInference +from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from PIL import Image +from torchvision import transforms +from torchvision.models.mobilenetv2 import MobileNetV2 + + +def read_image(image_file_name: str, + path_to_dir: str = None) -> Tuple[str, Image.Image]: + if path_to_dir is not None: + image_file_name = os.path.join(path_to_dir, image_file_name) + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + return image_file_name, data + + +def preprocess_image(data: Image) -> torch.Tensor: + image_size = (224, 224) + # to use models in torch with imagenet weights, + # normalize the images using the below values. + # ref: https://pytorch.org/vision/stable/models.html# + normalize = transforms.Normalize( + mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + transform = transforms.Compose([ + transforms.Resize(image_size), + transforms.ToTensor(), + normalize, + ]) + return transform(data) + + +class PostProcessor(beam.DoFn): + def process( + self, element: Union[PredictionResult, Tuple[Any, PredictionResult]] + ) -> Iterable[str]: + filename, prediction_result = element + prediction = torch.argmax(prediction_result.inference, dim=0) + yield filename + ',' + str(prediction.item()) + + +def run_pipeline(options: PipelineOptions, args=None): + """Sets up PyTorch RunInference pipeline""" + # reference to the class definition of the model. + model_class = MobileNetV2 + # params for model class constructor. These values will be used in + # RunInference API to instantiate the model object. + model_params = {'num_classes': 1000} # imagenet has 1000 classes. + # for this example, the pretrained weights are downloaded from + # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth" + # and saved on GCS bucket gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt, + # which will be used to load the model state_dict in the RunInference API. + model_loader = PytorchModelLoader( + state_dict_path=args.model_state_dict_path, + model_class=model_class, + model_params=model_params) + with beam.Pipeline(options=options) as p: + filename_value_pair = ( + p + | 'Read from csv file' >> beam.io.ReadFromText( + args.input, skip_header_lines=1) + | 'Parse and read files from the input_file' >> beam.Map( + partial(read_image, path_to_dir=args.images_dir)) + | 'Preprocess images' >> beam.MapTuple( + lambda file_name, data: (file_name, preprocess_image(data)))) + predictions = ( + filename_value_pair + | 'PyTorch RunInference' >> RunInference(model_loader) + | 'Process output' >> beam.ParDo(PostProcessor())) + + if args.output: + predictions | "Write output to GCS" >> beam.io.WriteToText( # pylint: disable=expression-not-assigned + args.output, + shard_name_template='', + append_trailing_newlines=True) + + +def parse_known_args(argv): + """Parses args for the workflow.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + dest='input', + required=True, + help='Path to the CSV file containing image names') + parser.add_argument( + '--output', + dest='output', + help='Predictions are saved to the output' + ' text file.') + parser.add_argument( + '--model_state_dict_path', + dest='model_state_dict_path', + required=True, + help='Path to load the model.') Review Comment: Adde model_class and model_params as input argument to the run() method. In this way, this example can be used to run different models that can predict on Imagenet Data ########## sdks/python/apache_beam/examples/inference/pytorch_image_classification.py: ########## @@ -0,0 +1,146 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Pipeline that uses RunInference API to perform classification task on imagenet dataset""" # pylint: disable=line-too-long + +import argparse +import io +import os +from functools import partial +from typing import Any +from typing import Iterable +from typing import Tuple +from typing import Union + +import apache_beam as beam +import torch +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.api import PredictionResult +from apache_beam.ml.inference.api import RunInference +from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from PIL import Image +from torchvision import transforms +from torchvision.models.mobilenetv2 import MobileNetV2 + + +def read_image(image_file_name: str, + path_to_dir: str = None) -> Tuple[str, Image.Image]: + if path_to_dir is not None: + image_file_name = os.path.join(path_to_dir, image_file_name) + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + return image_file_name, data + + +def preprocess_image(data: Image) -> torch.Tensor: + image_size = (224, 224) + # to use models in torch with imagenet weights, + # normalize the images using the below values. + # ref: https://pytorch.org/vision/stable/models.html# + normalize = transforms.Normalize( + mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + transform = transforms.Compose([ + transforms.Resize(image_size), + transforms.ToTensor(), + normalize, + ]) + return transform(data) + + +class PostProcessor(beam.DoFn): + def process( + self, element: Union[PredictionResult, Tuple[Any, PredictionResult]] + ) -> Iterable[str]: + filename, prediction_result = element + prediction = torch.argmax(prediction_result.inference, dim=0) + yield filename + ',' + str(prediction.item()) + + +def run_pipeline(options: PipelineOptions, args=None): + """Sets up PyTorch RunInference pipeline""" + # reference to the class definition of the model. + model_class = MobileNetV2 + # params for model class constructor. These values will be used in + # RunInference API to instantiate the model object. + model_params = {'num_classes': 1000} # imagenet has 1000 classes. + # for this example, the pretrained weights are downloaded from + # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth" + # and saved on GCS bucket gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt, + # which will be used to load the model state_dict in the RunInference API. + model_loader = PytorchModelLoader( + state_dict_path=args.model_state_dict_path, + model_class=model_class, + model_params=model_params) + with beam.Pipeline(options=options) as p: + filename_value_pair = ( + p + | 'Read from csv file' >> beam.io.ReadFromText( + args.input, skip_header_lines=1) + | 'Parse and read files from the input_file' >> beam.Map( + partial(read_image, path_to_dir=args.images_dir)) + | 'Preprocess images' >> beam.MapTuple( + lambda file_name, data: (file_name, preprocess_image(data)))) + predictions = ( + filename_value_pair + | 'PyTorch RunInference' >> RunInference(model_loader) + | 'Process output' >> beam.ParDo(PostProcessor())) + + if args.output: + predictions | "Write output to GCS" >> beam.io.WriteToText( # pylint: disable=expression-not-assigned + args.output, + shard_name_template='', + append_trailing_newlines=True) + + +def parse_known_args(argv): + """Parses args for the workflow.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + dest='input', + required=True, + help='Path to the CSV file containing image names') + parser.add_argument( + '--output', + dest='output', + help='Predictions are saved to the output' + ' text file.') + parser.add_argument( + '--model_state_dict_path', + dest='model_state_dict_path', + required=True, + help='Path to load the model.') Review Comment: Added model_class and model_params as input argument to the run() method. In this way, this example can be used to run different models that can predict on Imagenet Data Issue Time Tracking ------------------- Worklog Id: (was: 778242) Time Spent: 9h 20m (was: 9h 10m) > RunInference Benchmarking tests > ------------------------------- > > Key: BEAM-14068 > URL: https://issues.apache.org/jira/browse/BEAM-14068 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Anand Inguva > Assignee: Anand Inguva > Priority: P2 > Time Spent: 9h 20m > Remaining Estimate: 0h > > RunInference benchmarks will evaluate performance of Pipelines, which > represent common use cases of Beam + Dataflow in Pytorch, sklearn and > possibly TFX. These benchmarks would be the integration tests that exercise > several software components using Beam, PyTorch, Scikit learn and TensorFlow > extended. > we would use the datasets that's available publicly (Eg; Kaggle). > Size: small / 10 GB / 1 TB etc > The default execution runner would be Dataflow unless specified otherwise. > These tests would be run very less frequently(every release cycle). -- This message was sent by Atlassian Jira (v8.20.7#820007)