Hi Community! We are running a single DAG with thousands of tasks but can't get the workers to pick them up from Celery less than 3-5 minutes after finishing the previous task.
I'll include our complete env setup at the bottom but here's a few setup details: - Airflow version 2.0.2 - 60 workers set to process 1 task at a time - Dag concurrency of 90 which leaves us 30 tasks in the queue at most times (according to flower) - Single scheduler, 7 threads on an 8 CPU box with row locking off (2 schedulers with row locking pegs the CPU on postgres causing task failures) Here are some other interesting tidbits: - Flower always shows the workers as busy but the airflow db & ui shows at most 30 tasks running at a time (usually 16-25) - Loading the DagBag takes around 40 seconds because of the number of tasks - Long worker idle times are measured from the time between the task logs - Running a beefy AWS hosted Postgres DB(r6g.2xlarge), Redis(m4.xlarge) and Scheduler/Web/Workers(c5.2xlarge) - None of the systems are maxed out - I have already reviewed the Scheduler <https://airflow.apache.org/docs/apache-airflow/stable/scheduler.html> and FAQ <https://airflow.apache.org/docs/apache-airflow/stable/faq.html> documents for performance improvement tips (nicely written by the way) Any ideas for getting the workers to pick up tasks faster (preferably a few seconds after finishing the previous task)? We would appreciate any advice for things to consider! -Michael *Environment Setup:* AIRFLOW__CORE__DAG_CONCURRENCY=1000 AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=600 AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=600 AIRFLOW__CORE__DAGS_FOLDER=/home/airflow/pipeline_dag_bag AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION='true' AIRFLOW__CORE__ENABLE_XCOM_PICKLING='true' AIRFLOW__CORE__EXECUTOR=CeleryExecutor AIRFLOW__CORE__FERNET_KEY='XXXXXXXXXXXXXXXXXXXXXXXXXX=' AIRFLOW__CORE__KILLED_TASK_CLEANUP_TIME=604800 AIRFLOW__CORE__LOAD_EXAMPLES='false' AIRFLOW__CORE__HOSTNAME_CALLABLE='airflow.utils.net.get_host_ip_address' AIRFLOW__CORE__PARALLELISM=1000 AIRFLOW__CORE__REMOTE_LOGGING='true' AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER='s3://XXXXXXXXX/airflow' AIRFLOW__CORE__REMOTE_LOG_CONN_ID='s3_logging' AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2:// airflow:[email protected]/airflow AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql:// airflow:[email protected]/airflow AIRFLOW__CELERY__BROKER_URL=redis://:@ XXXXXXXXXX.usw2.cache.amazonaws.com:6379/0 AIRFLOW__CELERY__WORKER_CONCURRENCY=1 AIRFLOW__CELERY__OPERATION_TIMEOUT=10 AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC=30 AIRFLOW__SCHEDULER__CLEAN_TIS_WITHOUT_DAGRUN_INTERVAL=30 AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC=30 AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL=5 AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=30 AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=300 AIRFLOW__SCHEDULER__PRINT_STATS_INTERVAL=300 AIRFLOW__SCHEDULER__POOL_METRICS_INTERVAL=60 AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD=60 AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL=300 AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=600 AIRFLOW__SCHEDULER__USE_ROW_LEVEL_LOCKING='false' AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION='true' AIRFLOW__SCHEDULER__PARSING_PROCESSES=7 AIRFLOW_HOME=/home/airflow AIRFLOW__WEBSERVER__WEB_SERVER_MASTER_TIMEOUT=600 AIRFLOW__WEBSERVER__WEB_SERVER_WORKER_TIMEOUT=600 AIRFLOW__WEBSERVER__WORKER_REFRESH_INTERVAL=60 AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=60 AIRFLOW__WEBSERVER__DEFAULT_DAG_RUN_DISPLAY_NUMBER=5
