Hi James, The flight server is written in python. Here is the below code snippet.
We have overridden the *do_get* method only getStream() and getFlightInfo() is left untouched / default implementation. def do_get(self, context, ticket): """ Handles client requests for data. The ticket will contain: - access_key: S3 access key - secret_key: S3 secret key - s3_path: Full S3 path (e.g., bucket_name/object_key) - mode: 'batch' (for batch streaming) or 'full' (for loading the entire dataset) """ try: # Parse the ticket to extract credentials, S3 path, and mode access_key, secret_key, s3_path, mode, batch_size = parse_ticket(ticket) except InvalidTicketFormatError as e: logging.error(str(e)) raise except InvalidModeError as e: logging.error(str(e)) raise # s3fs dont need s3a protocol. if s3_path.startswith("s3://"): s3_path = s3_path.replace("s3://", "", 1) logging.info(f"Cloudian S3 Override endpoint: {Config.S3_ENDPOINT_OVERRIDE}") logging.info(f"Cloudian S3 Region: {Config.S3_REGION}") logging.info(f"Fetching Parquet data from S3: {s3_path} in mode: {mode}") # Initialize the S3 handler with credentials try: s3_handler = S3Handler( endpoint=Config.S3_ENDPOINT_OVERRIDE, region=Config.S3_REGION, access_key=access_key, secret_key=secret_key ) except Exception as e: logging.error(f"Error initializing S3 handler: {str(e)}") raise S3AccessError(f"Error initializing S3 handler: {str(e)}") from e if mode == DataRetrievalMode.BATCH: try: # Use the get_parquet_data method for both wildcard and non-wildcard cases parquet_data = s3_handler.get_parquet_data(s3_path) # parquet_data.schema: This is used when parquet_data is an instance of ds.Dataset # (i.e., when multiple Parquet files are being processed as a dataset). # # parquet_data.schema_arrow: This is used when parquet_data is an instance of pq (pyarrow.parquet) module. # A single Parquet file has its own schema, accessible via schema_arrow in PyArrow schema = parquet_data.schema if isinstance(parquet_data, ds.Dataset) else parquet_data.schema_arrow return flight.GeneratorStream(schema, s3_handler.stream_parquet_batches(parquet_data, batch_size)) except OSError as e: logging.error(f"AWS S3 access error: {str(e)}") raise S3AccessError(f"Failed to access S3: {str(e)}") from e except Exception as e: logging.error(f"Error streaming Parquet data: {str(e)}") raise DataProcessingError(f"Error streaming Parquet data: {str(e)}") from e # Handle 'full' mode to load the entire dataset elif mode == DataRetrievalMode.FULL: try: # Check if the S3 path contains a wildcard and the mode is FULL if "*" in s3_path: logging.warning( f"Wildcard pattern detected in S3 path '{s3_path}' with FULL data retrieval mode. " f"This may put pressure on memory as all files will be loaded into memory at once." ) # Use the same get_parquet_data method for both wildcard and non-wildcard cases parquet_data = s3_handler.get_parquet_data(s3_path) # Load the entire dataset into memory / Chance of OOM. # table = parquet_data.to_table() if isinstance(parquet_data, ds.Dataset) else parquet_data.read_table() # Load the entire dataset into memory, with consideration for Dataset vs. ParquetFile if isinstance(parquet_data, ds.Dataset): table = parquet_data.to_table() else: table = parquet_data.read() return flight.RecordBatchStream(table) except OSError as e: logging.error(f"AWS S3 access error: {str(e)}") raise S3AccessError(f"Failed to access S3: {str(e)}") from e except Exception as e: logging.error(f"Error loading full Parquet dataset: {str(e)}") raise DataProcessingError(f"Error loading full Parquet dataset: {str(e)}") from e else: logging.error(f"Invalid mode: {DataRetrievalMode.from_string(mode)}. Expected 'batch' or 'full'.") raise InvalidModeError() // Helper functions. def get_parquet_data(self, s3_path): """ Retrieves Parquet data from S3. If the path contains a wildcard pattern, it lists all matching files manually. If it's a single file, it reads the file directly. :param s3_path: The S3 path, which could be a wildcard pattern or a direct file path. :return: PyArrow Dataset object if it's a wildcard, or a ParquetFile object for a single file. """ try: # Check if the path contains a wildcard if "*" in s3_path: # Split the directory and pattern (e.g., `*.parquet`) directory, pattern = s3_path.rsplit("/", 1) # List all files in the directory and filter using the pattern logging.info(f"Fetching Parquet files matching wildcard: {s3_path}") files = self.s3_fs.get_file_info(fs.FileSelector(directory)) # Filter files matching the pattern (e.g., *.parquet) and sort them by modification time (mtime_ns) sorted_file_paths = [file.path for file in sorted(files, key=lambda file: file.mtime_ns) if fnmatch.fnmatch(file.path, f"{directory}/{pattern}")] if not sorted_file_paths: raise FileNotFoundError(f"No files matching pattern {pattern} found in {directory}") logging.info(f"Sorted files: {sorted_file_paths}") # Validate schemas across all files if not validate_schemas(sorted_file_paths, self.s3_fs): raise ValueError("Schema mismatch detected across files.") # Create a dataset from the matching files dataset = ds.dataset(sorted_file_paths, format="parquet", filesystem=self.s3_fs) return dataset else: # Handle single file case: read the specific Parquet file logging.info(f"Fetching single Parquet file: {s3_path}") parquet_file = pq.ParquetFile(self.s3_fs.open_input_file(s3_path)) return parquet_file except Exception as e: logging.error(f"Error fetching Parquet data from S3: {e}") raise e @staticmethod def stream_parquet_batches(parquet_data, batch_size=None): """ Stream the Parquet data in batches. Supports both datasets (multiple files) and single Parquet files. :param parquet_data: The Dataset or ParquetFile object to stream data from. :param batch_size: The size of the batches to stream. Default is 100,000 if not provided. :return: Generator for streaming Parquet batches. """ try: # Ensure batch_size is an integer, set default if None if batch_size is None or not isinstance(batch_size, int): batch_size = 100000 if isinstance(parquet_data, ds.Dataset): # If it's a dataset (multiple files), stream dataset batches using `int_batch_size` logging.info(f"Streaming Parquet data in batches from a dataset") for batch in parquet_data.to_batches(batch_size=batch_size): yield batch else: # If it's a single file (ParquetFile), stream file batches (iter_batches) logging.info(f"Streaming Parquet data in batches from a single file") for batch in parquet_data.iter_batches(batch_size=batch_size): yield batch except Exception as e: logging.error(f"Error streaming Parquet batches: {e}") raise e On Wed, Apr 30, 2025 at 11:14 PM James Duong <james.du...@improving.com.invalid> wrote: > Would you be able to share the server’s getStream() and getFlightInfo() > implementations? > > Note that getStream() needs should be written such that it doesn’t block > the grpc thread. > > > Get Outlook for Mac <https://aka.ms/GetOutlookForMac> > > From: Susmit Sarkar <susmitsir...@gmail.com> > Date: Wednesday, April 30, 2025 at 2:59 AM > To: David Li <lidav...@apache.org> > Cc: nik.9...@gmail.com <nik.9...@gmail.com>, dev@arrow.apache.org < > dev@arrow.apache.org> > Subject: Re: Query on stuck Arrow Flight Client while interacting from > local workstation (mac) > > Hi David > > Sharing the arrow client thread dump for reference. Strangely if we pass a > dummy non existent s3 path we are getting proper error from server > > cef_flight_server.exceptions.S3AccessError: Failed to access S3: [Errno 2] > Path does not exist > 'bg0975-cef-ccmedev-data/pp/load_date=2024-11-21/part-00007.c008.snappy.parquet'. > Detail: [errno 2] No such file or directory > > Which translates the server is reachable and we do see the logs in server > as well > > It works fine if we call the client within a VM issue arises in local > workstation, where its stuck indefinitely. > > Thanks, > Susmit > > On Wed, Apr 30, 2025 at 12:54 PM David Li <lidav...@apache.org<mailto: > lidav...@apache.org>> wrote: > This is not specific to Flight; use jstack or your favorite > instrumentation tool (VisualVM etc.) > > On Wed, Apr 30, 2025, at 15:41, NIKHIL RANJAN wrote: > > Hi David, > > > > How to enable thread dump logs for both client and server code. > > > > As of now, I don't see any error on either client side or server side. It > > just hangs/gets stuck. > > > > Thanks, > > Nikhil > > > > On Thu, 24 Apr, 2025, 14:39 Susmit Sarkar, <susmitsir...@gmail.com > <mailto:susmitsir...@gmail.com>> wrote: > > > >> Hi Team, > >> > >> We are using this below code snippet in Scala to query the flight > server, > >> but seems to be stuck indefinitely, this issue is seen when we are > testing > >> from our local workstation (Mac to be precise) > >> > >> Another interesting thing, it's able to propagate the error message > >> correctly but not the FlightStream data, the same code works fine when > we > >> run inside a linux VM. > >> > >> Do you folks see any issue in the code? > >> > >> def fetchDataStreamIterator(details: BaseDataAccessDetails): > Iterator[FlightStream] = { > >> logger.info<http://logger.info>(s"Fetching data for details: > ${details.toString}") > >> val ticketStr = buildTicketStr(details) > >> logger.info<http://logger.info>(s"Generated ticket string: > $ticketStr") > >> > >> val allocator = new RootAllocator(Long.MaxValue) > >> val client = FlightClient.builder(allocator, > Location.forGrpcInsecure(serverHost, serverPort)).build() > >> > >> try { > >> val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) > >> val stream = client.getStream(ticket) > >> > >> Iterator.continually { > >> if (stream.next()) Some(stream) else { > >> // Cleanup when no more batches > >> close(stream, client) > >> None > >> } > >> }.takeWhile(_.isDefined).flatten > >> } catch { > >> case e: FlightRuntimeException => > >> logger.error(s"Error communicating with Flight server: > ${e.getMessage}") > >> throw new CefFlightServerException(s"Error communicating with > Flight server: ${e.getMessage}", e) > >> case e: Exception => > >> logger.error(s"Failed to fetch data: ${e.getMessage}") > >> throw new CefFlightServerException("Failed to fetch data from > Flight Server", e) > >> } > >> } > >> > >> > >> Thanks, > >> > >> Susmit > >> > >> > > Warning: The sender of this message could not be validated and may not be > the actual sender. >