filippzorin opened a new pull request, #62916:
URL: https://github.com/apache/airflow/pull/62916

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   # Add `commit_offset` option to `AwaitMessageSensor` for manual offset 
management
   
   This PR adds a new parameter `commit_offset` (default `True`) to 
`AwaitMessageSensor` and its underlying `AwaitMessageTrigger`. When set to 
`False`, the sensor does **not** automatically commit the Kafka message offset 
after processing, allowing downstream tasks to handle offset committing 
manually (e.g., after successful business logic execution).
   
   ## Motivation
   In certain use cases, it is desirable to commit the offset only after the 
entire DAG or a subsequent task has successfully processed the message. 
Currently, the sensor always commits the offset immediately when a matching 
message is found, which may lead to data loss if the downstream processing 
fails. This change gives users control over offset commit behavior.
   
   ## Changes
   - Added `commit_offset: bool = True` parameter to 
`AwaitMessageSensor.__init__`
   - Added `commit_offset` to `template_fields` of `AwaitMessageSensor`
   - Pass `commit_offset` from sensor to `AwaitMessageTrigger`
   - Added `commit_offset` parameter to `AwaitMessageTrigger.__init__` and 
`serialize`
   - Modified trigger's `run()` method to conditionally call `async_commit()` 
based on `commit_offset`
   - Updated docstrings to document the new parameter and adjust behavior 
descriptions
   - Fixed type hints for `apply_function` in `AwaitMessageTrigger` and 
`AwaitMessageTriggerFunctionSensor` (`str | None` instead `str` to match with 
`AwaitMessageTrigger`)
   - Add typehint for `xcom_push_key` in `AwaitMessageSensor` (`str | None`) 
for better clarity
   
   ## Testing
   - `prek --all-files` passed
   
   ## Notes
   - The `AwaitMessageTriggerFunctionSensor` was intentionally **not** modified 
because its behavior is more complex and may require separate consideration. If 
needed, support can be added in a future PR.
   - This change is backward compatible: existing DAGs using the sensor will 
keep the same behavior (commit always) because `commit_offset` defaults to 
`True`.
   - Related discussion: #62854 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to