GitHub user kch8306 edited a discussion: Preventing Duplicate Queries During 
Asynchronous Chart Data Processing

Hello!

We are using Superset 3.1 with Redis as our cache database. To handle 
long-running queries, we have enabled GLOBAL_ASYNC_QUERIES. However, we’ve 
noticed an issue: when a dashboard is loaded and the charts are not yet cached, 
if multiple users access the same dashboard simultaneously, the same queries 
are sent repeatedly to the database. This is inefficient and increases the load.

To address this, I’m considering modifying the "load_chart_data_into_cache" 
function in "/var/local/idk2/heartset/superset/tasks/async_queries.py" 
(proposed changes below). I’d like to hear feedback from the community before 
proceeding.

If you have a specific code diff or further context you want included, let me 
know and I can update the body!


```
@celery_app.task(name="load_chart_data_into_cache", 
soft_time_limit=query_timeout)
def load_chart_data_into_cache(
    job_metadata: dict[str, Any],
    form_data: dict[str, Any],
) -> None:
    # pylint: disable=import-outside-toplevel
    from superset.commands.chart.data.get_data_command import ChartDataCommand

    with override_user(_load_user_from_job_metadata(job_metadata), force=False):
        try:
            set_form_data(form_data)
            query_context = _create_query_context_from_form(form_data)
            t_cache_key = 
query_context._processor.cache_key().removeprefix("qc-")

            session = db.create_scoped_session()
            try:
                cache_entry = 
session.query(CacheKey).filter_by(cache_key=t_cache_key).first()
                if cache_entry is None:
                    new_cache_entry = CacheKey(cache_key=t_cache_key, 
datasource_uid=1)
                    session.add(new_cache_entry)
                    session.commit()
                else:
                    session.close()
                    while True:
                        check_session = db.create_scoped_session()
                        try:
                            cache_entry_cnt = 
check_session.query(CacheKey).filter_by(cache_key=t_cache_key).count()
                            if cache_entry_cnt == 0:
                                break
                        finally:
                            check_session.close()
                        
logger.warning("=================wait=================:%s", t_cache_key)
                        time.sleep(5)
            except Exception as e:
                session.rollback()
                logger.error("Error in cache entry creation: %s", str(e))
                raise e
            finally:
                session.close()

            command = ChartDataCommand(query_context)
            result = command.run(cache=True)
            cache_key = result["cache_key"]
            result_url = f"/api/v1/chart/data/{cache_key}"

            delete_session = db.create_scoped_session()
            try:
                cache_entry = 
delete_session.query(CacheKey).filter_by(cache_key=t_cache_key).first()
                if cache_entry:
                    delete_session.delete(cache_entry)
                    delete_session.commit()
            except Exception as e:
                delete_session.rollback()
                logger.error("Error in cache entry deletion: %s", str(e))
                raise e
            finally:
                delete_session.close()

            async_query_manager.update_job(
                job_metadata,
                async_query_manager.STATUS_DONE,
                result_url=result_url,
            )

        except SoftTimeLimitExceeded as ex:
            logger.warning("A timeout occurred while loading chart data, error: 
%s", ex)
            raise ex
        except Exception as ex:
            # TODO: QueryContext should support SIP-40 style errors
            error = str(ex.message if hasattr(ex, "message") else ex)
            errors = [{"message": error}]
            async_query_manager.update_job(
                job_metadata, async_query_manager.STATUS_ERROR, errors=errors
            )
            raise ex
```

GitHub link: https://github.com/apache/superset/discussions/34316

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to