HeartSaVioR commented on PR #52479:
URL: https://github.com/apache/spark/pull/52479#issuecomment-3344930975

   https://github.com/HeartSaVioR/spark/actions/runs/18084423876/job/51454363578
   
   Regression test fails with master branch as below:
   
   ```
   ======================================================================
   ERROR [3.811s]: test_transform_with_state_list_state_large_list 
(pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests.test_transform_with_state_list_state_large_list)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File 
"/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
 line 380, in test_transform_with_state_list_state_large_list
       q.processAllAvailable()
     File "/__w/spark/spark/python/pyspark/sql/streaming/query.py", line 351, 
in processAllAvailable
       return self._jsq.processAllAvailable()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 
1362, in __call__
       return_value = get_return_value(
                      ^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 
294, in deco
       raise converted from None
     File 
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 
644, in _call_proxy
       return_value = getattr(self.pool[obj_id], method)(*params)
       ^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call
       raise e
     File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call
       self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
       ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
 line 312, in check_results
       batch_df.collect()
       ^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/classic/dataframe.py", line 443, 
in collect
       sock_info = self._jdf.collectToPython()
       ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 
1362, in __call__
       return_value = get_return_value(
       ^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 
294, in deco
       raise converted from None
       ^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3238, in main
       process()
              ^^^
     File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3230, in process
       serializer.dump_stream(out_iter, outfile)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 1751, in dump_stream
       super().dump_stream(flatten_iterator(), stream)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 768, in dump_stream
       return ArrowStreamSerializer.dump_stream(self, 
init_stream_yield_batches(), stream)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 121, in dump_stream
       for batch in iterator:
              ^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 761, in init_stream_yield_batches
       for series in iterator:
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 1748, in flatten_iterator
       for pdf in iter_pdf:
              ^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py",
 line 968, in handleInputRows
       self.list_state.put(new_elements)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor.py",
 line 145, in put
       self._listStateClient.put(self._stateName, newState)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/list_state_client.py",
 line 195, in put
       self._stateful_processor_api_client._send_arrow_state(self.schema, 
values)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor_api_client.py",
 line 535, in _send_arrow_state
       pandas_df = convert_pandas_using_numpy_type(
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 
1463, in convert_pandas_using_numpy_type
       df[field.name] = df[field.name].astype(np_type)
              ^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/dist-packages/pandas/core/generic.py", 
line 6662, in astype
       new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", 
line 430, in astype
       return self.apply(
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", 
line 363, in apply
       applied = getattr(b, f)(**kwargs)
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/internals/blocks.py", line 
784, in astype
       new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 
237, in astype_array_safe
       new_values = astype_array(values, dtype, copy=copy)
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 
182, in astype_array
       values = _astype_nansafe(values, dtype, copy=copy)
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 
133, in _astype_nansafe
       return arr.astype(dtype, copy=True)
       ^^^^^^^^^^^^^^^^^
   pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] 
Query [id = d4fcfcbc-6eca-495c-9899-6cb37669a20d, runId = 
11cdc97a-093a-4310-a079-18ed5082a114] terminated with exception: 
[FOREACH_BATCH_USER_FUNCTION_ERROR] An error occurred in the user provided 
function in foreach batch sink. Reason: An exception was raised by the Python 
Proxy. Return Message: Traceback (most recent call last):
     File 
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 
644, in _call_proxy
       return_value = getattr(self.pool[obj_id], method)(*params)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call
       raise e
     File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call
       self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
     File 
"/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
 line 312, in check_results
       batch_df.collect()
     File "/__w/spark/spark/python/pyspark/sql/classic/dataframe.py", line 443, 
in collect
       sock_info = self._jdf.collectToPython()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 
1362, in __call__
       return_value = get_return_value(
                      ^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 
294, in deco
       raise converted from None
     File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3238, in main
       process()
              ^^^
     File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3230, in process
       serializer.dump_stream(out_iter, outfile)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 1751, in dump_stream
       super().dump_stream(flatten_iterator(), stream)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 768, in dump_stream
       return ArrowStreamSerializer.dump_stream(self, 
init_stream_yield_batches(), stream)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 121, in dump_stream
       for batch in iterator:
              ^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 761, in init_stream_yield_batches
       for series in iterator:
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 1748, in flatten_iterator
       for pdf in iter_pdf:
              ^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py",
 line 968, in handleInputRows
       self.list_state.put(new_elements)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor.py",
 line 145, in put
       self._listStateClient.put(self._stateName, newState)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/list_state_client.py",
 line 195, in put
       self._stateful_processor_api_client._send_arrow_state(self.schema, 
values)
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor_api_client.py",
 line 535, in _send_arrow_state
       pandas_df = convert_pandas_using_numpy_type(
              ^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 
1463, in convert_pandas_using_numpy_type
       df[field.name] = df[field.name].astype(np_type)
              ^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/dist-packages/pandas/core/generic.py", 
line 6662, in astype
       new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", 
line 430, in astype
       return self.apply(
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", 
line 363, in apply
       applied = getattr(b, f)(**kwargs)
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/internals/blocks.py", line 
784, in astype
       new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 
237, in astype_array_safe
       new_values = astype_array(values, dtype, copy=copy)
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 
182, in astype_array
       values = _astype_nansafe(values, dtype, copy=copy)
       ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 
133, in _astype_nansafe
       return arr.astype(dtype, copy=True)
       ^^^^^^^^^^^^^^^^^
   pyspark.errors.exceptions.captured.PythonException: 
     An exception was thrown from the Python worker. Please see the stack trace 
below.
   Traceback (most recent call last):
     File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3238, in main
       process()
     File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3230, in process
       serializer.dump_stream(out_iter, outfile)
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 1751, in dump_stream
       super().dump_stream(flatten_iterator(), stream)
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 768, in dump_stream
       return ArrowStreamSerializer.dump_stream(self, 
init_stream_yield_batches(), stream)
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 121, in dump_stream
       for batch in iterator:
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 761, in init_stream_yield_batches
       for series in iterator:
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
line 1748, in flatten_iterator
       for pdf in iter_pdf:
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py",
 line 968, in handleInputRows
       self.list_state.put(new_elements)
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor.py",
 line 145, in put
       self._listStateClient.put(self._stateName, newState)
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/list_state_client.py",
 line 195, in put
       self._stateful_processor_api_client._send_arrow_state(self.schema, 
values)
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor_api_client.py",
 line 535, in _send_arrow_state
       pandas_df = convert_pandas_using_numpy_type(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 
1463, in convert_pandas_using_numpy_type
       df[field.name] = df[field.name].astype(np_type)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/dist-packages/pandas/core/generic.py", 
line 6662, in astype
       new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", 
line 430, in astype
       return self.apply(
              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", 
line 363, in apply
       applied = getattr(b, f)(**kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/internals/blocks.py", line 
784, in astype
       new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 
237, in astype_array_safe
       new_values = astype_array(values, dtype, copy=copy)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 
182, in astype_array
       values = _astype_nansafe(values, dtype, copy=copy)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 
133, in _astype_nansafe
       return arr.astype(dtype, copy=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   TypeError: int() argument must be a string, a bytes-like object or a real 
number, not 'NoneType'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to