poorbarcode commented on code in PR #25293:
URL: https://github.com/apache/pulsar/pull/25293#discussion_r2940545752


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java:
##########
@@ -33,9 +33,9 @@ public class SimpleCache<K, V> {
     private final long timeoutMs;
 
     @RequiredArgsConstructor
-    private class ExpirableValue<V> {
+    public class ExpirableValue<V> {

Review Comment:
   Improved



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java:
##########
@@ -89,16 +105,16 @@ private T internalReadLatest(String topic) throws 
Exception {
     @VisibleForTesting
     protected Reader<T> getReader(String topic) {
         final var topicName = TopicName.get(topic);
-        return readers.get(topicName.getNamespaceObject(), () -> {
-            try {
-                return wait(readerCreator.apply(topicName), "create reader");
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }, __ -> __.closeAsync().exceptionally(e -> {
-            log.warn("Failed to close reader {}", e.getMessage());
-            return null;
-        }));
+        NamespaceName ns = topicName.getNamespaceObject();
+        SimpleCache<NamespaceName, 
CompletableFuture<Reader<T>>>.ExpirableValue<CompletableFuture<Reader<T>>>
+                cachedReaderFuture = readers.getWithCacheInfo(ns, () -> 
readerCreator.apply(topicName), expiration);
+        try {
+            return wait(cachedReaderFuture.value, "create reader");
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            cachedReaderFuture.updateDeadline();
+        }

Review Comment:
   Fixed



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java:
##########
@@ -48,13 +49,28 @@ public class TableView<T> {
     protected final Function<TopicName, CompletableFuture<Reader<T>>> 
readerCreator;
     private final Map<String, T> snapshots = new ConcurrentHashMap<>();
     private final long clientOperationTimeoutMs;
-    private final SimpleCache<NamespaceName, Reader<T>> readers;
+    private final SimpleCache<NamespaceName, CompletableFuture<Reader<T>>> 
readers;
+    private final Consumer<CompletableFuture<Reader<T>>> expiration = 
readerFuture -> {
+        if (!readerFuture.isDone()) {
+            readerFuture.handle((reader, t) -> {
+                if (reader != null) {
+                    closeReader(reader);
+                }
+                return null;
+            });
+        } else if (!readerFuture.isCompletedExceptionally()) {
+            // Since the future has done, it will not wait anymore.
+            closeReader(readerFuture.join());
+        }

Review Comment:
   Improved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to