You *should *probably see a spike in your CPU on increasing max_threads.
https://github.com/apache/airflow/blob/1.10.12/airflow/utils/dag_processing.py#L1244


Regarding the timeout in your DAG, it is difficult to say without checking
your DAG code, do you code outside of DAG and Task/Operator objects which
get executed whenever DAG file is parsed?

Regards,
Kaxi

On Mon, Apr 26, 2021 at 1:52 PM Maulik Soneji <[email protected]>
wrote:

> Hello everyone,
>
> Me and my team require some inputs on configuring max_threads of airflow
> scheduler.
>
> *Deployment setup:*
> We are running airflow on Kubernetes with CeleryExecutor
> We currently have thousands of dags and are running with max_threads
> configuration of 40.
> Airflow version: 1.10.12
>
> We have a heartbeat dag which runs at frequency of 5 minutes and we
> observed that the heartbeat dag itself has a schedule delay of over one
> hour.
> Here are the DagFileProcessor logs from our heartbeat dag:
> [2021-04-22 06:27:10,027] {scheduler_job.py:1589} INFO - Processing file
> /home/airflow/airflow/dags/heartbeat.py for tasks to queue
> [2021-04-22 07:54:18,286] {scheduler_job.py:1589} INFO - Processing file
> /home/airflow/airflow/dags/heartbeat.py for tasks to queue
> [2021-04-22 09:07:28,994] {scheduler_job.py:1589} INFO - Processing file
> /home/airflow/airflow/dags/heartbeat.py for tasks to queue
>
> Our observations with changing the parameter have been as follows:
> a. Reducing the max_threads to 10 The schedule delay reduced to 45
> minutes. The dag load time reduced in DagFileProcessor and thus there
> were seeing the schedule delay drop to 45 minutes but it is still
> significantly higher.
> b. Increasing max_threads to 100 and 200
> In both cases, we are seeing timeout errors while importing the dag, in
> the following block of code, we have set DAGBAG_IMPORT_TIMEOUT to 30
> seconds:
> with timeout(self.DAGBAG_IMPORT_TIMEOUT):
> try:
> m = imp.load_source(mod_name, filepath)
> mods.append(m)
> except Exception as e:
> self.log.exception("Failed to import: %s", filepath)
> self.import_errors[filepath] = str(e)
> self.file_last_changed[filepath] = file_last_changed_on_disk
>
> The CPU or Memory usage doesn't spike with the increased number of
> max_threads, but there is a timeout while importing dags. So we are not
> able to increase the max_threads configuration, can someone please share
> why we are facing dag import timeouts when we increase the max_threads
> configuration? Can someone please share their learnings on how to properly
> configure the max_threads parameter for setup at this scale? Thanks and
> Regards, Maulik
>

Reply via email to