shivaam commented on issue #59707:
URL: https://github.com/apache/airflow/issues/59707#issuecomment-4108310186
## Investigation Report
I spent time investigating this end-to-end on a live Airflow 3.2 instance
(CeleryExecutor, Redis broker, PostgreSQL backend, Celery 5.6.2). Here's what I
found.
### Regression Source
The bug was introduced by commit `16829d7694` — "Add duplicate hostname
check for Celery workers (#58591)" — which landed in **celery provider
3.14.0**. This is confirmed by:
- `git log --oneline providers-celery/3.13.1..providers-celery/3.14.0 --
providers/celery/src/airflow/providers/celery/cli/celery_command.py` showing it
as the **only** CLI change between those versions
- Multiple comments on this issue confirming that downgrading to 3.13.1
fixes it
### Root Cause
The duplicate hostname check (lines 222-236 of `celery_command.py`) calls
`celery_app.control.inspect().active_queues()` **before**
`celery_app.worker_main()`. This is the problem.
**The chain:**
1. `inspect().active_queues()` broadcasts a control message to Redis and
waits for replies
2. This lazily initializes `celery_app.amqp._producer_pool` and opens TCP
sockets to the Redis broker
3. These pools and sockets register in **`kombu.pools`** — a
**process-global** registry keyed by broker URL
4. `worker_main()` then calls `fork()` to create prefork pool workers
5. Children inherit the parent's open socket file descriptors
6. Parent (consumer) and children (pool workers) now share the same Redis
sockets
7. The `-O fair` scheduling strategy requires consumer-to-pool IPC over
these connections
8. Shared sockets cause silent communication failure — tasks are received
but never dispatched
**Key discovery: `kombu.pools` is global, not per-app.** Even creating a
separate "temp" Celery app for the inspection pollutes the shared global pool
because `kombu.pools` is keyed by broker URL, not app identity:
```
# From our diagnostic script:
A.pool is B.pool: True # Same object!
A.amqp.producer_pool is B.amqp.producer_pool: True # Same object!
```
### Live Test Results
**Environment:** Airflow 3.2.0.dev0, Celery 5.6.2, Redis 6.2.20, PostgreSQL
(RDS), EC2 (Amazon Linux 2023)
| Test | Worker hostname | Result |
|------|----------------|--------|
| Baseline (no `--celery-hostname`) | `celery@ip-10-0-2-61...` | Task
**succeeded** in 8.1s |
| Bug repro (with `--celery-hostname`) | `myworker@ip-10-0-2-61...` | Task
**stuck in RESERVED** 60s+ (`acknowledged=False`, `worker_pid=None`) |
| Fix applied (temp app + `kombu.pools.reset()`) |
`myworker@ip-10-0-2-61...` | Task **succeeded** in 2.6s |
**Bug repro logs (task stuck):**
```
Task execute_workload[5c97cd55...] received
<< NO FURTHER OUTPUT — task never dispatched to pool worker >>
celery inspect reserved:
hostname: [email protected]
time_start: None
acknowledged: False
worker_pid: None
DB: test_celery_hostname | say_hello | queued (never progressed)
```
**Fix verified logs (task executes):**
```
[DEBUG] app.amqp._producer_pool is None (GOOD) at worker_main() time.
Task execute_workload[b41f1956...] received
[b41f1956...] Executing workload in Celery: task_id='say_hello'
Task finished exit_code=0 final_state=success
Task execute_workload[b41f1956...] succeeded in 2.596s
DB: say_hello | success
```
### The Fix
Use a temporary Celery app for the `inspect()` call, then **reset
`kombu.pools`** in a `finally` block to clear any global state pollution:
```python
if args.celery_hostname:
from celery import Celery as _TempCelery
temp_app = _TempCelery(broker=celery_app.conf.broker_url)
try:
active_workers = temp_app.control.inspect().active_queues()
if active_workers:
celery_hostname = args.celery_hostname
if any(
name == celery_hostname or
name.endswith(f"@{celery_hostname}")
for name in active_workers
):
raise SystemExit(
f"Error: A worker with hostname '{celery_hostname}' is
already running."
)
finally:
temp_app.close()
import kombu.pools
kombu.pools.reset() # CRITICAL: clear global pool state before fork
```
The `kombu.pools.reset()` is essential — without it, even the temp app's
connections persist in the global registry and get inherited after fork.
**Secondary fix:** The duplicate hostname detection also has a bug where
hostnames containing `@` (e.g., `myworker@mymachine`) fail the
`endswith("@myworker@mymachine")` check. Added exact-match fallback: `name ==
celery_hostname or name.endswith(...)`.
### Approaches We Tried and Discarded
| Approach | Why it didn't work |
|----------|-------------------|
| Convert `args.concurrency` from `int` to `str` | Correct style fix but
unrelated to root cause — Celery handles int coercion |
| Reset `app.amqp._producer_pool = None` after inspect | Insufficient —
`kombu.pools` still holds open sockets in the global registry |
| Temp app alone (no `kombu.pools.reset()`) | `kombu.pools` is keyed by
broker URL, not app — temp app pollutes the same global entry |
| `app.pool.force_close_all()` | Closes pool permanently — Celery can't
create new connections later |
| Remove `-O fair` flag | Workaround, not a fix — `-O fair` prevents task
starvation in production |
| Set hostname via `celery_app.conf` instead of CLI | Doesn't address the
connection pool pollution |
### How to Reproduce
```bash
# 1. Configure Airflow with CeleryExecutor + Redis broker
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__CELERY__BROKER_URL=redis://localhost:6379/0
# 2. Start scheduler + api-server
airflow api-server &
airflow scheduler &
# 3. Start worker WITH --celery-hostname
airflow celery worker --queues default --concurrency 1 --celery-hostname
"myworker@%h"
# 4. Trigger any DAG
airflow dags trigger <any_dag>
# 5. After 60s, check:
celery -A airflow.providers.celery.executors.celery_executor_utils.app
inspect reserved
# → Task stuck with acknowledged=False, worker_pid=None
```
Full investigation docs and diagnostic scripts are on branch
[`investigate/celery-hostname-59707`](https://github.com/shivaam/airflow/tree/investigate/celery-hostname-59707)
in `.claude/celery-hostname-issue-59707/` and `dev/`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]