This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a50d2ac  Ensure Spark driver response is valid before setting UNKNOWN 
status (#19978)
a50d2ac is described below

commit a50d2ac872da7e27d4cb32a2eb12cb75545c4a60
Author: PApostol <[email protected]>
AuthorDate: Thu Dec 2 21:12:25 2021 +0000

    Ensure Spark driver response is valid before setting UNKNOWN status (#19978)
---
 airflow/providers/apache/spark/hooks/spark_submit.py    |  7 ++++++-
 tests/providers/apache/spark/hooks/test_spark_submit.py | 15 +++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py 
b/airflow/providers/apache/spark/hooks/spark_submit.py
index b17eebe..677ea7b 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -520,10 +520,15 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         :param itr: An iterator which iterates over the input of the subprocess
         """
         driver_found = False
+        valid_response = False
         # Consume the iterator
         for line in itr:
             line = line.strip()
 
+            # A valid Spark status response should contain a submissionId
+            if "submissionId" in line:
+                valid_response = True
+
             # Check if the log line is about the driver status and extract the 
status.
             if "driverState" in line:
                 self._driver_status = line.split(' : ')[1].replace(',', 
'').replace('\"', '').strip()
@@ -531,7 +536,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
 
             self.log.debug("spark driver status log: %s", line)
 
-        if not driver_found:
+        if valid_response and not driver_found:
             self._driver_status = "UNKNOWN"
 
     def _start_driver_status_tracking(self) -> None:
diff --git a/tests/providers/apache/spark/hooks/test_spark_submit.py 
b/tests/providers/apache/spark/hooks/test_spark_submit.py
index be47bfb..041d038 100644
--- a/tests/providers/apache/spark/hooks/test_spark_submit.py
+++ b/tests/providers/apache/spark/hooks/test_spark_submit.py
@@ -728,6 +728,21 @@ class TestSparkSubmitHook(unittest.TestCase):
 
         assert hook._driver_status == 'RUNNING'
 
+    def test_process_spark_driver_status_log_bad_response(self):
+        # Given
+        hook = SparkSubmitHook(conn_id='spark_standalone_cluster')
+        log_lines = [
+            'curl: Failed to connect to http://spark-standalone-master:6066'
+            'This is an invalid Spark response',
+            'Timed out',
+        ]
+        # When
+        hook._process_spark_status_log(log_lines)
+
+        # Then
+
+        assert hook._driver_status is None
+
     @patch('airflow.providers.apache.spark.hooks.spark_submit.renew_from_kt')
     
@patch('airflow.providers.apache.spark.hooks.spark_submit.subprocess.Popen')
     def test_yarn_process_on_kill(self, mock_popen, mock_renew_from_kt):

Reply via email to