HeartSaVioR commented on PR #52479: URL: https://github.com/apache/spark/pull/52479#issuecomment-3344930975
https://github.com/HeartSaVioR/spark/actions/runs/18084423876/job/51454363578 Regression test fails with master branch as below: ``` ====================================================================== ERROR [3.811s]: test_transform_with_state_list_state_large_list (pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests.test_transform_with_state_list_state_large_list) ---------------------------------------------------------------------- Traceback (most recent call last): File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 380, in test_transform_with_state_list_state_large_list q.processAllAvailable() File "/__w/spark/spark/python/pyspark/sql/streaming/query.py", line 351, in processAllAvailable return self._jsq.processAllAvailable() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1362, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 294, in deco raise converted from None File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 644, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call raise e File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 312, in check_results batch_df.collect() ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/classic/dataframe.py", line 443, in collect sock_info = self._jdf.collectToPython() ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1362, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 294, in deco raise converted from None ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 3238, in main process() ^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 3230, in process serializer.dump_stream(out_iter, outfile) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 1751, in dump_stream super().dump_stream(flatten_iterator(), stream) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 768, in dump_stream return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 121, in dump_stream for batch in iterator: ^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 761, in init_stream_yield_batches for series in iterator: ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 1748, in flatten_iterator for pdf in iter_pdf: ^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py", line 968, in handleInputRows self.list_state.put(new_elements) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor.py", line 145, in put self._listStateClient.put(self._stateName, newState) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/list_state_client.py", line 195, in put self._stateful_processor_api_client._send_arrow_state(self.schema, values) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor_api_client.py", line 535, in _send_arrow_state pandas_df = convert_pandas_using_numpy_type( ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 1463, in convert_pandas_using_numpy_type df[field.name] = df[field.name].astype(np_type) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/generic.py", line 6662, in astype new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", line 430, in astype return self.apply( ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", line 363, in apply applied = getattr(b, f)(**kwargs) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/internals/blocks.py", line 784, in astype new_values = astype_array_safe(values, dtype, copy=copy, errors=errors) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 237, in astype_array_safe new_values = astype_array(values, dtype, copy=copy) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 182, in astype_array values = _astype_nansafe(values, dtype, copy=copy) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 133, in _astype_nansafe return arr.astype(dtype, copy=True) ^^^^^^^^^^^^^^^^^ pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = d4fcfcbc-6eca-495c-9899-6cb37669a20d, runId = 11cdc97a-093a-4310-a079-18ed5082a114] terminated with exception: [FOREACH_BATCH_USER_FUNCTION_ERROR] An error occurred in the user provided function in foreach batch sink. Reason: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 644, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call raise e File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 312, in check_results batch_df.collect() File "/__w/spark/spark/python/pyspark/sql/classic/dataframe.py", line 443, in collect sock_info = self._jdf.collectToPython() ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1362, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 294, in deco raise converted from None File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 3238, in main process() ^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 3230, in process serializer.dump_stream(out_iter, outfile) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 1751, in dump_stream super().dump_stream(flatten_iterator(), stream) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 768, in dump_stream return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 121, in dump_stream for batch in iterator: ^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 761, in init_stream_yield_batches for series in iterator: ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 1748, in flatten_iterator for pdf in iter_pdf: ^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py", line 968, in handleInputRows self.list_state.put(new_elements) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor.py", line 145, in put self._listStateClient.put(self._stateName, newState) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/list_state_client.py", line 195, in put self._stateful_processor_api_client._send_arrow_state(self.schema, values) ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor_api_client.py", line 535, in _send_arrow_state pandas_df = convert_pandas_using_numpy_type( ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 1463, in convert_pandas_using_numpy_type df[field.name] = df[field.name].astype(np_type) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/generic.py", line 6662, in astype new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", line 430, in astype return self.apply( ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", line 363, in apply applied = getattr(b, f)(**kwargs) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/internals/blocks.py", line 784, in astype new_values = astype_array_safe(values, dtype, copy=copy, errors=errors) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 237, in astype_array_safe new_values = astype_array(values, dtype, copy=copy) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 182, in astype_array values = _astype_nansafe(values, dtype, copy=copy) ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 133, in _astype_nansafe return arr.astype(dtype, copy=True) ^^^^^^^^^^^^^^^^^ pyspark.errors.exceptions.captured.PythonException: An exception was thrown from the Python worker. Please see the stack trace below. Traceback (most recent call last): File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 3238, in main process() File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 3230, in process serializer.dump_stream(out_iter, outfile) File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 1751, in dump_stream super().dump_stream(flatten_iterator(), stream) File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 768, in dump_stream return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 121, in dump_stream for batch in iterator: File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 761, in init_stream_yield_batches for series in iterator: File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 1748, in flatten_iterator for pdf in iter_pdf: File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py", line 968, in handleInputRows self.list_state.put(new_elements) File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor.py", line 145, in put self._listStateClient.put(self._stateName, newState) File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/list_state_client.py", line 195, in put self._stateful_processor_api_client._send_arrow_state(self.schema, values) File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/streaming/stateful_processor_api_client.py", line 535, in _send_arrow_state pandas_df = convert_pandas_using_numpy_type( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 1463, in convert_pandas_using_numpy_type df[field.name] = df[field.name].astype(np_type) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/generic.py", line 6662, in astype new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", line 430, in astype return self.apply( ^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/internals/managers.py", line 363, in apply applied = getattr(b, f)(**kwargs) ^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/internals/blocks.py", line 784, in astype new_values = astype_array_safe(values, dtype, copy=copy, errors=errors) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 237, in astype_array_safe new_values = astype_array(values, dtype, copy=copy) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 182, in astype_array values = _astype_nansafe(values, dtype, copy=copy) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/dist-packages/pandas/core/dtypes/astype.py", line 133, in _astype_nansafe return arr.astype(dtype, copy=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
