This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e417aca957e [fix][ml] Negative backlog & acked positions does not
exist & message lost when concurrently occupying topic owner (#24722)
e417aca957e is described below
commit e417aca957e7abe3f33525beaeea4fc3ee42f560
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 16 11:19:55 2025 +0800
[fix][ml] Negative backlog & acked positions does not exist & message lost
when concurrently occupying topic owner (#24722)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 44 +++++++-
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 14 ++-
.../pulsar/broker/service/BkEnsemblesTestBase.java | 74 ++++++++++++
.../broker/service/BrokerBkEnsemblesTest.java | 125 +++++++++++++++++++++
4 files changed, 251 insertions(+), 6 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a4527261c8b..d157677d210 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -136,6 +136,7 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils;
import org.apache.bookkeeper.net.BookieId;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
@@ -261,6 +262,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
private static final String MIGRATION_STATE_PROPERTY = "migrated";
+ // If "currentLedger" was closed by the component "auto-replication",
Managed ledger will determine the LAC from
+ // ledger metadata stored. This variable used to record the result of the
latest once re-checking.
+ private Pair<Long, CompletableFuture<Integer>> ledgerRecheckInProgress =
new ImmutablePair<>(-1L,
+ CompletableFuture.completedFuture(Code.OK));
public enum State {
None, // Uninitialized
@@ -1878,10 +1883,43 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
}
+ synchronized void addEntryFailedDueToConcurrentlyModified(final
LedgerHandle currentLedger, int errorCode) {
+ // The method "addEntryFailedDueToConcurrentlyModified" will be
triggered for a certain ledger at most once.
+ if (ledgerRecheckInProgress.getLeft() == currentLedger.getId()) {
+ return;
+ }
+ ledgerRecheckInProgress = new ImmutablePair<>(currentLedger.getId(),
new CompletableFuture<>());
+ bookKeeper.asyncOpenLedger(currentLedger.getId(), digestType,
config.getPassword(), (rc, lh, ctx) -> {
+ ledgerRecheckInProgress.getRight().complete(rc);
+ if (rc == Code.OK) {
+ log.info("[{}] Successfully opened ledger {} to check the last
add confirmed position when the ledger"
+ + " was concurrently modified(the ledger may be closed
by auto-replication)."
+ + " The last add confirmed position in memory is {},
and the value"
+ + " stored in metadata store is {}.", name,
lh.getId(), currentLedger.getLastAddConfirmed(),
+ lh.getLastAddConfirmed());
+ ledgerClosed(currentLedger, lh.getLastAddConfirmed());
+ } else {
+ log.error("[{}] Fencing the topic to ensure durability and
consistency(the current ledger was"
+ + " concurrent modified by a other bookie client, which is
not expected)."
+ + " Current ledger: {}, lastAddConfirmed: {} (the value
stored may be larger), error coder: {}.",
+ name, currentLedger.getId(),
currentLedger.getLastAddConfirmed(), rc);
+ // Stop switching ledger and write topic metadata, to avoid
messages lost. The doc of
+ // LedgerHandle also mentioned this:
https://github.com/apache/bookkeeper/blob/release-4.17.2/
+ //
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L2047-L2048.
+ handleBadVersion(new BadVersionException("the current ledger "
+ currentLedger.getId()
+ + " was concurrent modified by a other bookie client. The
error code is: " + errorCode));
+ }
+ }, null);
+ }
+
+ synchronized void ledgerClosed(final LedgerHandle lh) {
+ ledgerClosed(lh, null);
+ }
+
// //////////////////////////////////////////////////////////////////////
// Private helpers
- synchronized void ledgerClosed(final LedgerHandle lh) {
+ synchronized void ledgerClosed(final LedgerHandle lh, Long
lastAddConfirmed) {
final State state = STATE_UPDATER.get(this);
LedgerHandle currentLedger = this.currentLedger;
if (currentLedger == lh && (state == State.ClosingLedger || state ==
State.LedgerOpened)) {
@@ -1896,7 +1934,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return;
}
- long entriesInLedger = lh.getLastAddConfirmed() + 1;
+ long entriesInLedger = lastAddConfirmed != null ? lastAddConfirmed + 1
: lh.getLastAddConfirmed() + 1;
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger has been closed id={} entries={}", name,
lh.getId(), entriesInLedger);
}
@@ -4501,7 +4539,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
opAddEntry.ledger != null ? opAddEntry.ledger.getId()
: -1,
opAddEntry.entryId, timeoutSec);
currentLedgerTimeoutTriggered.set(true);
- opAddEntry.handleAddFailure(opAddEntry.ledger);
+ opAddEntry.handleAddFailure(opAddEntry.ledger, null);
}
}
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 3af5618064f..bcd87e62629 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -231,7 +231,7 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable, Managed
}
if (rc != BKException.Code.OK || timeoutTriggered.get()) {
- handleAddFailure(lh);
+ handleAddFailure(lh, rc);
} else {
// Trigger addComplete callback in a thread hashed on the managed
ledger name
ml.getExecutor().execute(this);
@@ -351,7 +351,7 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable, Managed
*
* @param lh
*/
- void handleAddFailure(final LedgerHandle lh) {
+ void handleAddFailure(final LedgerHandle lh, Integer rc) {
// If we get a write error, we will try to create a new ledger and
re-submit the pending writes. If the
// ledger creation fails (persistent bk failure, another instance
owning the ML, ...), then the writes will
// be marked as failed.
@@ -361,7 +361,15 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable, Managed
finalMl.getExecutor().execute(() -> {
// Force the creation of a new ledger. Doing it in a background
thread to avoid acquiring ML lock
// from a BK callback.
- finalMl.ledgerClosed(lh);
+ // If we received a "MetadataVersionException" or a
"LedgerFencedException", we should tell the ML that
+ // the ledger has been closed by others, and the entries count in
the ledger may is not correct. The ML
+ // will handle it.
+ if (rc != null && (rc.intValue() ==
BKException.Code.MetadataVersionException
+ || rc.intValue() ==
BKException.Code.LedgerFencedException)) {
+ finalMl.addEntryFailedDueToConcurrentlyModified(lh, rc);
+ } else {
+ finalMl.ledgerClosed(lh);
+ }
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index bbdc2ec0d32..15ca6cda72d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -19,17 +19,36 @@
package org.apache.pulsar.broker.service;
import com.google.common.collect.Sets;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.Closeable;
import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieClientImpl;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.proto.PerChannelBookieClient;
+import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -131,4 +150,59 @@ public abstract class BkEnsemblesTestBase extends
TestRetrySupport {
}
}
+ /***
+ * Inject a delay for the following requests to the specified bookie in
the ensemble of the current ledger of
+ * the topic.
+ * @param bkIndexOfEnsemble the bk index of the ensemble.
+ * @return a cancellation of the injection.
+ */
+ protected Closeable injectBKServerDelayForCurrentLedger(String topic, long
delayTime, TimeUnit unit,
+ int
bkIndexOfEnsemble) throws Exception {
+ // Make an injection to let the next publishing delay.
+ ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl)
pulsar.getDefaultManagedLedgerFactory();
+ BookieClientImpl bookieClient =
+ (BookieClientImpl)
mlFactory.getBookKeeper().get().getClientCtx().getBookieClient();
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopic(topic, false).join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ LedgerHandle ledgerHandle = ml.getCurrentLedger();
+ BookieId bookieId1 =
ledgerHandle.getLedgerMetadata().getEnsembleAt(bkIndexOfEnsemble).get(0);
+ PerChannelBookieClientPool perChannelBookieClientPool =
bookieClient.lookupClient(bookieId1);
+ CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
+ perChannelBookieClientPool.obtain(new
BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() {
+ @Override
+ public void operationComplete(int rc, PerChannelBookieClient
result) {
+ channelFuture.complete(WhiteboxImpl.getInternalState(result,
"channel"));
+ }
+ }, ledgerHandle.getId());
+ Channel channel = channelFuture.get();
+
+ ScheduledExecutorService bkServerIoMock =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("bk-server-io-mock"));
+ channel.eventLoop().execute(() -> {
+ // To avoid client IO thread being stuck, we use a separate event
loop to mock the server delay. And the
+ // response handling is still in the client IO thread.
+ channel.pipeline().addAfter("bookieProtoDecoder",
"delayInjection", new ChannelInboundHandlerAdapter(){
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
+ bkServerIoMock.schedule(() -> {
+ // Call the real client io thread to handle the
requests after a delayed time.
+ ctx.executor().execute(() -> {
+ ctx.fireChannelRead(msg);
+ });
+ }, delayTime, unit);
+ }
+ });
+ });
+ return () -> {
+ CompletableFuture<Void> removing = new CompletableFuture<>();
+ channel.eventLoop().execute(() -> {
+ channel.pipeline().remove("delayInjection");
+ removing.complete(null);
+ });
+ removing.whenComplete((__, ex) -> {
+ bkServerIoMock.shutdown();
+ });
+ };
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTest.java
index 5e5292fb30c..8b84041524b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTest.java
@@ -21,10 +21,17 @@ package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.Closeable;
import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
@@ -36,6 +43,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -48,13 +56,18 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
@@ -522,4 +535,116 @@ public class BrokerBkEnsemblesTest extends
BkEnsemblesTestBase {
assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(),
Optional.empty());
}
+ @DataProvider
+ public Object[][] doReloadTopicAfterLedgerFenced() {
+ return new Object[][] {
+ {true},
+ {false}
+ };
+ }
+
+ @Test(timeOut = 60_000, dataProvider = "doReloadTopicAfterLedgerFenced")
+ public void testConcurrentlyModifyCurrentLedger(boolean
doReloadTopicAfterLedgerFenced) throws Exception {
+ EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(),
+ config.isEnableBusyWait(), new
DefaultThreadFactory("pulsar-io-test-1"));
+ BookKeeper bkClient2 =
pulsar.getBkClientFactory().create(pulsar.getConfiguration(),
+ pulsar.getLocalMetadataStore(),
+ eventLoopGroup,
+ Optional.empty(),
+ null).get();
+
+ final String namespace = BrokerTestUtil.newUniqueName("prop/usc");
+ final String topic = BrokerTestUtil.newUniqueName("persistent://" +
namespace + "/tp");
+ final String subscription = "s1";
+ admin.namespaces().createNamespace(namespace);
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().createSubscription(topic, subscription,
MessageId.earliest);
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopic(topic, false).join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .enableBatching(false)
+ .topic(topic)
+ .create();
+ MessageIdAdv msgId = (MessageIdAdv) producer.send("1");
+ long currentLedgerId = msgId.getLedgerId();
+
+ // Make an injection to let the next publishing delay.
+ Closeable cancellation = injectBKServerDelayForCurrentLedger(topic,
10, TimeUnit.SECONDS, 0);
+
+ // Publish new 3 messages.
+ // Since we injected a delay, the 2nd message will complete after the
ledger is closed.
+ // After the ledger is closed, the 5th messages will be written to a
new ledger.
+ // Verify: the entries in topic and stored are consistent.
+ CompletableFuture<MessageId> send2 = producer.sendAsync("2");
+ CompletableFuture<MessageId> send3 = producer.sendAsync("3");
+ CompletableFuture<MessageId> send4 = producer.sendAsync("4");
+ // Wait 1s to make sure the messages are written to the Bookie server.
+ Thread.sleep(1000);
+ LedgerHandle readOnlyLedger = bkClient2.openLedger(currentLedgerId,
+
BookKeeper.DigestType.fromApiDigestType(ml.getConfig().getDigestType()),
+ ml.getConfig().getPassword());
+ cancellation.close();
+ if (doReloadTopicAfterLedgerFenced) {
+ admin.topics().unload(topic);
+ }
+ producer.send("5");
+
+ // Verify: the entries in topic and stored are consistent.
+ List<ManagedLedgerInternalStats.LedgerInfo> ledgers =
admin.topics().getInternalStats(topic).getLedgers();
+ ManagedLedgerInternalStats.LedgerInfo ledgerInfo = null;
+ for (ManagedLedgerInternalStats.LedgerInfo li : ledgers) {
+ if (li.ledgerId == currentLedgerId) {
+ ledgerInfo = li;
+ break;
+ }
+ }
+ assertNotNull(ledgerInfo);
+ long entriesStored =
readOnlyLedger.getLedgerMetadata().getLastEntryId() + 1;
+ assertEquals(ledgerInfo.entries, entriesStored);
+
+ // Verify: the messages that got a successful response are persisted
into the topic.
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+ Awaitility.await().until(() -> send4.isDone());
+ List<String> messagesReceived = new ArrayList<>();
+ while (true) {
+ Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg != null) {
+ messagesReceived.add(msg.getValue());
+ consumer.acknowledge(msg);
+ } else {
+ break;
+ }
+ }
+ assertTrue(messagesReceived.contains("1"));
+ if (send2.isDone() && !send2.isCompletedExceptionally()) {
+ assertTrue(ledgerInfo.entries >= 1);
+ assertTrue(messagesReceived.contains("2"));
+ }
+ if (send3.isDone() && !send3.isCompletedExceptionally()) {
+ assertTrue(ledgerInfo.entries >= 2);
+ assertTrue(messagesReceived.contains("3"));
+ }
+ if (send4.isDone() && !send4.isCompletedExceptionally()) {
+ assertTrue(ledgerInfo.entries >= 2);
+ assertTrue(messagesReceived.contains("4"));
+ }
+ assertTrue(messagesReceived.contains("5"));
+
+ // cleanup.
+ producer.close();
+ admin.topics().unload(topic);
+ admin.topics().delete(topic);
+ bkClient2.close();
+ }
+
}