[ 
https://issues.apache.org/jira/browse/BEAM-14068?focusedWorklogId=778242&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-778242
 ]

ASF GitHub Bot logged work on BEAM-14068:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Jun/22 18:06
            Start Date: 03/Jun/22 18:06
    Worklog Time Spent: 10m 
      Work Description: AnandInguva commented on code in PR #17462:
URL: https://github.com/apache/beam/pull/17462#discussion_r889212195


##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on 
imagenet dataset"""  # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+               path_to_dir: str = None) -> Tuple[str, Image.Image]:
+  if path_to_dir is not None:
+    image_file_name = os.path.join(path_to_dir, image_file_name)
+  with FileSystems().open(image_file_name, 'r') as file:
+    data = Image.open(io.BytesIO(file.read())).convert('RGB')
+    return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+  image_size = (224, 224)
+  # to use models in torch with imagenet weights,
+  # normalize the images using the below values.
+  # ref: https://pytorch.org/vision/stable/models.html#
+  normalize = transforms.Normalize(
+      mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+  transform = transforms.Compose([
+      transforms.Resize(image_size),
+      transforms.ToTensor(),
+      normalize,
+  ])
+  return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
+  ) -> Iterable[str]:
+    filename, prediction_result = element
+    prediction = torch.argmax(prediction_result.inference, dim=0)
+    yield filename + ',' + str(prediction.item())
+
+
+def run_pipeline(options: PipelineOptions, args=None):
+  """Sets up PyTorch RunInference pipeline"""
+  # reference to the class definition of the model.
+  model_class = MobileNetV2
+  # params for model class constructor. These values will be used in
+  # RunInference API to instantiate the model object.
+  model_params = {'num_classes': 1000}  # imagenet has 1000 classes.
+  # for this example, the pretrained weights are downloaded from
+  # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth";
+  # and saved on GCS bucket 
gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt,
+  # which will be used to load the model state_dict in the RunInference API.
+  model_loader = PytorchModelLoader(
+      state_dict_path=args.model_state_dict_path,
+      model_class=model_class,
+      model_params=model_params)
+  with beam.Pipeline(options=options) as p:
+    filename_value_pair = (
+        p
+        | 'Read from csv file' >> beam.io.ReadFromText(
+            args.input, skip_header_lines=1)
+        | 'Parse and read files from the input_file' >> beam.Map(
+            partial(read_image, path_to_dir=args.images_dir))
+        | 'Preprocess images' >> beam.MapTuple(
+            lambda file_name, data: (file_name, preprocess_image(data))))
+    predictions = (
+        filename_value_pair
+        | 'PyTorch RunInference' >> RunInference(model_loader)
+        | 'Process output' >> beam.ParDo(PostProcessor()))
+
+    if args.output:
+      predictions | "Write output to GCS" >> beam.io.WriteToText( # pylint: 
disable=expression-not-assigned
+        args.output,
+        shard_name_template='',
+        append_trailing_newlines=True)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      required=True,
+      help='Path to the CSV file containing image names')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Predictions are saved to the output'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      required=True,
+      help='Path to load the model.')

Review Comment:
   Adde model_class and model_params as input argument to the run() method. In 
this way, this example can be used to run different models that can predict on 
Imagenet Data



##########
sdks/python/apache_beam/examples/inference/pytorch_image_classification.py:
##########
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline that uses RunInference API to perform classification task on 
imagenet dataset"""  # pylint: disable=line-too-long
+
+import argparse
+import io
+import os
+from functools import partial
+from typing import Any
+from typing import Iterable
+from typing import Tuple
+from typing import Union
+
+import apache_beam as beam
+import torch
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.api import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from PIL import Image
+from torchvision import transforms
+from torchvision.models.mobilenetv2 import MobileNetV2
+
+
+def read_image(image_file_name: str,
+               path_to_dir: str = None) -> Tuple[str, Image.Image]:
+  if path_to_dir is not None:
+    image_file_name = os.path.join(path_to_dir, image_file_name)
+  with FileSystems().open(image_file_name, 'r') as file:
+    data = Image.open(io.BytesIO(file.read())).convert('RGB')
+    return image_file_name, data
+
+
+def preprocess_image(data: Image) -> torch.Tensor:
+  image_size = (224, 224)
+  # to use models in torch with imagenet weights,
+  # normalize the images using the below values.
+  # ref: https://pytorch.org/vision/stable/models.html#
+  normalize = transforms.Normalize(
+      mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
+  transform = transforms.Compose([
+      transforms.Resize(image_size),
+      transforms.ToTensor(),
+      normalize,
+  ])
+  return transform(data)
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Union[PredictionResult, Tuple[Any, PredictionResult]]
+  ) -> Iterable[str]:
+    filename, prediction_result = element
+    prediction = torch.argmax(prediction_result.inference, dim=0)
+    yield filename + ',' + str(prediction.item())
+
+
+def run_pipeline(options: PipelineOptions, args=None):
+  """Sets up PyTorch RunInference pipeline"""
+  # reference to the class definition of the model.
+  model_class = MobileNetV2
+  # params for model class constructor. These values will be used in
+  # RunInference API to instantiate the model object.
+  model_params = {'num_classes': 1000}  # imagenet has 1000 classes.
+  # for this example, the pretrained weights are downloaded from
+  # "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth";
+  # and saved on GCS bucket 
gs://apache-beam-ml/models/imagenet_classification_mobilenet_v2.pt,
+  # which will be used to load the model state_dict in the RunInference API.
+  model_loader = PytorchModelLoader(
+      state_dict_path=args.model_state_dict_path,
+      model_class=model_class,
+      model_params=model_params)
+  with beam.Pipeline(options=options) as p:
+    filename_value_pair = (
+        p
+        | 'Read from csv file' >> beam.io.ReadFromText(
+            args.input, skip_header_lines=1)
+        | 'Parse and read files from the input_file' >> beam.Map(
+            partial(read_image, path_to_dir=args.images_dir))
+        | 'Preprocess images' >> beam.MapTuple(
+            lambda file_name, data: (file_name, preprocess_image(data))))
+    predictions = (
+        filename_value_pair
+        | 'PyTorch RunInference' >> RunInference(model_loader)
+        | 'Process output' >> beam.ParDo(PostProcessor()))
+
+    if args.output:
+      predictions | "Write output to GCS" >> beam.io.WriteToText( # pylint: 
disable=expression-not-assigned
+        args.output,
+        shard_name_template='',
+        append_trailing_newlines=True)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      required=True,
+      help='Path to the CSV file containing image names')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Predictions are saved to the output'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      required=True,
+      help='Path to load the model.')

Review Comment:
   Added model_class and model_params as input argument to the run() method. In 
this way, this example can be used to run different models that can predict on 
Imagenet Data





Issue Time Tracking
-------------------

    Worklog Id:     (was: 778242)
    Time Spent: 9h 20m  (was: 9h 10m)

> RunInference Benchmarking tests
> -------------------------------
>
>                 Key: BEAM-14068
>                 URL: https://issues.apache.org/jira/browse/BEAM-14068
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Anand Inguva
>            Assignee: Anand Inguva
>            Priority: P2
>          Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> RunInference benchmarks will evaluate performance of Pipelines, which 
> represent common use cases of Beam + Dataflow in Pytorch, sklearn and 
> possibly TFX. These benchmarks would be the integration tests that exercise 
> several software components using Beam, PyTorch, Scikit learn and TensorFlow 
> extended.
> we would use the datasets that's available publicly (Eg; Kaggle). 
> Size: small / 10 GB / 1 TB etc
> The default execution runner would be Dataflow unless specified otherwise.
> These tests would be run very less frequently(every release cycle).  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to