I think you should probably use an allocator that lives long enough, yes.

Regards

Antoine.


Le 30/06/2023 à 10:23, Wenbo Hu a écrit :
Hi,

But gc is not predictable (though python's is more progressive than
that of jvm), the following code fails randomly if `gc.collect` is not
called.
```

# ENV
# Python 3.8.16 (default, Jun 12 2023, 12:55:15)
# [Clang 14.0.6 ] :: Anaconda, Inc. on darwin
# Type "help", "copyright", "credits" or "license" for more information.


def wrap_from_java_stream_to_generator(java_arrow_stream,
allocator=None, yield_schema=False):
     if allocator is None:
         allocator = get_java_root_allocator().allocator
     c_stream = arrow_c.new("struct ArrowArrayStream*")
     c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))

     import jpype.imports
     org = jpype.JPackage("org")
     java_wrapped_stream = 
org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)

     org.apache.arrow.c.Data.exportArrayStream(allocator,
java_arrow_stream, java_wrapped_stream)

     with pa.RecordBatchReader._import_from_c(c_stream_ptr) as reader:
# type: pa.RecordBatchReader
         if yield_schema:
             yield reader.schema
         for record_batch in reader:
             yield record_batch

             del record_batch  # delete reference, make sure no
reference in python


def wrap_from_java_stream(java_arrow_stream, base_allocator=None):
     generator = wrap_from_java_stream_to_generator(java_arrow_stream,
base_allocator, yield_schema=True)
     schema = next(generator)

     return pa.RecordBatchReader.from_batches(schema, generator)

def test_wrap_from_java_stream(tmp_path):
     start_jvm()

     org = jpype.JPackage("org")

     with child_allocator("test-allocator") as allocator:
         r = 
org.alipay.fair.panama.server_wrapper.utils.InMemoryArrowReader.create(allocator)

         with wrap_from_java_stream(r, allocator) as stream:
             pa.dataset.write_dataset(stream, "/tmp/aaaaa", format="parquet",

existing_data_behavior="overwrite_or_ignore")

         # gc.collect()
```

Any way to ensure when the allocator closed, all references is collected?
Should I gc every time before allocator is closed, or just use one
single root allocator for all purpose?

Antoine Pitrou <anto...@python.org> 于2023年6月29日周四 21:50写道:


Hi,

To answer precisely:

1) The exported record batch will live as long as the Python RecordBatch
object is kept alive. If your script keeps the Python RecordBatch object
alive until the end, then the exported record batch is kept alive until
the end.

2) The rest is standard Python semantics. When an object is not
referenced by the program anymore, reference counting destroys it. For
an exported record batch, destroying the Python RecordBatch calls the
record batch's release callback.

Regards

Antoine.






Le 29/06/2023 à 15:05, Wenbo Hu a écrit :
Thanks for your explanation, Antoine.

I figured out why I'm facing the memory leak and need to call delete explicit.
my example code may mislead the situation. The key problem is when I
wrap the code of convert java stream to RecordBatchReader, I generate
a child allocator from current context (lives as long as the
RecordBatchReader) to call exportArrayStream in a generator, so the
consumer/callback always outlives the RecordBatchReader and its
underlying allocator (not the allocator of java stream, but that of
exportArrayStream).

When I specify the allocator of the convert with a longer lives
allocator (as long as the consumer/callback), code works as expected.

Antoine Pitrou <anto...@python.org> 于2023年6月29日周四 17:55写道:


Le 29/06/2023 à 09:50, Wenbo Hu a écrit :
Hi,

I'm using Jpype to pass streams between java and python back and forth.

For follow code works fine with its release callback
```python

       with child_allocator("test-allocator") as allocator:
           r = some_package.InMemoryArrowReader.create(allocator)
           c_stream = arrow_c.new("struct ArrowArrayStream*")
           c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))

           s = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
           org.apache.arrow.c.Data.exportArrayStream(allocator, r, s)

           with pa.RecordBatchReader._import_from_c(c_stream_ptr) as stream:
               for rb in stream:  # type: pa.RecordBatch
                   logger.info(rb.num_rows)     # yield weakref.proxy(rb)
                   del rb     # release callback directly called in current 
thread?
```

But if del statment not called, the allocator from java side would
raise exception that memory leaks.

That's not surprising. The `rb` variable keeps the record batch and its
backing memory alive. This is the expected semantics. Otherwise,
accessing `rb`'s contents would crash.

Also, an warning message output to err
```
WARNING: Failed to release Java C Data resource: Failed to attach the
current thread to a Java VM
```

That's probably because the release callback is called at process exit,
after the JVM is shutdown?

   > Is yielding a weakref-ed `rb` a good idea? Will the weakref-ed
   > RecordBatchReader works with other pyarrow api (dataset)?

That would probably not solve the problem. Users can trivially get a
strong reference from the weakref, and keep it alive too long.

Regards

Antoine.






Reply via email to