This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 500c9c921939ac8f6c5e9fca6494939ab0e1fd0e Author: shyjsarah <[email protected]> AuthorDate: Wed Apr 1 01:44:22 2026 -0700 [python] Fix token cache isolation by adding user identity to cache key (#7571) Revert #7562 and fix token cache pollution with a minimal approach. **Problem:** In #7562, the class-level `_TOKEN_CACHE` was completely removed to fix token pollution across different catalog instances. However, this also removed the ability to reuse tokens for the same user on the same table, causing unnecessary token refresh requests. The root cause of the original pollution was that `_TOKEN_CACHE` was keyed only by table identifier (`str(self.identifier)`), so different users (different AK/SK credentials) operating on the same table within one process would share the same cached data token. **Fix:** Instead of removing the cache entirely, this PR adds **user identity** to the cache key to isolate tokens across different credentials while preserving token reuse for the same user. --- .../pypaimon/catalog/rest/rest_token_file_io.py | 76 +++++- .../pypaimon/tests/rest/rest_token_file_io_test.py | 254 ++++++++++++++------- 2 files changed, 233 insertions(+), 97 deletions(-) diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index 16b16e6972..7d4553175b 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -27,6 +27,8 @@ from pypaimon.api.rest_util import RESTUtil from pypaimon.catalog.rest.rest_token import RESTToken from pypaimon.common.file_io import FileIO from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO +from pypaimon.api.auth.bearer import BearTokenAuthProvider +from pypaimon.api.auth.dlf_provider import DLFAuthProvider from pypaimon.common.identifier import Identifier, SYSTEM_TABLE_SPLITTER from pypaimon.common.options import Options from pypaimon.common.options.config import CatalogOptions, OssOptions @@ -44,6 +46,10 @@ class RESTTokenFileIO(FileIO): _FILE_IO_CACHE: TTLCache = None _FILE_IO_CACHE_LOCK = threading.Lock() + _TOKEN_CACHE: dict = {} + _TOKEN_LOCKS: dict = {} + _TOKEN_LOCKS_LOCK = threading.Lock() + @classmethod def _get_file_io_cache(cls) -> TTLCache: if cls._FILE_IO_CACHE is None: @@ -69,14 +75,12 @@ class RESTTokenFileIO(FileIO): self.properties = self.catalog_options or Options({}) # For compatibility with refresh_token() self.token: Optional[RESTToken] = None self.api_instance: Optional[RESTApi] = None - self.lock = threading.Lock() self.log = logging.getLogger(__name__) self._uri_reader_factory_cache: Optional[UriReaderFactory] = None def __getstate__(self): state = self.__dict__.copy() # Remove non-serializable objects - state.pop('lock', None) state.pop('api_instance', None) state.pop('_uri_reader_factory_cache', None) # token can be serialized, but we'll refresh it on deserialization @@ -84,8 +88,6 @@ class RESTTokenFileIO(FileIO): def __setstate__(self, state): self.__dict__.update(state) - # Recreate lock after deserialization - self.lock = threading.Lock() self._uri_reader_factory_cache = None # api_instance will be recreated when needed self.api_instance = None @@ -201,17 +203,69 @@ class RESTTokenFileIO(FileIO): def filesystem(self): return self.file_io().filesystem + def _build_cache_key(self) -> str: + """Build cache key with user identity to isolate tokens across different credentials.""" + return f"{self._get_user_identity()}:{self.identifier}" + + def _get_user_identity(self) -> str: + """Get user identity for token cache isolation. + + For DLF auth (AK/SK, STS, ECS Role): returns access_key_id via get_token(). + For Bear Token auth: returns the bearer token string. + For unknown auth: returns 'anonymous'. + """ + if self.api_instance is None: + self.api_instance = RESTApi(self.properties, False) + auth_provider = self.api_instance.rest_auth_function.auth_provider + if isinstance(auth_provider, DLFAuthProvider): + return auth_provider.get_token().access_key_id or 'anonymous' + if isinstance(auth_provider, BearTokenAuthProvider): + return auth_provider.token or 'anonymous' + return 'anonymous' + def try_to_refresh_token(self): - if self._should_refresh(): - with self.lock: - if self._should_refresh(): - self.refresh_token() + identifier_str = self._build_cache_key() - def _should_refresh(self): - if self.token is None: + if self.token is not None and not self._is_token_expired(self.token): + return + + cached_token = self._get_cached_token(identifier_str) + if cached_token and not self._is_token_expired(cached_token): + self.token = cached_token + return + + global_lock = self._get_global_token_lock(identifier_str) + + with global_lock: + cached_token = self._get_cached_token(identifier_str) + if cached_token and not self._is_token_expired(cached_token): + self.token = cached_token + return + + token_to_check = cached_token if cached_token else self.token + if token_to_check is None or self._is_token_expired(token_to_check): + self.refresh_token() + self._set_cached_token(identifier_str, self.token) + + def _get_cached_token(self, identifier_str: str) -> Optional[RESTToken]: + with self._TOKEN_LOCKS_LOCK: + return self._TOKEN_CACHE.get(identifier_str) + + def _set_cached_token(self, identifier_str: str, token: RESTToken): + with self._TOKEN_LOCKS_LOCK: + self._TOKEN_CACHE[identifier_str] = token + + def _is_token_expired(self, token: Optional[RESTToken]) -> bool: + if token is None: return True current_time = int(time.time() * 1000) - return (self.token.expire_at_millis - current_time) < RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS + return (token.expire_at_millis - current_time) < RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS + + def _get_global_token_lock(self, identifier_str: str) -> threading.Lock: + with self._TOKEN_LOCKS_LOCK: + if identifier_str not in self._TOKEN_LOCKS: + self._TOKEN_LOCKS[identifier_str] = threading.Lock() + return self._TOKEN_LOCKS[identifier_str] def refresh_token(self): self.log.info(f"begin refresh data token for identifier [{self.identifier}]") diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py index cdcd5ed36c..b94ad58b24 100644 --- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py @@ -148,9 +148,6 @@ class RESTTokenFileIOTest(unittest.TestCase): self.catalog_options ) - self.assertTrue(hasattr(original_file_io, 'lock')) - self.assertIsNotNone(original_file_io.lock) - pickled = pickle.dumps(original_file_io) deserialized_file_io = pickle.loads(pickled) @@ -159,10 +156,6 @@ class RESTTokenFileIOTest(unittest.TestCase): self.assertEqual(deserialized_file_io.path, original_file_io.path) self.assertEqual(deserialized_file_io.properties.data, original_file_io.properties.data) - self.assertTrue(hasattr(deserialized_file_io, 'lock')) - self.assertIsNotNone(deserialized_file_io.lock) - self.assertIsNot(deserialized_file_io.lock, original_file_io.lock) - self.assertIsNone(deserialized_file_io.api_instance) test_file_path = f"file://{self.temp_dir}/pickle_test.txt" @@ -306,56 +299,183 @@ class RESTTokenFileIOTest(unittest.TestCase): self.assertIsNotNone(restored_file_io.uri_reader_factory, "uri_reader_factory should work after deserialization") - def test_should_refresh_when_token_is_none(self): - """_should_refresh() returns True when token is None.""" - with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): - file_io = RESTTokenFileIO( - self.identifier, self.warehouse_path, self.catalog_options) - self.assertIsNone(file_io.token) - self.assertTrue(file_io._should_refresh()) + def test_build_cache_key_with_dlf_auth(self): + """Test that _build_cache_key includes DLF access_key_id for cache isolation.""" + from pypaimon.api.auth.dlf_provider import DLFAuthProvider + from pypaimon.api.token_loader import DLFToken + + catalog_options = Options({ + CatalogOptions.URI.key(): "http://test-uri", + CatalogOptions.TOKEN_PROVIDER.key(): "dlf", + CatalogOptions.DLF_REGION.key(): "cn-hangzhou", + CatalogOptions.DLF_ACCESS_KEY_ID.key(): "test-ak-123", + CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "test-sk-456", + }) - def test_should_refresh_when_token_not_expired(self): - """_should_refresh() returns False when token is far from expiry.""" with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): file_io = RESTTokenFileIO( - self.identifier, self.warehouse_path, self.catalog_options) - # Token that expires 2 hours from now (well beyond the 1-hour safe margin) - future_millis = int(time.time() * 1000) + 7200_000 - file_io.token = RESTToken({'ak': 'v'}, future_millis) - self.assertFalse(file_io._should_refresh()) - - def test_should_refresh_when_token_expired(self): - """_should_refresh() returns True when token is already expired.""" + self.identifier, + self.warehouse_path, + catalog_options + ) + + mock_api = MagicMock() + mock_dlf_provider = MagicMock(spec=DLFAuthProvider) + mock_dlf_provider.get_token.return_value = DLFToken( + access_key_id="test-ak-123", + access_key_secret="test-sk-456", + security_token=None + ) + mock_api.rest_auth_function.auth_provider = mock_dlf_provider + file_io.api_instance = mock_api + + cache_key = file_io._build_cache_key() + self.assertTrue(cache_key.startswith("test-ak-123:"), + f"Cache key should start with access_key_id, got: {cache_key}") + self.assertIn(str(self.identifier), cache_key) + + def test_build_cache_key_with_bear_auth(self): + """Test that _build_cache_key includes bearer token for cache isolation.""" + from pypaimon.api.auth.bearer import BearTokenAuthProvider + + catalog_options = Options({ + CatalogOptions.URI.key(): "http://test-uri", + CatalogOptions.TOKEN_PROVIDER.key(): "bear", + CatalogOptions.TOKEN.key(): "my-bearer-token", + }) + with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): file_io = RESTTokenFileIO( - self.identifier, self.warehouse_path, self.catalog_options) - # Token that expired 1 second ago - past_millis = int(time.time() * 1000) - 1000 - file_io.token = RESTToken({'ak': 'v'}, past_millis) - self.assertTrue(file_io._should_refresh()) - - def test_try_to_refresh_token_calls_refresh_once(self): - """try_to_refresh_token() calls refresh_token() exactly once via double-check.""" - file_io = RESTTokenFileIO( - self.identifier, self.warehouse_path, self.catalog_options) - self.assertIsNone(file_io.token) - - mock_response = MagicMock() - mock_response.token = {'ak': 'test-ak'} - mock_response.expires_at_millis = int(time.time() * 1000) + 7200_000 + self.identifier, + self.warehouse_path, + catalog_options + ) mock_api = MagicMock() - mock_api.load_table_token.return_value = mock_response + mock_bear_provider = MagicMock(spec=BearTokenAuthProvider) + mock_bear_provider.token = "my-bearer-token" + mock_api.rest_auth_function.auth_provider = mock_bear_provider file_io.api_instance = mock_api - file_io.try_to_refresh_token() + cache_key = file_io._build_cache_key() + self.assertTrue(cache_key.startswith("my-bearer-token:"), + f"Cache key should start with bearer token, got: {cache_key}") + self.assertIn(str(self.identifier), cache_key) + + def test_different_ak_same_table_token_isolation(self): + """Test that two RESTTokenFileIO instances with different AKs on the same table + do not share cached tokens.""" + from pypaimon.api.auth.dlf_provider import DLFAuthProvider + from pypaimon.api.token_loader import DLFToken + + # Clear class-level token cache before test + RESTTokenFileIO._TOKEN_CACHE.clear() + RESTTokenFileIO._TOKEN_LOCKS.clear() - mock_api.load_table_token.assert_called_once() - self.assertIsNotNone(file_io.token) + identifier = Identifier.from_string("db.same_table") + future_expiry = int(time.time() * 1000) + 7_200_000 # 2 hours from now - # Second call should NOT trigger refresh again (token is valid) - file_io.try_to_refresh_token() - mock_api.load_table_token.assert_called_once() + # Create file_io_1 with AK "read-ak" + with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): + file_io_1 = RESTTokenFileIO( + identifier, self.warehouse_path, + Options({ + CatalogOptions.URI.key(): "http://test-uri", + CatalogOptions.TOKEN_PROVIDER.key(): "dlf", + CatalogOptions.DLF_REGION.key(): "cn-hangzhou", + CatalogOptions.DLF_ACCESS_KEY_ID.key(): "read-ak", + CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "read-sk", + }) + ) + + mock_api_1 = MagicMock() + mock_dlf_1 = MagicMock(spec=DLFAuthProvider) + mock_dlf_1.get_token.return_value = DLFToken("read-ak", "read-sk", None) + mock_api_1.rest_auth_function.auth_provider = mock_dlf_1 + file_io_1.api_instance = mock_api_1 + + read_token = RESTToken({"fs.oss.accessKeyId": "read-data-ak"}, future_expiry) + file_io_1.token = read_token + cache_key_1 = file_io_1._build_cache_key() + file_io_1._set_cached_token(cache_key_1, read_token) + + # Create file_io_2 with AK "write-ak" + with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): + file_io_2 = RESTTokenFileIO( + identifier, self.warehouse_path, + Options({ + CatalogOptions.URI.key(): "http://test-uri", + CatalogOptions.TOKEN_PROVIDER.key(): "dlf", + CatalogOptions.DLF_REGION.key(): "cn-hangzhou", + CatalogOptions.DLF_ACCESS_KEY_ID.key(): "write-ak", + CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "write-sk", + }) + ) + + mock_api_2 = MagicMock() + mock_dlf_2 = MagicMock(spec=DLFAuthProvider) + mock_dlf_2.get_token.return_value = DLFToken("write-ak", "write-sk", None) + mock_api_2.rest_auth_function.auth_provider = mock_dlf_2 + file_io_2.api_instance = mock_api_2 + + cache_key_2 = file_io_2._build_cache_key() + + # Cache keys should be different + self.assertNotEqual(cache_key_1, cache_key_2, + "Different AKs on the same table should produce different cache keys") + + # file_io_2 should NOT get file_io_1's cached token + cached_for_2 = file_io_2._get_cached_token(cache_key_2) + self.assertIsNone(cached_for_2, + "file_io_2 should not see file_io_1's cached token") + + def test_same_ak_same_table_token_reuse(self): + """Test that two RESTTokenFileIO instances with the same AK on the same table + can share cached tokens.""" + from pypaimon.api.auth.dlf_provider import DLFAuthProvider + from pypaimon.api.token_loader import DLFToken + + # Clear class-level token cache before test + RESTTokenFileIO._TOKEN_CACHE.clear() + RESTTokenFileIO._TOKEN_LOCKS.clear() + + identifier = Identifier.from_string("db.same_table") + future_expiry = int(time.time() * 1000) + 7_200_000 + + def make_file_io(): + with patch.object(RESTTokenFileIO, 'try_to_refresh_token'): + fio = RESTTokenFileIO( + identifier, self.warehouse_path, + Options({ + CatalogOptions.URI.key(): "http://test-uri", + CatalogOptions.TOKEN_PROVIDER.key(): "dlf", + CatalogOptions.DLF_REGION.key(): "cn-hangzhou", + CatalogOptions.DLF_ACCESS_KEY_ID.key(): "same-ak", + CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "same-sk", + }) + ) + mock_api = MagicMock() + mock_dlf = MagicMock(spec=DLFAuthProvider) + mock_dlf.get_token.return_value = DLFToken("same-ak", "same-sk", None) + mock_api.rest_auth_function.auth_provider = mock_dlf + fio.api_instance = mock_api + return fio + + file_io_1 = make_file_io() + file_io_2 = make_file_io() + + # Cache keys should be the same + self.assertEqual(file_io_1._build_cache_key(), file_io_2._build_cache_key(), + "Same AK on the same table should produce the same cache key") + + # Set token via file_io_1 + shared_token = RESTToken({"fs.oss.accessKeyId": "shared-data-ak"}, future_expiry) + file_io_1._set_cached_token(file_io_1._build_cache_key(), shared_token) + + # file_io_2 should see the same token + cached = file_io_2._get_cached_token(file_io_2._build_cache_key()) + self.assertIsNotNone(cached, "file_io_2 should see file_io_1's cached token") + self.assertEqual(cached.token.get("fs.oss.accessKeyId"), "shared-data-ak") def test_refresh_token_strips_system_table_suffix(self): """refresh_token() strips $snapshots suffix before requesting token.""" @@ -365,7 +485,7 @@ class RESTTokenFileIOTest(unittest.TestCase): mock_response = MagicMock() mock_response.token = {'ak': 'test-ak'} - mock_response.expires_at_millis = int(time.time() * 1000) + 7200_000 + mock_response.expires_at_millis = int(time.time() * 1000) + 7_200_000 mock_api = MagicMock() mock_api.load_table_token.return_value = mock_response @@ -373,7 +493,6 @@ class RESTTokenFileIOTest(unittest.TestCase): file_io.refresh_token() - # Verify load_table_token was called with base table identifier (no $snapshots) called_identifier = mock_api.load_table_token.call_args[0][0] self.assertEqual(called_identifier.get_database_name(), "db") self.assertEqual(called_identifier.get_object_name(), "my_table") @@ -386,7 +505,7 @@ class RESTTokenFileIOTest(unittest.TestCase): mock_response = MagicMock() mock_response.token = {'ak': 'test-ak'} - mock_response.expires_at_millis = int(time.time() * 1000) + 7200_000 + mock_response.expires_at_millis = int(time.time() * 1000) + 7_200_000 mock_api = MagicMock() mock_api.load_table_token.return_value = mock_response @@ -397,43 +516,6 @@ class RESTTokenFileIOTest(unittest.TestCase): called_identifier = mock_api.load_table_token.call_args[0][0] self.assertEqual(called_identifier.get_object_name(), "my_table") - def test_different_instances_do_not_share_token(self): - """Two instances with same identifier get independent tokens (no class-level cache).""" - same_identifier = Identifier.from_string("db.shared_table") - - file_io_a = RESTTokenFileIO( - same_identifier, self.warehouse_path, self.catalog_options) - file_io_b = RESTTokenFileIO( - same_identifier, self.warehouse_path, self.catalog_options) - - token_a = RESTToken({'ak': 'ak-A'}, int(time.time() * 1000) + 7200_000) - token_b = RESTToken({'ak': 'ak-B'}, int(time.time() * 1000) + 7200_000) - - mock_response_a = MagicMock() - mock_response_a.token = token_a.token - mock_response_a.expires_at_millis = token_a.expire_at_millis - - mock_response_b = MagicMock() - mock_response_b.token = token_b.token - mock_response_b.expires_at_millis = token_b.expire_at_millis - - mock_api_a = MagicMock() - mock_api_a.load_table_token.return_value = mock_response_a - file_io_a.api_instance = mock_api_a - - mock_api_b = MagicMock() - mock_api_b.load_table_token.return_value = mock_response_b - file_io_b.api_instance = mock_api_b - - # Refresh both - file_io_a.try_to_refresh_token() - file_io_b.try_to_refresh_token() - - # Each instance should hold its own token - self.assertEqual(file_io_a.token.token['ak'], 'ak-A') - self.assertEqual(file_io_b.token.token['ak'], 'ak-B') - self.assertIsNot(file_io_a.token, file_io_b.token) - if __name__ == '__main__': unittest.main()
