Hi James,

The flight server is written in python. Here is the below code snippet.

We have overridden the *do_get* method only

getStream() and getFlightInfo() is left untouched / default implementation.

def do_get(self, context, ticket):
    """
        Handles client requests for data. The ticket will contain:
        - access_key: S3 access key
        - secret_key: S3 secret key
        - s3_path: Full S3 path (e.g., bucket_name/object_key)
        - mode: 'batch' (for batch streaming) or 'full' (for loading
the entire dataset)
    """
    try:
        # Parse the ticket to extract credentials, S3 path, and mode
        access_key, secret_key, s3_path, mode, batch_size = parse_ticket(ticket)
    except InvalidTicketFormatError as e:
        logging.error(str(e))
        raise
    except InvalidModeError as e:
        logging.error(str(e))
        raise

    # s3fs dont need s3a protocol.
    if s3_path.startswith("s3://"):
        s3_path = s3_path.replace("s3://", "", 1)

    logging.info(f"Cloudian S3 Override endpoint:
{Config.S3_ENDPOINT_OVERRIDE}")
    logging.info(f"Cloudian S3 Region: {Config.S3_REGION}")
    logging.info(f"Fetching Parquet data from S3: {s3_path} in mode: {mode}")

    # Initialize the S3 handler with credentials
    try:
        s3_handler = S3Handler(
            endpoint=Config.S3_ENDPOINT_OVERRIDE,
            region=Config.S3_REGION,
            access_key=access_key,
            secret_key=secret_key
        )
    except Exception as e:
        logging.error(f"Error initializing S3 handler: {str(e)}")
        raise S3AccessError(f"Error initializing S3 handler: {str(e)}") from e

    if mode == DataRetrievalMode.BATCH:
        try:
            # Use the get_parquet_data method for both wildcard and
non-wildcard cases
            parquet_data = s3_handler.get_parquet_data(s3_path)
            # parquet_data.schema:  This is used when parquet_data is
an instance of ds.Dataset
            # (i.e., when multiple Parquet files are being processed
as a dataset).
            #
            # parquet_data.schema_arrow: This is used when
parquet_data is an instance of pq (pyarrow.parquet) module.
            #  A single Parquet file has its own schema, accessible
via schema_arrow in PyArrow
            schema = parquet_data.schema if isinstance(parquet_data,
ds.Dataset) else parquet_data.schema_arrow
            return flight.GeneratorStream(schema,
s3_handler.stream_parquet_batches(parquet_data, batch_size))
        except OSError as e:
            logging.error(f"AWS S3 access error: {str(e)}")
            raise S3AccessError(f"Failed to access S3: {str(e)}") from e
        except Exception as e:
            logging.error(f"Error streaming Parquet data: {str(e)}")
            raise DataProcessingError(f"Error streaming Parquet data:
{str(e)}") from e

    # Handle 'full' mode to load the entire dataset
    elif mode == DataRetrievalMode.FULL:
        try:
            # Check if the S3 path contains a wildcard and the mode is FULL
            if "*" in s3_path:
                logging.warning(
                    f"Wildcard pattern detected in S3 path '{s3_path}'
with FULL data retrieval mode. "
                    f"This may put pressure on memory as all files
will be loaded into memory at once."
                )
            # Use the same get_parquet_data method for both wildcard
and non-wildcard cases
            parquet_data = s3_handler.get_parquet_data(s3_path)
            # Load the entire dataset into memory / Chance of OOM.
            # table = parquet_data.to_table() if
isinstance(parquet_data, ds.Dataset) else parquet_data.read_table()
            # Load the entire dataset into memory, with consideration
for Dataset vs. ParquetFile
            if isinstance(parquet_data, ds.Dataset):
                table = parquet_data.to_table()
            else:
                table = parquet_data.read()
            return flight.RecordBatchStream(table)
        except OSError as e:
            logging.error(f"AWS S3 access error: {str(e)}")
            raise S3AccessError(f"Failed to access S3: {str(e)}") from e
        except Exception as e:
            logging.error(f"Error loading full Parquet dataset: {str(e)}")
            raise DataProcessingError(f"Error loading full Parquet
dataset: {str(e)}") from e

    else:
        logging.error(f"Invalid mode:
{DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.")
        raise InvalidModeError()


// Helper functions.

def get_parquet_data(self, s3_path):
    """
        Retrieves Parquet data from S3. If the path contains a
wildcard pattern, it lists all matching files manually.
        If it's a single file, it reads the file directly.

        :param s3_path: The S3 path, which could be a wildcard pattern
or a direct file path.
        :return: PyArrow Dataset object if it's a wildcard, or a
ParquetFile object for a single file.
    """
    try:
        # Check if the path contains a wildcard
        if "*" in s3_path:
            # Split the directory and pattern (e.g., `*.parquet`)
            directory, pattern = s3_path.rsplit("/", 1)

            # List all files in the directory and filter using the pattern
            logging.info(f"Fetching Parquet files matching wildcard: {s3_path}")
            files = self.s3_fs.get_file_info(fs.FileSelector(directory))

            # Filter files matching the pattern (e.g., *.parquet) and
sort them by modification time (mtime_ns)
            sorted_file_paths = [file.path for file in sorted(files,
key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path,
f"{directory}/{pattern}")]

            if not sorted_file_paths:
                raise FileNotFoundError(f"No files matching pattern
{pattern} found in {directory}")

            logging.info(f"Sorted files: {sorted_file_paths}")

            # Validate schemas across all files
            if not validate_schemas(sorted_file_paths, self.s3_fs):
                raise ValueError("Schema mismatch detected across files.")

            # Create a dataset from the matching files
            dataset = ds.dataset(sorted_file_paths, format="parquet",
filesystem=self.s3_fs)
            return dataset
        else:
            # Handle single file case: read the specific Parquet file
            logging.info(f"Fetching single Parquet file: {s3_path}")
            parquet_file = pq.ParquetFile(self.s3_fs.open_input_file(s3_path))
            return parquet_file
    except Exception as e:
        logging.error(f"Error fetching Parquet data from S3: {e}")
        raise e

@staticmethod
def stream_parquet_batches(parquet_data, batch_size=None):
    """
        Stream the Parquet data in batches. Supports both datasets
(multiple files) and single Parquet files.

        :param parquet_data: The Dataset or ParquetFile object to
stream data from.
        :param batch_size: The size of the batches to stream. Default
is 100,000 if not provided.
        :return: Generator for streaming Parquet batches.
    """
    try:
        # Ensure batch_size is an integer, set default if None
        if batch_size is None or not isinstance(batch_size, int):
            batch_size = 100000

        if isinstance(parquet_data, ds.Dataset):
            # If it's a dataset (multiple files), stream dataset
batches using `int_batch_size`
            logging.info(f"Streaming Parquet data in batches from a dataset")
            for batch in parquet_data.to_batches(batch_size=batch_size):
                yield batch
        else:
            # If it's a single file (ParquetFile), stream file batches
(iter_batches)
            logging.info(f"Streaming Parquet data in batches from a
single file")
            for batch in parquet_data.iter_batches(batch_size=batch_size):
                yield batch
    except Exception as e:
        logging.error(f"Error streaming Parquet batches: {e}")
        raise e


On Wed, Apr 30, 2025 at 11:14 PM James Duong
<james.du...@improving.com.invalid> wrote:

> Would you be able to share the server’s getStream() and getFlightInfo()
> implementations?
>
> Note that getStream() needs should be written such that it doesn’t block
> the grpc thread.
>
>
> Get Outlook for Mac <https://aka.ms/GetOutlookForMac>
>
> From: Susmit Sarkar <susmitsir...@gmail.com>
> Date: Wednesday, April 30, 2025 at 2:59 AM
> To: David Li <lidav...@apache.org>
> Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org <
> dev@arrow.apache.org>
> Subject: Re: Query on stuck Arrow Flight Client while interacting from
> local workstation (mac)
>
> Hi David
>
> Sharing the arrow client thread dump for reference. Strangely if we pass a
> dummy non existent s3 path we are getting proper error from server
>
> cef_flight_server.exceptions.S3AccessError: Failed to access S3: [Errno 2]
> Path does not exist
> 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'.
> Detail: [errno 2] No such file or directory
>
> Which translates the server is reachable and we do see the logs in server
> as well
>
> It works fine if we call the client within a VM issue arises in local
> workstation, where its stuck indefinitely.
>
> Thanks,
> Susmit
>
> On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org<mailto:
> lidav...@apache.org>> wrote:
> This is not specific to Flight; use jstack or your favorite
> instrumentation tool (VisualVM etc.)
>
> On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote:
> > Hi David,
> >
> > How to enable thread dump logs for both client and server code.
> >
> > As of now, I don't see any error on either client side or server side. It
> > just hangs/gets stuck.
> >
> > Thanks,
> > Nikhil
> >
> > On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com
> <mailto:susmitsir...@gmail.com>> wrote:
> >
> >> Hi Team,
> >>
> >> We are using this below code snippet in Scala to query the flight
> server,
> >> but seems to be stuck indefinitely, this issue is seen when we are
> testing
> >> from our local workstation (Mac to be precise)
> >>
> >> Another interesting thing, it's able to propagate the error message
> >> correctly but not the FlightStream data, the same code works fine when
> we
> >> run inside a linux VM.
> >>
> >> Do you folks see any issue in the code?
> >>
> >> def fetchDataStreamIterator(details: BaseDataAccessDetails):
> Iterator[FlightStream] = {
> >>   logger.info<http://logger.info>(s"Fetching data for details:
> ${details.toString}")
> >>   val ticketStr = buildTicketStr(details)
> >>   logger.info<http://logger.info>(s"Generated ticket string:
> $ticketStr")
> >>
> >>   val allocator = new RootAllocator(Long.MaxValue)
> >>   val client = FlightClient.builder(allocator,
> Location.forGrpcInsecure(serverHost, serverPort)).build()
> >>
> >>   try {
> >>     val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
> >>     val stream = client.getStream(ticket)
> >>
> >>     Iterator.continually {
> >>       if (stream.next()) Some(stream) else {
> >>         // Cleanup when no more batches
> >>         close(stream, client)
> >>         None
> >>       }
> >>     }.takeWhile(_.isDefined).flatten
> >>   } catch {
> >>     case e: FlightRuntimeException =>
> >>       logger.error(s"Error communicating with Flight server:
> ${e.getMessage}")
> >>       throw new CefFlightServerException(s"Error communicating with
> Flight server: ${e.getMessage}", e)
> >>     case e: Exception =>
> >>       logger.error(s"Failed to fetch data: ${e.getMessage}")
> >>       throw new CefFlightServerException("Failed to fetch data from
> Flight Server", e)
> >>   }
> >> }
> >>
> >>
> >> Thanks,
> >>
> >> Susmit
> >>
> >>
>
> Warning: The sender of this message could not be validated and may not be
> the actual sender.
>

Reply via email to