Hi, But gc is not predictable (though python's is more progressive than that of jvm), the following code fails randomly if `gc.collect` is not called. ```
# ENV # Python 3.8.16 (default, Jun 12 2023, 12:55:15) # [Clang 14.0.6 ] :: Anaconda, Inc. on darwin # Type "help", "copyright", "credits" or "license" for more information. def wrap_from_java_stream_to_generator(java_arrow_stream, allocator=None, yield_schema=False): if allocator is None: allocator = get_java_root_allocator().allocator c_stream = arrow_c.new("struct ArrowArrayStream*") c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream)) import jpype.imports org = jpype.JPackage("org") java_wrapped_stream = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr) org.apache.arrow.c.Data.exportArrayStream(allocator, java_arrow_stream, java_wrapped_stream) with pa.RecordBatchReader._import_from_c(c_stream_ptr) as reader: # type: pa.RecordBatchReader if yield_schema: yield reader.schema for record_batch in reader: yield record_batch del record_batch # delete reference, make sure no reference in python def wrap_from_java_stream(java_arrow_stream, base_allocator=None): generator = wrap_from_java_stream_to_generator(java_arrow_stream, base_allocator, yield_schema=True) schema = next(generator) return pa.RecordBatchReader.from_batches(schema, generator) def test_wrap_from_java_stream(tmp_path): start_jvm() org = jpype.JPackage("org") with child_allocator("test-allocator") as allocator: r = org.alipay.fair.panama.server_wrapper.utils.InMemoryArrowReader.create(allocator) with wrap_from_java_stream(r, allocator) as stream: pa.dataset.write_dataset(stream, "/tmp/aaaaa", format="parquet", existing_data_behavior="overwrite_or_ignore") # gc.collect() ``` Any way to ensure when the allocator closed, all references is collected? Should I gc every time before allocator is closed, or just use one single root allocator for all purpose? Antoine Pitrou <anto...@python.org> 于2023年6月29日周四 21:50写道: > > > Hi, > > To answer precisely: > > 1) The exported record batch will live as long as the Python RecordBatch > object is kept alive. If your script keeps the Python RecordBatch object > alive until the end, then the exported record batch is kept alive until > the end. > > 2) The rest is standard Python semantics. When an object is not > referenced by the program anymore, reference counting destroys it. For > an exported record batch, destroying the Python RecordBatch calls the > record batch's release callback. > > Regards > > Antoine. > > > > > > > Le 29/06/2023 à 15:05, Wenbo Hu a écrit : > > Thanks for your explanation, Antoine. > > > > I figured out why I'm facing the memory leak and need to call delete > > explicit. > > my example code may mislead the situation. The key problem is when I > > wrap the code of convert java stream to RecordBatchReader, I generate > > a child allocator from current context (lives as long as the > > RecordBatchReader) to call exportArrayStream in a generator, so the > > consumer/callback always outlives the RecordBatchReader and its > > underlying allocator (not the allocator of java stream, but that of > > exportArrayStream). > > > > When I specify the allocator of the convert with a longer lives > > allocator (as long as the consumer/callback), code works as expected. > > > > Antoine Pitrou <anto...@python.org> 于2023年6月29日周四 17:55写道: > >> > >> > >> Le 29/06/2023 à 09:50, Wenbo Hu a écrit : > >>> Hi, > >>> > >>> I'm using Jpype to pass streams between java and python back and forth. > >>> > >>> For follow code works fine with its release callback > >>> ```python > >>> > >>> with child_allocator("test-allocator") as allocator: > >>> r = some_package.InMemoryArrowReader.create(allocator) > >>> c_stream = arrow_c.new("struct ArrowArrayStream*") > >>> c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream)) > >>> > >>> s = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr) > >>> org.apache.arrow.c.Data.exportArrayStream(allocator, r, s) > >>> > >>> with pa.RecordBatchReader._import_from_c(c_stream_ptr) as > >>> stream: > >>> for rb in stream: # type: pa.RecordBatch > >>> logger.info(rb.num_rows) # yield weakref.proxy(rb) > >>> del rb # release callback directly called in > >>> current thread? > >>> ``` > >>> > >>> But if del statment not called, the allocator from java side would > >>> raise exception that memory leaks. > >> > >> That's not surprising. The `rb` variable keeps the record batch and its > >> backing memory alive. This is the expected semantics. Otherwise, > >> accessing `rb`'s contents would crash. > >> > >>> Also, an warning message output to err > >>> ``` > >>> WARNING: Failed to release Java C Data resource: Failed to attach the > >>> current thread to a Java VM > >>> ``` > >> > >> That's probably because the release callback is called at process exit, > >> after the JVM is shutdown? > >> > >> > Is yielding a weakref-ed `rb` a good idea? Will the weakref-ed > >> > RecordBatchReader works with other pyarrow api (dataset)? > >> > >> That would probably not solve the problem. Users can trivially get a > >> strong reference from the weakref, and keep it alive too long. > >> > >> Regards > >> > >> Antoine. > > > > > > -- --------------------- Best Regards, Wenbo Hu,