Copilot commented on code in PR #25293:
URL: https://github.com/apache/pulsar/pull/25293#discussion_r2904185652
##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java:
##########
@@ -33,9 +33,9 @@ public class SimpleCache<K, V> {
private final long timeoutMs;
@RequiredArgsConstructor
- private class ExpirableValue<V> {
+ public class ExpirableValue<V> {
Review Comment:
The inner class `ExpirableValue` shadows its own outer class's type
parameter `V` by declaring `<V>` again. While this compiles, it means the inner
`V` is a different type parameter than the outer `V`, which is confusing and
could lead to subtle type mismatches. The type parameter on `ExpirableValue`
should be removed since it should use the enclosing class's `V`.
```suggestion
public class ExpirableValue {
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java:
##########
@@ -48,7 +48,7 @@ boolean tryExpire() {
}
}
- void updateDeadline() {
+ public void updateDeadline() {
deadlineMs = System.currentTimeMillis() + timeoutMs;
}
Review Comment:
`updateDeadline()` is now public and called from `TableView.getReader()`
outside of any synchronized block (line 116 of `TableView.java`), but
`deadlineMs` is a plain field with no synchronization or volatile semantics.
The cache's `tryExpire()` reads `deadlineMs` from the scheduled cleanup thread.
This creates a data race — the cleanup thread may not see the updated deadline.
Either `deadlineMs` should be declared `volatile`, or `updateDeadline()` should
synchronize on the parent cache's lock.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TableViewTest.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TableViewTest {
+
+ @Test
+ public void testFailedCreateReader() throws Exception {
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ Function<TopicName,
CompletableFuture<SystemTopicClient.Reader<Object>>> readerCreator = topicName
-> {
+ return FutureUtil.failedFuture(new PulsarClientException("test
failed to create reader"));
+ };
+ TableView<Object> tableView = new TableView<>(readerCreator, 60000,
executor);
+ try {
+ tableView.readLatest("public/default/tp");
Review Comment:
If `readLatest` does not throw an exception, the test silently passes
without verifying anything. This should use `Assert.assertThrows()` or add
`Assert.fail("Expected exception")` after the `readLatest` call inside the
`try` block to ensure the exception is actually thrown.
```suggestion
tableView.readLatest("public/default/tp");
Assert.fail("Expected exception when creating reader");
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TableViewTest.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TableViewTest {
+
+ @Test
+ public void testFailedCreateReader() throws Exception {
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ Function<TopicName,
CompletableFuture<SystemTopicClient.Reader<Object>>> readerCreator = topicName
-> {
+ return FutureUtil.failedFuture(new PulsarClientException("test
failed to create reader"));
+ };
+ TableView<Object> tableView = new TableView<>(readerCreator, 60000,
executor);
+ try {
+ tableView.readLatest("public/default/tp");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Failed to create
reader"));
Review Comment:
The `ScheduledExecutorService` created here is never shut down after the
test completes. This leaks a thread. Add a try-finally or `@AfterMethod` to
call `executor.shutdownNow()`.
```suggestion
try {
Function<TopicName,
CompletableFuture<SystemTopicClient.Reader<Object>>> readerCreator = topicName
-> {
return FutureUtil.failedFuture(new
PulsarClientException("test failed to create reader"));
};
TableView<Object> tableView = new TableView<>(readerCreator,
60000, executor);
try {
tableView.readLatest("public/default/tp");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Failed to create
reader"));
}
} finally {
executor.shutdownNow();
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java:
##########
@@ -89,16 +105,16 @@ private T internalReadLatest(String topic) throws
Exception {
@VisibleForTesting
protected Reader<T> getReader(String topic) {
final var topicName = TopicName.get(topic);
- return readers.get(topicName.getNamespaceObject(), () -> {
- try {
- return wait(readerCreator.apply(topicName), "create reader");
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, __ -> __.closeAsync().exceptionally(e -> {
- log.warn("Failed to close reader {}", e.getMessage());
- return null;
- }));
+ NamespaceName ns = topicName.getNamespaceObject();
+ SimpleCache<NamespaceName,
CompletableFuture<Reader<T>>>.ExpirableValue<CompletableFuture<Reader<T>>>
+ cachedReaderFuture = readers.getWithCacheInfo(ns, () ->
readerCreator.apply(topicName), expiration);
+ try {
+ return wait(cachedReaderFuture.value, "create reader");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ cachedReaderFuture.updateDeadline();
+ }
Review Comment:
When `wait()` throws (reader creation fails), the failed `CompletableFuture`
remains in the cache with an updated deadline. Subsequent calls to
`getReader()` for the same namespace will retrieve this already-failed future
and immediately fail again, without ever retrying reader creation, until the
cache entry expires. The failed future should be removed from the cache on
exception so that the next caller gets a fresh attempt. Consider calling
`readers.remove(ns)` in the `catch` block before rethrowing.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java:
##########
@@ -48,13 +49,28 @@ public class TableView<T> {
protected final Function<TopicName, CompletableFuture<Reader<T>>>
readerCreator;
private final Map<String, T> snapshots = new ConcurrentHashMap<>();
private final long clientOperationTimeoutMs;
- private final SimpleCache<NamespaceName, Reader<T>> readers;
+ private final SimpleCache<NamespaceName, CompletableFuture<Reader<T>>>
readers;
+ private final Consumer<CompletableFuture<Reader<T>>> expiration =
readerFuture -> {
+ if (!readerFuture.isDone()) {
+ readerFuture.handle((reader, t) -> {
+ if (reader != null) {
+ closeReader(reader);
+ }
+ return null;
+ });
+ } else if (!readerFuture.isCompletedExceptionally()) {
+ // Since the future has done, it will not wait anymore.
+ closeReader(readerFuture.join());
+ }
Review Comment:
There is a TOCTOU race between `isDone()` / `isCompletedExceptionally()`
checks and `join()`. Between line 54 checking `!isDone()` (false) and line 61
checking `!isCompletedExceptionally()`, the future's state could theoretically
change if another thread completes it exceptionally. More concretely, if
`isDone()` returns `true` but the future completed with a
`CancellationException`, `join()` on line 63 will throw. A safer approach is to
use a single `handle()` / `whenComplete()` call unconditionally, which covers
all terminal states (success, exception, cancellation) without race windows.
For example, always call `readerFuture.whenComplete((reader, t) -> { if (reader
!= null) closeReader(reader); })`.
```suggestion
readerFuture.whenComplete((reader, t) -> {
if (reader != null) {
closeReader(reader);
}
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]