Yes. Great minds think alike. Fully agree - I have just found the right code to determine whether you are in "task execution" or not and it is ... well... convoluted ... in the current version. So yeah - that was exactly my thought to add "API guaranteed" env vars that will indicate what context the DAG is parsed in. I even mentioned in the PR that we should do it as a follow-up PR. I will add it next.
This is the current "detection" if we are in the task or not :): import sys import ast import setproctitle current_dag = None if len(sys.argv) > 3 and sys.argv[1] == "tasks": current_dag = sys.argv[3] else: try: PROCTITLE_PREFIX = "airflow task supervisor: " proctitle = str(setproctitle.getproctitle()) if proctitle.startswith(PROCTITLE_PREFIX): args_string = proctitle[len(PROCTITLE_PREFIX) :] args = ast.literal_eval(args_string) if len(args) > 3 and args[1] == "tasks": current_dag = args[3] except: pass On Tue, Jul 19, 2022 at 6:36 AM Ping Zhang <pin...@umich.edu> wrote: > Hi Jarek, > > Sorry for the late reply. It is very interesting! I like this idea. A very > simple optimization can have huge improvements. > > One thing I am thinking Airflow core can expose certain env vars to the > dag_file, something like, AIRFLOW_PARSING_ENV=[scheduler|worker], and/or > AIRFLOW_DAG_ID=<the dag id to get from the dag_file> instead of using the > `sys.argv` as the sys.argv format can change just like from 1.10 to 2, it > was changed from `airflow run` to `airflow tasks run`. And this kind of > interface changes won't trigger any errors but the dag file parsing does > fall back to the 'original' way. > > Please let me know your thoughts on this. > > Thanks, > > Ping > > > On Tue, Jul 12, 2022 at 12:45 AM Jarek Potiuk <ja...@potiuk.com> wrote: > >> Not interesting :) ? >> >> On Thu, Jul 7, 2022 at 10:41 AM Jarek Potiuk <ja...@potiuk.com> wrote: >> > >> > Hello everyone, >> > >> > We have just published a blog on our medium - >> https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629 - >> that is a blog of one of our users Itay Bittan (thanks!) who had been >> inspired by our discussion on Slack on how they struggle with delays of >> loading dynamic dags in their K8S. >> > >> > The problem that they had was that they have dynamic dags that are >> created in a big loop (1000s of DAGs) and that caused ~ 2 minutes delays on >> starting their tas on K8S, because all DAGs have to be created by the loop. >> > >> > What I proposed to try (since the DAGs were connected by the loop but >> really isolated from each other) is to skip "all other" DAG creation in the >> loop when it is parsed in the worker. That resulted in cutting the delay to >> ~ 200ms. >> > >> > His case seems to be general enough to maybe suggest it even as a >> "general" solution - currently it is based on possibly several >> "non-documented" assumptions (that dag_id is passed in a certain way to the >> worker and that you can use it to filter out such a loop. >> > >> > However maybe that's a good idea to make it documented and convert into >> "best practice" when you have similar Dynamic DAGs. >> > >> > I can think of several caveats of such an approach - not all DAGs >> created in a loop can be isolated, sometimes there might be side-effects >> that make your dag have different structure if you skip other DAGs, but - >> I thought that if we add some "guidelines" that could be easily replicated >> by other users. >> > >> > WDYT? >> > >> > J. >> >