Hi Team,

We are writing polyglot sdk wrappers on top of Flight Client, with
Scala/Java, I don't see any issues, but with pyarrow, it's showing sporadic
behavior.

The calls are getting stuck in between after iterating few loops,
this happens specifically with pyarrow flight rpc

Requesting your help

Thanks,
Susmit

Sharing the snippet below

def process_stream(client, object_store_details):
    """
    Fetches the data stream from the Flight server and displays row
counts and top 10 rows.

    :param client: CefFlightDataClient instance to interact with the
Flight server.
    :param object_store_details: ObjectStoreDetails instance
containing the access and retrieval details.
    """

    for batch in client.get_stream_iterator(object_store_details):
        print("Number of rows:", batch.num_rows)

        # Optionally convert to pandas for easier handling
        df = batch.to_pandas()
        print("Top 10 rows:\n", df.head(10))


def get_stream_iterator(self, object_store_details: ObjectStoreDetails):

    ticket_str = build_ticket_str(object_store_details)
    ticket = flight.Ticket(ticket_str.encode('utf-8'))
    client = None
    try:
        client = self._connect_to_flight_server()  # Initialize a new client
        data_stream = client.do_get(ticket, FlightCallOptions(timeout=10))
        for chunk in data_stream:
            yield chunk.data
    except Exception as e:
        logging.error(f"Error processing stream: {e}")
        raise CefFlightException("Error processing stream", e)
    finally:
        if client:
            client.close()
            logging.info("Flight client closed after streaming.")

Reply via email to