I see your point is very valid, in my use case the consumer which will be using an SDK we are exposing an api:
The method reads FlightStream in complete with a tunable batch size, the iterator will not be fully traversed only if the process crashes, or explicitly the user stops the process. def getStream(details: ObjectStoreDetails): Iterator[FlightStream] On Sat, Oct 26, 2024 at 4:03 AM Laurent Goujon <laur...@dremio.com.invalid> wrote: > Yes, the first example was returning an iterator to the caller which would > interact with the stream after the client is closed (lazy evaluation). The > new code should work but may cause a leak if the iterator is not fully > traversed. You may want to return a type which has both the Iterator and > AutoCloseable traits so that the caller can actually explicitly close the > resources. > > On Fri, Oct 25, 2024 at 11:52 AM Susmit Sarkar <susmitsir...@gmail.com> > wrote: > > > I believe I figured it out i changed slightly the logic, post the changes > > the issue is not prevalent.. > > > > def fetchDataStream(details: ObjectStoreDetails): Iterator[FlightStream] > = > > { > > logger.info(s"Fetching data for S3 path: ${details.s3Path}") > > val ticketStr = buildTicketStr(details) > > logger.info(s"Generated ticket string: $ticketStr") > > > > val allocator = new RootAllocator(Long.MaxValue) > > val client = FlightClient.builder(allocator, > > Location.forGrpcInsecure(serverHost, serverPort)).build() > > var stream: FlightStream = null > > > > try { > > val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) > > stream = client.getStream(ticket) > > // Return an iterator for the stream with proper cleanup > > Iterator.continually(stream).takeWhile { _ => > > val hasNext = stream.next() > > if (!hasNext) { > > // Close the stream explicitly once data is fully read > > close(stream, client) > > if (logger.isDebugEnabled()) { > > logger.debug("Closed FlightStream and client..") > > } > > } > > hasNext > > } > > } catch { > > case e: FlightRuntimeException => > > logger.error(s"Error communicating with Flight server: > > ${e.getMessage}") > > close(stream, client) > > throw new CefFlightServerException(s"Error communicating with > > Flight server: ${e.getMessage}", e) > > case e: Exception => > > logger.error(s"Failed to fetch data: ${e.getMessage}") > > close(stream, client) > > throw new CefFlightServerException("Failed to fetch data from > > Flight Server", e) > > } > > } > > > > Finally block was getting executed first, now i changed the approach a > > bit, dont see any memory leaks. if the client is closed underlying > > meory allocator is closed too > > > > > > > > > > On Fri, Oct 25, 2024 at 4:09 PM Susmit Sarkar <susmitsir...@gmail.com> > > wrote: > > > > > Hi Team, > > > > > > We are seeing the issue often with Memory Leak: > > > > > > *JDK 11* > > > > > > "org.apache.arrow" % "arrow-flight" % "17.0.0", > > > "org.apache.arrow" % "arrow-vector" % "17.0.0", > > > "org.apache.arrow" % "flight-core" % "17.0.0", > > > > > > > > > 4-10-25 15:25:06.394 [main] ERROR o.apache.arrow.memory.BaseAllocator - > > > Memory was leaked by query. Memory leaked: (10485760) > > > Allocator(flight-client) 0/10485760/10485760/9223372036854775807 > > > (res/actual/peak/limit) 2024-10-25 15:25:06.395 [main] ERROR > > > o.apache.arrow.memory.BaseAllocator - Memory was leaked by query. > Memory > > > leaked: (10485760) Outstanding child allocators : > > Allocator(flight-client) > > > 0/10485760/10485760/9223372036854775807 (res/actual/peak/limit) > > > Allocator(ROOT) 0/10485760/10485760/9223372036854775807 > > > (res/actual/peak/limit) 2024-10-25 15:25:06.396 [main] ERROR > > > c.t.c.d.ArrowFlightDataFetcher - Failed to fetch data: Memory was > leaked > > by > > > query. Memory leaked: (10485760) Allocator(flight-client) > > > 0/10485760/10485760/9223372036854775807 (res/actual/peak/limit) > Exception > > > in thread "main" com.tesco.cef.exceptions.CefFlightServerException: > > Failed > > > to fetch data from Flight Server at > > > > > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:47) > > > at > > > > > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:41) > > > at scala.util.Failure.recover(Try.scala:233) at > > > > > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:47) > > > at > > > > > > com.tesco.cef.client.CefFlightDataClient.getStream(CefFlightDataClient.scala:27) > > > at com.tesco.cef.samples.client.Main$.processStream(Main.scala:120) at > > > com.tesco.cef.samples.client.Main$.main(Main.scala:60) at > > > com.tesco.cef.samples.client.Main.main(Main.scala) Caused by: > > > java.lang.IllegalStateException: Memory was leaked by query. Memory > > leaked: > > > (10485760) Allocator(flight-client) > > 0/10485760/10485760/9223372036854775807 > > > (res/actual/peak/limit) at > > > org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:501) at > > > org.apache.arrow.flight.FlightClient.close(FlightClient.java:754) at > > > > > > scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:392) > > > at > > > > > > scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:391) > > > at scala.util.Using$.$anonfun$apply$1(Using.scala:268) at > > > scala.util.Using$.apply(Using.scala:113) at > > > > > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream$$anonfun$2(ArrowFlightDataFetcher.scala:40) > > > at scala.util.Using$.$anonfun$apply$1(Using.scala:262) at > > > scala.util.Using$.apply(Using.scala:113) at > > > > > > com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:41) > > > ... 4 more Suppressed: java.lang.IllegalStateException: Memory was > leaked > > > by query. Memory leaked: (10485760) > > > > > > def fetchDataStream(details: ObjectStoreDetails): > Iterator[FlightStream] > > = { > > > logger.info(s"Fetching data for S3 path: ${details.s3Path}") > > > val ticketStr = buildTicketStr(details) > > > logger.info(s"Generated ticket string: $ticketStr") > > > > > > val allocator = new RootAllocator(Long.MaxValue) > > > val client = FlightClient.builder(allocator, > > Location.forGrpcInsecure(serverHost, serverPort)).build() > > > val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8)) > > > // Using Try for proper resource cleanup > > > try { > > > val stream = client.getStream(ticket) > > > // Collect and return an iterator for each batch > > > Iterator.continually(stream).takeWhile(_.next()) > > > } catch { > > > case e: FlightRuntimeException => > > > logger.error(s"Error communicating with Flight server: > > ${e.getMessage}") > > > throw new CefFlightServerException(s"Error communicating with > > Flight server: ${e.getMessage}", e) > > > case e: Exception => > > > logger.error(s"Failed to fetch data: ${e.getMessage}") > > > throw new CefFlightServerException("Failed to fetch data from > > Flight Server", e) > > > } finally { > > > if (client != null) client.close() > > > if (allocator != null) allocator.close() > > > } > > > } > > > > > > Sharing the above code snippet for reference > > > Are we doing anything wrong here? how to avoid memory spillage issue? > > > > > > Thanks, > > > Susmit > > > > > > > > > > > >