Hi Duarte, I commented on the Stack Overflow question. It looks like your to_dataframe and to_feather calls are outside of the Pipeline context, so they are being created _after_ the pipeline has already run. Hopefully moving them inside the Pipeline context will resolve the issue.
Brian On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo < duarteoca...@gmail.com> wrote: > Hi all! > > *Context: I posted > <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner> > this question before on stack overflow, but hoping to get more answers > here. * > > I'm trying to compute a sentence-transformers > <https://github.com/UKPLab/sentence-transformers> model for various rows > stored in BigQuery, and then store them in a feather dataframe in Google > Cloud Storage. > > However, I'm having problems in saving the actual dataframe. I'm not able > to save it locally or in Google Cloud Storage, but get no error. > > Here's a reproducible example I've come up with: > > import apache_beam as beam > from apache_beam.ml.inference.base import ( > ModelHandler, > PredictionResult, > RunInference, > ) > from sentence_transformers import SentenceTransformer > import argparse > from apache_beam.options.pipeline_options import PipelineOptions > from typing import Sequence, Optional, Any, Dict, Iterable > from apache_beam.ml.inference.base import KeyedModelHandler > from apache_beam.dataframe.convert import to_dataframe > > ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1" > > > class EmbeddingModelHandler( > ModelHandler[str, PredictionResult, SentenceTransformer] > ): > def __init__(self, model_name: str = ENCODING_MODEL_NAME): > self._model_name = model_name > > def load_model(self) -> SentenceTransformer: > from sentence_transformers import ( > SentenceTransformer, > ) # <- These imports are needed otherwise GCP complains > import sentence_transformers > > return sentence_transformers.SentenceTransformer(self._model_name) > > def run_inference( > self, > batch: Sequence[str], > model: SentenceTransformer, > inference_args: Optional[Dict[str, Any]] = None, > ) -> Iterable[PredictionResult]: > from sentence_transformers import SentenceTransformer > import sentence_transformers > > embedding_matrix = model.encode( > batch, show_progress_bar=True, normalize_embeddings=True > ) > > return embedding_matrix > > > class GetFeatures(beam.DoFn): > def process(self, element): > feature = element.get("overview", "") > iid = element.get("iid") > return [(iid, feature)] > > > def run(argv=None): > > parser = argparse.ArgumentParser() > parser.add_argument( > "--output", > dest="output", > required=True, > help="Output file to write results to.", > ) > known_args, pipeline_args = parser.parse_known_args(argv) > pipeline_options = PipelineOptions(pipeline_args) > > with beam.Pipeline(options=pipeline_options) as pipeline: > > embedding_dataframe = ( > pipeline > | "Read BigQuery" > >> beam.io.ReadFromBigQuery( > query="""SELECT text_to_embed, > identifier > FROM [gcp-project:gcp-dataset.gcp-table] > LIMIT 20 > """, > project="gcp-project", > gcs_location="gs://ml-apache-beam/tmp/", > ) > | "Get features" >> beam.ParDo(GetFeatures()) > | "Run inference" > >> RunInference( > KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME)) > ) > | "To Rows" > >> beam.Map( > lambda element: __import__("beam").Row( > biggint=int(element[0]), embedding=element[1].tolist() > ) > ) > ) > > df = to_dataframe(embedding_dataframe) > df.to_feather(known_args.output) > > > if __name__ == "__main__": > run() > > And my requirements.txt: > > sentence-transformers==2.2.2 > > With python 3.8.14 > > To run it locally, I use: > > python beam_pipeline.py --requirements_file requirements.txt --output > embedding_output.feather > > Which runs fine, but I see no embedding_output.feather in the directory. > > And to run it on GCP: > > python beam_pipeline.py --requirements_file requirements.txt --output > "gs://my-bucket/embedding_output.feather" --runner DataflowRunner --project > my-gcp-project --region us-central1 > > Also runs fine, but the gs://my-bucket/embedding_output.feather file is > not there as well. > > Iām probably doing something wrong. Would love your thoughts š > > Thanks! > > Duarte OC > > > > > > > > > > > > > > > >