Thanks Susmit. I can't quite see a bug but I'm willing to believe that Flight 
is buggy in Java (I have never understood why the code tries to manually handle 
flow control and without my memories of 6 years ago the code seems suspect to 
me now)

Do you know if (1) the python server thinks the RPC is complete, (2) the Java 
client got any (or all) of the data before getting stuck? It may also be 
interesting to step though the Java code with a debugger attached, and see what 
the values of `pending` and `completed` are in the FlightStream instance, and 
if the methods here[1] are all being hit as expected.

[1] 
https://github.com/apache/arrow-java/blob/b9e37f0ccecc2651fec3487472c203bd223290e8/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java#L356

On Fri, May 2, 2025, at 14:56, Susmit Sarkar wrote:
> Hi James,
>
> The flight server is written in python. Here is the below code snippet.
>
> We have overridden the *do_get* method only
>
> getStream() and getFlightInfo() is left untouched / default implementation.
>
> def do_get(self, context, ticket):
>     """
>         Handles client requests for data. The ticket will contain:
>         - access_key: S3 access key
>         - secret_key: S3 secret key
>         - s3_path: Full S3 path (e.g., bucket_name/object_key)
>         - mode: 'batch' (for batch streaming) or 'full' (for loading
> the entire dataset)
>     """
>     try:
>         # Parse the ticket to extract credentials, S3 path, and mode
>         access_key, secret_key, s3_path, mode, batch_size = 
> parse_ticket(ticket)
>     except InvalidTicketFormatError as e:
>         logging.error(str(e))
>         raise
>     except InvalidModeError as e:
>         logging.error(str(e))
>         raise
>
>     # s3fs dont need s3a protocol.
>     if s3_path.startswith("s3://"):
>         s3_path = s3_path.replace("s3://", "", 1)
>
>     logging.info(f"Cloudian S3 Override endpoint:
> {Config.S3_ENDPOINT_OVERRIDE}")
>     logging.info(f"Cloudian S3 Region: {Config.S3_REGION}")
>     logging.info(f"Fetching Parquet data from S3: {s3_path} in mode: {mode}")
>
>     # Initialize the S3 handler with credentials
>     try:
>         s3_handler = S3Handler(
>             endpoint=Config.S3_ENDPOINT_OVERRIDE,
>             region=Config.S3_REGION,
>             access_key=access_key,
>             secret_key=secret_key
>         )
>     except Exception as e:
>         logging.error(f"Error initializing S3 handler: {str(e)}")
>         raise S3AccessError(f"Error initializing S3 handler: {str(e)}") from e
>
>     if mode == DataRetrievalMode.BATCH:
>         try:
>             # Use the get_parquet_data method for both wildcard and
> non-wildcard cases
>             parquet_data = s3_handler.get_parquet_data(s3_path)
>             # parquet_data.schema:  This is used when parquet_data is
> an instance of ds.Dataset
>             # (i.e., when multiple Parquet files are being processed
> as a dataset).
>             #
>             # parquet_data.schema_arrow: This is used when
> parquet_data is an instance of pq (pyarrow.parquet) module.
>             #  A single Parquet file has its own schema, accessible
> via schema_arrow in PyArrow
>             schema = parquet_data.schema if isinstance(parquet_data,
> ds.Dataset) else parquet_data.schema_arrow
>             return flight.GeneratorStream(schema,
> s3_handler.stream_parquet_batches(parquet_data, batch_size))
>         except OSError as e:
>             logging.error(f"AWS S3 access error: {str(e)}")
>             raise S3AccessError(f"Failed to access S3: {str(e)}") from e
>         except Exception as e:
>             logging.error(f"Error streaming Parquet data: {str(e)}")
>             raise DataProcessingError(f"Error streaming Parquet data:
> {str(e)}") from e
>
>     # Handle 'full' mode to load the entire dataset
>     elif mode == DataRetrievalMode.FULL:
>         try:
>             # Check if the S3 path contains a wildcard and the mode is FULL
>             if "*" in s3_path:
>                 logging.warning(
>                     f"Wildcard pattern detected in S3 path '{s3_path}'
> with FULL data retrieval mode. "
>                     f"This may put pressure on memory as all files
> will be loaded into memory at once."
>                 )
>             # Use the same get_parquet_data method for both wildcard
> and non-wildcard cases
>             parquet_data = s3_handler.get_parquet_data(s3_path)
>             # Load the entire dataset into memory / Chance of OOM.
>             # table = parquet_data.to_table() if
> isinstance(parquet_data, ds.Dataset) else parquet_data.read_table()
>             # Load the entire dataset into memory, with consideration
> for Dataset vs. ParquetFile
>             if isinstance(parquet_data, ds.Dataset):
>                 table = parquet_data.to_table()
>             else:
>                 table = parquet_data.read()
>             return flight.RecordBatchStream(table)
>         except OSError as e:
>             logging.error(f"AWS S3 access error: {str(e)}")
>             raise S3AccessError(f"Failed to access S3: {str(e)}") from e
>         except Exception as e:
>             logging.error(f"Error loading full Parquet dataset: {str(e)}")
>             raise DataProcessingError(f"Error loading full Parquet
> dataset: {str(e)}") from e
>
>     else:
>         logging.error(f"Invalid mode:
> {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.")
>         raise InvalidModeError()
>
>
> // Helper functions.
>
> def get_parquet_data(self, s3_path):
>     """
>         Retrieves Parquet data from S3. If the path contains a
> wildcard pattern, it lists all matching files manually.
>         If it's a single file, it reads the file directly.
>
>         :param s3_path: The S3 path, which could be a wildcard pattern
> or a direct file path.
>         :return: PyArrow Dataset object if it's a wildcard, or a
> ParquetFile object for a single file.
>     """
>     try:
>         # Check if the path contains a wildcard
>         if "*" in s3_path:
>             # Split the directory and pattern (e.g., `*.parquet`)
>             directory, pattern = s3_path.rsplit("/", 1)
>
>             # List all files in the directory and filter using the pattern
>             logging.info(f"Fetching Parquet files matching wildcard: 
> {s3_path}")
>             files = self.s3_fs.get_file_info(fs.FileSelector(directory))
>
>             # Filter files matching the pattern (e.g., *.parquet) and
> sort them by modification time (mtime_ns)
>             sorted_file_paths = [file.path for file in sorted(files,
> key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path,
> f"{directory}/{pattern}")]
>
>             if not sorted_file_paths:
>                 raise FileNotFoundError(f"No files matching pattern
> {pattern} found in {directory}")
>
>             logging.info(f"Sorted files: {sorted_file_paths}")
>
>             # Validate schemas across all files
>             if not validate_schemas(sorted_file_paths, self.s3_fs):
>                 raise ValueError("Schema mismatch detected across files.")
>
>             # Create a dataset from the matching files
>             dataset = ds.dataset(sorted_file_paths, format="parquet",
> filesystem=self.s3_fs)
>             return dataset
>         else:
>             # Handle single file case: read the specific Parquet file
>             logging.info(f"Fetching single Parquet file: {s3_path}")
>             parquet_file = pq.ParquetFile(self.s3_fs.open_input_file(s3_path))
>             return parquet_file
>     except Exception as e:
>         logging.error(f"Error fetching Parquet data from S3: {e}")
>         raise e
>
> @staticmethod
> def stream_parquet_batches(parquet_data, batch_size=None):
>     """
>         Stream the Parquet data in batches. Supports both datasets
> (multiple files) and single Parquet files.
>
>         :param parquet_data: The Dataset or ParquetFile object to
> stream data from.
>         :param batch_size: The size of the batches to stream. Default
> is 100,000 if not provided.
>         :return: Generator for streaming Parquet batches.
>     """
>     try:
>         # Ensure batch_size is an integer, set default if None
>         if batch_size is None or not isinstance(batch_size, int):
>             batch_size = 100000
>
>         if isinstance(parquet_data, ds.Dataset):
>             # If it's a dataset (multiple files), stream dataset
> batches using `int_batch_size`
>             logging.info(f"Streaming Parquet data in batches from a dataset")
>             for batch in parquet_data.to_batches(batch_size=batch_size):
>                 yield batch
>         else:
>             # If it's a single file (ParquetFile), stream file batches
> (iter_batches)
>             logging.info(f"Streaming Parquet data in batches from a
> single file")
>             for batch in parquet_data.iter_batches(batch_size=batch_size):
>                 yield batch
>     except Exception as e:
>         logging.error(f"Error streaming Parquet batches: {e}")
>         raise e
>
>
> On Wed, Apr 30, 2025 at 11:14 PM James Duong
> <james.du...@improving.com.invalid> wrote:
>
>> Would you be able to share the server’s getStream() and getFlightInfo()
>> implementations?
>>
>> Note that getStream() needs should be written such that it doesn’t block
>> the grpc thread.
>>
>>
>> Get Outlook for Mac <https://aka.ms/GetOutlookForMac>
>>
>> From: Susmit Sarkar <susmitsir...@gmail.com>
>> Date: Wednesday, April 30, 2025 at 2:59 AM
>> To: David Li <lidav...@apache.org>
>> Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org <
>> dev@arrow.apache.org>
>> Subject: Re: Query on stuck Arrow Flight Client while interacting from
>> local workstation (mac)
>>
>> Hi David
>>
>> Sharing the arrow client thread dump for reference. Strangely if we pass a
>> dummy non existent s3 path we are getting proper error from server
>>
>> cef_flight_server.exceptions.S3AccessError: Failed to access S3: [Errno 2]
>> Path does not exist
>> 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'.
>> Detail: [errno 2] No such file or directory
>>
>> Which translates the server is reachable and we do see the logs in server
>> as well
>>
>> It works fine if we call the client within a VM issue arises in local
>> workstation, where its stuck indefinitely.
>>
>> Thanks,
>> Susmit
>>
>> On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org<mailto:
>> lidav...@apache.org>> wrote:
>> This is not specific to Flight; use jstack or your favorite
>> instrumentation tool (VisualVM etc.)
>>
>> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote:
>> > Hi David,
>> >
>> > How to enable thread dump logs for both client and server code.
>> >
>> > As of now, I don't see any error on either client side or server side. It
>> > just hangs/gets stuck.
>> >
>> > Thanks,
>> > Nikhil
>> >
>> > On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com
>> <mailto:susmitsir...@gmail.com>> wrote:
>> >
>> >> Hi Team,
>> >>
>> >> We are using this below code snippet in Scala to query the flight
>> server,
>> >> but seems to be stuck indefinitely, this issue is seen when we are
>> testing
>> >> from our local workstation (Mac to be precise)
>> >>
>> >> Another interesting thing, it's able to propagate the error message
>> >> correctly but not the FlightStream data, the same code works fine when
>> we
>> >> run inside a linux VM.
>> >>
>> >> Do you folks see any issue in the code?
>> >>
>> >> def fetchDataStreamIterator(details: BaseDataAccessDetails):
>> Iterator[FlightStream] = {
>> >>   logger.info<http://logger.info>(s"Fetching data for details:
>> ${details.toString}")
>> >>   val ticketStr = buildTicketStr(details)
>> >>   logger.info<http://logger.info>(s"Generated ticket string:
>> $ticketStr")
>> >>
>> >>   val allocator = new RootAllocator(Long.MaxValue)
>> >>   val client = FlightClient.builder(allocator,
>> Location.forGrpcInsecure(serverHost, serverPort)).build()
>> >>
>> >>   try {
>> >>     val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
>> >>     val stream = client.getStream(ticket)
>> >>
>> >>     Iterator.continually {
>> >>       if (stream.next()) Some(stream) else {
>> >>         // Cleanup when no more batches
>> >>         close(stream, client)
>> >>         None
>> >>       }
>> >>     }.takeWhile(_.isDefined).flatten
>> >>   } catch {
>> >>     case e: FlightRuntimeException =>
>> >>       logger.error(s"Error communicating with Flight server:
>> ${e.getMessage}")
>> >>       throw new CefFlightServerException(s"Error communicating with
>> Flight server: ${e.getMessage}", e)
>> >>     case e: Exception =>
>> >>       logger.error(s"Failed to fetch data: ${e.getMessage}")
>> >>       throw new CefFlightServerException("Failed to fetch data from
>> Flight Server", e)
>> >>   }
>> >> }
>> >>
>> >>
>> >> Thanks,
>> >>
>> >> Susmit
>> >>
>> >>
>>
>> Warning: The sender of this message could not be validated and may not be
>> the actual sender.
>>

Reply via email to