Actually this is even more complex as I found another case:


I thought about it too, but this will be only for 2.4+ and a ton of people
could benefit from that even now.

However I have a thought. I was waiting for something like that to appear
to have a very good reason to implement the "apache-airflow-future" package.

Maybe this is a cool opportunity to add the "apache-airflow-future" package.

We've been discussing it in the past and it could be a package that would
contain some of the tools, utils, etc. that are added in Airflow 2.3/2.4
and the like - very similar to Python's __future__.

In this case we could simply implement the variable check as
"airfow.uitls.dag_parsing_context.get_parsing_context()" AND at the same
time have the possibility to do "import airflow.__future__" (or similar)
that could simply monkeypatch "get_parsing_context" (with this - complex -
proctitle based implementation).
There are already a number of tools/utils like that available only in 2.2
and 2.3 and having a "future" package could make it simple for old version
users to use them.

BTW. This is the current version (<exploding head>):

  import sys
  import ast
  import setproctitle
  from airflow.models.dag import DAG

  current_dag = None
  if len(sys.argv) > 3 and sys.argv[1] == "tasks":
      # task executed by starting a new Python interpreter
      current_dag = sys.argv[3]
  else:
      try:
          PROCTITLE_SUPERVISOR_PREFIX = "airflow task supervisor: "
          PROCTITLE_TASK_RUNNER_PREFIX = "airflow task runner: "
          proctitle = str(setproctitle.getproctitle())
          if proctitle.startswith(PROCTITLE_SUPERVISOR_PREFIX):
              # task executed via forked process in celery
              args_string = proctitle[len(PROCTITLE_SUPERVISOR_PREFIX) :]
              args = ast.literal_eval(args_string)
              if len(args) > 3 and args[1] == "tasks":
                  current_dag = args[3]
          elif proctitle.startswith(PROCTITLE_TASK_RUNNER_PREFIX):
              # task executed via forked process in standard_task_runner
              args = proctitle[len(PROCTITLE_TASK_RUNNER_PREFIX) :].split("
")
              if len(args) > 0:
                  current_dag = args[0]
      except Exception:
          pass

  for thing in list_of_things:
      dag_id = f"generated_dag_{thing}"
      if current_dag is not None and current_dag != dag_id:
          continue  # skip generation of non-selected DAG

      dag = DAG(dag_id=dag_id, ...)
      globals()[dag_id] = dag


J.


On Tue, Jul 19, 2022 at 4:59 PM Jed Cunningham <jedcunning...@apache.org>
wrote:

> I think we should only document the "DAG author friendly" approach (env
> var or otherwise) and hold the existing PR until it is ready. My 2c.
>

Reply via email to