GitHub user kch8306 edited a comment on the discussion: Preventing Duplicate
Queries During Asynchronous Chart Data Processing
@dosu
```
@celery_app.task(name="load_chart_data_into_cache",
soft_time_limit=query_timeout)
def load_chart_data_into_cache(
job_metadata: dict[str, Any],
form_data: dict[str, Any],
) -> None:
# pylint: disable=import-outside-toplevel
from superset.commands.chart.data.get_data_command import ChartDataCommand
redis_host = os.environ.get("REDIS_HOST", "172.20.0.1")
redis_port = int(os.environ.get("REDIS_PORT", 26840))
redis_db = int(os.environ.get("REDIS_DB", 9))
redis_password = os.environ.get("REDIS_PASSWORD", None)
redis_client = redis.StrictRedis(
host=redis_host,
port=redis_port,
db=redis_db,
password=redis_password,
decode_responses=True
)
cache_entry_created = False
LOCK_KEY_PREFIX = "superset:chart_data_lock:"
LOCK_EXPIRATION_SECONDS = 600
INITIAL_WAIT_SECONDS = 1
MAX_WAIT_TIME_SECONDS = 600
MAX_WAIT_SECONDS = 30
with override_user(_load_user_from_job_metadata(job_metadata), force=False):
try:
set_form_data(form_data)
query_context = _create_query_context_from_form(form_data)
t_cache_key =
query_context._processor.cache_key().removeprefix("qc-")
logger.warning("=================t_cache_key: \n%s", t_cache_key)
lock_key = f"{LOCK_KEY_PREFIX}{t_cache_key}"
logger.warning("=================lock_key: \n%s", lock_key)
lock_acquired_by_this_task = redis_client.set(lock_key, "locked",
nx=True, ex=LOCK_EXPIRATION_SECONDS)
if not lock_acquired_by_this_task:
logger.warning(
"=================Failed to acquire Redis lock for %s.
Another task is likely processing. Waiting...",
t_cache_key
)
current_wait_time = INITIAL_WAIT_SECONDS
total_waited_time = 0
while redis_client.exists(lock_key) and total_waited_time <
MAX_WAIT_TIME_SECONDS:
logger.info(
"Waiting for lock %s to be released. Sleeping for %s
seconds. Total waited: %s/%s",
lock_key, current_wait_time, total_waited_time,
MAX_WAIT_TIME_SECONDS
)
time.sleep(current_wait_time)
total_waited_time += current_wait_time
current_wait_time = min(MAX_WAIT_SECONDS, current_wait_time
* 2)
logger.warning("=================Redis lock %s released.
Proceeding to execute ChartDataCommand.", lock_key)
else:
logger.warning("=================Successfully acquired Redis
lock for %s.", t_cache_key)
command = ChartDataCommand(query_context)
result = command.run(cache=True)
cache_key = result["cache_key"]
logger.warning("=================cache_key: \n%s", cache_key)
result_url = f"/api/v1/chart/data/{cache_key}"
async_query_manager.update_job(
job_metadata,
async_query_manager.STATUS_DONE,
result_url=result_url,
)
except SoftTimeLimitExceeded as ex:
logger.warning("A timeout occurred while loading chart data, error:
%s", ex)
raise ex
except Exception as ex:
# TODO: QueryContext should support SIP-40 style errors
error = str(ex.message if hasattr(ex, "message") else ex)
errors = [{"message": error}]
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_ERROR, errors=errors
)
raise ex
finally:
if lock_acquired_by_this_task and t_cache_key:
try:
redis_client.delete(lock_key)
logger.warning("=================Redis lock released for
%s.", t_cache_key)
except Exception as e:
logger.error("Error releasing Redis lock for %s: %s",
t_cache_key, str(e))
```
GitHub link:
https://github.com/apache/superset/discussions/34316#discussioncomment-13901063
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]