GitHub user kch8306 added a comment to the discussion: Preventing Duplicate 
Queries During Asynchronous Chart Data Processing

@dosu

@celery_app.task(name="load_chart_data_into_cache", 
soft_time_limit=query_timeout)
def load_chart_data_into_cache(
    job_metadata: dict[str, Any],
    form_data: dict[str, Any],
) -> None:
    # pylint: disable=import-outside-toplevel
    from superset.commands.chart.data.get_data_command import ChartDataCommand

    redis_host = os.environ.get("REDIS_HOST", "172.20.0.1")
    redis_port = int(os.environ.get("REDIS_PORT", 26840))
    redis_db = int(os.environ.get("REDIS_DB", 9))
    redis_password = os.environ.get("REDIS_PASSWORD", None)

    redis_client = redis.StrictRedis(
        host=redis_host,
        port=redis_port,
        db=redis_db,
        password=redis_password,
        decode_responses=True
    )

    cache_entry_created = False
    LOCK_KEY_PREFIX = "superset:chart_data_lock:"
    LOCK_EXPIRATION_SECONDS = 300
    INITIAL_WAIT_SECONDS = 1
    MAX_WAIT_TIME_SECONDS = 600
    MAX_WAIT_SECONDS = 30

    with override_user(_load_user_from_job_metadata(job_metadata), force=False):
        try:
            set_form_data(form_data)
            query_context = _create_query_context_from_form(form_data)
            t_cache_key = 
query_context._processor.cache_key().removeprefix("qc-")
            logger.warning("=================t_cache_key: \n%s", t_cache_key)
            lock_key = f"{LOCK_KEY_PREFIX}{t_cache_key}"
            logger.warning("=================lock_key: \n%s", lock_key)

            lock_acquired_by_this_task = redis_client.set(lock_key, "locked", 
nx=True, ex=LOCK_EXPIRATION_SECONDS)
            if not lock_acquired_by_this_task:
                logger.warning(
                    "=================Failed to acquire Redis lock for %s. 
Another task is likely processing. Waiting...",
                    t_cache_key
                )
                current_wait_time = INITIAL_WAIT_SECONDS
                total_waited_time = 0
                while redis_client.exists(lock_key) and total_waited_time < 
MAX_WAIT_TIME_SECONDS:
                    logger.info(
                        "Waiting for lock %s to be released. Sleeping for %s 
seconds. Total waited: %s/%s",
                        lock_key, current_wait_time, total_waited_time, 
MAX_WAIT_TIME_SECONDS
                    )
                    time.sleep(current_wait_time)
                    total_waited_time += current_wait_time
                    current_wait_time = min(MAX_WAIT_SECONDS, current_wait_time 
* 2)

                logger.warning("=================Redis lock %s released. 
Proceeding to execute ChartDataCommand.", lock_key)

            else:
                logger.warning("=================Successfully acquired Redis 
lock for %s.", t_cache_key)

            command = ChartDataCommand(query_context)
            result = command.run(cache=True)
            cache_key = result["cache_key"]
            logger.warning("=================cache_key: \n%s", cache_key)
            result_url = f"/api/v1/chart/data/{cache_key}"

            async_query_manager.update_job(
                job_metadata,
                async_query_manager.STATUS_DONE,
                result_url=result_url,
            )

        except SoftTimeLimitExceeded as ex:
            logger.warning("A timeout occurred while loading chart data, error: 
%s", ex)
            raise ex
        except Exception as ex:
            # TODO: QueryContext should support SIP-40 style errors
            error = str(ex.message if hasattr(ex, "message") else ex)
            errors = [{"message": error}]
            async_query_manager.update_job(
                job_metadata, async_query_manager.STATUS_ERROR, errors=errors
            )
            raise ex
        finally:
            if lock_acquired_by_this_task and t_cache_key:
                try:
                    redis_client.delete(lock_key)
                    logger.warning("=================Redis lock released for 
%s.", t_cache_key)
                except Exception as e:
                    logger.error("Error releasing Redis lock for %s: %s", 
t_cache_key, str(e))

GitHub link: 
https://github.com/apache/superset/discussions/34316#discussioncomment-13901063

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to