villebro commented on code in PR #29912:
URL: https://github.com/apache/superset/pull/29912#discussion_r1733477617


##########
superset/async_events/async_query_manager.py:
##########
@@ -230,14 +257,35 @@ def submit_chart_data_job(
     def read_events(
         self, channel: str, last_id: Optional[str]
     ) -> list[Optional[dict[str, Any]]]:
+        if not self._cache:
+            raise CacheBackendNotInitialized("Cache backend not initialized")
+
         stream_name = f"{self._stream_prefix}{channel}"
         start_id = increment_id(last_id) if last_id else "-"
-        results = self._redis.xrange(stream_name, start_id, "+", 
self.MAX_EVENT_COUNT)
+        results = self._cache.xrange(stream_name, start_id, "+", 
self.MAX_EVENT_COUNT)
+        # Decode bytes to strings, decode_responses is not supported at 
RedisCache and RedisSentinelCache
+        if isinstance(self._cache, (RedisSentinelCacheBackend, 
RedisCacheBackend)):
+            decoded_results = [
+                (
+                    event_id.decode("utf-8"),
+                    {
+                        key.decode("utf-8"): value.decode("utf-8")
+                        for key, value in event_data.items()
+                    },
+                )
+                for event_id, event_data in results
+            ]
+            return (
+                [] if not decoded_results else list(map(parse_event, 
decoded_results))
+            )

Review Comment:
   I'm not sure this part was mentioned in the description. Can you elaborate 
on the need for this change?



##########
superset/async_events/async_query_manager.py:
##########
@@ -16,18 +16,27 @@
 # under the License.
 import logging
 import uuid
-from typing import Any, Literal, Optional
+from typing import Any, Dict, Literal, Optional, Union

Review Comment:
   nit: we can now use `dict` in type annotations as long as we do the 
following import at the beginning:
   ```python
   from __future__ iimport annotations
   ```



##########
tests/unit_tests/async_events/async_query_manager_tests.py:
##########
@@ -26,17 +27,27 @@
     AsyncQueryManager,
     AsyncQueryTokenException,
 )
+from superset.async_events.cache_backend import (
+    RedisCacheBackend,
+    RedisSentinelCacheBackend,
+)
 
 JWT_TOKEN_SECRET = "some_secret"
 JWT_TOKEN_COOKIE_NAME = "superset_async_jwt"
 
+# Define the cache backends once as mocks
+cache_backends = {
+    "RedisCacheBackend": mock.Mock(spec=RedisCacheBackend),
+    "RedisSentinelCacheBackend": mock.Mock(spec=RedisSentinelCacheBackend),
+    "redis.Redis": mock.Mock(spec=redis.Redis),
+}
+

Review Comment:
   Could this loop be replaced by `pytest.mark.parametrize`? It feels more 
idiomatic (unless I'm missing some reason why it needs to be implemented like 
this..)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to