maciej-szuszkiewicz commented on issue #41816: URL: https://github.com/apache/airflow/issues/41816#issuecomment-2405033993
Hey, I've ran into the same issue today. In our case, we're using in-house DAG factory for generating DAGs from configuration files. This can result in both long dag ids and task ids, as the task ids also contains task groups names. For example, I have a dag id that's already 81 chars long, and in addition to that, the DatabricksWorkflowTaskGroup is nested in another group. So, for me the task key generated by `DatabricksTaskBaseOperator._get_databricks_task_id` is 125 chars long, and I have no way of shortening it. When I try to run this dag, `DatabricksWorkflowTaskGroup.launch` operator fails with: ``` requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://<redacted>.cloud.databricks.com/api/2.1/jobs/create During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/operators/databricks_workflow.py", line 201, in execute job_id = self._create_or_reset_job(context) File "/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/operators/databricks_workflow.py", line 178, in _create_or_reset_job job_id = self._hook.create_job(job_spec) File "/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/hooks/databricks.py", line 226, in create_job response = self._do_api_call(CREATE_ENDPOINT, json) File "/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 579, in _do_api_call raise AirflowException(msg) airflow.exceptions.AirflowException: Response: {"error_code":"INVALID_PARAMETER_VALUE","message":"The provided task key (of 125 characters) exceeds the maximum allowed length of 100 characters."}, Status Code: 400 ``` I see three options here: - let users configure the `task_key` on their own in `DatabricksTaskBaseOperator`. I was able to set `task_key` in `DatabricksTaskOperator.task_config` attibute. `DatabricksWorkflowTaskGroup.launch` executed successfully and created a job in Databricks. However, the execution of that `DatabricksTaskOperator` failed as it was looking for a Databricks task with key generated by `_get_databricks_task_id`, which didn't exist in that job. - remove `dag_id` from `DatabricksTaskBaseOperator._get_databricks_task_id` - dag_id adds nothing to the uniqueness of the results returned from `_get_databricks_task_id`, as it's the same value for all tasks. Only the task id matters. But this is an incomplete fix, as it won't fix the issue in all cases, for example deeply nested groups which will cause Airflow task id to be longer than 100 chars - trim `_get_databricks_task_id` return value to the last 100 characters like `return f"{self.dag_id}__{task_id.replace('.', '__')}"[-100:]`. It may not return super pretty values for longer ids, but should do the trick. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org