Hello,

I have observed that for long queries (say > 1 hour) from pyspark with Spark 
Connect (3.5.2), the client often gets stuck: even when the query completes 
successfully, the client stays waiting in _execute_and_fetch_as_iterator​ (see 
traceback below).

The client session still shows as active in the Connect tab of the Spark UI.

I have tried to set 
spark.connect.execute.reattachable.senderMaxStreamDuration=0​ in the server, 
but it did not help. Setting SPARK_CONNECT_LOG_LEVEL=debug​ on the client also 
did not produce anything aside from the plan at the time when the query started.

Is this a known issue? How to troubleshoot it?

Thank you!

Traceback:

File 
"/opt/pipelines/.venv/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py",
 line 982, in execute_command
data, _, _, _, properties = self._execute_and_fetch(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File 
"/opt/pipelines/.venv/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py",
 line 1283, in _execute_and_fetch
for response in self._execute_and_fetch_as_iterator(req):
File 
"/opt/pipelines/.venv/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py",
 line 1256, in _execute_and_fetch_as_iterator
for b in generator:
File "<frozen _collections_abc>", line 330, in __next__
File 
"/opt/pipelines/.venv/lib/python3.11/site-packages/pyspark/sql/connect/client/reattach.py",
 line 131, in send
if not self._has_next():
^^^^^^^^^^^^^^^^
File 
"/opt/pipelines/.venv/lib/python3.11/site-packages/pyspark/sql/connect/client/reattach.py",
 line 179, in _has_next
self._current = self._call_iter(
^^^^^^^^^^^^^^^^
File 
"/opt/pipelines/.venv/lib/python3.11/site-packages/pyspark/sql/connect/client/reattach.py",
 line 267, in _call_iter
return iter_fun()
^^^^^^^^^^
File 
"/opt/pipelines/.venv/lib/python3.11/site-packages/pyspark/sql/connect/client/reattach.py",
 line 180, in <lambda>
lambda: next(self._iterator) # type: ignore[arg-type]
^^^^^^^^^^^^^^^^^^^^
File "/opt/pipelines/.venv/lib/python3.11/site-packages/grpc/_channel.py", line 
543, in __next__
return self._next()
^^^^^^^^^^^^
File "/opt/pipelines/.venv/lib/python3.11/site-packages/grpc/_channel.py", line 
960, in _next
_common.wait(self._state.condition.wait, _response_ready)
File "/opt/pipelines/.venv/lib/python3.11/site-packages/grpc/_common.py", line 
156, in wait
_wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
File "/opt/pipelines/.venv/lib/python3.11/site-packages/grpc/_common.py", line 
116, in _wait_once
wait_fn(timeout=timeout)
File 
"/home/spark/.local/share/uv/python/cpython-3.11.10-linux-x86_64-gnu/lib/python3.11/threading.py",
 line 331, in wait
gotit = waiter.acquire(True, timeout)

Reply via email to