On Thu, Nov 10, 2022 at 9:39 AM Duarte Oliveira e Carmo < duarteoca...@gmail.com> wrote:
> Thanks a lot for the help Brian, and for filling the bug about feather. > > Been using parquet and the pipeline is working perfectly. > > One extra question (if youโre willing): Is there any way to control the > amount of sharing on the to_parquet call? (I tried adding n_shards = N, but > seems to have no effect) > Unfortunately not, but this is a good feature request. I filed (another) issue for this: https://github.com/apache/beam/issues/24094 > > For extra context, when saving a large parquet file, I wish we could > control that it saves in 10 files instead of 1000 for example. This would > make it easier to load back. > > Thanks for the help!! > Of course! > > On 10 Nov 2022 at 18.36.43, Brian Hulette <bhule...@google.com> wrote: > >> +user <user@beam.apache.org> (adding back the user list) >> >> On Thu, Nov 10, 2022 at 9:34 AM Brian Hulette <bhule...@google.com> >> wrote: >> >>> Thanks, glad to hear that worked! >>> >>> The feather error looks like a bug, I filed an issue [1] to track it. I >>> think using parquet instead of feather is the best workaround for now. >>> >>> [1] https://github.com/apache/beam/issues/24091 >>> >>> On Thu, Nov 10, 2022 at 1:15 AM Duarte Oliveira e Carmo < >>> duarteoca...@gmail.com> wrote: >>> >>>> Hey Brian! >>>> >>>> Wow. You are absolutely correct!!!! >>>> >>>> I hate my brain ahaha. >>>> >>>> Ended up indenting the block. Beam was still giving me problems with >>>> >>>> ValueError: feather does not support serializing a non-default index for >>>> the index; you can .reset_index() to make the index into column(s) >>>> >>>> >>>> >>>> And reset_index() is not parallelizable. >>>> >>>> So I ended up going with parquet. >>>> >>>> Thanks for the help Brian! Really appreciate it ๐ >>>> >>>> Will also update stack overflow >>>> >>>> >>>> Duarte O.Carmo >>>> ML Engineer / Consultant >>>> duarteocarmo.com >>>> >>>> >>>> On 10 Nov 2022 at 01.41.29, Brian Hulette via user < >>>> user@beam.apache.org> wrote: >>>> >>>>> Hi Duarte, >>>>> >>>>> I commented on the Stack Overflow question. It looks like your >>>>> to_dataframe and to_feather calls are outside of the Pipeline context, so >>>>> they are being created _after_ the pipeline has already run. Hopefully >>>>> moving them inside the Pipeline context will resolve the issue. >>>>> >>>>> Brian >>>>> >>>>> On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo < >>>>> duarteoca...@gmail.com> wrote: >>>>> >>>>>> Hi all! >>>>>> >>>>>> *Context: I posted >>>>>> <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner> >>>>>> this question before on stack overflow, but hoping to get more answers >>>>>> here. * >>>>>> >>>>>> I'm trying to compute a sentence-transformers >>>>>> <https://github.com/UKPLab/sentence-transformers> model for various >>>>>> rows stored in BigQuery, and then store them in a feather dataframe >>>>>> in Google Cloud Storage. >>>>>> >>>>>> However, I'm having problems in saving the actual dataframe. I'm not >>>>>> able to save it locally or in Google Cloud Storage, but get no error. >>>>>> >>>>>> Here's a reproducible example I've come up with: >>>>>> >>>>>> import apache_beam as beam >>>>>> from apache_beam.ml.inference.base import ( >>>>>> ModelHandler, >>>>>> PredictionResult, >>>>>> RunInference, >>>>>> ) >>>>>> from sentence_transformers import SentenceTransformer >>>>>> import argparse >>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>> from typing import Sequence, Optional, Any, Dict, Iterable >>>>>> from apache_beam.ml.inference.base import KeyedModelHandler >>>>>> from apache_beam.dataframe.convert import to_dataframe >>>>>> >>>>>> ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1" >>>>>> >>>>>> >>>>>> class EmbeddingModelHandler( >>>>>> ModelHandler[str, PredictionResult, SentenceTransformer] >>>>>> ): >>>>>> def __init__(self, model_name: str = ENCODING_MODEL_NAME): >>>>>> self._model_name = model_name >>>>>> >>>>>> def load_model(self) -> SentenceTransformer: >>>>>> from sentence_transformers import ( >>>>>> SentenceTransformer, >>>>>> ) # <- These imports are needed otherwise GCP complains >>>>>> import sentence_transformers >>>>>> >>>>>> return >>>>>> sentence_transformers.SentenceTransformer(self._model_name) >>>>>> >>>>>> def run_inference( >>>>>> self, >>>>>> batch: Sequence[str], >>>>>> model: SentenceTransformer, >>>>>> inference_args: Optional[Dict[str, Any]] = None, >>>>>> ) -> Iterable[PredictionResult]: >>>>>> from sentence_transformers import SentenceTransformer >>>>>> import sentence_transformers >>>>>> >>>>>> embedding_matrix = model.encode( >>>>>> batch, show_progress_bar=True, normalize_embeddings=True >>>>>> ) >>>>>> >>>>>> return embedding_matrix >>>>>> >>>>>> >>>>>> class GetFeatures(beam.DoFn): >>>>>> def process(self, element): >>>>>> feature = element.get("overview", "") >>>>>> iid = element.get("iid") >>>>>> return [(iid, feature)] >>>>>> >>>>>> >>>>>> def run(argv=None): >>>>>> >>>>>> parser = argparse.ArgumentParser() >>>>>> parser.add_argument( >>>>>> "--output", >>>>>> dest="output", >>>>>> required=True, >>>>>> help="Output file to write results to.", >>>>>> ) >>>>>> known_args, pipeline_args = parser.parse_known_args(argv) >>>>>> pipeline_options = PipelineOptions(pipeline_args) >>>>>> >>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>> >>>>>> embedding_dataframe = ( >>>>>> pipeline >>>>>> | "Read BigQuery" >>>>>> >> beam.io.ReadFromBigQuery( >>>>>> query="""SELECT text_to_embed, >>>>>> identifier >>>>>> FROM [gcp-project:gcp-dataset.gcp-table] >>>>>> LIMIT 20 >>>>>> """, >>>>>> project="gcp-project", >>>>>> gcs_location="gs://ml-apache-beam/tmp/", >>>>>> ) >>>>>> | "Get features" >> beam.ParDo(GetFeatures()) >>>>>> | "Run inference" >>>>>> >> RunInference( >>>>>> >>>>>> KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME)) >>>>>> ) >>>>>> | "To Rows" >>>>>> >> beam.Map( >>>>>> lambda element: __import__("beam").Row( >>>>>> biggint=int(element[0]), >>>>>> embedding=element[1].tolist() >>>>>> ) >>>>>> ) >>>>>> ) >>>>>> >>>>>> df = to_dataframe(embedding_dataframe) >>>>>> df.to_feather(known_args.output) >>>>>> >>>>>> >>>>>> if __name__ == "__main__": >>>>>> run() >>>>>> >>>>>> And my requirements.txt: >>>>>> >>>>>> sentence-transformers==2.2.2 >>>>>> >>>>>> With python 3.8.14 >>>>>> >>>>>> To run it locally, I use: >>>>>> >>>>>> python beam_pipeline.py --requirements_file requirements.txt --output >>>>>> embedding_output.feather >>>>>> >>>>>> Which runs fine, but I see no embedding_output.feather in the >>>>>> directory. >>>>>> >>>>>> And to run it on GCP: >>>>>> >>>>>> python beam_pipeline.py --requirements_file requirements.txt --output >>>>>> "gs://my-bucket/embedding_output.feather" --runner DataflowRunner >>>>>> --project my-gcp-project --region us-central1 >>>>>> >>>>>> Also runs fine, but the gs://my-bucket/embedding_output.feather file >>>>>> is not there as well. >>>>>> >>>>>> Iโm probably doing something wrong. Would love your thoughts ๐ >>>>>> >>>>>> Thanks! >>>>>> >>>>>> Duarte OC >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>