Ah - that’s unfortunate.  Will keep an eye on that issue!

Thanks for all the help Brian - great work on Beam!

Duarte OC

On 10 Nov 2022 at 19.13.33, Brian Hulette <bhule...@google.com> wrote:

>
>
> On Thu, Nov 10, 2022 at 9:39 AM Duarte Oliveira e Carmo <
> duarteoca...@gmail.com> wrote:
>
>> Thanks a lot for the help Brian, and for filling the bug about feather.
>>
>> Been using parquet and the pipeline is working perfectly.
>>
>> One extra question (if you’re willing): Is there any way to control the
>> amount of sharing on the to_parquet call? (I tried adding n_shards = N, but
>> seems to have no effect)
>>
>
> Unfortunately not, but this is a good feature request. I filed (another)
> issue for this: https://github.com/apache/beam/issues/24094
>
>>
>> For extra context, when saving a large parquet file, I wish we could
>> control that it saves in 10 files instead of 1000 for example. This would
>> make it easier to load back.
>>
>> Thanks for the help!!
>>
>
> Of course!
>
>>
>> On 10 Nov 2022 at 18.36.43, Brian Hulette <bhule...@google.com> wrote:
>>
>>> +user <user@beam.apache.org> (adding back the user list)
>>>
>>> On Thu, Nov 10, 2022 at 9:34 AM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> Thanks, glad to hear that worked!
>>>>
>>>> The feather error looks like a bug, I filed an issue [1] to track it. I
>>>> think using parquet instead of feather is the best workaround for now.
>>>>
>>>> [1] https://github.com/apache/beam/issues/24091
>>>>
>>>> On Thu, Nov 10, 2022 at 1:15 AM Duarte Oliveira e Carmo <
>>>> duarteoca...@gmail.com> wrote:
>>>>
>>>>> Hey Brian!
>>>>>
>>>>> Wow. You are absolutely correct!!!!
>>>>>
>>>>> I hate my brain ahaha.
>>>>>
>>>>> Ended up indenting the block. Beam was still giving me problems with
>>>>>
>>>>> ValueError: feather does not support serializing a non-default index for 
>>>>> the index; you can .reset_index() to make the index into column(s)
>>>>>
>>>>>
>>>>>
>>>>> And  reset_index() is not parallelizable.
>>>>>
>>>>> So I ended up going with parquet.
>>>>>
>>>>> Thanks for the help Brian! Really appreciate it 🙂
>>>>>
>>>>> Will also update stack overflow
>>>>>
>>>>>
>>>>> Duarte O.Carmo
>>>>> ML Engineer / Consultant
>>>>> duarteocarmo.com
>>>>>
>>>>>
>>>>> On 10 Nov 2022 at 01.41.29, Brian Hulette via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>>> Hi Duarte,
>>>>>>
>>>>>> I commented on the Stack Overflow question. It looks like your
>>>>>> to_dataframe and to_feather calls are outside of the Pipeline context, so
>>>>>> they are being created _after_ the pipeline has already run. Hopefully
>>>>>> moving them inside the Pipeline context will resolve the issue.
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo <
>>>>>> duarteoca...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all!
>>>>>>>
>>>>>>> *Context: I posted
>>>>>>> <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner>
>>>>>>> this question before on stack overflow, but hoping to get more answers
>>>>>>> here.  *
>>>>>>>
>>>>>>> I'm trying to compute a sentence-transformers
>>>>>>> <https://github.com/UKPLab/sentence-transformers> model for various
>>>>>>> rows stored in BigQuery, and then store them in a feather dataframe
>>>>>>> in Google Cloud Storage.
>>>>>>>
>>>>>>> However, I'm having problems in saving the actual dataframe. I'm not
>>>>>>> able to save it locally or in Google Cloud Storage, but get no error.
>>>>>>>
>>>>>>> Here's a reproducible example I've come up with:
>>>>>>>
>>>>>>> import apache_beam as beam
>>>>>>> from apache_beam.ml.inference.base import (
>>>>>>>     ModelHandler,
>>>>>>>     PredictionResult,
>>>>>>>     RunInference,
>>>>>>> )
>>>>>>> from sentence_transformers import SentenceTransformer
>>>>>>> import argparse
>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>> from typing import Sequence, Optional, Any, Dict, Iterable
>>>>>>> from apache_beam.ml.inference.base import KeyedModelHandler
>>>>>>> from apache_beam.dataframe.convert import to_dataframe
>>>>>>>
>>>>>>> ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1"
>>>>>>>
>>>>>>>
>>>>>>> class EmbeddingModelHandler(
>>>>>>>     ModelHandler[str, PredictionResult, SentenceTransformer]
>>>>>>> ):
>>>>>>>     def __init__(self, model_name: str = ENCODING_MODEL_NAME):
>>>>>>>         self._model_name = model_name
>>>>>>>
>>>>>>>     def load_model(self) -> SentenceTransformer:
>>>>>>>         from sentence_transformers import (
>>>>>>>             SentenceTransformer,
>>>>>>>         )  # <- These imports are needed otherwise GCP complains
>>>>>>>         import sentence_transformers
>>>>>>>
>>>>>>>         return 
>>>>>>> sentence_transformers.SentenceTransformer(self._model_name)
>>>>>>>
>>>>>>>     def run_inference(
>>>>>>>         self,
>>>>>>>         batch: Sequence[str],
>>>>>>>         model: SentenceTransformer,
>>>>>>>         inference_args: Optional[Dict[str, Any]] = None,
>>>>>>>     ) -> Iterable[PredictionResult]:
>>>>>>>         from sentence_transformers import SentenceTransformer
>>>>>>>         import sentence_transformers
>>>>>>>
>>>>>>>         embedding_matrix = model.encode(
>>>>>>>             batch, show_progress_bar=True, normalize_embeddings=True
>>>>>>>         )
>>>>>>>
>>>>>>>         return embedding_matrix
>>>>>>>
>>>>>>>
>>>>>>> class GetFeatures(beam.DoFn):
>>>>>>>     def process(self, element):
>>>>>>>         feature = element.get("overview", "")
>>>>>>>         iid = element.get("iid")
>>>>>>>         return [(iid, feature)]
>>>>>>>
>>>>>>>
>>>>>>> def run(argv=None):
>>>>>>>
>>>>>>>     parser = argparse.ArgumentParser()
>>>>>>>     parser.add_argument(
>>>>>>>         "--output",
>>>>>>>         dest="output",
>>>>>>>         required=True,
>>>>>>>         help="Output file to write results to.",
>>>>>>>     )
>>>>>>>     known_args, pipeline_args = parser.parse_known_args(argv)
>>>>>>>     pipeline_options = PipelineOptions(pipeline_args)
>>>>>>>
>>>>>>>     with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>
>>>>>>>         embedding_dataframe = (
>>>>>>>             pipeline
>>>>>>>             | "Read BigQuery"
>>>>>>>             >> beam.io.ReadFromBigQuery(
>>>>>>>                 query="""SELECT text_to_embed,
>>>>>>>                                 identifier
>>>>>>>                                 FROM  
>>>>>>> [gcp-project:gcp-dataset.gcp-table]
>>>>>>>                                 LIMIT 20
>>>>>>>                                 """,
>>>>>>>                 project="gcp-project",
>>>>>>>                 gcs_location="gs://ml-apache-beam/tmp/",
>>>>>>>             )
>>>>>>>             | "Get features" >> beam.ParDo(GetFeatures())
>>>>>>>             | "Run inference"
>>>>>>>             >> RunInference(
>>>>>>>                 
>>>>>>> KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME))
>>>>>>>             )
>>>>>>>             | "To Rows"
>>>>>>>             >> beam.Map(
>>>>>>>                 lambda element: __import__("beam").Row(
>>>>>>>                     biggint=int(element[0]), 
>>>>>>> embedding=element[1].tolist()
>>>>>>>                 )
>>>>>>>             )
>>>>>>>         )
>>>>>>>
>>>>>>>     df = to_dataframe(embedding_dataframe)
>>>>>>>     df.to_feather(known_args.output)
>>>>>>>
>>>>>>>
>>>>>>> if __name__ == "__main__":
>>>>>>>     run()
>>>>>>>
>>>>>>> And my requirements.txt:
>>>>>>>
>>>>>>> sentence-transformers==2.2.2
>>>>>>>
>>>>>>> With python 3.8.14
>>>>>>>
>>>>>>> To run it locally, I use:
>>>>>>>
>>>>>>> python beam_pipeline.py   --requirements_file requirements.txt  
>>>>>>> --output embedding_output.feather
>>>>>>>
>>>>>>> Which runs fine, but I see no embedding_output.feather in the
>>>>>>> directory.
>>>>>>>
>>>>>>> And to run it on GCP:
>>>>>>>
>>>>>>> python beam_pipeline.py   --requirements_file requirements.txt  
>>>>>>> --output "gs://my-bucket/embedding_output.feather" --runner 
>>>>>>> DataflowRunner --project my-gcp-project --region us-central1
>>>>>>>
>>>>>>> Also runs fine, but the gs://my-bucket/embedding_output.feather file
>>>>>>> is not there as well.
>>>>>>>
>>>>>>> I’m probably doing something wrong. Would love your thoughts 🙂
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> Duarte OC
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to