Nataneljpwd commented on code in PR #62604:
URL: https://github.com/apache/airflow/pull/62604#discussion_r2894516173


##########
providers/standard/src/airflow/providers/standard/operators/python.py:
##########
@@ -677,13 +679,58 @@ def _execute_python_callable_in_subprocess(self, 
python_path: Path):
 
             return self._read_result(output_path)
 
+    def _get_additional_jinja_context(self) -> dict:
+        """Return additional Jinja context variables for the virtualenv script 
template."""
+        return {}
+
     def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, 
Any]:
         keyword_params = KeywordParameters.determine(self.python_callable, 
self.op_args, context)
         if AIRFLOW_V_3_0_PLUS:
             return keyword_params.unpacking()
         return keyword_params.serializing()  # type: ignore[attr-defined]
 
 
+def _pendulum_to_native_datetime(obj):
+    """
+    Recursively convert pendulum DateTime objects to native Python datetime.
+
+    When the virtualenv has a different major version of pendulum than the 
host,
+    pendulum's internal pickle representations are incompatible. This function
+    converts pendulum DateTime objects to stdlib datetime.datetime with
+    zoneinfo.ZoneInfo timezone info, which can be serialized and deserialized
+    without pendulum.
+    """
+    import pendulum
+
+    if isinstance(obj, pendulum.DateTime):
+        tz = None
+        if obj.timezone_name:
+            from zoneinfo import ZoneInfo
+
+            try:
+                tz = ZoneInfo(obj.timezone_name)
+            except KeyError:
+                # Fall back to fixed UTC offset if the timezone name is not 
recognized
+                utc_offset = obj.utcoffset()
+                tz = dt.timezone(utc_offset) if utc_offset is not None else 
None
+        return dt.datetime(
+            obj.year,
+            obj.month,
+            obj.day,
+            obj.hour,
+            obj.minute,
+            obj.second,
+            obj.microsecond,
+            tzinfo=tz,
+        )
+    if isinstance(obj, dict):
+        return {k: _pendulum_to_native_datetime(v) for k, v in obj.items()}
+    if isinstance(obj, (list, tuple)):
+        converted = [_pendulum_to_native_datetime(v) for v in obj]
+        return type(obj)(converted)

Review Comment:
   This looks a little 'hacky' if the type is always datetime



##########
providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2:
##########
@@ -78,6 +78,50 @@ with open(sys.argv[1], "rb") as file:
 arg_dict = {"args": [], "kwargs": {}}
 {% endif %}
 
+{% if pendulum_version_mismatch | default(false) %}
+import datetime as _dt
+
+try:
+    import pendulum as _pendulum
+    _pendulum_available = True
+except ImportError:
+    _pendulum_available = False
+
+def _native_datetime_to_pendulum(obj):
+    """Convert native datetime to pendulum DateTime (using virtualenv's 
pendulum version)."""
+    if _pendulum_available and isinstance(obj, _dt.datetime) and not 
isinstance(obj, _pendulum.DateTime):

Review Comment:
   If it is a datetime, doesn't it mean it is not pendulum?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to