Yes, the first example was returning an iterator to the caller which would interact with the stream after the client is closed (lazy evaluation). The new code should work but may cause a leak if the iterator is not fully traversed. You may want to return a type which has both the Iterator and AutoCloseable traits so that the caller can actually explicitly close the resources.
On Fri, Oct 25, 2024 at 11:52 AM Susmit Sarkar <susmitsir...@gmail.com> wrote: > I believe I figured it out i changed slightly the logic, post the changes > the issue is not prevalent.. > > def fetchDataStream(details: ObjectStoreDetails): Iterator[FlightStream] = > { > logger.info(s"Fetching data for S3 path: ${details.s3Path}") > val ticketStr = buildTicketStr(details) > logger.info(s"Generated ticket string: $ticketStr") > > val allocator = new RootAllocator(Long.MaxValue) > val client = FlightClient.builder(allocator, > Location.forGrpcInsecure(serverHost, serverPort)).build() > var stream: FlightStream = null > > try { > val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) > stream = client.getStream(ticket) > // Return an iterator for the stream with proper cleanup > Iterator.continually(stream).takeWhile { _ => > val hasNext = stream.next() > if (!hasNext) { > // Close the stream explicitly once data is fully read > close(stream, client) > if (logger.isDebugEnabled()) { > logger.debug("Closed FlightStream and client..") > } > } > hasNext > } > } catch { > case e: FlightRuntimeException => > logger.error(s"Error communicating with Flight server: > ${e.getMessage}") > close(stream, client) > throw new CefFlightServerException(s"Error communicating with > Flight server: ${e.getMessage}", e) > case e: Exception => > logger.error(s"Failed to fetch data: ${e.getMessage}") > close(stream, client) > throw new CefFlightServerException("Failed to fetch data from > Flight Server", e) > } > } > > Finally block was getting executed first, now i changed the approach a > bit, dont see any memory leaks. if the client is closed underlying > meory allocator is closed too > > > > > On Fri, Oct 25, 2024 at 4:09 PM Susmit Sarkar <susmitsir...@gmail.com> > wrote: > > > Hi Team, > > > > We are seeing the issue often with Memory Leak: > > > > *JDK 11* > > > > "org.apache.arrow" % "arrow-flight" % "17.0.0", > > "org.apache.arrow" % "arrow-vector" % "17.0.0", > > "org.apache.arrow" % "flight-core" % "17.0.0", > > > > > > 4-10-25 15:25:06.394 [main] ERROR o.apache.arrow.memory.BaseAllocator - > > Memory was leaked by query. Memory leaked: (10485760) > > Allocator(flight-client) 0/10485760/10485760/9223372036854775807 > > (res/actual/peak/limit) 2024-10-25 15:25:06.395 [main] ERROR > > o.apache.arrow.memory.BaseAllocator - Memory was leaked by query. Memory > > leaked: (10485760) Outstanding child allocators : > Allocator(flight-client) > > 0/10485760/10485760/9223372036854775807 (res/actual/peak/limit) > > Allocator(ROOT) 0/10485760/10485760/9223372036854775807 > > (res/actual/peak/limit) 2024-10-25 15:25:06.396 [main] ERROR > > c.t.c.d.ArrowFlightDataFetcher - Failed to fetch data: Memory was leaked > by > > query. Memory leaked: (10485760) Allocator(flight-client) > > 0/10485760/10485760/9223372036854775807 (res/actual/peak/limit) Exception > > in thread "main" com.tesco.cef.exceptions.CefFlightServerException: > Failed > > to fetch data from Flight Server at > > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:47) > > at > > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:41) > > at scala.util.Failure.recover(Try.scala:233) at > > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:47) > > at > > > com.tesco.cef.client.CefFlightDataClient.getStream(CefFlightDataClient.scala:27) > > at com.tesco.cef.samples.client.Main$.processStream(Main.scala:120) at > > com.tesco.cef.samples.client.Main$.main(Main.scala:60) at > > com.tesco.cef.samples.client.Main.main(Main.scala) Caused by: > > java.lang.IllegalStateException: Memory was leaked by query. Memory > leaked: > > (10485760) Allocator(flight-client) > 0/10485760/10485760/9223372036854775807 > > (res/actual/peak/limit) at > > org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:501) at > > org.apache.arrow.flight.FlightClient.close(FlightClient.java:754) at > > > scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:392) > > at > > > scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:391) > > at scala.util.Using$.$anonfun$apply$1(Using.scala:268) at > > scala.util.Using$.apply(Using.scala:113) at > > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream$$anonfun$2(ArrowFlightDataFetcher.scala:40) > > at scala.util.Using$.$anonfun$apply$1(Using.scala:262) at > > scala.util.Using$.apply(Using.scala:113) at > > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:41) > > ... 4 more Suppressed: java.lang.IllegalStateException: Memory was leaked > > by query. Memory leaked: (10485760) > > > > def fetchDataStream(details: ObjectStoreDetails): Iterator[FlightStream] > = { > > logger.info(s"Fetching data for S3 path: ${details.s3Path}") > > val ticketStr = buildTicketStr(details) > > logger.info(s"Generated ticket string: $ticketStr") > > > > val allocator = new RootAllocator(Long.MaxValue) > > val client = FlightClient.builder(allocator, > Location.forGrpcInsecure(serverHost, serverPort)).build() > > val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) > > // Using Try for proper resource cleanup > > try { > > val stream = client.getStream(ticket) > > // Collect and return an iterator for each batch > > Iterator.continually(stream).takeWhile(_.next()) > > } catch { > > case e: FlightRuntimeException => > > logger.error(s"Error communicating with Flight server: > ${e.getMessage}") > > throw new CefFlightServerException(s"Error communicating with > Flight server: ${e.getMessage}", e) > > case e: Exception => > > logger.error(s"Failed to fetch data: ${e.getMessage}") > > throw new CefFlightServerException("Failed to fetch data from > Flight Server", e) > > } finally { > > if (client != null) client.close() > > if (allocator != null) allocator.close() > > } > > } > > > > Sharing the above code snippet for reference > > Are we doing anything wrong here? how to avoid memory spillage issue? > > > > Thanks, > > Susmit > > > > > > >