villebro commented on code in PR #29912:
URL: https://github.com/apache/superset/pull/29912#discussion_r1736740946
##########
superset/async_events/async_query_manager.py:
##########
@@ -230,14 +257,35 @@ def submit_chart_data_job(
def read_events(
self, channel: str, last_id: Optional[str]
) -> list[Optional[dict[str, Any]]]:
+ if not self._cache:
+ raise CacheBackendNotInitialized("Cache backend not initialized")
+
stream_name = f"{self._stream_prefix}{channel}"
start_id = increment_id(last_id) if last_id else "-"
- results = self._redis.xrange(stream_name, start_id, "+",
self.MAX_EVENT_COUNT)
+ results = self._cache.xrange(stream_name, start_id, "+",
self.MAX_EVENT_COUNT)
+ # Decode bytes to strings, decode_responses is not supported at
RedisCache and RedisSentinelCache
+ if isinstance(self._cache, (RedisSentinelCacheBackend,
RedisCacheBackend)):
+ decoded_results = [
+ (
+ event_id.decode("utf-8"),
+ {
+ key.decode("utf-8"): value.decode("utf-8")
+ for key, value in event_data.items()
+ },
+ )
+ for event_id, event_data in results
+ ]
+ return (
+ [] if not decoded_results else list(map(parse_event,
decoded_results))
+ )
Review Comment:
Got it, I hadn't noticed we were using `redis.Redis`, I thought we were
using `RedisCache` from `flask_caching` here. Thanks for clarifying.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]