This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 50aa0f131e [python] Fix token cache isolation by adding user identity
to cache key (#7571)
50aa0f131e is described below
commit 50aa0f131e8a0ff3874634a540d1468a59f24762
Author: shyjsarah <[email protected]>
AuthorDate: Wed Apr 1 01:44:22 2026 -0700
[python] Fix token cache isolation by adding user identity to cache key
(#7571)
Revert #7562 and fix token cache pollution with a minimal approach.
**Problem:**
In #7562, the class-level `_TOKEN_CACHE` was completely removed to fix
token pollution across different catalog instances. However, this also
removed the ability to reuse tokens for the same user on the same table,
causing unnecessary token refresh requests.
The root cause of the original pollution was that `_TOKEN_CACHE` was
keyed only by table identifier (`str(self.identifier)`), so different
users (different AK/SK credentials) operating on the same table within
one process would share the same cached data token.
**Fix:**
Instead of removing the cache entirely, this PR adds **user identity**
to the cache key to isolate tokens across different credentials while
preserving token reuse for the same user.
---
.../pypaimon/catalog/rest/rest_token_file_io.py | 76 +++++-
.../pypaimon/tests/rest/rest_token_file_io_test.py | 254 ++++++++++++++-------
2 files changed, 233 insertions(+), 97 deletions(-)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 16b16e6972..7d4553175b 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -27,6 +27,8 @@ from pypaimon.api.rest_util import RESTUtil
from pypaimon.catalog.rest.rest_token import RESTToken
from pypaimon.common.file_io import FileIO
from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
+from pypaimon.api.auth.bearer import BearTokenAuthProvider
+from pypaimon.api.auth.dlf_provider import DLFAuthProvider
from pypaimon.common.identifier import Identifier, SYSTEM_TABLE_SPLITTER
from pypaimon.common.options import Options
from pypaimon.common.options.config import CatalogOptions, OssOptions
@@ -44,6 +46,10 @@ class RESTTokenFileIO(FileIO):
_FILE_IO_CACHE: TTLCache = None
_FILE_IO_CACHE_LOCK = threading.Lock()
+ _TOKEN_CACHE: dict = {}
+ _TOKEN_LOCKS: dict = {}
+ _TOKEN_LOCKS_LOCK = threading.Lock()
+
@classmethod
def _get_file_io_cache(cls) -> TTLCache:
if cls._FILE_IO_CACHE is None:
@@ -69,14 +75,12 @@ class RESTTokenFileIO(FileIO):
self.properties = self.catalog_options or Options({}) # For
compatibility with refresh_token()
self.token: Optional[RESTToken] = None
self.api_instance: Optional[RESTApi] = None
- self.lock = threading.Lock()
self.log = logging.getLogger(__name__)
self._uri_reader_factory_cache: Optional[UriReaderFactory] = None
def __getstate__(self):
state = self.__dict__.copy()
# Remove non-serializable objects
- state.pop('lock', None)
state.pop('api_instance', None)
state.pop('_uri_reader_factory_cache', None)
# token can be serialized, but we'll refresh it on deserialization
@@ -84,8 +88,6 @@ class RESTTokenFileIO(FileIO):
def __setstate__(self, state):
self.__dict__.update(state)
- # Recreate lock after deserialization
- self.lock = threading.Lock()
self._uri_reader_factory_cache = None
# api_instance will be recreated when needed
self.api_instance = None
@@ -201,17 +203,69 @@ class RESTTokenFileIO(FileIO):
def filesystem(self):
return self.file_io().filesystem
+ def _build_cache_key(self) -> str:
+ """Build cache key with user identity to isolate tokens across
different credentials."""
+ return f"{self._get_user_identity()}:{self.identifier}"
+
+ def _get_user_identity(self) -> str:
+ """Get user identity for token cache isolation.
+
+ For DLF auth (AK/SK, STS, ECS Role): returns access_key_id via
get_token().
+ For Bear Token auth: returns the bearer token string.
+ For unknown auth: returns 'anonymous'.
+ """
+ if self.api_instance is None:
+ self.api_instance = RESTApi(self.properties, False)
+ auth_provider = self.api_instance.rest_auth_function.auth_provider
+ if isinstance(auth_provider, DLFAuthProvider):
+ return auth_provider.get_token().access_key_id or 'anonymous'
+ if isinstance(auth_provider, BearTokenAuthProvider):
+ return auth_provider.token or 'anonymous'
+ return 'anonymous'
+
def try_to_refresh_token(self):
- if self._should_refresh():
- with self.lock:
- if self._should_refresh():
- self.refresh_token()
+ identifier_str = self._build_cache_key()
- def _should_refresh(self):
- if self.token is None:
+ if self.token is not None and not self._is_token_expired(self.token):
+ return
+
+ cached_token = self._get_cached_token(identifier_str)
+ if cached_token and not self._is_token_expired(cached_token):
+ self.token = cached_token
+ return
+
+ global_lock = self._get_global_token_lock(identifier_str)
+
+ with global_lock:
+ cached_token = self._get_cached_token(identifier_str)
+ if cached_token and not self._is_token_expired(cached_token):
+ self.token = cached_token
+ return
+
+ token_to_check = cached_token if cached_token else self.token
+ if token_to_check is None or
self._is_token_expired(token_to_check):
+ self.refresh_token()
+ self._set_cached_token(identifier_str, self.token)
+
+ def _get_cached_token(self, identifier_str: str) -> Optional[RESTToken]:
+ with self._TOKEN_LOCKS_LOCK:
+ return self._TOKEN_CACHE.get(identifier_str)
+
+ def _set_cached_token(self, identifier_str: str, token: RESTToken):
+ with self._TOKEN_LOCKS_LOCK:
+ self._TOKEN_CACHE[identifier_str] = token
+
+ def _is_token_expired(self, token: Optional[RESTToken]) -> bool:
+ if token is None:
return True
current_time = int(time.time() * 1000)
- return (self.token.expire_at_millis - current_time) <
RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS
+ return (token.expire_at_millis - current_time) <
RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS
+
+ def _get_global_token_lock(self, identifier_str: str) -> threading.Lock:
+ with self._TOKEN_LOCKS_LOCK:
+ if identifier_str not in self._TOKEN_LOCKS:
+ self._TOKEN_LOCKS[identifier_str] = threading.Lock()
+ return self._TOKEN_LOCKS[identifier_str]
def refresh_token(self):
self.log.info(f"begin refresh data token for identifier
[{self.identifier}]")
diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
index cdcd5ed36c..b94ad58b24 100644
--- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
@@ -148,9 +148,6 @@ class RESTTokenFileIOTest(unittest.TestCase):
self.catalog_options
)
- self.assertTrue(hasattr(original_file_io, 'lock'))
- self.assertIsNotNone(original_file_io.lock)
-
pickled = pickle.dumps(original_file_io)
deserialized_file_io = pickle.loads(pickled)
@@ -159,10 +156,6 @@ class RESTTokenFileIOTest(unittest.TestCase):
self.assertEqual(deserialized_file_io.path, original_file_io.path)
self.assertEqual(deserialized_file_io.properties.data,
original_file_io.properties.data)
- self.assertTrue(hasattr(deserialized_file_io, 'lock'))
- self.assertIsNotNone(deserialized_file_io.lock)
- self.assertIsNot(deserialized_file_io.lock, original_file_io.lock)
-
self.assertIsNone(deserialized_file_io.api_instance)
test_file_path = f"file://{self.temp_dir}/pickle_test.txt"
@@ -306,56 +299,183 @@ class RESTTokenFileIOTest(unittest.TestCase):
self.assertIsNotNone(restored_file_io.uri_reader_factory,
"uri_reader_factory should work after
deserialization")
- def test_should_refresh_when_token_is_none(self):
- """_should_refresh() returns True when token is None."""
- with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
- file_io = RESTTokenFileIO(
- self.identifier, self.warehouse_path, self.catalog_options)
- self.assertIsNone(file_io.token)
- self.assertTrue(file_io._should_refresh())
+ def test_build_cache_key_with_dlf_auth(self):
+ """Test that _build_cache_key includes DLF access_key_id for cache
isolation."""
+ from pypaimon.api.auth.dlf_provider import DLFAuthProvider
+ from pypaimon.api.token_loader import DLFToken
+
+ catalog_options = Options({
+ CatalogOptions.URI.key(): "http://test-uri",
+ CatalogOptions.TOKEN_PROVIDER.key(): "dlf",
+ CatalogOptions.DLF_REGION.key(): "cn-hangzhou",
+ CatalogOptions.DLF_ACCESS_KEY_ID.key(): "test-ak-123",
+ CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "test-sk-456",
+ })
- def test_should_refresh_when_token_not_expired(self):
- """_should_refresh() returns False when token is far from expiry."""
with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
file_io = RESTTokenFileIO(
- self.identifier, self.warehouse_path, self.catalog_options)
- # Token that expires 2 hours from now (well beyond the 1-hour safe
margin)
- future_millis = int(time.time() * 1000) + 7200_000
- file_io.token = RESTToken({'ak': 'v'}, future_millis)
- self.assertFalse(file_io._should_refresh())
-
- def test_should_refresh_when_token_expired(self):
- """_should_refresh() returns True when token is already expired."""
+ self.identifier,
+ self.warehouse_path,
+ catalog_options
+ )
+
+ mock_api = MagicMock()
+ mock_dlf_provider = MagicMock(spec=DLFAuthProvider)
+ mock_dlf_provider.get_token.return_value = DLFToken(
+ access_key_id="test-ak-123",
+ access_key_secret="test-sk-456",
+ security_token=None
+ )
+ mock_api.rest_auth_function.auth_provider = mock_dlf_provider
+ file_io.api_instance = mock_api
+
+ cache_key = file_io._build_cache_key()
+ self.assertTrue(cache_key.startswith("test-ak-123:"),
+ f"Cache key should start with access_key_id, got:
{cache_key}")
+ self.assertIn(str(self.identifier), cache_key)
+
+ def test_build_cache_key_with_bear_auth(self):
+ """Test that _build_cache_key includes bearer token for cache
isolation."""
+ from pypaimon.api.auth.bearer import BearTokenAuthProvider
+
+ catalog_options = Options({
+ CatalogOptions.URI.key(): "http://test-uri",
+ CatalogOptions.TOKEN_PROVIDER.key(): "bear",
+ CatalogOptions.TOKEN.key(): "my-bearer-token",
+ })
+
with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
file_io = RESTTokenFileIO(
- self.identifier, self.warehouse_path, self.catalog_options)
- # Token that expired 1 second ago
- past_millis = int(time.time() * 1000) - 1000
- file_io.token = RESTToken({'ak': 'v'}, past_millis)
- self.assertTrue(file_io._should_refresh())
-
- def test_try_to_refresh_token_calls_refresh_once(self):
- """try_to_refresh_token() calls refresh_token() exactly once via
double-check."""
- file_io = RESTTokenFileIO(
- self.identifier, self.warehouse_path, self.catalog_options)
- self.assertIsNone(file_io.token)
-
- mock_response = MagicMock()
- mock_response.token = {'ak': 'test-ak'}
- mock_response.expires_at_millis = int(time.time() * 1000) + 7200_000
+ self.identifier,
+ self.warehouse_path,
+ catalog_options
+ )
mock_api = MagicMock()
- mock_api.load_table_token.return_value = mock_response
+ mock_bear_provider = MagicMock(spec=BearTokenAuthProvider)
+ mock_bear_provider.token = "my-bearer-token"
+ mock_api.rest_auth_function.auth_provider = mock_bear_provider
file_io.api_instance = mock_api
- file_io.try_to_refresh_token()
+ cache_key = file_io._build_cache_key()
+ self.assertTrue(cache_key.startswith("my-bearer-token:"),
+ f"Cache key should start with bearer token, got:
{cache_key}")
+ self.assertIn(str(self.identifier), cache_key)
+
+ def test_different_ak_same_table_token_isolation(self):
+ """Test that two RESTTokenFileIO instances with different AKs on the
same table
+ do not share cached tokens."""
+ from pypaimon.api.auth.dlf_provider import DLFAuthProvider
+ from pypaimon.api.token_loader import DLFToken
+
+ # Clear class-level token cache before test
+ RESTTokenFileIO._TOKEN_CACHE.clear()
+ RESTTokenFileIO._TOKEN_LOCKS.clear()
- mock_api.load_table_token.assert_called_once()
- self.assertIsNotNone(file_io.token)
+ identifier = Identifier.from_string("db.same_table")
+ future_expiry = int(time.time() * 1000) + 7_200_000 # 2 hours from now
- # Second call should NOT trigger refresh again (token is valid)
- file_io.try_to_refresh_token()
- mock_api.load_table_token.assert_called_once()
+ # Create file_io_1 with AK "read-ak"
+ with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+ file_io_1 = RESTTokenFileIO(
+ identifier, self.warehouse_path,
+ Options({
+ CatalogOptions.URI.key(): "http://test-uri",
+ CatalogOptions.TOKEN_PROVIDER.key(): "dlf",
+ CatalogOptions.DLF_REGION.key(): "cn-hangzhou",
+ CatalogOptions.DLF_ACCESS_KEY_ID.key(): "read-ak",
+ CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "read-sk",
+ })
+ )
+
+ mock_api_1 = MagicMock()
+ mock_dlf_1 = MagicMock(spec=DLFAuthProvider)
+ mock_dlf_1.get_token.return_value = DLFToken("read-ak", "read-sk",
None)
+ mock_api_1.rest_auth_function.auth_provider = mock_dlf_1
+ file_io_1.api_instance = mock_api_1
+
+ read_token = RESTToken({"fs.oss.accessKeyId": "read-data-ak"},
future_expiry)
+ file_io_1.token = read_token
+ cache_key_1 = file_io_1._build_cache_key()
+ file_io_1._set_cached_token(cache_key_1, read_token)
+
+ # Create file_io_2 with AK "write-ak"
+ with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+ file_io_2 = RESTTokenFileIO(
+ identifier, self.warehouse_path,
+ Options({
+ CatalogOptions.URI.key(): "http://test-uri",
+ CatalogOptions.TOKEN_PROVIDER.key(): "dlf",
+ CatalogOptions.DLF_REGION.key(): "cn-hangzhou",
+ CatalogOptions.DLF_ACCESS_KEY_ID.key(): "write-ak",
+ CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "write-sk",
+ })
+ )
+
+ mock_api_2 = MagicMock()
+ mock_dlf_2 = MagicMock(spec=DLFAuthProvider)
+ mock_dlf_2.get_token.return_value = DLFToken("write-ak", "write-sk",
None)
+ mock_api_2.rest_auth_function.auth_provider = mock_dlf_2
+ file_io_2.api_instance = mock_api_2
+
+ cache_key_2 = file_io_2._build_cache_key()
+
+ # Cache keys should be different
+ self.assertNotEqual(cache_key_1, cache_key_2,
+ "Different AKs on the same table should produce
different cache keys")
+
+ # file_io_2 should NOT get file_io_1's cached token
+ cached_for_2 = file_io_2._get_cached_token(cache_key_2)
+ self.assertIsNone(cached_for_2,
+ "file_io_2 should not see file_io_1's cached token")
+
+ def test_same_ak_same_table_token_reuse(self):
+ """Test that two RESTTokenFileIO instances with the same AK on the
same table
+ can share cached tokens."""
+ from pypaimon.api.auth.dlf_provider import DLFAuthProvider
+ from pypaimon.api.token_loader import DLFToken
+
+ # Clear class-level token cache before test
+ RESTTokenFileIO._TOKEN_CACHE.clear()
+ RESTTokenFileIO._TOKEN_LOCKS.clear()
+
+ identifier = Identifier.from_string("db.same_table")
+ future_expiry = int(time.time() * 1000) + 7_200_000
+
+ def make_file_io():
+ with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+ fio = RESTTokenFileIO(
+ identifier, self.warehouse_path,
+ Options({
+ CatalogOptions.URI.key(): "http://test-uri",
+ CatalogOptions.TOKEN_PROVIDER.key(): "dlf",
+ CatalogOptions.DLF_REGION.key(): "cn-hangzhou",
+ CatalogOptions.DLF_ACCESS_KEY_ID.key(): "same-ak",
+ CatalogOptions.DLF_ACCESS_KEY_SECRET.key(): "same-sk",
+ })
+ )
+ mock_api = MagicMock()
+ mock_dlf = MagicMock(spec=DLFAuthProvider)
+ mock_dlf.get_token.return_value = DLFToken("same-ak", "same-sk",
None)
+ mock_api.rest_auth_function.auth_provider = mock_dlf
+ fio.api_instance = mock_api
+ return fio
+
+ file_io_1 = make_file_io()
+ file_io_2 = make_file_io()
+
+ # Cache keys should be the same
+ self.assertEqual(file_io_1._build_cache_key(),
file_io_2._build_cache_key(),
+ "Same AK on the same table should produce the same
cache key")
+
+ # Set token via file_io_1
+ shared_token = RESTToken({"fs.oss.accessKeyId": "shared-data-ak"},
future_expiry)
+ file_io_1._set_cached_token(file_io_1._build_cache_key(), shared_token)
+
+ # file_io_2 should see the same token
+ cached = file_io_2._get_cached_token(file_io_2._build_cache_key())
+ self.assertIsNotNone(cached, "file_io_2 should see file_io_1's cached
token")
+ self.assertEqual(cached.token.get("fs.oss.accessKeyId"),
"shared-data-ak")
def test_refresh_token_strips_system_table_suffix(self):
"""refresh_token() strips $snapshots suffix before requesting token."""
@@ -365,7 +485,7 @@ class RESTTokenFileIOTest(unittest.TestCase):
mock_response = MagicMock()
mock_response.token = {'ak': 'test-ak'}
- mock_response.expires_at_millis = int(time.time() * 1000) + 7200_000
+ mock_response.expires_at_millis = int(time.time() * 1000) + 7_200_000
mock_api = MagicMock()
mock_api.load_table_token.return_value = mock_response
@@ -373,7 +493,6 @@ class RESTTokenFileIOTest(unittest.TestCase):
file_io.refresh_token()
- # Verify load_table_token was called with base table identifier (no
$snapshots)
called_identifier = mock_api.load_table_token.call_args[0][0]
self.assertEqual(called_identifier.get_database_name(), "db")
self.assertEqual(called_identifier.get_object_name(), "my_table")
@@ -386,7 +505,7 @@ class RESTTokenFileIOTest(unittest.TestCase):
mock_response = MagicMock()
mock_response.token = {'ak': 'test-ak'}
- mock_response.expires_at_millis = int(time.time() * 1000) + 7200_000
+ mock_response.expires_at_millis = int(time.time() * 1000) + 7_200_000
mock_api = MagicMock()
mock_api.load_table_token.return_value = mock_response
@@ -397,43 +516,6 @@ class RESTTokenFileIOTest(unittest.TestCase):
called_identifier = mock_api.load_table_token.call_args[0][0]
self.assertEqual(called_identifier.get_object_name(), "my_table")
- def test_different_instances_do_not_share_token(self):
- """Two instances with same identifier get independent tokens (no
class-level cache)."""
- same_identifier = Identifier.from_string("db.shared_table")
-
- file_io_a = RESTTokenFileIO(
- same_identifier, self.warehouse_path, self.catalog_options)
- file_io_b = RESTTokenFileIO(
- same_identifier, self.warehouse_path, self.catalog_options)
-
- token_a = RESTToken({'ak': 'ak-A'}, int(time.time() * 1000) + 7200_000)
- token_b = RESTToken({'ak': 'ak-B'}, int(time.time() * 1000) + 7200_000)
-
- mock_response_a = MagicMock()
- mock_response_a.token = token_a.token
- mock_response_a.expires_at_millis = token_a.expire_at_millis
-
- mock_response_b = MagicMock()
- mock_response_b.token = token_b.token
- mock_response_b.expires_at_millis = token_b.expire_at_millis
-
- mock_api_a = MagicMock()
- mock_api_a.load_table_token.return_value = mock_response_a
- file_io_a.api_instance = mock_api_a
-
- mock_api_b = MagicMock()
- mock_api_b.load_table_token.return_value = mock_response_b
- file_io_b.api_instance = mock_api_b
-
- # Refresh both
- file_io_a.try_to_refresh_token()
- file_io_b.try_to_refresh_token()
-
- # Each instance should hold its own token
- self.assertEqual(file_io_a.token.token['ak'], 'ak-A')
- self.assertEqual(file_io_b.token.token['ak'], 'ak-B')
- self.assertIsNot(file_io_a.token, file_io_b.token)
-
if __name__ == '__main__':
unittest.main()