On Thu, Nov 10, 2022 at 9:39 AM Duarte Oliveira e Carmo <
duarteoca...@gmail.com> wrote:

> Thanks a lot for the help Brian, and for filling the bug about feather.
>
> Been using parquet and the pipeline is working perfectly.
>
> One extra question (if youโ€™re willing): Is there any way to control the
> amount of sharing on the to_parquet call? (I tried adding n_shards = N, but
> seems to have no effect)
>

Unfortunately not, but this is a good feature request. I filed (another)
issue for this: https://github.com/apache/beam/issues/24094

>
> For extra context, when saving a large parquet file, I wish we could
> control that it saves in 10 files instead of 1000 for example. This would
> make it easier to load back.
>
> Thanks for the help!!
>

Of course!

>
> On 10 Nov 2022 at 18.36.43, Brian Hulette <bhule...@google.com> wrote:
>
>> +user <user@beam.apache.org> (adding back the user list)
>>
>> On Thu, Nov 10, 2022 at 9:34 AM Brian Hulette <bhule...@google.com>
>> wrote:
>>
>>> Thanks, glad to hear that worked!
>>>
>>> The feather error looks like a bug, I filed an issue [1] to track it. I
>>> think using parquet instead of feather is the best workaround for now.
>>>
>>> [1] https://github.com/apache/beam/issues/24091
>>>
>>> On Thu, Nov 10, 2022 at 1:15 AM Duarte Oliveira e Carmo <
>>> duarteoca...@gmail.com> wrote:
>>>
>>>> Hey Brian!
>>>>
>>>> Wow. You are absolutely correct!!!!
>>>>
>>>> I hate my brain ahaha.
>>>>
>>>> Ended up indenting the block. Beam was still giving me problems with
>>>>
>>>> ValueError: feather does not support serializing a non-default index for 
>>>> the index; you can .reset_index() to make the index into column(s)
>>>>
>>>>
>>>>
>>>> And  reset_index() is not parallelizable.
>>>>
>>>> So I ended up going with parquet.
>>>>
>>>> Thanks for the help Brian! Really appreciate it ๐Ÿ™‚
>>>>
>>>> Will also update stack overflow
>>>>
>>>>
>>>> Duarte O.Carmo
>>>> ML Engineer / Consultant
>>>> duarteocarmo.com
>>>>
>>>>
>>>> On 10 Nov 2022 at 01.41.29, Brian Hulette via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> Hi Duarte,
>>>>>
>>>>> I commented on the Stack Overflow question. It looks like your
>>>>> to_dataframe and to_feather calls are outside of the Pipeline context, so
>>>>> they are being created _after_ the pipeline has already run. Hopefully
>>>>> moving them inside the Pipeline context will resolve the issue.
>>>>>
>>>>> Brian
>>>>>
>>>>> On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo <
>>>>> duarteoca...@gmail.com> wrote:
>>>>>
>>>>>> Hi all!
>>>>>>
>>>>>> *Context: I posted
>>>>>> <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner>
>>>>>> this question before on stack overflow, but hoping to get more answers
>>>>>> here.  *
>>>>>>
>>>>>> I'm trying to compute a sentence-transformers
>>>>>> <https://github.com/UKPLab/sentence-transformers> model for various
>>>>>> rows stored in BigQuery, and then store them in a feather dataframe
>>>>>> in Google Cloud Storage.
>>>>>>
>>>>>> However, I'm having problems in saving the actual dataframe. I'm not
>>>>>> able to save it locally or in Google Cloud Storage, but get no error.
>>>>>>
>>>>>> Here's a reproducible example I've come up with:
>>>>>>
>>>>>> import apache_beam as beam
>>>>>> from apache_beam.ml.inference.base import (
>>>>>>     ModelHandler,
>>>>>>     PredictionResult,
>>>>>>     RunInference,
>>>>>> )
>>>>>> from sentence_transformers import SentenceTransformer
>>>>>> import argparse
>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>> from typing import Sequence, Optional, Any, Dict, Iterable
>>>>>> from apache_beam.ml.inference.base import KeyedModelHandler
>>>>>> from apache_beam.dataframe.convert import to_dataframe
>>>>>>
>>>>>> ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1"
>>>>>>
>>>>>>
>>>>>> class EmbeddingModelHandler(
>>>>>>     ModelHandler[str, PredictionResult, SentenceTransformer]
>>>>>> ):
>>>>>>     def __init__(self, model_name: str = ENCODING_MODEL_NAME):
>>>>>>         self._model_name = model_name
>>>>>>
>>>>>>     def load_model(self) -> SentenceTransformer:
>>>>>>         from sentence_transformers import (
>>>>>>             SentenceTransformer,
>>>>>>         )  # <- These imports are needed otherwise GCP complains
>>>>>>         import sentence_transformers
>>>>>>
>>>>>>         return 
>>>>>> sentence_transformers.SentenceTransformer(self._model_name)
>>>>>>
>>>>>>     def run_inference(
>>>>>>         self,
>>>>>>         batch: Sequence[str],
>>>>>>         model: SentenceTransformer,
>>>>>>         inference_args: Optional[Dict[str, Any]] = None,
>>>>>>     ) -> Iterable[PredictionResult]:
>>>>>>         from sentence_transformers import SentenceTransformer
>>>>>>         import sentence_transformers
>>>>>>
>>>>>>         embedding_matrix = model.encode(
>>>>>>             batch, show_progress_bar=True, normalize_embeddings=True
>>>>>>         )
>>>>>>
>>>>>>         return embedding_matrix
>>>>>>
>>>>>>
>>>>>> class GetFeatures(beam.DoFn):
>>>>>>     def process(self, element):
>>>>>>         feature = element.get("overview", "")
>>>>>>         iid = element.get("iid")
>>>>>>         return [(iid, feature)]
>>>>>>
>>>>>>
>>>>>> def run(argv=None):
>>>>>>
>>>>>>     parser = argparse.ArgumentParser()
>>>>>>     parser.add_argument(
>>>>>>         "--output",
>>>>>>         dest="output",
>>>>>>         required=True,
>>>>>>         help="Output file to write results to.",
>>>>>>     )
>>>>>>     known_args, pipeline_args = parser.parse_known_args(argv)
>>>>>>     pipeline_options = PipelineOptions(pipeline_args)
>>>>>>
>>>>>>     with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>
>>>>>>         embedding_dataframe = (
>>>>>>             pipeline
>>>>>>             | "Read BigQuery"
>>>>>>             >> beam.io.ReadFromBigQuery(
>>>>>>                 query="""SELECT text_to_embed,
>>>>>>                                 identifier
>>>>>>                                 FROM  [gcp-project:gcp-dataset.gcp-table]
>>>>>>                                 LIMIT 20
>>>>>>                                 """,
>>>>>>                 project="gcp-project",
>>>>>>                 gcs_location="gs://ml-apache-beam/tmp/",
>>>>>>             )
>>>>>>             | "Get features" >> beam.ParDo(GetFeatures())
>>>>>>             | "Run inference"
>>>>>>             >> RunInference(
>>>>>>                 
>>>>>> KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME))
>>>>>>             )
>>>>>>             | "To Rows"
>>>>>>             >> beam.Map(
>>>>>>                 lambda element: __import__("beam").Row(
>>>>>>                     biggint=int(element[0]), 
>>>>>> embedding=element[1].tolist()
>>>>>>                 )
>>>>>>             )
>>>>>>         )
>>>>>>
>>>>>>     df = to_dataframe(embedding_dataframe)
>>>>>>     df.to_feather(known_args.output)
>>>>>>
>>>>>>
>>>>>> if __name__ == "__main__":
>>>>>>     run()
>>>>>>
>>>>>> And my requirements.txt:
>>>>>>
>>>>>> sentence-transformers==2.2.2
>>>>>>
>>>>>> With python 3.8.14
>>>>>>
>>>>>> To run it locally, I use:
>>>>>>
>>>>>> python beam_pipeline.py   --requirements_file requirements.txt  --output 
>>>>>> embedding_output.feather
>>>>>>
>>>>>> Which runs fine, but I see no embedding_output.feather in the
>>>>>> directory.
>>>>>>
>>>>>> And to run it on GCP:
>>>>>>
>>>>>> python beam_pipeline.py   --requirements_file requirements.txt  --output 
>>>>>> "gs://my-bucket/embedding_output.feather" --runner DataflowRunner 
>>>>>> --project my-gcp-project --region us-central1
>>>>>>
>>>>>> Also runs fine, but the gs://my-bucket/embedding_output.feather file
>>>>>> is not there as well.
>>>>>>
>>>>>> Iโ€™m probably doing something wrong. Would love your thoughts ๐Ÿ™‚
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Duarte OC
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to