+user <user@beam.apache.org> (adding back the user list)

On Thu, Nov 10, 2022 at 9:34 AM Brian Hulette <bhule...@google.com> wrote:

> Thanks, glad to hear that worked!
>
> The feather error looks like a bug, I filed an issue [1] to track it. I
> think using parquet instead of feather is the best workaround for now.
>
> [1] https://github.com/apache/beam/issues/24091
>
> On Thu, Nov 10, 2022 at 1:15 AM Duarte Oliveira e Carmo <
> duarteoca...@gmail.com> wrote:
>
>> Hey Brian!
>>
>> Wow. You are absolutely correct!!!!
>>
>> I hate my brain ahaha.
>>
>> Ended up indenting the block. Beam was still giving me problems with
>>
>> ValueError: feather does not support serializing a non-default index for the 
>> index; you can .reset_index() to make the index into column(s)
>>
>>
>>
>> And  reset_index() is not parallelizable.
>>
>> So I ended up going with parquet.
>>
>> Thanks for the help Brian! Really appreciate it šŸ™‚
>>
>> Will also update stack overflow
>>
>>
>> Duarte O.Carmo
>> ML Engineer / Consultant
>> duarteocarmo.com
>>
>>
>> On 10 Nov 2022 at 01.41.29, Brian Hulette via user <user@beam.apache.org>
>> wrote:
>>
>>> Hi Duarte,
>>>
>>> I commented on the Stack Overflow question. It looks like your
>>> to_dataframe and to_feather calls are outside of the Pipeline context, so
>>> they are being created _after_ the pipeline has already run. Hopefully
>>> moving them inside the Pipeline context will resolve the issue.
>>>
>>> Brian
>>>
>>> On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo <
>>> duarteoca...@gmail.com> wrote:
>>>
>>>> Hi all!
>>>>
>>>> *Context: I posted
>>>> <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner>
>>>> this question before on stack overflow, but hoping to get more answers
>>>> here.  *
>>>>
>>>> I'm trying to compute a sentence-transformers
>>>> <https://github.com/UKPLab/sentence-transformers> model for various
>>>> rows stored in BigQuery, and then store them in a feather dataframe in
>>>> Google Cloud Storage.
>>>>
>>>> However, I'm having problems in saving the actual dataframe. I'm not
>>>> able to save it locally or in Google Cloud Storage, but get no error.
>>>>
>>>> Here's a reproducible example I've come up with:
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.ml.inference.base import (
>>>>     ModelHandler,
>>>>     PredictionResult,
>>>>     RunInference,
>>>> )
>>>> from sentence_transformers import SentenceTransformer
>>>> import argparse
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from typing import Sequence, Optional, Any, Dict, Iterable
>>>> from apache_beam.ml.inference.base import KeyedModelHandler
>>>> from apache_beam.dataframe.convert import to_dataframe
>>>>
>>>> ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1"
>>>>
>>>>
>>>> class EmbeddingModelHandler(
>>>>     ModelHandler[str, PredictionResult, SentenceTransformer]
>>>> ):
>>>>     def __init__(self, model_name: str = ENCODING_MODEL_NAME):
>>>>         self._model_name = model_name
>>>>
>>>>     def load_model(self) -> SentenceTransformer:
>>>>         from sentence_transformers import (
>>>>             SentenceTransformer,
>>>>         )  # <- These imports are needed otherwise GCP complains
>>>>         import sentence_transformers
>>>>
>>>>         return sentence_transformers.SentenceTransformer(self._model_name)
>>>>
>>>>     def run_inference(
>>>>         self,
>>>>         batch: Sequence[str],
>>>>         model: SentenceTransformer,
>>>>         inference_args: Optional[Dict[str, Any]] = None,
>>>>     ) -> Iterable[PredictionResult]:
>>>>         from sentence_transformers import SentenceTransformer
>>>>         import sentence_transformers
>>>>
>>>>         embedding_matrix = model.encode(
>>>>             batch, show_progress_bar=True, normalize_embeddings=True
>>>>         )
>>>>
>>>>         return embedding_matrix
>>>>
>>>>
>>>> class GetFeatures(beam.DoFn):
>>>>     def process(self, element):
>>>>         feature = element.get("overview", "")
>>>>         iid = element.get("iid")
>>>>         return [(iid, feature)]
>>>>
>>>>
>>>> def run(argv=None):
>>>>
>>>>     parser = argparse.ArgumentParser()
>>>>     parser.add_argument(
>>>>         "--output",
>>>>         dest="output",
>>>>         required=True,
>>>>         help="Output file to write results to.",
>>>>     )
>>>>     known_args, pipeline_args = parser.parse_known_args(argv)
>>>>     pipeline_options = PipelineOptions(pipeline_args)
>>>>
>>>>     with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>
>>>>         embedding_dataframe = (
>>>>             pipeline
>>>>             | "Read BigQuery"
>>>>             >> beam.io.ReadFromBigQuery(
>>>>                 query="""SELECT text_to_embed,
>>>>                                 identifier
>>>>                                 FROM  [gcp-project:gcp-dataset.gcp-table]
>>>>                                 LIMIT 20
>>>>                                 """,
>>>>                 project="gcp-project",
>>>>                 gcs_location="gs://ml-apache-beam/tmp/",
>>>>             )
>>>>             | "Get features" >> beam.ParDo(GetFeatures())
>>>>             | "Run inference"
>>>>             >> RunInference(
>>>>                 
>>>> KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME))
>>>>             )
>>>>             | "To Rows"
>>>>             >> beam.Map(
>>>>                 lambda element: __import__("beam").Row(
>>>>                     biggint=int(element[0]), embedding=element[1].tolist()
>>>>                 )
>>>>             )
>>>>         )
>>>>
>>>>     df = to_dataframe(embedding_dataframe)
>>>>     df.to_feather(known_args.output)
>>>>
>>>>
>>>> if __name__ == "__main__":
>>>>     run()
>>>>
>>>> And my requirements.txt:
>>>>
>>>> sentence-transformers==2.2.2
>>>>
>>>> With python 3.8.14
>>>>
>>>> To run it locally, I use:
>>>>
>>>> python beam_pipeline.py   --requirements_file requirements.txt  --output 
>>>> embedding_output.feather
>>>>
>>>> Which runs fine, but I see no embedding_output.feather in the
>>>> directory.
>>>>
>>>> And to run it on GCP:
>>>>
>>>> python beam_pipeline.py   --requirements_file requirements.txt  --output 
>>>> "gs://my-bucket/embedding_output.feather" --runner DataflowRunner 
>>>> --project my-gcp-project --region us-central1
>>>>
>>>> Also runs fine, but the gs://my-bucket/embedding_output.feather file
>>>> is not there as well.
>>>>
>>>> I’m probably doing something wrong. Would love your thoughts šŸ™‚
>>>>
>>>> Thanks!
>>>>
>>>> Duarte OC
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to