Thanks a lot for the help Brian, and for filling the bug about feather. Been using parquet and the pipeline is working perfectly.
One extra question (if youโre willing): Is there any way to control the amount of sharing on the to_parquet call? (I tried adding n_shards = N, but seems to have no effect) For extra context, when saving a large parquet file, I wish we could control that it saves in 10 files instead of 1000 for example. This would make it easier to load back. Thanks for the help!! On 10 Nov 2022 at 18.36.43, Brian Hulette <bhule...@google.com> wrote: > +user <user@beam.apache.org> (adding back the user list) > > On Thu, Nov 10, 2022 at 9:34 AM Brian Hulette <bhule...@google.com> wrote: > >> Thanks, glad to hear that worked! >> >> The feather error looks like a bug, I filed an issue [1] to track it. I >> think using parquet instead of feather is the best workaround for now. >> >> [1] https://github.com/apache/beam/issues/24091 >> >> On Thu, Nov 10, 2022 at 1:15 AM Duarte Oliveira e Carmo < >> duarteoca...@gmail.com> wrote: >> >>> Hey Brian! >>> >>> Wow. You are absolutely correct!!!! >>> >>> I hate my brain ahaha. >>> >>> Ended up indenting the block. Beam was still giving me problems with >>> >>> ValueError: feather does not support serializing a non-default index for >>> the index; you can .reset_index() to make the index into column(s) >>> >>> >>> >>> And reset_index() is not parallelizable. >>> >>> So I ended up going with parquet. >>> >>> Thanks for the help Brian! Really appreciate it ๐ >>> >>> Will also update stack overflow >>> >>> >>> Duarte O.Carmo >>> ML Engineer / Consultant >>> duarteocarmo.com >>> >>> >>> On 10 Nov 2022 at 01.41.29, Brian Hulette via user <user@beam.apache.org> >>> wrote: >>> >>>> Hi Duarte, >>>> >>>> I commented on the Stack Overflow question. It looks like your >>>> to_dataframe and to_feather calls are outside of the Pipeline context, so >>>> they are being created _after_ the pipeline has already run. Hopefully >>>> moving them inside the Pipeline context will resolve the issue. >>>> >>>> Brian >>>> >>>> On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo < >>>> duarteoca...@gmail.com> wrote: >>>> >>>>> Hi all! >>>>> >>>>> *Context: I posted >>>>> <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner> >>>>> this question before on stack overflow, but hoping to get more answers >>>>> here. * >>>>> >>>>> I'm trying to compute a sentence-transformers >>>>> <https://github.com/UKPLab/sentence-transformers> model for various >>>>> rows stored in BigQuery, and then store them in a feather dataframe >>>>> in Google Cloud Storage. >>>>> >>>>> However, I'm having problems in saving the actual dataframe. I'm not >>>>> able to save it locally or in Google Cloud Storage, but get no error. >>>>> >>>>> Here's a reproducible example I've come up with: >>>>> >>>>> import apache_beam as beam >>>>> from apache_beam.ml.inference.base import ( >>>>> ModelHandler, >>>>> PredictionResult, >>>>> RunInference, >>>>> ) >>>>> from sentence_transformers import SentenceTransformer >>>>> import argparse >>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>> from typing import Sequence, Optional, Any, Dict, Iterable >>>>> from apache_beam.ml.inference.base import KeyedModelHandler >>>>> from apache_beam.dataframe.convert import to_dataframe >>>>> >>>>> ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1" >>>>> >>>>> >>>>> class EmbeddingModelHandler( >>>>> ModelHandler[str, PredictionResult, SentenceTransformer] >>>>> ): >>>>> def __init__(self, model_name: str = ENCODING_MODEL_NAME): >>>>> self._model_name = model_name >>>>> >>>>> def load_model(self) -> SentenceTransformer: >>>>> from sentence_transformers import ( >>>>> SentenceTransformer, >>>>> ) # <- These imports are needed otherwise GCP complains >>>>> import sentence_transformers >>>>> >>>>> return sentence_transformers.SentenceTransformer(self._model_name) >>>>> >>>>> def run_inference( >>>>> self, >>>>> batch: Sequence[str], >>>>> model: SentenceTransformer, >>>>> inference_args: Optional[Dict[str, Any]] = None, >>>>> ) -> Iterable[PredictionResult]: >>>>> from sentence_transformers import SentenceTransformer >>>>> import sentence_transformers >>>>> >>>>> embedding_matrix = model.encode( >>>>> batch, show_progress_bar=True, normalize_embeddings=True >>>>> ) >>>>> >>>>> return embedding_matrix >>>>> >>>>> >>>>> class GetFeatures(beam.DoFn): >>>>> def process(self, element): >>>>> feature = element.get("overview", "") >>>>> iid = element.get("iid") >>>>> return [(iid, feature)] >>>>> >>>>> >>>>> def run(argv=None): >>>>> >>>>> parser = argparse.ArgumentParser() >>>>> parser.add_argument( >>>>> "--output", >>>>> dest="output", >>>>> required=True, >>>>> help="Output file to write results to.", >>>>> ) >>>>> known_args, pipeline_args = parser.parse_known_args(argv) >>>>> pipeline_options = PipelineOptions(pipeline_args) >>>>> >>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>> >>>>> embedding_dataframe = ( >>>>> pipeline >>>>> | "Read BigQuery" >>>>> >> beam.io.ReadFromBigQuery( >>>>> query="""SELECT text_to_embed, >>>>> identifier >>>>> FROM [gcp-project:gcp-dataset.gcp-table] >>>>> LIMIT 20 >>>>> """, >>>>> project="gcp-project", >>>>> gcs_location="gs://ml-apache-beam/tmp/", >>>>> ) >>>>> | "Get features" >> beam.ParDo(GetFeatures()) >>>>> | "Run inference" >>>>> >> RunInference( >>>>> >>>>> KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME)) >>>>> ) >>>>> | "To Rows" >>>>> >> beam.Map( >>>>> lambda element: __import__("beam").Row( >>>>> biggint=int(element[0]), embedding=element[1].tolist() >>>>> ) >>>>> ) >>>>> ) >>>>> >>>>> df = to_dataframe(embedding_dataframe) >>>>> df.to_feather(known_args.output) >>>>> >>>>> >>>>> if __name__ == "__main__": >>>>> run() >>>>> >>>>> And my requirements.txt: >>>>> >>>>> sentence-transformers==2.2.2 >>>>> >>>>> With python 3.8.14 >>>>> >>>>> To run it locally, I use: >>>>> >>>>> python beam_pipeline.py --requirements_file requirements.txt --output >>>>> embedding_output.feather >>>>> >>>>> Which runs fine, but I see no embedding_output.feather in the >>>>> directory. >>>>> >>>>> And to run it on GCP: >>>>> >>>>> python beam_pipeline.py --requirements_file requirements.txt --output >>>>> "gs://my-bucket/embedding_output.feather" --runner DataflowRunner >>>>> --project my-gcp-project --region us-central1 >>>>> >>>>> Also runs fine, but the gs://my-bucket/embedding_output.feather file >>>>> is not there as well. >>>>> >>>>> Iโm probably doing something wrong. Would love your thoughts ๐ >>>>> >>>>> Thanks! >>>>> >>>>> Duarte OC >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>>