I see your  point is very valid, in my use case the consumer which will be
using an SDK we are exposing an api:

The method reads FlightStream in complete with a tunable batch size, the
iterator will not be fully traversed only if the process crashes, or
explicitly the user stops the process.

def getStream(details: ObjectStoreDetails): Iterator[FlightStream]


On Sat, Oct 26, 2024 at 4:03 AM Laurent Goujon <laur...@dremio.com.invalid>
wrote:

> Yes, the first example was returning an iterator to the caller which would
> interact with the stream after the client is closed (lazy evaluation). The
> new code should work but may cause a leak if the iterator is not fully
> traversed. You may want to return a type which has both the Iterator and
> AutoCloseable traits so that the caller can actually explicitly close the
> resources.
>
> On Fri, Oct 25, 2024 at 11:52 AM Susmit Sarkar <susmitsir...@gmail.com>
> wrote:
>
> > I believe I figured it out i changed slightly the logic, post the changes
> > the issue is not prevalent..
> >
> > def fetchDataStream(details: ObjectStoreDetails): Iterator[FlightStream]
> =
> > {
> >   logger.info(s"Fetching data for S3 path: ${details.s3Path}")
> >   val ticketStr = buildTicketStr(details)
> >   logger.info(s"Generated ticket string: $ticketStr")
> >
> >   val allocator = new RootAllocator(Long.MaxValue)
> >   val client = FlightClient.builder(allocator,
> > Location.forGrpcInsecure(serverHost, serverPort)).build()
> >   var stream: FlightStream = null
> >
> >   try {
> >     val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
> >     stream = client.getStream(ticket)
> >     // Return an iterator for the stream with proper cleanup
> >     Iterator.continually(stream).takeWhile { _ =>
> >       val hasNext = stream.next()
> >       if (!hasNext) {
> >         // Close the stream explicitly once data is fully read
> >         close(stream, client)
> >         if (logger.isDebugEnabled()) {
> >           logger.debug("Closed FlightStream and client..")
> >         }
> >       }
> >       hasNext
> >     }
> >   } catch {
> >     case e: FlightRuntimeException =>
> >       logger.error(s"Error communicating with Flight server:
> > ${e.getMessage}")
> >       close(stream, client)
> >       throw new CefFlightServerException(s"Error communicating with
> > Flight server: ${e.getMessage}", e)
> >     case e: Exception =>
> >       logger.error(s"Failed to fetch data: ${e.getMessage}")
> >       close(stream, client)
> >       throw new CefFlightServerException("Failed to fetch data from
> > Flight Server", e)
> >   }
> > }
> >
> > Finally block was getting executed first, now i changed the approach a
> > bit, dont see any memory leaks. if the client is closed underlying
> > meory allocator is closed too
> >
> >
> >
> >
> > On Fri, Oct 25, 2024 at 4:09 PM Susmit Sarkar <susmitsir...@gmail.com>
> > wrote:
> >
> > > Hi Team,
> > >
> > > We are seeing the issue often with Memory Leak:
> > >
> > > *JDK 11*
> > >
> > > "org.apache.arrow" % "arrow-flight" % "17.0.0",
> > > "org.apache.arrow" % "arrow-vector" % "17.0.0",
> > > "org.apache.arrow" % "flight-core" % "17.0.0",
> > >
> > >
> > > 4-10-25 15:25:06.394 [main] ERROR o.apache.arrow.memory.BaseAllocator -
> > > Memory was leaked by query. Memory leaked: (10485760)
> > > Allocator(flight-client) 0/10485760/10485760/9223372036854775807
> > > (res/actual/peak/limit) 2024-10-25 15:25:06.395 [main] ERROR
> > > o.apache.arrow.memory.BaseAllocator - Memory was leaked by query.
> Memory
> > > leaked: (10485760) Outstanding child allocators :
> > Allocator(flight-client)
> > > 0/10485760/10485760/9223372036854775807 (res/actual/peak/limit)
> > > Allocator(ROOT) 0/10485760/10485760/9223372036854775807
> > > (res/actual/peak/limit) 2024-10-25 15:25:06.396 [main] ERROR
> > > c.t.c.d.ArrowFlightDataFetcher - Failed to fetch data: Memory was
> leaked
> > by
> > > query. Memory leaked: (10485760) Allocator(flight-client)
> > > 0/10485760/10485760/9223372036854775807 (res/actual/peak/limit)
> Exception
> > > in thread "main" com.tesco.cef.exceptions.CefFlightServerException:
> > Failed
> > > to fetch data from Flight Server at
> > >
> >
> com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:47)
> > > at
> > >
> >
> com.tesco.cef.datafetcher.ArrowFlightDataFetcher$$anon$1.applyOrElse(ArrowFlightDataFetcher.scala:41)
> > > at scala.util.Failure.recover(Try.scala:233) at
> > >
> >
> com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:47)
> > > at
> > >
> >
> com.tesco.cef.client.CefFlightDataClient.getStream(CefFlightDataClient.scala:27)
> > > at com.tesco.cef.samples.client.Main$.processStream(Main.scala:120) at
> > > com.tesco.cef.samples.client.Main$.main(Main.scala:60) at
> > > com.tesco.cef.samples.client.Main.main(Main.scala) Caused by:
> > > java.lang.IllegalStateException: Memory was leaked by query. Memory
> > leaked:
> > > (10485760) Allocator(flight-client)
> > 0/10485760/10485760/9223372036854775807
> > > (res/actual/peak/limit) at
> > > org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:501) at
> > > org.apache.arrow.flight.FlightClient.close(FlightClient.java:754) at
> > >
> >
> scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:392)
> > > at
> > >
> >
> scala.util.Using$Releasable$AutoCloseableIsReleasable$.release(Using.scala:391)
> > > at scala.util.Using$.$anonfun$apply$1(Using.scala:268) at
> > > scala.util.Using$.apply(Using.scala:113) at
> > >
> >
> com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream$$anonfun$2(ArrowFlightDataFetcher.scala:40)
> > > at scala.util.Using$.$anonfun$apply$1(Using.scala:262) at
> > > scala.util.Using$.apply(Using.scala:113) at
> > >
> >
> com.tesco.cef.datafetcher.ArrowFlightDataFetcher.fetchDataStream(ArrowFlightDataFetcher.scala:41)
> > > ... 4 more Suppressed: java.lang.IllegalStateException: Memory was
> leaked
> > > by query. Memory leaked: (10485760)
> > >
> > > def fetchDataStream(details: ObjectStoreDetails):
> Iterator[FlightStream]
> > = {
> > >   logger.info(s"Fetching data for S3 path: ${details.s3Path}")
> > >   val ticketStr = buildTicketStr(details)
> > >   logger.info(s"Generated ticket string: $ticketStr")
> > >
> > >   val allocator = new RootAllocator(Long.MaxValue)
> > >   val client = FlightClient.builder(allocator,
> > Location.forGrpcInsecure(serverHost, serverPort)).build()
> > >   val ticket = new Ticket(ticketStr.getBytes(StandardCharsets.UTF_8))
> > >   // Using Try for proper resource cleanup
> > >   try {
> > >     val stream = client.getStream(ticket)
> > >     // Collect and return an iterator for each batch
> > >     Iterator.continually(stream).takeWhile(_.next())
> > >   } catch {
> > >     case e: FlightRuntimeException =>
> > >       logger.error(s"Error communicating with Flight server:
> > ${e.getMessage}")
> > >       throw new CefFlightServerException(s"Error communicating with
> > Flight server: ${e.getMessage}", e)
> > >     case e: Exception =>
> > >       logger.error(s"Failed to fetch data: ${e.getMessage}")
> > >       throw new CefFlightServerException("Failed to fetch data from
> > Flight Server", e)
> > >   } finally {
> > >     if (client != null) client.close()
> > >     if (allocator != null) allocator.close()
> > >   }
> > > }
> > >
> > > Sharing the above code snippet for reference
> > > Are we doing anything wrong here? how to avoid memory spillage issue?
> > >
> > > Thanks,
> > > Susmit
> > >
> > >
> > >
> >
>

Reply via email to