This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a50d2ac Ensure Spark driver response is valid before setting UNKNOWN
status (#19978)
a50d2ac is described below
commit a50d2ac872da7e27d4cb32a2eb12cb75545c4a60
Author: PApostol <[email protected]>
AuthorDate: Thu Dec 2 21:12:25 2021 +0000
Ensure Spark driver response is valid before setting UNKNOWN status (#19978)
---
airflow/providers/apache/spark/hooks/spark_submit.py | 7 ++++++-
tests/providers/apache/spark/hooks/test_spark_submit.py | 15 +++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py
b/airflow/providers/apache/spark/hooks/spark_submit.py
index b17eebe..677ea7b 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -520,10 +520,15 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
:param itr: An iterator which iterates over the input of the subprocess
"""
driver_found = False
+ valid_response = False
# Consume the iterator
for line in itr:
line = line.strip()
+ # A valid Spark status response should contain a submissionId
+ if "submissionId" in line:
+ valid_response = True
+
# Check if the log line is about the driver status and extract the
status.
if "driverState" in line:
self._driver_status = line.split(' : ')[1].replace(',',
'').replace('\"', '').strip()
@@ -531,7 +536,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
self.log.debug("spark driver status log: %s", line)
- if not driver_found:
+ if valid_response and not driver_found:
self._driver_status = "UNKNOWN"
def _start_driver_status_tracking(self) -> None:
diff --git a/tests/providers/apache/spark/hooks/test_spark_submit.py
b/tests/providers/apache/spark/hooks/test_spark_submit.py
index be47bfb..041d038 100644
--- a/tests/providers/apache/spark/hooks/test_spark_submit.py
+++ b/tests/providers/apache/spark/hooks/test_spark_submit.py
@@ -728,6 +728,21 @@ class TestSparkSubmitHook(unittest.TestCase):
assert hook._driver_status == 'RUNNING'
+ def test_process_spark_driver_status_log_bad_response(self):
+ # Given
+ hook = SparkSubmitHook(conn_id='spark_standalone_cluster')
+ log_lines = [
+ 'curl: Failed to connect to http://spark-standalone-master:6066'
+ 'This is an invalid Spark response',
+ 'Timed out',
+ ]
+ # When
+ hook._process_spark_status_log(log_lines)
+
+ # Then
+
+ assert hook._driver_status is None
+
@patch('airflow.providers.apache.spark.hooks.spark_submit.renew_from_kt')
@patch('airflow.providers.apache.spark.hooks.spark_submit.subprocess.Popen')
def test_yarn_process_on_kill(self, mock_popen, mock_renew_from_kt):