+user <user@beam.apache.org> (adding back the user list) On Thu, Nov 10, 2022 at 9:34 AM Brian Hulette <bhule...@google.com> wrote:
> Thanks, glad to hear that worked! > > The feather error looks like a bug, I filed an issue [1] to track it. I > think using parquet instead of feather is the best workaround for now. > > [1] https://github.com/apache/beam/issues/24091 > > On Thu, Nov 10, 2022 at 1:15 AM Duarte Oliveira e Carmo < > duarteoca...@gmail.com> wrote: > >> Hey Brian! >> >> Wow. You are absolutely correct!!!! >> >> I hate my brain ahaha. >> >> Ended up indenting the block. Beam was still giving me problems with >> >> ValueError: feather does not support serializing a non-default index for the >> index; you can .reset_index() to make the index into column(s) >> >> >> >> And reset_index() is not parallelizable. >> >> So I ended up going with parquet. >> >> Thanks for the help Brian! Really appreciate it š >> >> Will also update stack overflow >> >> >> Duarte O.Carmo >> ML Engineer / Consultant >> duarteocarmo.com >> >> >> On 10 Nov 2022 at 01.41.29, Brian Hulette via user <user@beam.apache.org> >> wrote: >> >>> Hi Duarte, >>> >>> I commented on the Stack Overflow question. It looks like your >>> to_dataframe and to_feather calls are outside of the Pipeline context, so >>> they are being created _after_ the pipeline has already run. Hopefully >>> moving them inside the Pipeline context will resolve the issue. >>> >>> Brian >>> >>> On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo < >>> duarteoca...@gmail.com> wrote: >>> >>>> Hi all! >>>> >>>> *Context: I posted >>>> <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner> >>>> this question before on stack overflow, but hoping to get more answers >>>> here. * >>>> >>>> I'm trying to compute a sentence-transformers >>>> <https://github.com/UKPLab/sentence-transformers> model for various >>>> rows stored in BigQuery, and then store them in a feather dataframe in >>>> Google Cloud Storage. >>>> >>>> However, I'm having problems in saving the actual dataframe. I'm not >>>> able to save it locally or in Google Cloud Storage, but get no error. >>>> >>>> Here's a reproducible example I've come up with: >>>> >>>> import apache_beam as beam >>>> from apache_beam.ml.inference.base import ( >>>> ModelHandler, >>>> PredictionResult, >>>> RunInference, >>>> ) >>>> from sentence_transformers import SentenceTransformer >>>> import argparse >>>> from apache_beam.options.pipeline_options import PipelineOptions >>>> from typing import Sequence, Optional, Any, Dict, Iterable >>>> from apache_beam.ml.inference.base import KeyedModelHandler >>>> from apache_beam.dataframe.convert import to_dataframe >>>> >>>> ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1" >>>> >>>> >>>> class EmbeddingModelHandler( >>>> ModelHandler[str, PredictionResult, SentenceTransformer] >>>> ): >>>> def __init__(self, model_name: str = ENCODING_MODEL_NAME): >>>> self._model_name = model_name >>>> >>>> def load_model(self) -> SentenceTransformer: >>>> from sentence_transformers import ( >>>> SentenceTransformer, >>>> ) # <- These imports are needed otherwise GCP complains >>>> import sentence_transformers >>>> >>>> return sentence_transformers.SentenceTransformer(self._model_name) >>>> >>>> def run_inference( >>>> self, >>>> batch: Sequence[str], >>>> model: SentenceTransformer, >>>> inference_args: Optional[Dict[str, Any]] = None, >>>> ) -> Iterable[PredictionResult]: >>>> from sentence_transformers import SentenceTransformer >>>> import sentence_transformers >>>> >>>> embedding_matrix = model.encode( >>>> batch, show_progress_bar=True, normalize_embeddings=True >>>> ) >>>> >>>> return embedding_matrix >>>> >>>> >>>> class GetFeatures(beam.DoFn): >>>> def process(self, element): >>>> feature = element.get("overview", "") >>>> iid = element.get("iid") >>>> return [(iid, feature)] >>>> >>>> >>>> def run(argv=None): >>>> >>>> parser = argparse.ArgumentParser() >>>> parser.add_argument( >>>> "--output", >>>> dest="output", >>>> required=True, >>>> help="Output file to write results to.", >>>> ) >>>> known_args, pipeline_args = parser.parse_known_args(argv) >>>> pipeline_options = PipelineOptions(pipeline_args) >>>> >>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>> >>>> embedding_dataframe = ( >>>> pipeline >>>> | "Read BigQuery" >>>> >> beam.io.ReadFromBigQuery( >>>> query="""SELECT text_to_embed, >>>> identifier >>>> FROM [gcp-project:gcp-dataset.gcp-table] >>>> LIMIT 20 >>>> """, >>>> project="gcp-project", >>>> gcs_location="gs://ml-apache-beam/tmp/", >>>> ) >>>> | "Get features" >> beam.ParDo(GetFeatures()) >>>> | "Run inference" >>>> >> RunInference( >>>> >>>> KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME)) >>>> ) >>>> | "To Rows" >>>> >> beam.Map( >>>> lambda element: __import__("beam").Row( >>>> biggint=int(element[0]), embedding=element[1].tolist() >>>> ) >>>> ) >>>> ) >>>> >>>> df = to_dataframe(embedding_dataframe) >>>> df.to_feather(known_args.output) >>>> >>>> >>>> if __name__ == "__main__": >>>> run() >>>> >>>> And my requirements.txt: >>>> >>>> sentence-transformers==2.2.2 >>>> >>>> With python 3.8.14 >>>> >>>> To run it locally, I use: >>>> >>>> python beam_pipeline.py --requirements_file requirements.txt --output >>>> embedding_output.feather >>>> >>>> Which runs fine, but I see no embedding_output.feather in the >>>> directory. >>>> >>>> And to run it on GCP: >>>> >>>> python beam_pipeline.py --requirements_file requirements.txt --output >>>> "gs://my-bucket/embedding_output.feather" --runner DataflowRunner >>>> --project my-gcp-project --region us-central1 >>>> >>>> Also runs fine, but the gs://my-bucket/embedding_output.feather file >>>> is not there as well. >>>> >>>> Iām probably doing something wrong. Would love your thoughts š >>>> >>>> Thanks! >>>> >>>> Duarte OC >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>