Hi Susmit,

Since you submitted a Github issue as well [1] maybe we can consolidate 
discussion there?

[1]: https://github.com/apache/arrow/issues/44560

Best,
David

On Tue, Oct 29, 2024, at 22:08, Susmit Sarkar wrote:
> Hi Team,
>
> We are writing polyglot sdk wrappers on top of Flight Client, with
> Scala/Java, I don't see any issues, but with pyarrow, it's showing sporadic
> behavior.
>
> The calls are getting stuck in between after iterating few loops,
> this happens specifically with pyarrow flight rpc
>
> Requesting your help
>
> Thanks,
> Susmit
>
> Sharing the snippet below
>
> def process_stream(client, object_store_details):
>     """
>     Fetches the data stream from the Flight server and displays row
> counts and top 10 rows.
>
>     :param client: CefFlightDataClient instance to interact with the
> Flight server.
>     :param object_store_details: ObjectStoreDetails instance
> containing the access and retrieval details.
>     """
>
>     for batch in client.get_stream_iterator(object_store_details):
>         print("Number of rows:", batch.num_rows)
>
>         # Optionally convert to pandas for easier handling
>         df = batch.to_pandas()
>         print("Top 10 rows:\n", df.head(10))
>
>
> def get_stream_iterator(self, object_store_details: ObjectStoreDetails):
>
>     ticket_str = build_ticket_str(object_store_details)
>     ticket = flight.Ticket(ticket_str.encode('utf-8'))
>     client = None
>     try:
>         client = self._connect_to_flight_server()  # Initialize a new client
>         data_stream = client.do_get(ticket, FlightCallOptions(timeout=10))
>         for chunk in data_stream:
>             yield chunk.data
>     except Exception as e:
>         logging.error(f"Error processing stream: {e}")
>         raise CefFlightException("Error processing stream", e)
>     finally:
>         if client:
>             client.close()
>             logging.info("Flight client closed after streaming.")

Reply via email to