I think you should probably use an allocator that lives long enough, yes.
Regards
Antoine.
Le 30/06/2023 à 10:23, Wenbo Hu a écrit :
Hi,
But gc is not predictable (though python's is more progressive than
that of jvm), the following code fails randomly if `gc.collect` is not
called.
```
# ENV
# Python 3.8.16 (default, Jun 12 2023, 12:55:15)
# [Clang 14.0.6 ] :: Anaconda, Inc. on darwin
# Type "help", "copyright", "credits" or "license" for more information.
def wrap_from_java_stream_to_generator(java_arrow_stream,
allocator=None, yield_schema=False):
if allocator is None:
allocator = get_java_root_allocator().allocator
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
import jpype.imports
org = jpype.JPackage("org")
java_wrapped_stream =
org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
org.apache.arrow.c.Data.exportArrayStream(allocator,
java_arrow_stream, java_wrapped_stream)
with pa.RecordBatchReader._import_from_c(c_stream_ptr) as reader:
# type: pa.RecordBatchReader
if yield_schema:
yield reader.schema
for record_batch in reader:
yield record_batch
del record_batch # delete reference, make sure no
reference in python
def wrap_from_java_stream(java_arrow_stream, base_allocator=None):
generator = wrap_from_java_stream_to_generator(java_arrow_stream,
base_allocator, yield_schema=True)
schema = next(generator)
return pa.RecordBatchReader.from_batches(schema, generator)
def test_wrap_from_java_stream(tmp_path):
start_jvm()
org = jpype.JPackage("org")
with child_allocator("test-allocator") as allocator:
r =
org.alipay.fair.panama.server_wrapper.utils.InMemoryArrowReader.create(allocator)
with wrap_from_java_stream(r, allocator) as stream:
pa.dataset.write_dataset(stream, "/tmp/aaaaa", format="parquet",
existing_data_behavior="overwrite_or_ignore")
# gc.collect()
```
Any way to ensure when the allocator closed, all references is collected?
Should I gc every time before allocator is closed, or just use one
single root allocator for all purpose?
Antoine Pitrou <anto...@python.org> 于2023年6月29日周四 21:50写道:
Hi,
To answer precisely:
1) The exported record batch will live as long as the Python RecordBatch
object is kept alive. If your script keeps the Python RecordBatch object
alive until the end, then the exported record batch is kept alive until
the end.
2) The rest is standard Python semantics. When an object is not
referenced by the program anymore, reference counting destroys it. For
an exported record batch, destroying the Python RecordBatch calls the
record batch's release callback.
Regards
Antoine.
Le 29/06/2023 à 15:05, Wenbo Hu a écrit :
Thanks for your explanation, Antoine.
I figured out why I'm facing the memory leak and need to call delete explicit.
my example code may mislead the situation. The key problem is when I
wrap the code of convert java stream to RecordBatchReader, I generate
a child allocator from current context (lives as long as the
RecordBatchReader) to call exportArrayStream in a generator, so the
consumer/callback always outlives the RecordBatchReader and its
underlying allocator (not the allocator of java stream, but that of
exportArrayStream).
When I specify the allocator of the convert with a longer lives
allocator (as long as the consumer/callback), code works as expected.
Antoine Pitrou <anto...@python.org> 于2023年6月29日周四 17:55写道:
Le 29/06/2023 à 09:50, Wenbo Hu a écrit :
Hi,
I'm using Jpype to pass streams between java and python back and forth.
For follow code works fine with its release callback
```python
with child_allocator("test-allocator") as allocator:
r = some_package.InMemoryArrowReader.create(allocator)
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
s = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
org.apache.arrow.c.Data.exportArrayStream(allocator, r, s)
with pa.RecordBatchReader._import_from_c(c_stream_ptr) as stream:
for rb in stream: # type: pa.RecordBatch
logger.info(rb.num_rows) # yield weakref.proxy(rb)
del rb # release callback directly called in current
thread?
```
But if del statment not called, the allocator from java side would
raise exception that memory leaks.
That's not surprising. The `rb` variable keeps the record batch and its
backing memory alive. This is the expected semantics. Otherwise,
accessing `rb`'s contents would crash.
Also, an warning message output to err
```
WARNING: Failed to release Java C Data resource: Failed to attach the
current thread to a Java VM
```
That's probably because the release callback is called at process exit,
after the JVM is shutdown?
> Is yielding a weakref-ed `rb` a good idea? Will the weakref-ed
> RecordBatchReader works with other pyarrow api (dataset)?
That would probably not solve the problem. Users can trivially get a
strong reference from the weakref, and keep it alive too long.
Regards
Antoine.