Ah - that’s unfortunate. Will keep an eye on that issue! Thanks for all the help Brian - great work on Beam!
Duarte OC On 10 Nov 2022 at 19.13.33, Brian Hulette <bhule...@google.com> wrote: > > > On Thu, Nov 10, 2022 at 9:39 AM Duarte Oliveira e Carmo < > duarteoca...@gmail.com> wrote: > >> Thanks a lot for the help Brian, and for filling the bug about feather. >> >> Been using parquet and the pipeline is working perfectly. >> >> One extra question (if you’re willing): Is there any way to control the >> amount of sharing on the to_parquet call? (I tried adding n_shards = N, but >> seems to have no effect) >> > > Unfortunately not, but this is a good feature request. I filed (another) > issue for this: https://github.com/apache/beam/issues/24094 > >> >> For extra context, when saving a large parquet file, I wish we could >> control that it saves in 10 files instead of 1000 for example. This would >> make it easier to load back. >> >> Thanks for the help!! >> > > Of course! > >> >> On 10 Nov 2022 at 18.36.43, Brian Hulette <bhule...@google.com> wrote: >> >>> +user <user@beam.apache.org> (adding back the user list) >>> >>> On Thu, Nov 10, 2022 at 9:34 AM Brian Hulette <bhule...@google.com> >>> wrote: >>> >>>> Thanks, glad to hear that worked! >>>> >>>> The feather error looks like a bug, I filed an issue [1] to track it. I >>>> think using parquet instead of feather is the best workaround for now. >>>> >>>> [1] https://github.com/apache/beam/issues/24091 >>>> >>>> On Thu, Nov 10, 2022 at 1:15 AM Duarte Oliveira e Carmo < >>>> duarteoca...@gmail.com> wrote: >>>> >>>>> Hey Brian! >>>>> >>>>> Wow. You are absolutely correct!!!! >>>>> >>>>> I hate my brain ahaha. >>>>> >>>>> Ended up indenting the block. Beam was still giving me problems with >>>>> >>>>> ValueError: feather does not support serializing a non-default index for >>>>> the index; you can .reset_index() to make the index into column(s) >>>>> >>>>> >>>>> >>>>> And reset_index() is not parallelizable. >>>>> >>>>> So I ended up going with parquet. >>>>> >>>>> Thanks for the help Brian! Really appreciate it 🙂 >>>>> >>>>> Will also update stack overflow >>>>> >>>>> >>>>> Duarte O.Carmo >>>>> ML Engineer / Consultant >>>>> duarteocarmo.com >>>>> >>>>> >>>>> On 10 Nov 2022 at 01.41.29, Brian Hulette via user < >>>>> user@beam.apache.org> wrote: >>>>> >>>>>> Hi Duarte, >>>>>> >>>>>> I commented on the Stack Overflow question. It looks like your >>>>>> to_dataframe and to_feather calls are outside of the Pipeline context, so >>>>>> they are being created _after_ the pipeline has already run. Hopefully >>>>>> moving them inside the Pipeline context will resolve the issue. >>>>>> >>>>>> Brian >>>>>> >>>>>> On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo < >>>>>> duarteoca...@gmail.com> wrote: >>>>>> >>>>>>> Hi all! >>>>>>> >>>>>>> *Context: I posted >>>>>>> <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner> >>>>>>> this question before on stack overflow, but hoping to get more answers >>>>>>> here. * >>>>>>> >>>>>>> I'm trying to compute a sentence-transformers >>>>>>> <https://github.com/UKPLab/sentence-transformers> model for various >>>>>>> rows stored in BigQuery, and then store them in a feather dataframe >>>>>>> in Google Cloud Storage. >>>>>>> >>>>>>> However, I'm having problems in saving the actual dataframe. I'm not >>>>>>> able to save it locally or in Google Cloud Storage, but get no error. >>>>>>> >>>>>>> Here's a reproducible example I've come up with: >>>>>>> >>>>>>> import apache_beam as beam >>>>>>> from apache_beam.ml.inference.base import ( >>>>>>> ModelHandler, >>>>>>> PredictionResult, >>>>>>> RunInference, >>>>>>> ) >>>>>>> from sentence_transformers import SentenceTransformer >>>>>>> import argparse >>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>> from typing import Sequence, Optional, Any, Dict, Iterable >>>>>>> from apache_beam.ml.inference.base import KeyedModelHandler >>>>>>> from apache_beam.dataframe.convert import to_dataframe >>>>>>> >>>>>>> ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1" >>>>>>> >>>>>>> >>>>>>> class EmbeddingModelHandler( >>>>>>> ModelHandler[str, PredictionResult, SentenceTransformer] >>>>>>> ): >>>>>>> def __init__(self, model_name: str = ENCODING_MODEL_NAME): >>>>>>> self._model_name = model_name >>>>>>> >>>>>>> def load_model(self) -> SentenceTransformer: >>>>>>> from sentence_transformers import ( >>>>>>> SentenceTransformer, >>>>>>> ) # <- These imports are needed otherwise GCP complains >>>>>>> import sentence_transformers >>>>>>> >>>>>>> return >>>>>>> sentence_transformers.SentenceTransformer(self._model_name) >>>>>>> >>>>>>> def run_inference( >>>>>>> self, >>>>>>> batch: Sequence[str], >>>>>>> model: SentenceTransformer, >>>>>>> inference_args: Optional[Dict[str, Any]] = None, >>>>>>> ) -> Iterable[PredictionResult]: >>>>>>> from sentence_transformers import SentenceTransformer >>>>>>> import sentence_transformers >>>>>>> >>>>>>> embedding_matrix = model.encode( >>>>>>> batch, show_progress_bar=True, normalize_embeddings=True >>>>>>> ) >>>>>>> >>>>>>> return embedding_matrix >>>>>>> >>>>>>> >>>>>>> class GetFeatures(beam.DoFn): >>>>>>> def process(self, element): >>>>>>> feature = element.get("overview", "") >>>>>>> iid = element.get("iid") >>>>>>> return [(iid, feature)] >>>>>>> >>>>>>> >>>>>>> def run(argv=None): >>>>>>> >>>>>>> parser = argparse.ArgumentParser() >>>>>>> parser.add_argument( >>>>>>> "--output", >>>>>>> dest="output", >>>>>>> required=True, >>>>>>> help="Output file to write results to.", >>>>>>> ) >>>>>>> known_args, pipeline_args = parser.parse_known_args(argv) >>>>>>> pipeline_options = PipelineOptions(pipeline_args) >>>>>>> >>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>> >>>>>>> embedding_dataframe = ( >>>>>>> pipeline >>>>>>> | "Read BigQuery" >>>>>>> >> beam.io.ReadFromBigQuery( >>>>>>> query="""SELECT text_to_embed, >>>>>>> identifier >>>>>>> FROM >>>>>>> [gcp-project:gcp-dataset.gcp-table] >>>>>>> LIMIT 20 >>>>>>> """, >>>>>>> project="gcp-project", >>>>>>> gcs_location="gs://ml-apache-beam/tmp/", >>>>>>> ) >>>>>>> | "Get features" >> beam.ParDo(GetFeatures()) >>>>>>> | "Run inference" >>>>>>> >> RunInference( >>>>>>> >>>>>>> KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME)) >>>>>>> ) >>>>>>> | "To Rows" >>>>>>> >> beam.Map( >>>>>>> lambda element: __import__("beam").Row( >>>>>>> biggint=int(element[0]), >>>>>>> embedding=element[1].tolist() >>>>>>> ) >>>>>>> ) >>>>>>> ) >>>>>>> >>>>>>> df = to_dataframe(embedding_dataframe) >>>>>>> df.to_feather(known_args.output) >>>>>>> >>>>>>> >>>>>>> if __name__ == "__main__": >>>>>>> run() >>>>>>> >>>>>>> And my requirements.txt: >>>>>>> >>>>>>> sentence-transformers==2.2.2 >>>>>>> >>>>>>> With python 3.8.14 >>>>>>> >>>>>>> To run it locally, I use: >>>>>>> >>>>>>> python beam_pipeline.py --requirements_file requirements.txt >>>>>>> --output embedding_output.feather >>>>>>> >>>>>>> Which runs fine, but I see no embedding_output.feather in the >>>>>>> directory. >>>>>>> >>>>>>> And to run it on GCP: >>>>>>> >>>>>>> python beam_pipeline.py --requirements_file requirements.txt >>>>>>> --output "gs://my-bucket/embedding_output.feather" --runner >>>>>>> DataflowRunner --project my-gcp-project --region us-central1 >>>>>>> >>>>>>> Also runs fine, but the gs://my-bucket/embedding_output.feather file >>>>>>> is not there as well. >>>>>>> >>>>>>> I’m probably doing something wrong. Would love your thoughts 🙂 >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> Duarte OC >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>>