amoghrajesh commented on code in PR #61932:
URL: https://github.com/apache/airflow/pull/61932#discussion_r2811330581
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -247,6 +251,13 @@ def test_ti_run_state_to_running(
)
assert response.status_code == 409
+ # Test that no audit log was created on the second request when
resulting in conflict
+ logs = session.scalars(select(Log).where(Log.dag_id ==
ti.dag_id)).all()
+ assert len(logs) == 1
+ assert logs[0].event == TaskInstanceState.RUNNING.value
+ assert logs[0].task_id == ti.task_id
+ assert logs[0].run_id == ti.run_id
Review Comment:
Don't think we need to validate this. The test is targetted to test
something else, we do not need to add validations more than the scope of the
test
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -837,6 +891,127 @@ def test_ti_update_state_to_terminal(
assert ti.state == expected_state
assert ti.end_date == end_date
+ @pytest.mark.parametrize(
+ ("state", "end_date"),
+ [
+ (State.SUCCESS, DEFAULT_END_DATE),
+ (State.FAILED, DEFAULT_END_DATE),
+ (State.SKIPPED, DEFAULT_END_DATE),
+ ],
+ )
+ def test_ti_update_state_creates_audit_log(self, client, session,
create_task_instance, state, end_date):
+ """Test that transitioning to a terminal state creates an audit log
record."""
+ ti = create_task_instance(
+ task_id="test_ti_update_state_creates_audit_log",
+ start_date=DEFAULT_START_DATE,
+ state=State.RUNNING,
+ )
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": state,
+ "end_date": end_date.isoformat(),
+ },
+ )
+
+ assert response.status_code == 204
+
+ logs = session.scalars(select(Log).where(Log.dag_id ==
ti.dag_id)).all()
+ assert len(logs) == 1
+ assert logs[0].event == state
+ assert logs[0].task_id == ti.task_id
+ assert logs[0].dag_id == ti.dag_id
+ assert logs[0].run_id == ti.run_id
+ assert logs[0].map_index == ti.map_index
+ assert logs[0].try_number == ti.try_number
+
+ def test_ti_update_state_to_deferred_creates_audit_log(
+ self, client, session, create_task_instance, time_machine
+ ):
+ """Test that transitioning to DEFERRED creates an audit log record."""
+ ti = create_task_instance(
+ task_id="test_ti_update_state_to_deferred_creates_audit_log",
+ state=State.RUNNING,
+ session=session,
+ )
+ session.commit()
+
+ instant = timezone.datetime(2024, 11, 22)
+ time_machine.move_to(instant, tick=False)
+
+ payload = {
+ "state": "deferred",
+ "trigger_kwargs": {"key": "value", "moment":
"2024-12-18T00:00:00Z"},
+ "trigger_timeout": "P1D",
+ "classpath": "my-classpath",
+ "next_method": "execute_callback",
+ }
+
+ response = client.patch(f"/execution/task-instances/{ti.id}/state",
json=payload)
+ assert response.status_code == 204
+
+ logs = session.scalars(select(Log).where(Log.dag_id ==
ti.dag_id)).all()
+ assert len(logs) == 1
+ assert logs[0].event == TaskInstanceState.DEFERRED.value
+ assert logs[0].task_id == ti.task_id
+ assert logs[0].dag_id == ti.dag_id
+
+ def test_ti_update_state_to_reschedule_creates_audit_log(
+ self, client, session, create_task_instance, time_machine
+ ):
+ """Test that transitioning to UP_FOR_RESCHEDULE creates an audit log
record."""
+ instant = timezone.datetime(2024, 10, 30)
+ time_machine.move_to(instant, tick=False)
+
+ ti = create_task_instance(
+ task_id="test_ti_update_state_to_reschedule_creates_audit_log",
+ state=State.RUNNING,
+ session=session,
+ )
+ ti.start_date = instant
+ session.commit()
+
+ payload = {
+ "state": "up_for_reschedule",
+ "reschedule_date": "2024-10-31T11:03:00+00:00",
+ "end_date": DEFAULT_END_DATE.isoformat(),
+ }
+
+ response = client.patch(f"/execution/task-instances/{ti.id}/state",
json=payload)
+ assert response.status_code == 204
+
+ logs = session.scalars(select(Log).where(Log.dag_id ==
ti.dag_id)).all()
+ assert len(logs) == 1
+ assert logs[0].event == TaskInstanceState.UP_FOR_RESCHEDULE.value
+ assert logs[0].task_id == ti.task_id
+ assert logs[0].dag_id == ti.dag_id
+
+ def test_ti_update_state_to_retry_creates_audit_log(self, client, session,
create_task_instance):
+ """Test that transitioning to UP_FOR_RETRY creates an audit log
record."""
+ ti = create_task_instance(
+ task_id="test_ti_update_state_to_retry_creates_audit_log",
+ state=State.RUNNING,
+ )
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": State.UP_FOR_RETRY,
+ "end_date": DEFAULT_END_DATE.isoformat(),
+ },
+ )
+
+ assert response.status_code == 204
+
+ logs = session.scalars(select(Log).where(Log.dag_id ==
ti.dag_id)).all()
+ assert len(logs) == 1
+ assert logs[0].event == TaskInstanceState.UP_FOR_RETRY.value
+ assert logs[0].task_id == ti.task_id
+ assert logs[0].dag_id == ti.dag_id
+
Review Comment:
Can we have this be a single test with _parameters_ instead? Since we
validate almost the same thing in every case for various state changes
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1395,6 +1580,9 @@ def test_ti_update_state_not_running(self, client,
session, create_task_instance
session.refresh(ti)
assert ti.state == State.SUCCESS
+ # Test that no audit log was created when TI state is not RUNNING
+ assert session.scalars(select(Log)).all() == []
Review Comment:
No need to validate this
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1076,6 +1258,9 @@ def test_ti_update_state_database_error(self, client,
session, create_task_insta
assert response.status_code == 500
assert response.json()["detail"] == "Database error occurred"
+ # Test that no audit log was created when database error occurred
Review Comment:
No need to validate this
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1025,6 +1200,9 @@ def test_ti_update_state_not_found(self, client, session):
"message": "Task Instance not found",
}
+ # Test that no audit log was created when TI not found
+ assert session.scalars(select(Log)).all() == []
Review Comment:
No need to validate this
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -683,6 +694,9 @@ def test_ti_run_state_conflict_if_not_queued(
assert session.scalar(select(TaskInstance.state).where(TaskInstance.id
== ti.id)) == initial_ti_state
+ # Test that no audit log was created on conflict
+ assert session.scalars(select(Log)).all() == []
Review Comment:
Same as above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]