1. For weakref types, cython API raise TypeError.
2. All related references need to explicit delete before the allocator close
For following code, works fine.
```
    with child_allocator("test-allocator") as allocator:
        r = some_package.InMemoryArrowReader.create(allocator)
        c_stream = arrow_c.new("struct ArrowArrayStream*")
        c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))

        s = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
        org.apache.arrow.c.Data.exportArrayStream(allocator, r, s)

        with pa.RecordBatchReader._import_from_c(c_stream_ptr) as stream:
              with pa.csv.CSVWriter(csv_path, schema) as writer:
                  for rb in stream:  # type: pa.RecordBatch
                    writer.write(rb)
                    del rb
              del writer
```
before the java allocator closed, the only reference of the writer is deleted.

Buf if the writer out scope of the allocator, it will fail as well,
although writer should not hold any record_batch.

```
    with pa.csv.CSVWriter(csv_path, schema) as writer:
        with child_allocator("test-allocator") as allocator:
            r = some_packge.InMemoryArrowReader.create(allocator)
            c_stream = arrow_c.new("struct ArrowArrayStream*")
            c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))

            s = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
            org.apache.arrow.c.Data.exportArrayStream(allocator, r, s)

            with pa.RecordBatchReader._import_from_c(c_stream_ptr) as stream:
                for rb in stream:  # type: pa.RecordBatch
                    writer.write(rb)
                    del rb
                del writer
```

Wenbo Hu <huwenbo1...@gmail.com> 于2023年6月29日周四 15:50写道:

>
> Hi,
>
> I'm using Jpype to pass streams between java and python back and forth.
>
> For follow code works fine with its release callback
> ```python
>
>     with child_allocator("test-allocator") as allocator:
>         r = some_package.InMemoryArrowReader.create(allocator)
>         c_stream = arrow_c.new("struct ArrowArrayStream*")
>         c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
>
>         s = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
>         org.apache.arrow.c.Data.exportArrayStream(allocator, r, s)
>
>         with pa.RecordBatchReader._import_from_c(c_stream_ptr) as stream:
>             for rb in stream:  # type: pa.RecordBatch
>                 logger.info(rb.num_rows)     # yield weakref.proxy(rb)
>                 del rb     # release callback directly called in current 
> thread?
> ```
>
> But if del statment not called, the allocator from java side would
> raise exception that memory leaks.
> Also, an warning message output to err
> ```
> WARNING: Failed to release Java C Data resource: Failed to attach the
> current thread to a Java VM
> ```
> which comes from
> https://github.com/apache/arrow/blob/5cc6aa3551c6d0dc1285ea1eb587325ae3491a7f/java/c/src/main/cpp/jni_wrapper.cc#L182
>
> Then, how can I find which thread called the release callback? Or, how
> can I make sure release callback is called at the last line
>
> I was trying to wrap the code into a managed RecordBatchReader with
> generator (replace with `yield rb` in the loop), but the object is
> still referenced by downstream users.
> Is yielding a weakref-ed `rb` a good idea? Will the weakref-ed
> RecordBatchReader works with other pyarrow api (dataset)?
>
> --
> ---------------------
> Best Regards,
> Wenbo Hu,



--
---------------------
Best Regards,
Wenbo Hu,

Reply via email to