You *should *probably see a spike in your CPU on increasing max_threads. https://github.com/apache/airflow/blob/1.10.12/airflow/utils/dag_processing.py#L1244
Regarding the timeout in your DAG, it is difficult to say without checking your DAG code, do you code outside of DAG and Task/Operator objects which get executed whenever DAG file is parsed? Regards, Kaxi On Mon, Apr 26, 2021 at 1:52 PM Maulik Soneji <[email protected]> wrote: > Hello everyone, > > Me and my team require some inputs on configuring max_threads of airflow > scheduler. > > *Deployment setup:* > We are running airflow on Kubernetes with CeleryExecutor > We currently have thousands of dags and are running with max_threads > configuration of 40. > Airflow version: 1.10.12 > > We have a heartbeat dag which runs at frequency of 5 minutes and we > observed that the heartbeat dag itself has a schedule delay of over one > hour. > Here are the DagFileProcessor logs from our heartbeat dag: > [2021-04-22 06:27:10,027] {scheduler_job.py:1589} INFO - Processing file > /home/airflow/airflow/dags/heartbeat.py for tasks to queue > [2021-04-22 07:54:18,286] {scheduler_job.py:1589} INFO - Processing file > /home/airflow/airflow/dags/heartbeat.py for tasks to queue > [2021-04-22 09:07:28,994] {scheduler_job.py:1589} INFO - Processing file > /home/airflow/airflow/dags/heartbeat.py for tasks to queue > > Our observations with changing the parameter have been as follows: > a. Reducing the max_threads to 10 The schedule delay reduced to 45 > minutes. The dag load time reduced in DagFileProcessor and thus there > were seeing the schedule delay drop to 45 minutes but it is still > significantly higher. > b. Increasing max_threads to 100 and 200 > In both cases, we are seeing timeout errors while importing the dag, in > the following block of code, we have set DAGBAG_IMPORT_TIMEOUT to 30 > seconds: > with timeout(self.DAGBAG_IMPORT_TIMEOUT): > try: > m = imp.load_source(mod_name, filepath) > mods.append(m) > except Exception as e: > self.log.exception("Failed to import: %s", filepath) > self.import_errors[filepath] = str(e) > self.file_last_changed[filepath] = file_last_changed_on_disk > > The CPU or Memory usage doesn't spike with the increased number of > max_threads, but there is a timeout while importing dags. So we are not > able to increase the max_threads configuration, can someone please share > why we are facing dag import timeouts when we increase the max_threads > configuration? Can someone please share their learnings on how to properly > configure the max_threads parameter for setup at this scale? Thanks and > Regards, Maulik >
