This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new b1b34b14334 [fix][ml] Negative backlog & acked positions does not 
exist & message lost when concurrently occupying topic owner (#24722)
b1b34b14334 is described below

commit b1b34b14334743d21572d91a344097a9f8d0b99f
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 16 11:19:55 2025 +0800

    [fix][ml] Negative backlog & acked positions does not exist & message lost 
when concurrently occupying topic owner (#24722)
    
    (cherry picked from commit e417aca957e7abe3f33525beaeea4fc3ee42f560)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  44 +++++++-
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |  14 ++-
 .../pulsar/broker/service/BkEnsemblesTestBase.java |  74 ++++++++++++
 .../broker/service/BrokerBkEnsemblesTest.java      | 125 +++++++++++++++++++++
 4 files changed, 251 insertions(+), 6 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a4527261c8b..d157677d210 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -136,6 +136,7 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
@@ -261,6 +262,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
     protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
     private static final String MIGRATION_STATE_PROPERTY = "migrated";
+    // If "currentLedger" was closed by the component "auto-replication", 
Managed ledger will determine the LAC from
+    // ledger metadata stored. This variable used to record the result of the 
latest once re-checking.
+    private Pair<Long, CompletableFuture<Integer>> ledgerRecheckInProgress = 
new ImmutablePair<>(-1L,
+            CompletableFuture.completedFuture(Code.OK));
 
     public enum State {
         None, // Uninitialized
@@ -1878,10 +1883,43 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         }
     }
 
+    synchronized void addEntryFailedDueToConcurrentlyModified(final 
LedgerHandle currentLedger, int errorCode) {
+        // The method "addEntryFailedDueToConcurrentlyModified" will be 
triggered for a certain ledger at most once.
+        if (ledgerRecheckInProgress.getLeft() == currentLedger.getId()) {
+            return;
+        }
+        ledgerRecheckInProgress = new ImmutablePair<>(currentLedger.getId(), 
new CompletableFuture<>());
+        bookKeeper.asyncOpenLedger(currentLedger.getId(), digestType, 
config.getPassword(), (rc, lh, ctx) -> {
+            ledgerRecheckInProgress.getRight().complete(rc);
+            if (rc == Code.OK) {
+                log.info("[{}] Successfully opened ledger {} to check the last 
add confirmed position when the ledger"
+                        + " was concurrently modified(the ledger may be closed 
by auto-replication)."
+                        + " The last add confirmed position in memory is {}, 
and the value"
+                        + " stored in metadata store is {}.", name, 
lh.getId(), currentLedger.getLastAddConfirmed(),
+                        lh.getLastAddConfirmed());
+                ledgerClosed(currentLedger, lh.getLastAddConfirmed());
+            } else {
+                log.error("[{}] Fencing the topic to ensure durability and 
consistency(the current ledger was"
+                    + " concurrent modified by a other bookie client, which is 
not expected)."
+                    + " Current ledger: {}, lastAddConfirmed: {} (the value 
stored may be larger), error coder: {}.",
+                    name, currentLedger.getId(), 
currentLedger.getLastAddConfirmed(), rc);
+                // Stop switching ledger and write topic metadata, to avoid 
messages lost. The doc of
+                // LedgerHandle also mentioned this: 
https://github.com/apache/bookkeeper/blob/release-4.17.2/
+                // 
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L2047-L2048.
+                handleBadVersion(new BadVersionException("the current ledger " 
+ currentLedger.getId()
+                    + " was concurrent modified by a other bookie client. The 
error code is: " + errorCode));
+            }
+        }, null);
+    }
+
+    synchronized void ledgerClosed(final LedgerHandle lh) {
+        ledgerClosed(lh, null);
+    }
+
     // //////////////////////////////////////////////////////////////////////
     // Private helpers
 
-    synchronized void ledgerClosed(final LedgerHandle lh) {
+    synchronized void ledgerClosed(final LedgerHandle lh, Long 
lastAddConfirmed) {
         final State state = STATE_UPDATER.get(this);
         LedgerHandle currentLedger = this.currentLedger;
         if (currentLedger == lh && (state == State.ClosingLedger || state == 
State.LedgerOpened)) {
@@ -1896,7 +1934,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             return;
         }
 
-        long entriesInLedger = lh.getLastAddConfirmed() + 1;
+        long entriesInLedger = lastAddConfirmed != null ? lastAddConfirmed + 1 
: lh.getLastAddConfirmed() + 1;
         if (log.isDebugEnabled()) {
             log.debug("[{}] Ledger has been closed id={} entries={}", name, 
lh.getId(), entriesInLedger);
         }
@@ -4501,7 +4539,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                         opAddEntry.ledger != null ? opAddEntry.ledger.getId() 
: -1,
                         opAddEntry.entryId, timeoutSec);
                 currentLedgerTimeoutTriggered.set(true);
-                opAddEntry.handleAddFailure(opAddEntry.ledger);
+                opAddEntry.handleAddFailure(opAddEntry.ledger, null);
             }
         }
     }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 3af5618064f..bcd87e62629 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -231,7 +231,7 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable, Managed
         }
 
         if (rc != BKException.Code.OK || timeoutTriggered.get()) {
-            handleAddFailure(lh);
+            handleAddFailure(lh, rc);
         } else {
             // Trigger addComplete callback in a thread hashed on the managed 
ledger name
             ml.getExecutor().execute(this);
@@ -351,7 +351,7 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable, Managed
      *
      * @param lh
      */
-    void handleAddFailure(final LedgerHandle lh) {
+    void handleAddFailure(final LedgerHandle lh, Integer rc) {
         // If we get a write error, we will try to create a new ledger and 
re-submit the pending writes. If the
         // ledger creation fails (persistent bk failure, another instance 
owning the ML, ...), then the writes will
         // be marked as failed.
@@ -361,7 +361,15 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable, Managed
         finalMl.getExecutor().execute(() -> {
             // Force the creation of a new ledger. Doing it in a background 
thread to avoid acquiring ML lock
             // from a BK callback.
-            finalMl.ledgerClosed(lh);
+            // If we received a "MetadataVersionException" or a 
"LedgerFencedException", we should tell the ML that
+            // the ledger has been closed by others, and the entries count in 
the ledger may is not correct. The ML
+            // will handle it.
+            if (rc != null && (rc.intValue() == 
BKException.Code.MetadataVersionException
+                    || rc.intValue() == 
BKException.Code.LedgerFencedException)) {
+                finalMl.addEntryFailedDueToConcurrentlyModified(lh, rc);
+            } else {
+                finalMl.ledgerClosed(lh);
+            }
         });
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index bbdc2ec0d32..15ca6cda72d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -19,17 +19,36 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.collect.Sets;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.Closeable;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieClientImpl;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.proto.PerChannelBookieClient;
+import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -131,4 +150,59 @@ public abstract class BkEnsemblesTestBase extends 
TestRetrySupport {
         }
     }
 
+    /***
+     * Inject a delay for the following requests to the specified bookie in 
the ensemble of the current ledger of
+     * the topic.
+     * @param bkIndexOfEnsemble the bk index of the ensemble.
+     * @return a cancellation of the injection.
+     */
+    protected Closeable injectBKServerDelayForCurrentLedger(String topic, long 
delayTime, TimeUnit unit,
+                                                            int 
bkIndexOfEnsemble) throws Exception {
+        // Make an injection to let the next publishing delay.
+        ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) 
pulsar.getDefaultManagedLedgerFactory();
+        BookieClientImpl bookieClient =
+                (BookieClientImpl) 
mlFactory.getBookKeeper().get().getClientCtx().getBookieClient();
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                .getTopic(topic, false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        LedgerHandle ledgerHandle = ml.getCurrentLedger();
+        BookieId bookieId1 = 
ledgerHandle.getLedgerMetadata().getEnsembleAt(bkIndexOfEnsemble).get(0);
+        PerChannelBookieClientPool perChannelBookieClientPool = 
bookieClient.lookupClient(bookieId1);
+        CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
+        perChannelBookieClientPool.obtain(new 
BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() {
+            @Override
+            public void operationComplete(int rc, PerChannelBookieClient 
result) {
+                channelFuture.complete(WhiteboxImpl.getInternalState(result, 
"channel"));
+            }
+        }, ledgerHandle.getId());
+        Channel channel = channelFuture.get();
+
+        ScheduledExecutorService bkServerIoMock =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("bk-server-io-mock"));
+        channel.eventLoop().execute(() -> {
+            // To avoid client IO thread being stuck, we use a separate event 
loop to mock the server delay. And the
+            // response handling is still in the client IO thread.
+            channel.pipeline().addAfter("bookieProtoDecoder", 
"delayInjection", new ChannelInboundHandlerAdapter(){
+                public void channelRead(ChannelHandlerContext ctx, Object msg) 
throws Exception {
+                    bkServerIoMock.schedule(() -> {
+                        // Call the real client io thread to handle the 
requests after a delayed time.
+                        ctx.executor().execute(() -> {
+                            ctx.fireChannelRead(msg);
+                        });
+                    }, delayTime, unit);
+                }
+            });
+        });
+        return () -> {
+            CompletableFuture<Void> removing = new CompletableFuture<>();
+            channel.eventLoop().execute(() -> {
+                channel.pipeline().remove("delayInjection");
+                removing.complete(null);
+            });
+            removing.whenComplete((__, ex) -> {
+                bkServerIoMock.shutdown();
+            });
+        };
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTest.java
index 5e5292fb30c..8b84041524b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTest.java
@@ -21,10 +21,17 @@ package org.apache.pulsar.broker.service;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.Closeable;
 import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Optional;
@@ -36,6 +43,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -48,13 +56,18 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
@@ -522,4 +535,116 @@ public class BrokerBkEnsemblesTest extends 
BkEnsemblesTestBase {
         assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(), 
Optional.empty());
     }
 
+    @DataProvider
+    public Object[][] doReloadTopicAfterLedgerFenced() {
+        return new Object[][] {
+                {true},
+                {false}
+        };
+    }
+
+    @Test(timeOut = 60_000, dataProvider = "doReloadTopicAfterLedgerFenced")
+    public void testConcurrentlyModifyCurrentLedger(boolean 
doReloadTopicAfterLedgerFenced) throws Exception {
+        EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(),
+                config.isEnableBusyWait(), new 
DefaultThreadFactory("pulsar-io-test-1"));
+        BookKeeper bkClient2 = 
pulsar.getBkClientFactory().create(pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                eventLoopGroup,
+                Optional.empty(),
+                null).get();
+
+        final String namespace = BrokerTestUtil.newUniqueName("prop/usc");
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace + "/tp");
+        final String subscription = "s1";
+        admin.namespaces().createNamespace(namespace);
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                .getTopic(topic, false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .enableBatching(false)
+                .topic(topic)
+                .create();
+        MessageIdAdv msgId = (MessageIdAdv) producer.send("1");
+        long currentLedgerId = msgId.getLedgerId();
+
+        // Make an injection to let the next publishing delay.
+        Closeable cancellation = injectBKServerDelayForCurrentLedger(topic, 
10, TimeUnit.SECONDS, 0);
+
+        // Publish new 3 messages.
+        // Since we injected a delay, the 2nd message will complete after the 
ledger is closed.
+        // After the ledger is closed, the 5th messages will be written to a 
new ledger.
+        // Verify: the entries in topic and stored are consistent.
+        CompletableFuture<MessageId> send2 = producer.sendAsync("2");
+        CompletableFuture<MessageId> send3 = producer.sendAsync("3");
+        CompletableFuture<MessageId> send4 = producer.sendAsync("4");
+        // Wait 1s to make sure the messages are written to the Bookie server.
+        Thread.sleep(1000);
+        LedgerHandle readOnlyLedger = bkClient2.openLedger(currentLedgerId,
+                
BookKeeper.DigestType.fromApiDigestType(ml.getConfig().getDigestType()),
+                ml.getConfig().getPassword());
+        cancellation.close();
+        if (doReloadTopicAfterLedgerFenced) {
+            admin.topics().unload(topic);
+        }
+        producer.send("5");
+
+        // Verify: the entries in topic and stored are consistent.
+        List<ManagedLedgerInternalStats.LedgerInfo> ledgers = 
admin.topics().getInternalStats(topic).getLedgers();
+        ManagedLedgerInternalStats.LedgerInfo ledgerInfo = null;
+        for (ManagedLedgerInternalStats.LedgerInfo li : ledgers) {
+            if (li.ledgerId == currentLedgerId) {
+                ledgerInfo = li;
+                break;
+            }
+        }
+        assertNotNull(ledgerInfo);
+        long entriesStored = 
readOnlyLedger.getLedgerMetadata().getLastEntryId() + 1;
+        assertEquals(ledgerInfo.entries, entriesStored);
+
+        // Verify: the messages that got a successful response are persisted 
into the topic.
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+        Awaitility.await().until(() -> send4.isDone());
+        List<String> messagesReceived = new ArrayList<>();
+        while (true) {
+            Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg != null) {
+                messagesReceived.add(msg.getValue());
+                consumer.acknowledge(msg);
+            } else {
+                break;
+            }
+        }
+        assertTrue(messagesReceived.contains("1"));
+        if (send2.isDone() && !send2.isCompletedExceptionally()) {
+            assertTrue(ledgerInfo.entries >= 1);
+            assertTrue(messagesReceived.contains("2"));
+        }
+        if (send3.isDone() && !send3.isCompletedExceptionally()) {
+            assertTrue(ledgerInfo.entries >= 2);
+            assertTrue(messagesReceived.contains("3"));
+        }
+        if (send4.isDone() && !send4.isCompletedExceptionally()) {
+            assertTrue(ledgerInfo.entries >= 2);
+            assertTrue(messagesReceived.contains("4"));
+        }
+        assertTrue(messagesReceived.contains("5"));
+
+        // cleanup.
+        producer.close();
+        admin.topics().unload(topic);
+        admin.topics().delete(topic);
+        bkClient2.close();
+    }
+
 }

Reply via email to