Haven't tested the code but isn't `FlightStream` a closeable as well? On Fri, Oct 25, 2024 at 3:40 AM Susmit Sarkar <susmitsir...@gmail.com> wrote:
> Hi Team, > > We are seeing the issue often with Memory Leak: > > *JDK 11* > > "org.apache.arrow" % "arrow-flight" % "17.0.0", > "org.apache.arrow" % "arrow-vector" % "17.0.0", > "org.apache.arrow" % "flight-core" % "17.0.0", > > > 4-10-25 15:25:06.394 [main] ERROR o.apache.arrow.memory.BaseAllocator - > Memory was leaked by query. Memory leaked: (10485760) > Allocator(flight-client) 0/10485760/10485760/9223372036854775807 > (res/actual/peak/limit) 2024-10-25 15:25:06.395 [main] ERROR > o.apache.arrow.memory.BaseAllocator - Memory was leaked by query. Memory > leaked: (10485760) Outstanding child allocators : Allocator(flight-client) > 0/10485760/10485760/9223372036854775807 (res/actual/peak/limit) > Allocator(ROOT) 0/10485760/10485760/9223372036854775807 > (res/actual/peak/limit) 2024-10-25 15:25:06.396 [main] ERROR > c.t.c.d.ArrowFlightDataFetcher - Failed to fetch data: Memory was leaked by > query. Memory leaked: (10485760) Allocator(flight-client) > 0/10485760/10485760/9223372036854775807 (res/actual/peak/limit) Exception > in thread "main" com.tesco.cef.exceptions.CefFlightServerException: Failed > to fetch data from Flight Server at > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:47) > at > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:41) > at scala.util.Failure.recover(Try.scala:233) at > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:47) > at > > com.tesco.cef.client.CefFlightDataClient.getStream(CefFlightDataClient.scala:27) > at com.tesco.cef.samples.client.Main$.processStream(Main.scala:120) at > com.tesco.cef.samples.client.Main$.main(Main.scala:60) at > com.tesco.cef.samples.client.Main.main(Main.scala) Caused by: > java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: > (10485760) Allocator(flight-client) 0/10485760/10485760/9223372036854775807 > (res/actual/peak/limit) at > org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:501) at > org.apache.arrow.flight.FlightClient.close(FlightClient.java:754) at > > scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:392) > at > > scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:391) > at scala.util.Using$.$anonfun$apply$1(Using.scala:268) at > scala.util.Using$.apply(Using.scala:113) at > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream$$anonfun$2(ArrowFlightDataFetcher.scala:40) > at scala.util.Using$.$anonfun$apply$1(Using.scala:262) at > scala.util.Using$.apply(Using.scala:113) at > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:41) > ... 4 more Suppressed: java.lang.IllegalStateException: Memory was leaked > by query. Memory leaked: (10485760) > > def fetchDataStream(details: ObjectStoreDetails): Iterator[FlightStream] = > { > logger.info(s"Fetching data for S3 path: ${details.s3Path}") > val ticketStr = buildTicketStr(details) > logger.info(s"Generated ticket string: $ticketStr") > > val allocator = new RootAllocator(Long.MaxValue) > val client = FlightClient.builder(allocator, > Location.forGrpcInsecure(serverHost, serverPort)).build() > val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) > // Using Try for proper resource cleanup > try { > val stream = client.getStream(ticket) > // Collect and return an iterator for each batch > Iterator.continually(stream).takeWhile(_.next()) > } catch { > case e: FlightRuntimeException => > logger.error(s"Error communicating with Flight server: > ${e.getMessage}") > throw new CefFlightServerException(s"Error communicating with > Flight server: ${e.getMessage}", e) > case e: Exception => > logger.error(s"Failed to fetch data: ${e.getMessage}") > throw new CefFlightServerException("Failed to fetch data from > Flight Server", e) > } finally { > if (client != null) client.close() > if (allocator != null) allocator.close() > } > } > > Sharing the above code snippet for reference > Are we doing anything wrong here? how to avoid memory spillage issue? > > Thanks, > Susmit >