vincbeck commented on code in PR #42900:
URL: https://github.com/apache/airflow/pull/42900#discussion_r1795829850


##########
providers/src/airflow/providers/amazon/aws/operators/redshift_data.py:
##########
@@ -197,16 +196,38 @@ def execute_complete(
             raise AirflowException("statement_id should not be empty.")
 
         self.log.info("%s completed successfully.", self.task_id)
-        if self.return_sql_result:
-            result = self.hook.conn.get_statement_result(Id=statement_id)
-            self.log.debug("Statement result: %s", result)
-            return result
 
-        return statement_id
+        # Use the get_sql_results method to return the results of the SQL 
query, or the statement_ids,
+        # depending on the value of self.return_sql_result
+        return self.get_sql_results(return_sql_result=self.return_sql_result)
+
+    def get_sql_results(self, return_sql_result: bool) -> 
list[GetStatementResultResponseTypeDef] | list[str]:
+        """
+        Retrieve either the result of the SQL query, or the statement ID(s).
+
+        :param return_sql_result:
+        """
+        # ISSUE-40427: Pull the statement, and check to see if there are 
sub-statements. If that is the
+        # case, pull each of the sub-statement ID's, and grab the results. 
Otherwise, just use
+        # self.statement_id
+        statement: DescribeStatementResponseTypeDef = 
self.hook.conn.describe_statement(Id=self.statement_id)

Review Comment:
   In case of deferable operator, I dont think it will work. `get_sql_results` 
is called by `execute_complete`, `self.statement_id` is not defined there. You 
should pass the statement id as parameter of `get_sql_results`



##########
providers/src/airflow/providers/amazon/aws/operators/redshift_data.py:
##########
@@ -124,12 +126,11 @@ def __init__(
                 poll_interval,
             )
         self.return_sql_result = return_sql_result
-        self.statement_id: str | None = None
         self.deferrable = deferrable
         self.session_id = session_id
         self.session_keep_alive_seconds = session_keep_alive_seconds
 
-    def execute(self, context: Context) -> GetStatementResultResponseTypeDef | 
str:
+    def execute(self, context: Context) -> 
list[GetStatementResultResponseTypeDef] | list[str]:

Review Comment:
   This is a breaking change I guess we can consider it as a bug fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to