Hi all! *Context: I posted <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner> this question before on stack overflow, but hoping to get more answers here. *
I'm trying to compute a sentence-transformers <https://github.com/UKPLab/sentence-transformers> model for various rows stored in BigQuery, and then store them in a feather dataframe in Google Cloud Storage. However, I'm having problems in saving the actual dataframe. I'm not able to save it locally or in Google Cloud Storage, but get no error. Here's a reproducible example I've come up with: import apache_beam as beam from apache_beam.ml.inference.base import ( ModelHandler, PredictionResult, RunInference, ) from sentence_transformers import SentenceTransformer import argparse from apache_beam.options.pipeline_options import PipelineOptions from typing import Sequence, Optional, Any, Dict, Iterable from apache_beam.ml.inference.base import KeyedModelHandler from apache_beam.dataframe.convert import to_dataframe ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1" class EmbeddingModelHandler( ModelHandler[str, PredictionResult, SentenceTransformer] ): def __init__(self, model_name: str = ENCODING_MODEL_NAME): self._model_name = model_name def load_model(self) -> SentenceTransformer: from sentence_transformers import ( SentenceTransformer, ) # <- These imports are needed otherwise GCP complains import sentence_transformers return sentence_transformers.SentenceTransformer(self._model_name) def run_inference( self, batch: Sequence[str], model: SentenceTransformer, inference_args: Optional[Dict[str, Any]] = None, ) -> Iterable[PredictionResult]: from sentence_transformers import SentenceTransformer import sentence_transformers embedding_matrix = model.encode( batch, show_progress_bar=True, normalize_embeddings=True ) return embedding_matrix class GetFeatures(beam.DoFn): def process(self, element): feature = element.get("overview", "") iid = element.get("iid") return [(iid, feature)] def run(argv=None): parser = argparse.ArgumentParser() parser.add_argument( "--output", dest="output", required=True, help="Output file to write results to.", ) known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) with beam.Pipeline(options=pipeline_options) as pipeline: embedding_dataframe = ( pipeline | "Read BigQuery" >> beam.io.ReadFromBigQuery( query="""SELECT text_to_embed, identifier FROM [gcp-project:gcp-dataset.gcp-table] LIMIT 20 """, project="gcp-project", gcs_location="gs://ml-apache-beam/tmp/", ) | "Get features" >> beam.ParDo(GetFeatures()) | "Run inference" >> RunInference( KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME)) ) | "To Rows" >> beam.Map( lambda element: __import__("beam").Row( biggint=int(element[0]), embedding=element[1].tolist() ) ) ) df = to_dataframe(embedding_dataframe) df.to_feather(known_args.output) if __name__ == "__main__": run() And my requirements.txt: sentence-transformers==2.2.2 With python 3.8.14 To run it locally, I use: python beam_pipeline.py --requirements_file requirements.txt --output embedding_output.feather Which runs fine, but I see no embedding_output.feather in the directory. And to run it on GCP: python beam_pipeline.py --requirements_file requirements.txt --output "gs://my-bucket/embedding_output.feather" --runner DataflowRunner --project my-gcp-project --region us-central1 Also runs fine, but the gs://my-bucket/embedding_output.feather file is not there as well. Iām probably doing something wrong. Would love your thoughts š Thanks! Duarte OC