Hi,

But gc is not predictable (though python's is more progressive than
that of jvm), the following code fails randomly if `gc.collect` is not
called.
```

# ENV
# Python 3.8.16 (default, Jun 12 2023, 12:55:15)
# [Clang 14.0.6 ] :: Anaconda, Inc. on darwin
# Type "help", "copyright", "credits" or "license" for more information.


def wrap_from_java_stream_to_generator(java_arrow_stream,
allocator=None, yield_schema=False):
    if allocator is None:
        allocator = get_java_root_allocator().allocator
    c_stream = arrow_c.new("struct ArrowArrayStream*")
    c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))

    import jpype.imports
    org = jpype.JPackage("org")
    java_wrapped_stream = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)

    org.apache.arrow.c.Data.exportArrayStream(allocator,
java_arrow_stream, java_wrapped_stream)

    with pa.RecordBatchReader._import_from_c(c_stream_ptr) as reader:
# type: pa.RecordBatchReader
        if yield_schema:
            yield reader.schema
        for record_batch in reader:
            yield record_batch

            del record_batch  # delete reference, make sure no
reference in python


def wrap_from_java_stream(java_arrow_stream, base_allocator=None):
    generator = wrap_from_java_stream_to_generator(java_arrow_stream,
base_allocator, yield_schema=True)
    schema = next(generator)

    return pa.RecordBatchReader.from_batches(schema, generator)

def test_wrap_from_java_stream(tmp_path):
    start_jvm()

    org = jpype.JPackage("org")

    with child_allocator("test-allocator") as allocator:
        r = 
org.alipay.fair.panama.server_wrapper.utils.InMemoryArrowReader.create(allocator)

        with wrap_from_java_stream(r, allocator) as stream:
            pa.dataset.write_dataset(stream, "/tmp/aaaaa", format="parquet",

existing_data_behavior="overwrite_or_ignore")

        # gc.collect()
```

Any way to ensure when the allocator closed, all references is collected?
Should I gc every time before allocator is closed, or just use one
single root allocator for all purpose?

Antoine Pitrou <anto...@python.org> 于2023年6月29日周四 21:50写道:
>
>
> Hi,
>
> To answer precisely:
>
> 1) The exported record batch will live as long as the Python RecordBatch
> object is kept alive. If your script keeps the Python RecordBatch object
> alive until the end, then the exported record batch is kept alive until
> the end.
>
> 2) The rest is standard Python semantics. When an object is not
> referenced by the program anymore, reference counting destroys it. For
> an exported record batch, destroying the Python RecordBatch calls the
> record batch's release callback.
>
> Regards
>
> Antoine.
>
>
>
>
>
>
> Le 29/06/2023 à 15:05, Wenbo Hu a écrit :
> > Thanks for your explanation, Antoine.
> >
> > I figured out why I'm facing the memory leak and need to call delete 
> > explicit.
> > my example code may mislead the situation. The key problem is when I
> > wrap the code of convert java stream to RecordBatchReader, I generate
> > a child allocator from current context (lives as long as the
> > RecordBatchReader) to call exportArrayStream in a generator, so the
> > consumer/callback always outlives the RecordBatchReader and its
> > underlying allocator (not the allocator of java stream, but that of
> > exportArrayStream).
> >
> > When I specify the allocator of the convert with a longer lives
> > allocator (as long as the consumer/callback), code works as expected.
> >
> > Antoine Pitrou <anto...@python.org> 于2023年6月29日周四 17:55写道:
> >>
> >>
> >> Le 29/06/2023 à 09:50, Wenbo Hu a écrit :
> >>> Hi,
> >>>
> >>> I'm using Jpype to pass streams between java and python back and forth.
> >>>
> >>> For follow code works fine with its release callback
> >>> ```python
> >>>
> >>>       with child_allocator("test-allocator") as allocator:
> >>>           r = some_package.InMemoryArrowReader.create(allocator)
> >>>           c_stream = arrow_c.new("struct ArrowArrayStream*")
> >>>           c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
> >>>
> >>>           s = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
> >>>           org.apache.arrow.c.Data.exportArrayStream(allocator, r, s)
> >>>
> >>>           with pa.RecordBatchReader._import_from_c(c_stream_ptr) as 
> >>> stream:
> >>>               for rb in stream:  # type: pa.RecordBatch
> >>>                   logger.info(rb.num_rows)     # yield weakref.proxy(rb)
> >>>                   del rb     # release callback directly called in 
> >>> current thread?
> >>> ```
> >>>
> >>> But if del statment not called, the allocator from java side would
> >>> raise exception that memory leaks.
> >>
> >> That's not surprising. The `rb` variable keeps the record batch and its
> >> backing memory alive. This is the expected semantics. Otherwise,
> >> accessing `rb`'s contents would crash.
> >>
> >>> Also, an warning message output to err
> >>> ```
> >>> WARNING: Failed to release Java C Data resource: Failed to attach the
> >>> current thread to a Java VM
> >>> ```
> >>
> >> That's probably because the release callback is called at process exit,
> >> after the JVM is shutdown?
> >>
> >>   > Is yielding a weakref-ed `rb` a good idea? Will the weakref-ed
> >>   > RecordBatchReader works with other pyarrow api (dataset)?
> >>
> >> That would probably not solve the problem. Users can trivially get a
> >> strong reference from the weakref, and keep it alive too long.
> >>
> >> Regards
> >>
> >> Antoine.
> >
> >
> >



-- 
---------------------
Best Regards,
Wenbo Hu,

Reply via email to