Copilot commented on code in PR #25293:
URL: https://github.com/apache/pulsar/pull/25293#discussion_r2904185652


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java:
##########
@@ -33,9 +33,9 @@ public class SimpleCache<K, V> {
     private final long timeoutMs;
 
     @RequiredArgsConstructor
-    private class ExpirableValue<V> {
+    public class ExpirableValue<V> {

Review Comment:
   The inner class `ExpirableValue` shadows its own outer class's type 
parameter `V` by declaring `<V>` again. While this compiles, it means the inner 
`V` is a different type parameter than the outer `V`, which is confusing and 
could lead to subtle type mismatches. The type parameter on `ExpirableValue` 
should be removed since it should use the enclosing class's `V`.
   ```suggestion
       public class ExpirableValue {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java:
##########
@@ -48,7 +48,7 @@ boolean tryExpire() {
             }
         }
 
-        void updateDeadline() {
+        public void updateDeadline() {
             deadlineMs = System.currentTimeMillis() + timeoutMs;
         }

Review Comment:
   `updateDeadline()` is now public and called from `TableView.getReader()` 
outside of any synchronized block (line 116 of `TableView.java`), but 
`deadlineMs` is a plain field with no synchronization or volatile semantics. 
The cache's `tryExpire()` reads `deadlineMs` from the scheduled cleanup thread. 
This creates a data race — the cleanup thread may not see the updated deadline. 
Either `deadlineMs` should be declared `volatile`, or `updateDeadline()` should 
synchronize on the parent cache's lock.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TableViewTest.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TableViewTest {
+
+    @Test
+    public void testFailedCreateReader() throws Exception {
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        Function<TopicName, 
CompletableFuture<SystemTopicClient.Reader<Object>>> readerCreator = topicName 
-> {
+            return FutureUtil.failedFuture(new PulsarClientException("test 
failed to create reader"));
+        };
+        TableView<Object> tableView = new TableView<>(readerCreator, 60000, 
executor);
+        try {
+            tableView.readLatest("public/default/tp");

Review Comment:
   If `readLatest` does not throw an exception, the test silently passes 
without verifying anything. This should use `Assert.assertThrows()` or add 
`Assert.fail("Expected exception")` after the `readLatest` call inside the 
`try` block to ensure the exception is actually thrown.
   ```suggestion
               tableView.readLatest("public/default/tp");
               Assert.fail("Expected exception when creating reader");
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TableViewTest.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TableViewTest {
+
+    @Test
+    public void testFailedCreateReader() throws Exception {
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        Function<TopicName, 
CompletableFuture<SystemTopicClient.Reader<Object>>> readerCreator = topicName 
-> {
+            return FutureUtil.failedFuture(new PulsarClientException("test 
failed to create reader"));
+        };
+        TableView<Object> tableView = new TableView<>(readerCreator, 60000, 
executor);
+        try {
+            tableView.readLatest("public/default/tp");
+        } catch (Exception e) {
+            Assert.assertTrue(e.getMessage().contains("Failed to create 
reader"));

Review Comment:
   The `ScheduledExecutorService` created here is never shut down after the 
test completes. This leaks a thread. Add a try-finally or `@AfterMethod` to 
call `executor.shutdownNow()`.
   ```suggestion
           try {
               Function<TopicName, 
CompletableFuture<SystemTopicClient.Reader<Object>>> readerCreator = topicName 
-> {
                   return FutureUtil.failedFuture(new 
PulsarClientException("test failed to create reader"));
               };
               TableView<Object> tableView = new TableView<>(readerCreator, 
60000, executor);
               try {
                   tableView.readLatest("public/default/tp");
               } catch (Exception e) {
                   Assert.assertTrue(e.getMessage().contains("Failed to create 
reader"));
               }
           } finally {
               executor.shutdownNow();
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java:
##########
@@ -89,16 +105,16 @@ private T internalReadLatest(String topic) throws 
Exception {
     @VisibleForTesting
     protected Reader<T> getReader(String topic) {
         final var topicName = TopicName.get(topic);
-        return readers.get(topicName.getNamespaceObject(), () -> {
-            try {
-                return wait(readerCreator.apply(topicName), "create reader");
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }, __ -> __.closeAsync().exceptionally(e -> {
-            log.warn("Failed to close reader {}", e.getMessage());
-            return null;
-        }));
+        NamespaceName ns = topicName.getNamespaceObject();
+        SimpleCache<NamespaceName, 
CompletableFuture<Reader<T>>>.ExpirableValue<CompletableFuture<Reader<T>>>
+                cachedReaderFuture = readers.getWithCacheInfo(ns, () -> 
readerCreator.apply(topicName), expiration);
+        try {
+            return wait(cachedReaderFuture.value, "create reader");
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            cachedReaderFuture.updateDeadline();
+        }

Review Comment:
   When `wait()` throws (reader creation fails), the failed `CompletableFuture` 
remains in the cache with an updated deadline. Subsequent calls to 
`getReader()` for the same namespace will retrieve this already-failed future 
and immediately fail again, without ever retrying reader creation, until the 
cache entry expires. The failed future should be removed from the cache on 
exception so that the next caller gets a fresh attempt. Consider calling 
`readers.remove(ns)` in the `catch` block before rethrowing.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java:
##########
@@ -48,13 +49,28 @@ public class TableView<T> {
     protected final Function<TopicName, CompletableFuture<Reader<T>>> 
readerCreator;
     private final Map<String, T> snapshots = new ConcurrentHashMap<>();
     private final long clientOperationTimeoutMs;
-    private final SimpleCache<NamespaceName, Reader<T>> readers;
+    private final SimpleCache<NamespaceName, CompletableFuture<Reader<T>>> 
readers;
+    private final Consumer<CompletableFuture<Reader<T>>> expiration = 
readerFuture -> {
+        if (!readerFuture.isDone()) {
+            readerFuture.handle((reader, t) -> {
+                if (reader != null) {
+                    closeReader(reader);
+                }
+                return null;
+            });
+        } else if (!readerFuture.isCompletedExceptionally()) {
+            // Since the future has done, it will not wait anymore.
+            closeReader(readerFuture.join());
+        }

Review Comment:
   There is a TOCTOU race between `isDone()` / `isCompletedExceptionally()` 
checks and `join()`. Between line 54 checking `!isDone()` (false) and line 61 
checking `!isCompletedExceptionally()`, the future's state could theoretically 
change if another thread completes it exceptionally. More concretely, if 
`isDone()` returns `true` but the future completed with a 
`CancellationException`, `join()` on line 63 will throw. A safer approach is to 
use a single `handle()` / `whenComplete()` call unconditionally, which covers 
all terminal states (success, exception, cancellation) without race windows. 
For example, always call `readerFuture.whenComplete((reader, t) -> { if (reader 
!= null) closeReader(reader); })`.
   ```suggestion
           readerFuture.whenComplete((reader, t) -> {
               if (reader != null) {
                   closeReader(reader);
               }
           });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to