Hi all!

*Context: I posted
<https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner>
this question before on stack overflow, but hoping to get more answers
here.  *

I'm trying to compute a sentence-transformers
<https://github.com/UKPLab/sentence-transformers> model for various rows
stored in BigQuery, and then store them in a feather dataframe in Google
Cloud Storage.

However, I'm having problems in saving the actual dataframe. I'm not able
to save it locally or in Google Cloud Storage, but get no error.

Here's a reproducible example I've come up with:

import apache_beam as beam
from apache_beam.ml.inference.base import (
    ModelHandler,
    PredictionResult,
    RunInference,
)
from sentence_transformers import SentenceTransformer
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from typing import Sequence, Optional, Any, Dict, Iterable
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.dataframe.convert import to_dataframe

ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1"


class EmbeddingModelHandler(
    ModelHandler[str, PredictionResult, SentenceTransformer]
):
    def __init__(self, model_name: str = ENCODING_MODEL_NAME):
        self._model_name = model_name

    def load_model(self) -> SentenceTransformer:
        from sentence_transformers import (
            SentenceTransformer,
        )  # <- These imports are needed otherwise GCP complains
        import sentence_transformers

        return sentence_transformers.SentenceTransformer(self._model_name)

    def run_inference(
        self,
        batch: Sequence[str],
        model: SentenceTransformer,
        inference_args: Optional[Dict[str, Any]] = None,
    ) -> Iterable[PredictionResult]:
        from sentence_transformers import SentenceTransformer
        import sentence_transformers

        embedding_matrix = model.encode(
            batch, show_progress_bar=True, normalize_embeddings=True
        )

        return embedding_matrix


class GetFeatures(beam.DoFn):
    def process(self, element):
        feature = element.get("overview", "")
        iid = element.get("iid")
        return [(iid, feature)]


def run(argv=None):

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--output",
        dest="output",
        required=True,
        help="Output file to write results to.",
    )
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)

    with beam.Pipeline(options=pipeline_options) as pipeline:

        embedding_dataframe = (
            pipeline
            | "Read BigQuery"
            >> beam.io.ReadFromBigQuery(
                query="""SELECT text_to_embed,
                                identifier
                                FROM  [gcp-project:gcp-dataset.gcp-table]
                                LIMIT 20
                                """,
                project="gcp-project",
                gcs_location="gs://ml-apache-beam/tmp/",
            )
            | "Get features" >> beam.ParDo(GetFeatures())
            | "Run inference"
            >> RunInference(
                KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME))
            )
            | "To Rows"
            >> beam.Map(
                lambda element: __import__("beam").Row(
                    biggint=int(element[0]), embedding=element[1].tolist()
                )
            )
        )

    df = to_dataframe(embedding_dataframe)
    df.to_feather(known_args.output)


if __name__ == "__main__":
    run()

And my requirements.txt:

sentence-transformers==2.2.2

With python 3.8.14

To run it locally, I use:

python beam_pipeline.py   --requirements_file requirements.txt
--output embedding_output.feather

Which runs fine, but I see no embedding_output.feather in the directory.

And to run it on GCP:

python beam_pipeline.py   --requirements_file requirements.txt
--output "gs://my-bucket/embedding_output.feather" --runner
DataflowRunner --project my-gcp-project --region us-central1

Also runs fine, but the gs://my-bucket/embedding_output.feather file is not
there as well.

I’m probably doing something wrong. Would love your thoughts šŸ™‚

Thanks!

Duarte OC

Reply via email to