maciej-szuszkiewicz commented on issue #41816:
URL: https://github.com/apache/airflow/issues/41816#issuecomment-2405033993

   Hey, I've ran into the same issue today. In our case, we're using in-house 
DAG factory for generating DAGs from configuration files. This can result in 
both long dag ids and task ids, as the task ids also contains task groups 
names. 
   For example, I have a dag id that's already 81 chars long, and in addition 
to that, the DatabricksWorkflowTaskGroup is nested in another group.
   So, for me the task key generated by 
`DatabricksTaskBaseOperator._get_databricks_task_id` is 125 chars long, and I 
have no way of shortening it.
   
   When I try to run this dag, `DatabricksWorkflowTaskGroup.launch` operator 
fails with:
   ```
   requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: 
https://<redacted>.cloud.databricks.com/api/2.1/jobs/create
   During handling of the above exception, another exception occurred:
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
465, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
432, in _execute_callable
       return execute_callable(context=context, **execute_callable_kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 
401, in wrapper
       return func(self, *args, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/operators/databricks_workflow.py",
 line 201, in execute
       job_id = self._create_or_reset_job(context)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/operators/databricks_workflow.py",
 line 178, in _create_or_reset_job
       job_id = self._hook.create_job(job_spec)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/hooks/databricks.py",
 line 226, in create_job
       response = self._do_api_call(CREATE_ENDPOINT, json)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 579, in _do_api_call
       raise AirflowException(msg)
   airflow.exceptions.AirflowException: Response: 
{"error_code":"INVALID_PARAMETER_VALUE","message":"The provided task key (of 
125 characters) exceeds the maximum allowed length of 100 characters."}, Status 
Code: 400
   ```
   
   I see three options here:
   - let users configure the `task_key` on their own in 
`DatabricksTaskBaseOperator`.   
   I was able to set `task_key` in `DatabricksTaskOperator.task_config` 
attibute. `DatabricksWorkflowTaskGroup.launch` executed successfully and 
created a job in Databricks. However, the execution of that 
`DatabricksTaskOperator` failed as it was looking for a Databricks task with 
key generated by `_get_databricks_task_id`, which didn't exist in that job.
   
   - remove `dag_id` from `DatabricksTaskBaseOperator._get_databricks_task_id` 
- dag_id adds nothing to the uniqueness of the results returned from 
`_get_databricks_task_id`, as it's the same value for all tasks. Only the task 
id matters. But this is an incomplete fix, as it won't fix the issue in all 
cases, for example deeply nested groups which will cause Airflow task id to be 
longer than 100 chars
   
   - trim `_get_databricks_task_id` return value to the last 100 characters 
like `return f"{self.dag_id}__{task_id.replace('.', '__')}"[-100:]`. It may not 
return super pretty values for longer ids, but should do the trick.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to