Hi Susmit, Since you submitted a Github issue as well [1] maybe we can consolidate discussion there?
[1]: https://github.com/apache/arrow/issues/44560 Best, David On Tue, Oct 29, 2024, at 22:08, Susmit Sarkar wrote: > Hi Team, > > We are writing polyglot sdk wrappers on top of Flight Client, with > Scala/Java, I don't see any issues, but with pyarrow, it's showing sporadic > behavior. > > The calls are getting stuck in between after iterating few loops, > this happens specifically with pyarrow flight rpc > > Requesting your help > > Thanks, > Susmit > > Sharing the snippet below > > def process_stream(client, object_store_details): > """ > Fetches the data stream from the Flight server and displays row > counts and top 10 rows. > > :param client: CefFlightDataClient instance to interact with the > Flight server. > :param object_store_details: ObjectStoreDetails instance > containing the access and retrieval details. > """ > > for batch in client.get_stream_iterator(object_store_details): > print("Number of rows:", batch.num_rows) > > # Optionally convert to pandas for easier handling > df = batch.to_pandas() > print("Top 10 rows:\n", df.head(10)) > > > def get_stream_iterator(self, object_store_details: ObjectStoreDetails): > > ticket_str = build_ticket_str(object_store_details) > ticket = flight.Ticket(ticket_str.encode('utf-8')) > client = None > try: > client = self._connect_to_flight_server() # Initialize a new client > data_stream = client.do_get(ticket, FlightCallOptions(timeout=10)) > for chunk in data_stream: > yield chunk.data > except Exception as e: > logging.error(f"Error processing stream: {e}") > raise CefFlightException("Error processing stream", e) > finally: > if client: > client.close() > logging.info("Flight client closed after streaming.")