This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 500c9c921939ac8f6c5e9fca6494939ab0e1fd0e
Author: shyjsarah <[email protected]>
AuthorDate: Wed Apr 1 01:44:22 2026 -0700

    [python] Fix token cache isolation by adding user identity to cache key 
(#7571)
    
    Revert #7562 and fix token cache pollution with a minimal approach.
    
    **Problem:**
    In #7562, the class-level `_TOKEN_CACHE` was completely removed to fix
    token pollution across different catalog instances. However, this also
    removed the ability to reuse tokens for the same user on the same table,
    causing unnecessary token refresh requests.
    
    The root cause of the original pollution was that `_TOKEN_CACHE` was
    keyed only by table identifier (`str(self.identifier)`), so different
    users (different AK/SK credentials) operating on the same table within
    one process would share the same cached data token.
    
    **Fix:**
    Instead of removing the cache entirely, this PR adds **user identity**
    to the cache key to isolate tokens across different credentials while
    preserving token reuse for the same user.
---
 .../pypaimon/catalog/rest/rest_token_file_io.py    |  76 +++++-
 .../pypaimon/tests/rest/rest_token_file_io_test.py | 254 ++++++++++++++-------
 2 files changed, 233 insertions(+), 97 deletions(-)

diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py 
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 16b16e6972..7d4553175b 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -27,6 +27,8 @@ from pypaimon.api.rest_util import RESTUtil
 from pypaimon.catalog.rest.rest_token import RESTToken
 from pypaimon.common.file_io import FileIO
 from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
+from pypaimon.api.auth.bearer import BearTokenAuthProvider
+from pypaimon.api.auth.dlf_provider import DLFAuthProvider
 from pypaimon.common.identifier import Identifier, SYSTEM_TABLE_SPLITTER
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import CatalogOptions, OssOptions
@@ -44,6 +46,10 @@ class RESTTokenFileIO(FileIO):
     _FILE_IO_CACHE: TTLCache = None
     _FILE_IO_CACHE_LOCK = threading.Lock()
 
+    _TOKEN_CACHE: dict = {}
+    _TOKEN_LOCKS: dict = {}
+    _TOKEN_LOCKS_LOCK = threading.Lock()
+
     @classmethod
     def _get_file_io_cache(cls) -> TTLCache:
         if cls._FILE_IO_CACHE is None:
@@ -69,14 +75,12 @@ class RESTTokenFileIO(FileIO):
         self.properties = self.catalog_options or Options({})  # For 
compatibility with refresh_token()
         self.token: Optional[RESTToken] = None
         self.api_instance: Optional[RESTApi] = None
-        self.lock = threading.Lock()
         self.log = logging.getLogger(__name__)
         self._uri_reader_factory_cache: Optional[UriReaderFactory] = None
 
     def __getstate__(self):
         state = self.__dict__.copy()
         # Remove non-serializable objects
-        state.pop('lock', None)
         state.pop('api_instance', None)
         state.pop('_uri_reader_factory_cache', None)
         # token can be serialized, but we'll refresh it on deserialization
@@ -84,8 +88,6 @@ class RESTTokenFileIO(FileIO):
 
     def __setstate__(self, state):
         self.__dict__.update(state)
-        # Recreate lock after deserialization
-        self.lock = threading.Lock()
         self._uri_reader_factory_cache = None
         # api_instance will be recreated when needed
         self.api_instance = None
@@ -201,17 +203,69 @@ class RESTTokenFileIO(FileIO):
     def filesystem(self):
         return self.file_io().filesystem
 
+    def _build_cache_key(self) -> str:
+        """Build cache key with user identity to isolate tokens across 
different credentials."""
+        return f"{self._get_user_identity()}:{self.identifier}"
+
+    def _get_user_identity(self) -> str:
+        """Get user identity for token cache isolation.
+
+        For DLF auth (AK/SK, STS, ECS Role): returns access_key_id via 
get_token().
+        For Bear Token auth: returns the bearer token string.
+        For unknown auth: returns 'anonymous'.
+        """
+        if self.api_instance is None:
+            self.api_instance = RESTApi(self.properties, False)
+        auth_provider = self.api_instance.rest_auth_function.auth_provider
+        if isinstance(auth_provider, DLFAuthProvider):
+            return auth_provider.get_token().access_key_id or 'anonymous'
+        if isinstance(auth_provider, BearTokenAuthProvider):
+            return auth_provider.token or 'anonymous'
+        return 'anonymous'
+
     def try_to_refresh_token(self):
-        if self._should_refresh():
-            with self.lock:
-                if self._should_refresh():
-                    self.refresh_token()
+        identifier_str = self._build_cache_key()
 
-    def _should_refresh(self):
-        if self.token is None:
+        if self.token is not None and not self._is_token_expired(self.token):
+            return
+
+        cached_token = self._get_cached_token(identifier_str)
+        if cached_token and not self._is_token_expired(cached_token):
+            self.token = cached_token
+            return
+
+        global_lock = self._get_global_token_lock(identifier_str)
+
+        with global_lock:
+            cached_token = self._get_cached_token(identifier_str)
+            if cached_token and not self._is_token_expired(cached_token):
+                self.token = cached_token
+                return
+
+            token_to_check = cached_token if cached_token else self.token
+            if token_to_check is None or 
self._is_token_expired(token_to_check):
+                self.refresh_token()
+                self._set_cached_token(identifier_str, self.token)
+
+    def _get_cached_token(self, identifier_str: str) -> Optional[RESTToken]:
+        with self._TOKEN_LOCKS_LOCK:
+            return self._TOKEN_CACHE.get(identifier_str)
+
+    def _set_cached_token(self, identifier_str: str, token: RESTToken):
+        with self._TOKEN_LOCKS_LOCK:
+            self._TOKEN_CACHE[identifier_str] = token
+
+    def _is_token_expired(self, token: Optional[RESTToken]) -> bool:
+        if token is None:
             return True
         current_time = int(time.time() * 1000)
-        return (self.token.expire_at_millis - current_time) < 
RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS
+        return (token.expire_at_millis - current_time) < 
RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS
+
+    def _get_global_token_lock(self, identifier_str: str) -> threading.Lock:
+        with self._TOKEN_LOCKS_LOCK:
+            if identifier_str not in self._TOKEN_LOCKS:
+                self._TOKEN_LOCKS[identifier_str] = threading.Lock()
+            return self._TOKEN_LOCKS[identifier_str]
 
     def refresh_token(self):
         self.log.info(f"begin refresh data token for identifier 
[{self.identifier}]")
diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py 
b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
index cdcd5ed36c..b94ad58b24 100644
--- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
@@ -148,9 +148,6 @@ class RESTTokenFileIOTest(unittest.TestCase):
                 self.catalog_options
             )
 
-            self.assertTrue(hasattr(original_file_io, 'lock'))
-            self.assertIsNotNone(original_file_io.lock)
-
             pickled = pickle.dumps(original_file_io)
 
             deserialized_file_io = pickle.loads(pickled)
@@ -159,10 +156,6 @@ class RESTTokenFileIOTest(unittest.TestCase):
             self.assertEqual(deserialized_file_io.path, original_file_io.path)
             self.assertEqual(deserialized_file_io.properties.data, 
original_file_io.properties.data)
 
-            self.assertTrue(hasattr(deserialized_file_io, 'lock'))
-            self.assertIsNotNone(deserialized_file_io.lock)
-            self.assertIsNot(deserialized_file_io.lock, original_file_io.lock)
-
             self.assertIsNone(deserialized_file_io.api_instance)
 
             test_file_path = f"file://{self.temp_dir}/pickle_test.txt"
@@ -306,56 +299,183 @@ class RESTTokenFileIOTest(unittest.TestCase):
             self.assertIsNotNone(restored_file_io.uri_reader_factory,
                                  "uri_reader_factory should work after 
deserialization")
 
-    def test_should_refresh_when_token_is_none(self):
-        """_should_refresh() returns True when token is None."""
-        with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
-            file_io = RESTTokenFileIO(
-                self.identifier, self.warehouse_path, self.catalog_options)
-            self.assertIsNone(file_io.token)
-            self.assertTrue(file_io._should_refresh())
+    def test_build_cache_key_with_dlf_auth(self):
+        """Test that _build_cache_key includes DLF access_key_id for cache 
isolation."""
+        from pypaimon.api.auth.dlf_provider import DLFAuthProvider
+        from pypaimon.api.token_loader import DLFToken
+
+        catalog_options = Options({
+            CatalogOptions.URI.key(): "http://test-uri";,
+            CatalogOptions.TOKEN_PROVIDER.key(): "dlf",
+            CatalogOptions.DLF_REGION.key(): "cn-hangzhou",
+            CatalogOptions.DLF_ACCESS_KEY_ID.key(): "test-ak-123",
+            CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "test-sk-456",
+        })
 
-    def test_should_refresh_when_token_not_expired(self):
-        """_should_refresh() returns False when token is far from expiry."""
         with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
             file_io = RESTTokenFileIO(
-                self.identifier, self.warehouse_path, self.catalog_options)
-            # Token that expires 2 hours from now (well beyond the 1-hour safe 
margin)
-            future_millis = int(time.time() * 1000) + 7200_000
-            file_io.token = RESTToken({'ak': 'v'}, future_millis)
-            self.assertFalse(file_io._should_refresh())
-
-    def test_should_refresh_when_token_expired(self):
-        """_should_refresh() returns True when token is already expired."""
+                self.identifier,
+                self.warehouse_path,
+                catalog_options
+            )
+
+        mock_api = MagicMock()
+        mock_dlf_provider = MagicMock(spec=DLFAuthProvider)
+        mock_dlf_provider.get_token.return_value = DLFToken(
+            access_key_id="test-ak-123",
+            access_key_secret="test-sk-456",
+            security_token=None
+        )
+        mock_api.rest_auth_function.auth_provider = mock_dlf_provider
+        file_io.api_instance = mock_api
+
+        cache_key = file_io._build_cache_key()
+        self.assertTrue(cache_key.startswith("test-ak-123:"),
+                        f"Cache key should start with access_key_id, got: 
{cache_key}")
+        self.assertIn(str(self.identifier), cache_key)
+
+    def test_build_cache_key_with_bear_auth(self):
+        """Test that _build_cache_key includes bearer token for cache 
isolation."""
+        from pypaimon.api.auth.bearer import BearTokenAuthProvider
+
+        catalog_options = Options({
+            CatalogOptions.URI.key(): "http://test-uri";,
+            CatalogOptions.TOKEN_PROVIDER.key(): "bear",
+            CatalogOptions.TOKEN.key(): "my-bearer-token",
+        })
+
         with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
             file_io = RESTTokenFileIO(
-                self.identifier, self.warehouse_path, self.catalog_options)
-            # Token that expired 1 second ago
-            past_millis = int(time.time() * 1000) - 1000
-            file_io.token = RESTToken({'ak': 'v'}, past_millis)
-            self.assertTrue(file_io._should_refresh())
-
-    def test_try_to_refresh_token_calls_refresh_once(self):
-        """try_to_refresh_token() calls refresh_token() exactly once via 
double-check."""
-        file_io = RESTTokenFileIO(
-            self.identifier, self.warehouse_path, self.catalog_options)
-        self.assertIsNone(file_io.token)
-
-        mock_response = MagicMock()
-        mock_response.token = {'ak': 'test-ak'}
-        mock_response.expires_at_millis = int(time.time() * 1000) + 7200_000
+                self.identifier,
+                self.warehouse_path,
+                catalog_options
+            )
 
         mock_api = MagicMock()
-        mock_api.load_table_token.return_value = mock_response
+        mock_bear_provider = MagicMock(spec=BearTokenAuthProvider)
+        mock_bear_provider.token = "my-bearer-token"
+        mock_api.rest_auth_function.auth_provider = mock_bear_provider
         file_io.api_instance = mock_api
 
-        file_io.try_to_refresh_token()
+        cache_key = file_io._build_cache_key()
+        self.assertTrue(cache_key.startswith("my-bearer-token:"),
+                        f"Cache key should start with bearer token, got: 
{cache_key}")
+        self.assertIn(str(self.identifier), cache_key)
+
+    def test_different_ak_same_table_token_isolation(self):
+        """Test that two RESTTokenFileIO instances with different AKs on the 
same table
+        do not share cached tokens."""
+        from pypaimon.api.auth.dlf_provider import DLFAuthProvider
+        from pypaimon.api.token_loader import DLFToken
+
+        # Clear class-level token cache before test
+        RESTTokenFileIO._TOKEN_CACHE.clear()
+        RESTTokenFileIO._TOKEN_LOCKS.clear()
 
-        mock_api.load_table_token.assert_called_once()
-        self.assertIsNotNone(file_io.token)
+        identifier = Identifier.from_string("db.same_table")
+        future_expiry = int(time.time() * 1000) + 7_200_000  # 2 hours from now
 
-        # Second call should NOT trigger refresh again (token is valid)
-        file_io.try_to_refresh_token()
-        mock_api.load_table_token.assert_called_once()
+        # Create file_io_1 with AK "read-ak"
+        with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+            file_io_1 = RESTTokenFileIO(
+                identifier, self.warehouse_path,
+                Options({
+                    CatalogOptions.URI.key(): "http://test-uri";,
+                    CatalogOptions.TOKEN_PROVIDER.key(): "dlf",
+                    CatalogOptions.DLF_REGION.key(): "cn-hangzhou",
+                    CatalogOptions.DLF_ACCESS_KEY_ID.key(): "read-ak",
+                    CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "read-sk",
+                })
+            )
+
+        mock_api_1 = MagicMock()
+        mock_dlf_1 = MagicMock(spec=DLFAuthProvider)
+        mock_dlf_1.get_token.return_value = DLFToken("read-ak", "read-sk", 
None)
+        mock_api_1.rest_auth_function.auth_provider = mock_dlf_1
+        file_io_1.api_instance = mock_api_1
+
+        read_token = RESTToken({"fs.oss.accessKeyId": "read-data-ak"}, 
future_expiry)
+        file_io_1.token = read_token
+        cache_key_1 = file_io_1._build_cache_key()
+        file_io_1._set_cached_token(cache_key_1, read_token)
+
+        # Create file_io_2 with AK "write-ak"
+        with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+            file_io_2 = RESTTokenFileIO(
+                identifier, self.warehouse_path,
+                Options({
+                    CatalogOptions.URI.key(): "http://test-uri";,
+                    CatalogOptions.TOKEN_PROVIDER.key(): "dlf",
+                    CatalogOptions.DLF_REGION.key(): "cn-hangzhou",
+                    CatalogOptions.DLF_ACCESS_KEY_ID.key(): "write-ak",
+                    CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "write-sk",
+                })
+            )
+
+        mock_api_2 = MagicMock()
+        mock_dlf_2 = MagicMock(spec=DLFAuthProvider)
+        mock_dlf_2.get_token.return_value = DLFToken("write-ak", "write-sk", 
None)
+        mock_api_2.rest_auth_function.auth_provider = mock_dlf_2
+        file_io_2.api_instance = mock_api_2
+
+        cache_key_2 = file_io_2._build_cache_key()
+
+        # Cache keys should be different
+        self.assertNotEqual(cache_key_1, cache_key_2,
+                            "Different AKs on the same table should produce 
different cache keys")
+
+        # file_io_2 should NOT get file_io_1's cached token
+        cached_for_2 = file_io_2._get_cached_token(cache_key_2)
+        self.assertIsNone(cached_for_2,
+                          "file_io_2 should not see file_io_1's cached token")
+
+    def test_same_ak_same_table_token_reuse(self):
+        """Test that two RESTTokenFileIO instances with the same AK on the 
same table
+        can share cached tokens."""
+        from pypaimon.api.auth.dlf_provider import DLFAuthProvider
+        from pypaimon.api.token_loader import DLFToken
+
+        # Clear class-level token cache before test
+        RESTTokenFileIO._TOKEN_CACHE.clear()
+        RESTTokenFileIO._TOKEN_LOCKS.clear()
+
+        identifier = Identifier.from_string("db.same_table")
+        future_expiry = int(time.time() * 1000) + 7_200_000
+
+        def make_file_io():
+            with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+                fio = RESTTokenFileIO(
+                    identifier, self.warehouse_path,
+                    Options({
+                        CatalogOptions.URI.key(): "http://test-uri";,
+                        CatalogOptions.TOKEN_PROVIDER.key(): "dlf",
+                        CatalogOptions.DLF_REGION.key(): "cn-hangzhou",
+                        CatalogOptions.DLF_ACCESS_KEY_ID.key(): "same-ak",
+                        CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "same-sk",
+                    })
+                )
+            mock_api = MagicMock()
+            mock_dlf = MagicMock(spec=DLFAuthProvider)
+            mock_dlf.get_token.return_value = DLFToken("same-ak", "same-sk", 
None)
+            mock_api.rest_auth_function.auth_provider = mock_dlf
+            fio.api_instance = mock_api
+            return fio
+
+        file_io_1 = make_file_io()
+        file_io_2 = make_file_io()
+
+        # Cache keys should be the same
+        self.assertEqual(file_io_1._build_cache_key(), 
file_io_2._build_cache_key(),
+                         "Same AK on the same table should produce the same 
cache key")
+
+        # Set token via file_io_1
+        shared_token = RESTToken({"fs.oss.accessKeyId": "shared-data-ak"}, 
future_expiry)
+        file_io_1._set_cached_token(file_io_1._build_cache_key(), shared_token)
+
+        # file_io_2 should see the same token
+        cached = file_io_2._get_cached_token(file_io_2._build_cache_key())
+        self.assertIsNotNone(cached, "file_io_2 should see file_io_1's cached 
token")
+        self.assertEqual(cached.token.get("fs.oss.accessKeyId"), 
"shared-data-ak")
 
     def test_refresh_token_strips_system_table_suffix(self):
         """refresh_token() strips $snapshots suffix before requesting token."""
@@ -365,7 +485,7 @@ class RESTTokenFileIOTest(unittest.TestCase):
 
         mock_response = MagicMock()
         mock_response.token = {'ak': 'test-ak'}
-        mock_response.expires_at_millis = int(time.time() * 1000) + 7200_000
+        mock_response.expires_at_millis = int(time.time() * 1000) + 7_200_000
 
         mock_api = MagicMock()
         mock_api.load_table_token.return_value = mock_response
@@ -373,7 +493,6 @@ class RESTTokenFileIOTest(unittest.TestCase):
 
         file_io.refresh_token()
 
-        # Verify load_table_token was called with base table identifier (no 
$snapshots)
         called_identifier = mock_api.load_table_token.call_args[0][0]
         self.assertEqual(called_identifier.get_database_name(), "db")
         self.assertEqual(called_identifier.get_object_name(), "my_table")
@@ -386,7 +505,7 @@ class RESTTokenFileIOTest(unittest.TestCase):
 
         mock_response = MagicMock()
         mock_response.token = {'ak': 'test-ak'}
-        mock_response.expires_at_millis = int(time.time() * 1000) + 7200_000
+        mock_response.expires_at_millis = int(time.time() * 1000) + 7_200_000
 
         mock_api = MagicMock()
         mock_api.load_table_token.return_value = mock_response
@@ -397,43 +516,6 @@ class RESTTokenFileIOTest(unittest.TestCase):
         called_identifier = mock_api.load_table_token.call_args[0][0]
         self.assertEqual(called_identifier.get_object_name(), "my_table")
 
-    def test_different_instances_do_not_share_token(self):
-        """Two instances with same identifier get independent tokens (no 
class-level cache)."""
-        same_identifier = Identifier.from_string("db.shared_table")
-
-        file_io_a = RESTTokenFileIO(
-            same_identifier, self.warehouse_path, self.catalog_options)
-        file_io_b = RESTTokenFileIO(
-            same_identifier, self.warehouse_path, self.catalog_options)
-
-        token_a = RESTToken({'ak': 'ak-A'}, int(time.time() * 1000) + 7200_000)
-        token_b = RESTToken({'ak': 'ak-B'}, int(time.time() * 1000) + 7200_000)
-
-        mock_response_a = MagicMock()
-        mock_response_a.token = token_a.token
-        mock_response_a.expires_at_millis = token_a.expire_at_millis
-
-        mock_response_b = MagicMock()
-        mock_response_b.token = token_b.token
-        mock_response_b.expires_at_millis = token_b.expire_at_millis
-
-        mock_api_a = MagicMock()
-        mock_api_a.load_table_token.return_value = mock_response_a
-        file_io_a.api_instance = mock_api_a
-
-        mock_api_b = MagicMock()
-        mock_api_b.load_table_token.return_value = mock_response_b
-        file_io_b.api_instance = mock_api_b
-
-        # Refresh both
-        file_io_a.try_to_refresh_token()
-        file_io_b.try_to_refresh_token()
-
-        # Each instance should hold its own token
-        self.assertEqual(file_io_a.token.token['ak'], 'ak-A')
-        self.assertEqual(file_io_b.token.token['ak'], 'ak-B')
-        self.assertIsNot(file_io_a.token, file_io_b.token)
-
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to