Yes.  Great minds think alike.

Fully agree - I have just found the right code to determine whether you are
in "task execution" or not and it is ... well... convoluted ... in the
current version.
So yeah - that was exactly my thought to add "API guaranteed" env vars that
will indicate what context the DAG is parsed in. I even mentioned in the PR
that we should do it as a follow-up PR. I will add it next.

This is the current "detection" if we are in the task or not :):


  import sys
  import ast
  import setproctitle

  current_dag = None
  if len(sys.argv) > 3 and sys.argv[1] == "tasks":
      current_dag = sys.argv[3]
  else:
      try:
          PROCTITLE_PREFIX = "airflow task supervisor: "
          proctitle = str(setproctitle.getproctitle())
          if proctitle.startswith(PROCTITLE_PREFIX):
              args_string = proctitle[len(PROCTITLE_PREFIX) :]
              args = ast.literal_eval(args_string)
              if len(args) > 3 and args[1] == "tasks":
                  current_dag = args[3]
      except:
          pass




On Tue, Jul 19, 2022 at 6:36 AM Ping Zhang <pin...@umich.edu> wrote:

> Hi Jarek,
>
> Sorry for the late reply. It is very interesting! I like this idea. A very
> simple optimization can have huge improvements.
>
> One thing I am thinking Airflow core can expose certain env vars to the
> dag_file, something like, AIRFLOW_PARSING_ENV=[scheduler|worker], and/or
> AIRFLOW_DAG_ID=<the dag id to get from the dag_file> instead of using the
> `sys.argv` as the sys.argv format can change just like from 1.10 to 2, it
> was changed from `airflow run` to `airflow tasks run`. And this kind of
> interface changes won't trigger any errors but the dag file parsing does
> fall back to the 'original' way.
>
> Please let me know your thoughts on this.
>
> Thanks,
>
> Ping
>
>
> On Tue, Jul 12, 2022 at 12:45 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> Not interesting :) ?
>>
>> On Thu, Jul 7, 2022 at 10:41 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>> >
>> > Hello everyone,
>> >
>> > We have just published a blog on our medium -
>> https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629 -
>> that is a blog of one of our users Itay Bittan (thanks!) who had been
>> inspired by our discussion on Slack on how they struggle with delays of
>> loading dynamic dags in their K8S.
>> >
>> > The problem that they had was that they have dynamic dags that are
>> created in a big loop (1000s of DAGs) and that caused ~ 2 minutes delays on
>> starting their tas on K8S, because all DAGs have to be created by the loop.
>> >
>> > What I proposed to try (since the DAGs were connected by the loop but
>> really isolated from each other) is to skip "all other" DAG creation in the
>> loop when it is parsed in the worker. That resulted in cutting the delay to
>> ~ 200ms.
>> >
>> > His case seems to be general enough to maybe suggest it even as a
>> "general" solution - currently it is based on possibly several
>> "non-documented" assumptions (that dag_id is passed in a certain way to the
>> worker and that you can use it to filter out such a loop.
>> >
>> > However maybe that's a good idea to make it documented and convert into
>> "best practice" when you have similar Dynamic DAGs.
>> >
>> > I can think of several caveats of such an approach - not all DAGs
>> created in a loop can be isolated, sometimes there might be side-effects
>> that make your dag have different structure if  you skip other DAGs, but -
>> I thought that if we add some "guidelines" that could be easily replicated
>> by other users.
>> >
>> > WDYT?
>> >
>> > J.
>>
>

Reply via email to