Hi Community!

We are running a single DAG with thousands of tasks but can't get the
workers to pick them up from Celery less than 3-5 minutes after finishing
the previous task.

I'll include our complete env setup at the bottom but here's a few setup
details:

   - Airflow version 2.0.2
   - 60 workers set to process 1 task at a time
   - Dag concurrency of 90 which leaves us 30 tasks in the queue at most
   times (according to flower)
   - Single scheduler, 7 threads on an 8 CPU box with row locking off (2
   schedulers with row locking pegs the CPU on postgres causing task failures)

Here are some other interesting tidbits:

   - Flower always shows the workers as busy but the airflow db & ui shows
   at most 30 tasks running at a time (usually 16-25)
   - Loading the DagBag takes around 40 seconds because of the number of
   tasks
   - Long worker idle times are measured from the time between the task logs
   - Running a beefy AWS hosted Postgres DB(r6g.2xlarge), Redis(m4.xlarge)
   and Scheduler/Web/Workers(c5.2xlarge)
   - None of the systems are maxed out
   - I have already reviewed the Scheduler
   <https://airflow.apache.org/docs/apache-airflow/stable/scheduler.html>
   and FAQ <https://airflow.apache.org/docs/apache-airflow/stable/faq.html>
   documents for performance improvement tips (nicely written by the way)

Any ideas for getting the workers to pick up tasks faster (preferably a few
seconds after finishing the previous task)?

We would appreciate any advice for things to consider!

-Michael



*Environment Setup:*
AIRFLOW__CORE__DAG_CONCURRENCY=1000
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=600
AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=600
AIRFLOW__CORE__DAGS_FOLDER=/home/airflow/pipeline_dag_bag
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION='true'
AIRFLOW__CORE__ENABLE_XCOM_PICKLING='true'
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__FERNET_KEY='XXXXXXXXXXXXXXXXXXXXXXXXXX='
AIRFLOW__CORE__KILLED_TASK_CLEANUP_TIME=604800
AIRFLOW__CORE__LOAD_EXAMPLES='false'
AIRFLOW__CORE__HOSTNAME_CALLABLE='airflow.utils.net.get_host_ip_address'
AIRFLOW__CORE__PARALLELISM=1000
AIRFLOW__CORE__REMOTE_LOGGING='true'
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER='s3://XXXXXXXXX/airflow'
AIRFLOW__CORE__REMOTE_LOG_CONN_ID='s3_logging'
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://
airflow:[email protected]/airflow

AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://
airflow:[email protected]/airflow
AIRFLOW__CELERY__BROKER_URL=redis://:@
XXXXXXXXXX.usw2.cache.amazonaws.com:6379/0
AIRFLOW__CELERY__WORKER_CONCURRENCY=1
AIRFLOW__CELERY__OPERATION_TIMEOUT=10

AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC=30
AIRFLOW__SCHEDULER__CLEAN_TIS_WITHOUT_DAGRUN_INTERVAL=30
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC=30
AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL=5
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=30
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=300
AIRFLOW__SCHEDULER__PRINT_STATS_INTERVAL=300
AIRFLOW__SCHEDULER__POOL_METRICS_INTERVAL=60
AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD=60
AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL=300
AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=600
AIRFLOW__SCHEDULER__USE_ROW_LEVEL_LOCKING='false'
AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION='true'
AIRFLOW__SCHEDULER__PARSING_PROCESSES=7

AIRFLOW_HOME=/home/airflow

AIRFLOW__WEBSERVER__WEB_SERVER_MASTER_TIMEOUT=600
AIRFLOW__WEBSERVER__WEB_SERVER_WORKER_TIMEOUT=600
AIRFLOW__WEBSERVER__WORKER_REFRESH_INTERVAL=60
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=60
AIRFLOW__WEBSERVER__DEFAULT_DAG_RUN_DISPLAY_NUMBER=5

Reply via email to