Hi Duarte,

I commented on the Stack Overflow question. It looks like your to_dataframe
and to_feather calls are outside of the Pipeline context, so they are being
created _after_ the pipeline has already run. Hopefully moving them inside
the Pipeline context will resolve the issue.

Brian

On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo <
duarteoca...@gmail.com> wrote:

> Hi all!
>
> *Context: I posted
> <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner>
> this question before on stack overflow, but hoping to get more answers
> here.  *
>
> I'm trying to compute a sentence-transformers
> <https://github.com/UKPLab/sentence-transformers> model for various rows
> stored in BigQuery, and then store them in a feather dataframe in Google
> Cloud Storage.
>
> However, I'm having problems in saving the actual dataframe. I'm not able
> to save it locally or in Google Cloud Storage, but get no error.
>
> Here's a reproducible example I've come up with:
>
> import apache_beam as beam
> from apache_beam.ml.inference.base import (
>     ModelHandler,
>     PredictionResult,
>     RunInference,
> )
> from sentence_transformers import SentenceTransformer
> import argparse
> from apache_beam.options.pipeline_options import PipelineOptions
> from typing import Sequence, Optional, Any, Dict, Iterable
> from apache_beam.ml.inference.base import KeyedModelHandler
> from apache_beam.dataframe.convert import to_dataframe
>
> ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1"
>
>
> class EmbeddingModelHandler(
>     ModelHandler[str, PredictionResult, SentenceTransformer]
> ):
>     def __init__(self, model_name: str = ENCODING_MODEL_NAME):
>         self._model_name = model_name
>
>     def load_model(self) -> SentenceTransformer:
>         from sentence_transformers import (
>             SentenceTransformer,
>         )  # <- These imports are needed otherwise GCP complains
>         import sentence_transformers
>
>         return sentence_transformers.SentenceTransformer(self._model_name)
>
>     def run_inference(
>         self,
>         batch: Sequence[str],
>         model: SentenceTransformer,
>         inference_args: Optional[Dict[str, Any]] = None,
>     ) -> Iterable[PredictionResult]:
>         from sentence_transformers import SentenceTransformer
>         import sentence_transformers
>
>         embedding_matrix = model.encode(
>             batch, show_progress_bar=True, normalize_embeddings=True
>         )
>
>         return embedding_matrix
>
>
> class GetFeatures(beam.DoFn):
>     def process(self, element):
>         feature = element.get("overview", "")
>         iid = element.get("iid")
>         return [(iid, feature)]
>
>
> def run(argv=None):
>
>     parser = argparse.ArgumentParser()
>     parser.add_argument(
>         "--output",
>         dest="output",
>         required=True,
>         help="Output file to write results to.",
>     )
>     known_args, pipeline_args = parser.parse_known_args(argv)
>     pipeline_options = PipelineOptions(pipeline_args)
>
>     with beam.Pipeline(options=pipeline_options) as pipeline:
>
>         embedding_dataframe = (
>             pipeline
>             | "Read BigQuery"
>             >> beam.io.ReadFromBigQuery(
>                 query="""SELECT text_to_embed,
>                                 identifier
>                                 FROM  [gcp-project:gcp-dataset.gcp-table]
>                                 LIMIT 20
>                                 """,
>                 project="gcp-project",
>                 gcs_location="gs://ml-apache-beam/tmp/",
>             )
>             | "Get features" >> beam.ParDo(GetFeatures())
>             | "Run inference"
>             >> RunInference(
>                 KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME))
>             )
>             | "To Rows"
>             >> beam.Map(
>                 lambda element: __import__("beam").Row(
>                     biggint=int(element[0]), embedding=element[1].tolist()
>                 )
>             )
>         )
>
>     df = to_dataframe(embedding_dataframe)
>     df.to_feather(known_args.output)
>
>
> if __name__ == "__main__":
>     run()
>
> And my requirements.txt:
>
> sentence-transformers==2.2.2
>
> With python 3.8.14
>
> To run it locally, I use:
>
> python beam_pipeline.py   --requirements_file requirements.txt  --output 
> embedding_output.feather
>
> Which runs fine, but I see no embedding_output.feather in the directory.
>
> And to run it on GCP:
>
> python beam_pipeline.py   --requirements_file requirements.txt  --output 
> "gs://my-bucket/embedding_output.feather" --runner DataflowRunner --project 
> my-gcp-project --region us-central1
>
> Also runs fine, but the gs://my-bucket/embedding_output.feather file is
> not there as well.
>
> I’m probably doing something wrong. Would love your thoughts šŸ™‚
>
> Thanks!
>
> Duarte OC
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to