austin362667 opened a new issue, #37:
URL: https://github.com/apache/datafusion-ray/issues/37

   **Describe the bug**
   
   [DataFusion Ray](https://github.com/apache/datafusion-ray) installs 
[DataFusion Python](https://github.com/apache/datafusion-python) as a 
dependency after the merge of PR [Remove Session 
Context](https://github.com/apache/datafusion-ray/commit/b792eb0aaaa7704de5156bb87bc406d831be8fbf#diff-3165d0fca35aeb3c7c29f472edf128187ee5107cad705b253eafe5a5acb4bc35R30).
   
   I'm encountering the following bug, and I suspect it's a misused of 
DataFusion Python bindings exposed by PyO3 within DataFusion Ray .
   ```
   Traceback (most recent call last):
     File 
"/tmp/ray/session_2024-10-21_04-23-14_317497_1/runtime_resources/working_dir_files/_ray_pkg_dcc19145fca26cc6/tips.py",
 line 55, in <module>
       df_ctx.create_dataframe([ray_results]).show()
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/ray/anaconda3/lib/python3.11/site-packages/datafusion/context.py", line 
554, in create_dataframe
       return DataFrame(self.ctx.create_dataframe(partitions, name, schema))
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   TypeError: argument 'partitions': 'RecordBatch' object cannot be converted 
to 'PyList'
   ```
   
   **To Reproduce**
   
   In [DataFusion Ray](https://github.com/apache/datafusion-ray), [set up the 
cluster](https://github.com/apache/datafusion-ray/pull/23#pullrequestreview-2352261550)
 and register the Python script to the Ray cluster:
   ```
   export RAY_ADDRESS="http://127.0.0.1:8265";
   ray job submit --working-dir ./examples/ -- python3 tips.py
   ```
   
   **Expected behavior**
   
   Should return successfully
   ```
   Job submission server address: http://127.0.0.1:8265
   2024-10-07 22:59:35,002      INFO dashboard_sdk.py:338 -- Uploading package 
gcs://_ray_pkg_3dd815f5033d4050.zip.
   2024-10-07 22:59:35,003      INFO packaging.py:531 -- Creating a file 
package for local directory './examples/'.
   
   -------------------------------------------------------
   Job 'raysubmit_XeVwVxmEhvpayxXW' submitted successfully
   -------------------------------------------------------
   
   Next steps
     Query the logs of the job:
       ray job logs raysubmit_XeVwVxmEhvpayxXW
     Query the status of the job:
       ray job status raysubmit_XeVwVxmEhvpayxXW
     Request the job to be stopped:
       ray job stop raysubmit_XeVwVxmEhvpayxXW
   
   Tailing logs until the job exits (disable with --no-wait):
   2024-10-07 07:59:35,419      INFO job_manager.py:529 -- Runtime env is 
setting up.
   2024-10-07 07:59:36,933      INFO worker.py:1474 -- Using address 
192.168.194.51:6379 set in the environment variable RAY_ADDRESS
   2024-10-07 07:59:36,933      INFO worker.py:1614 -- Connecting to existing 
Ray cluster at address: 192.168.194.51:6379...
   2024-10-07 07:59:36,938      INFO worker.py:1790 -- Connected to Ray 
cluster. View the dashboard at http://192.168.194.51:8265
   Planning select sex, smoker, avg(tip/total_bill) as tip_pct from tips group 
by sex, smoker
   Query stage #0:
   RayShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: 
"sex", index: 0 }, Column { name: "smoker", index: 1 }], 2))
     AggregateExec: mode=Partial, gby=[sex@2 as sex, smoker@3 as smoker], 
aggr=[avg(tips.tip / tips.total_bill)]
       ParquetExec: file_groups={1 group: 
[[tmp/ray/session_2024-10-07_07-57-04_914630_1/runtime_resources/working_dir_files/_ray_pkg_3dd815f5033d4050/tips.parquet]]},
 projection=[total_bill, tip, sex, smoker]
   
   Query stage #1:
   CoalescePartitionsExec
     ProjectionExec: expr=[sex@0 as sex, smoker@1 as smoker, avg(tips.tip / 
tips.total_bill)@2 as tip_pct]
       AggregateExec: mode=FinalPartitioned, gby=[sex@0 as sex, smoker@1 as 
smoker], aggr=[avg(tips.tip / tips.total_bill)]
         CoalesceBatchesExec: target_batch_size=16384
           RayShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { 
name: "sex", index: 0 }, Column { name: "smoker", index: 1 }], 2))
   
   Scheduling query stage #0 with 1 input partitions and 2 output partitions
   Forcing reduce stage concurrency from 2 to 1
   Scheduling query stage #1 with 1 input partitions and 1 output partitions
   (execute_query_partition pid=943) 
RayShuffleWriterExec[stage=0].execute(input_partition=0)
   (execute_query_partition pid=943) RayShuffleWriterExec[stage=0] Finished 
writing shuffle partition 0. Batches: 1. Rows: 2. Bytes: 720.
   (execute_query_partition pid=943) RayShuffleWriterExec[stage=0] Finished 
writing shuffle partition 1. Batches: 1. Rows: 2. Bytes: 720.
   (execute_query_partition pid=943) {"cat": "0-0", "name": "0-0", "pid": 
"192.168.194.51", "tid": 943, "ts": 1728313178020260, "dur": 3577, "ph": 
"X"},:task_name:execute_query_partition
   (execute_query_partition pid=943) 
RayShuffleReaderExec[stage=0].execute(input_partition=1) with 1 shuffle inputs
   (execute_query_partition pid=943) 
RayShuffleReaderExec[stage=0].execute(input_partition=0) with 1 shuffle inputs
   (execute_query_partition pid=943) {"cat": "1-0", "name": "1-0", "pid": 
"192.168.194.51", "tid": 943, "ts": 1728313178027011, "dur": 769, "ph": "X"},
         sex smoker   tip_pct
   0    Male     No  0.160669
   1  Female    Yes  0.182150
         sex smoker   tip_pct
   0  Female     No  0.156921
   1    Male    Yes  0.152771
   
   ------------------------------------------
   Job 'raysubmit_XeVwVxmEhvpayxXW' succeeded
   ------------------------------------------
   ```
   
   **Additional context**
   Add any other context about the problem here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to