Hi Team,

We are using this below code snippet in Scala to query the flight server,
but seems to be stuck indefinitely, this issue is seen when we are testing
from our local workstation (Mac to be precise)

Another interesting thing, it's able to propagate the error message
correctly but not the FlightStream data, the same code works fine when we
run inside a linux VM.

Do you folks see any issue in the code?

def fetchDataStreamIterator(details: BaseDataAccessDetails):
Iterator[FlightStream] = {
  logger.info(s"Fetching data for details: ${details.toString}")
  val ticketStr = buildTicketStr(details)
  logger.info(s"Generated ticket string: $ticketStr")

  val allocator = new RootAllocator(Long.MaxValue)
  val client = FlightClient.builder(allocator,
Location.forGrpcInsecure(serverHost, serverPort)).build()

  try {
    val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
    val stream = client.getStream(ticket)

    Iterator.continually {
      if (stream.next()) Some(stream) else {
        // Cleanup when no more batches
        close(stream, client)
        None
      }
    }.takeWhile(_.isDefined).flatten
  } catch {
    case e: FlightRuntimeException =>
      logger.error(s"Error communicating with Flight server: ${e.getMessage}")
      throw new CefFlightServerException(s"Error communicating with
Flight server: ${e.getMessage}", e)
    case e: Exception =>
      logger.error(s"Failed to fetch data: ${e.getMessage}")
      throw new CefFlightServerException("Failed to fetch data from
Flight Server", e)
  }
}


Thanks,

Susmit

Reply via email to