This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 53bbd4aeff0 [improve][broker] Reduce unnecessary MessageMetadata
parsing by caching the parsed instance in the broker cache (#24682)
53bbd4aeff0 is described below
commit 53bbd4aeff003275d9c6bafbc6ef6c5523014ae2
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Sep 10 00:23:17 2025 +0300
[improve][broker] Reduce unnecessary MessageMetadata parsing by caching the
parsed instance in the broker cache (#24682)
---
.../java/org/apache/bookkeeper/mledger/Entry.java | 25 +++
.../apache/bookkeeper/mledger/impl/EntryImpl.java | 24 +++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 11 +-
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 30 +--
.../mledger/impl/ShadowManagedLedgerImpl.java | 5 +
.../mledger/impl/cache/EntryCacheDisabled.java | 3 +-
.../bookkeeper/mledger/impl/cache/RangeCache.java | 17 +-
.../mledger/impl/cache/RangeCacheEntryWrapper.java | 34 +++-
.../mledger/impl/cache/RangeEntryCacheImpl.java | 1 +
.../broker/admin/impl/PersistentTopicsBase.java | 28 ++-
.../intercept/ManagedLedgerInterceptorImpl.java | 12 +-
.../broker/service/AbstractBaseDispatcher.java | 28 ++-
.../pulsar/broker/service/EntryAndMetadata.java | 12 +-
.../apache/pulsar/broker/service/ServerCnx.java | 5 +-
.../NonPersistentDispatcherMultipleConsumers.java | 3 +-
...onPersistentDispatcherSingleActiveConsumer.java | 3 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 4 +-
.../PersistentDispatcherMultipleConsumers.java | 6 +-
.../PersistentDispatcherSingleActiveConsumer.java | 2 +-
.../persistent/PersistentMessageFinder.java | 3 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 6 +-
...tickyKeyDispatcherMultipleConsumersClassic.java | 2 +-
.../service/persistent/PersistentSubscription.java | 12 +-
.../broker/service/persistent/PersistentTopic.java | 24 ++-
.../SnapshotSegmentAbortedTxnProcessorImpl.java | 2 +-
.../buffer/impl/TopicTransactionBuffer.java | 7 +-
.../service/PersistentMessageFinderTest.java | 2 +-
.../apache/pulsar/client/impl/MessageImplTest.java | 3 +-
.../apache/pulsar/common/protocol/Commands.java | 208 +++++++++++++++------
29 files changed, 382 insertions(+), 140 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java
index 6b8f8a95773..24ea5c17c0d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java
@@ -21,6 +21,8 @@ package org.apache.bookkeeper.mledger;
import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
/**
* An Entry represent a ledger entry data and its associated position.
@@ -98,4 +100,27 @@ public interface Entry {
default boolean matchesPosition(Position position) {
return position != null && position.compareTo(getLedgerId(),
getEntryId()) == 0;
}
+
+ default MessageMetadata getMessageMetadata() {
+ return null;
+ }
+
+ /**
+ * Returns the timestamp of the entry.
+ * @return
+ */
+ default long getEntryTimestamp() {
+ // get broker timestamp first if BrokerEntryMetadata is enabled with
AppendBrokerTimestampMetadataInterceptor
+ return Commands.peekBrokerEntryMetadataToLong(getDataBuffer(),
brokerEntryMetadata -> {
+ if (brokerEntryMetadata != null &&
brokerEntryMetadata.hasBrokerTimestamp()) {
+ return brokerEntryMetadata.getBrokerTimestamp();
+ }
+ // otherwise get the publish_time
+ MessageMetadata messageMetadata = getMessageMetadata();
+ if (messageMetadata == null) {
+ messageMetadata =
Commands.peekMessageMetadata(getDataBuffer(), null, -1);
+ }
+ return messageMetadata.getPublishTime();
+ });
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index b85a1bc45fe..070a0fc1bea 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -24,6 +24,9 @@ import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCounted;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.Entry;
@@ -33,7 +36,10 @@ import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.ReferenceCountedEntry;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+@Slf4j
public final class EntryImpl extends AbstractCASReferenceCounted
implements ReferenceCountedEntry, Comparable<EntryImpl> {
@@ -51,6 +57,8 @@ public final class EntryImpl extends
AbstractCASReferenceCounted
ByteBuf data;
private EntryReadCountHandler readCountHandler;
private boolean decreaseReadCountOnRelease = true;
+ @Getter @Setter
+ private MessageMetadata messageMetadata;
private Runnable onDeallocate;
@@ -161,6 +169,7 @@ public final class EntryImpl extends
AbstractCASReferenceCounted
entry.entryId = other.entryId;
entry.data = other.data.retainedDuplicate();
entry.readCountHandler = other.readCountHandler;
+ entry.messageMetadata = other.messageMetadata;
entry.setRefCnt(1);
return entry;
}
@@ -172,6 +181,7 @@ public final class EntryImpl extends
AbstractCASReferenceCounted
entry.entryId = other.getEntryId();
entry.data = other.getDataBuffer().retainedDuplicate();
entry.readCountHandler = other.getReadCountHandler();
+ entry.messageMetadata = other.getMessageMetadata();
entry.setRefCnt(1);
return entry;
}
@@ -277,6 +287,7 @@ public final class EntryImpl extends
AbstractCASReferenceCounted
position = null;
readCountHandler = null;
decreaseReadCountOnRelease = true;
+ messageMetadata = null;
recyclerHandle.recycle(this);
}
@@ -294,6 +305,19 @@ public final class EntryImpl extends
AbstractCASReferenceCounted
decreaseReadCountOnRelease = enabled;
}
+ public void initializeMessageMetadataIfNeeded(String managedLedgerName) {
+ if (messageMetadata == null) {
+ try {
+ MessageMetadata msgMetadata = new MessageMetadata();
+ Commands.peekMessageMetadata(data, msgMetadata);
+ this.messageMetadata = msgMetadata;
+ } catch (Throwable t) {
+ log.warn("[{}] Failed to parse message metadata for entry
{}:{}", managedLedgerName, ledgerId, entryId,
+ t);
+ }
+ }
+ }
+
@Override
public String toString() {
return getClass().getName() + "@" + System.identityHashCode(this)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0be740e1bda..a4527261c8b 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -31,7 +31,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
@@ -144,7 +143,6 @@ import
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.LazyLoadableValue;
@@ -1315,9 +1313,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
- long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
+ long entryTimestamp = entry.getEntryTimestamp();
future.complete(entryTimestamp);
- } catch (IOException e) {
+ } catch (Exception e) {
log.error("Error deserializing message for message
position {}", nextPos, e);
future.completeExceptionally(e);
} finally {
@@ -4991,4 +4989,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
throw new RuntimeException(e);
}
}
+
+ boolean shouldCacheAddedEntry() {
+ // Avoid caching entries if no cursor has been created
+ return getActiveCursors().shouldCacheAddedEntry();
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 49bc3f309d4..3af5618064f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -257,23 +257,23 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable, Managed
ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
+ // ctx will contain a Position instance only in the case of
ShadowManagedLedgerImpl
long ledgerId = ledger != null ? ledger.getId() : ((Position)
ctx).getLedgerId();
- // Don't insert to the entry cache for the ShadowManagedLedger
- if (!(ml instanceof ShadowManagedLedgerImpl)) {
- // Avoid caching entries if no cursor has been created
- if (ml.getActiveCursors().shouldCacheAddedEntry()) {
- int expectedReadCount = 0;
- // only use expectedReadCount if cache eviction is enabled by
expected read count
- if (ml.getConfig().isCacheEvictionByExpectedReadCount()) {
- expectedReadCount = ml.getActiveCursors().size();
- }
- EntryImpl entry = EntryImpl.create(ledgerId, entryId, data,
expectedReadCount);
- entry.setDecreaseReadCountOnRelease(false);
- // EntryCache.insert: duplicates entry by allocating new entry
and data. so, recycle entry after calling
- // insert
- ml.entryCache.insert(entry);
- entry.release();
+
+ // Handle caching for tailing reads
+ if (ml.shouldCacheAddedEntry()) {
+ int expectedReadCount = 0;
+ // only use expectedReadCount if cache eviction is enabled by
expected read count
+ if (ml.getConfig().isCacheEvictionByExpectedReadCount()) {
+ // use the number of active cursors as the expected read count
+ expectedReadCount = ml.getActiveCursors().size();
}
+ EntryImpl entry = EntryImpl.create(ledgerId, entryId, data,
expectedReadCount);
+ entry.setDecreaseReadCountOnRelease(false);
+ // EntryCache.insert: duplicates entry by allocating new entry and
data. so, recycle entry after calling
+ // insert
+ ml.entryCache.insert(entry);
+ entry.release();
}
Position lastEntry = PositionFactory.create(ledgerId, entryId);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index 4b03cad8e0a..46cd1335a17 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -407,4 +407,9 @@ public class ShadowManagedLedgerImpl extends
ManagedLedgerImpl {
protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
this.lastLedgerCreatedTimestamp = clock.millis();
}
+
+ @Override
+ boolean shouldCacheAddedEntry() {
+ return false;
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
index 9cd63d99f4c..b5a45415a4f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
@@ -78,6 +78,7 @@ public class EntryCacheDisabled implements EntryCache {
for (LedgerEntry e : ledgerEntries) {
// Insert the entries at the end of the list (they
will be unsorted for now)
EntryImpl entry = EntryImpl.create(e, interceptor,
0);
+
entry.initializeMessageMetadataIfNeeded(ml.getName());
entries.add(entry);
totalSize += entry.getLength();
}
@@ -111,7 +112,7 @@ public class EntryCacheDisabled implements EntryCache {
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry =
EntryImpl.create(ledgerEntry, interceptor, 0);
-
+
returnEntry.initializeMessageMetadataIfNeeded(ml.getName());
ml.getMbean().recordReadEntriesOpsCacheMisses(1,
returnEntry.getLength());
ml.getFactory().getMbean().recordCacheMiss(1,
returnEntry.getLength());
ml.getMbean().addReadEntriesSample(1,
returnEntry.getLength());
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java
index a5592f7098e..858b7d3c07c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java
@@ -46,13 +46,23 @@ import org.apache.commons.lang3.tuple.Pair;
class RangeCache {
private final ConcurrentNavigableMap<Position, RangeCacheEntryWrapper>
entries;
private final RangeCacheRemovalQueue removalQueue;
- private AtomicLong size; // Total size of values stored in cache
+ private final AtomicLong size; // Total size of values stored in cache
+ private final String managedLedgerName;
/**
* Construct a new RangeCache.
*/
public RangeCache(RangeCacheRemovalQueue removalQueue) {
+ this(removalQueue, null);
+ }
+
+ /**
+ * Construct a new RangeCache.
+ * @param managedLedgerName the name of the managed ledger this cache
belongs to
+ */
+ public RangeCache(RangeCacheRemovalQueue removalQueue, String
managedLedgerName) {
this.removalQueue = removalQueue;
+ this.managedLedgerName = managedLedgerName;
this.entries = new ConcurrentSkipListMap<>();
this.size = new AtomicLong(0);
}
@@ -115,7 +125,7 @@ class RangeCache {
if (valueWrapper == null) {
return null;
} else {
- ReferenceCountedEntry value = valueWrapper.getValue(key);
+ ReferenceCountedEntry value = valueWrapper.getValue(key,
managedLedgerName);
return getRetainedValueMatchingKey(key, value);
}
}
@@ -124,7 +134,8 @@ class RangeCache {
* @apiNote the returned value must be released if it's not null
*/
private ReferenceCountedEntry getValueMatchingEntry(Map.Entry<Position,
RangeCacheEntryWrapper> entry) {
- ReferenceCountedEntry valueMatchingEntry =
RangeCacheEntryWrapper.getValueMatchingMapEntry(entry);
+ ReferenceCountedEntry valueMatchingEntry =
+ RangeCacheEntryWrapper.getValueMatchingMapEntry(entry,
managedLedgerName);
return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
index db5ceaefd38..5c82207e1c7 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
@@ -22,14 +22,17 @@ import io.netty.util.Recycler;
import java.util.Map;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReferenceCountedEntry;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
/**
* Wrapper around the value to store in Map. This is needed to ensure that a
specific instance can be removed from
* the map by calling the {@link Map#remove(Object, Object)} method. Certain
race conditions could result in the
* wrong value being removed from the map. The instances of this class are
recycled to avoid creating new objects.
*/
+@Slf4j
class RangeCacheEntryWrapper {
private final Recycler.Handle<RangeCacheEntryWrapper> recyclerHandle;
private static final Recycler<RangeCacheEntryWrapper> RECYCLER = new
Recycler<RangeCacheEntryWrapper>() {
@@ -73,12 +76,13 @@ class RangeCacheEntryWrapper {
/**
* Get the value associated with the key. Returns null if the key does not
match the key.
*
- * @param key the key to match
+ * @param key the key to match
+ * @param managedLedgerName
* @return the value associated with the key, or null if the value has
already been recycled or the key does not
* match
*/
- ReferenceCountedEntry getValue(Position key) {
- return getValueInternal(key, false);
+ ReferenceCountedEntry getValue(Position key, String managedLedgerName) {
+ return getValueInternal(key, false, managedLedgerName);
}
/**
@@ -88,8 +92,9 @@ class RangeCacheEntryWrapper {
* @return the value associated with the key, or null if the value has
already been recycled or the key does not
* exactly match the same instance
*/
- static ReferenceCountedEntry getValueMatchingMapEntry(Map.Entry<Position,
RangeCacheEntryWrapper> entry) {
- return entry.getValue().getValueInternal(entry.getKey(), true);
+ static ReferenceCountedEntry getValueMatchingMapEntry(Map.Entry<Position,
RangeCacheEntryWrapper> entry,
+ String
managedLedgerName) {
+ return entry.getValue().getValueInternal(entry.getKey(), true,
managedLedgerName);
}
/**
@@ -101,16 +106,20 @@ class RangeCacheEntryWrapper {
* key as the one stored in the wrapper.
This is used to avoid any races
* when retrieving or removing the entries
from the cache when the key and value
* instances are available.
+ * @param managedLedgerName
* @return the value associated with the key, or null if the key does not
match
*/
- private ReferenceCountedEntry getValueInternal(Position key, boolean
requireSameKeyInstance) {
+ private ReferenceCountedEntry getValueInternal(Position key, boolean
requireSameKeyInstance,
+ String managedLedgerName) {
long stamp = lock.tryOptimisticRead();
Position localKey = this.key;
ReferenceCountedEntry localValue = this.value;
+ boolean messageMetadataInitialized = localValue != null &&
localValue.getMessageMetadata() != null;
if (!lock.validate(stamp)) {
stamp = lock.readLock();
localKey = this.key;
localValue = this.value;
+ messageMetadataInitialized = localValue != null &&
localValue.getMessageMetadata() != null;
lock.unlockRead(stamp);
}
// check that the given key matches the key associated with the value
in the entry
@@ -120,6 +129,19 @@ class RangeCacheEntryWrapper {
if (localKey != key && (requireSameKeyInstance || localKey == null ||
!localKey.equals(key))) {
return null;
}
+ // Initialize the metadata if it's not already initialized
+ if (localValue != null && !messageMetadataInitialized) {
+ localValue = withWriteLock(wrapper -> {
+ // ensure that the key still matches
+ if (wrapper.key != key && (requireSameKeyInstance ||
wrapper.key == null || !wrapper.key.equals(key))) {
+ return null;
+ }
+ if (wrapper.value instanceof EntryImpl entry) {
+ entry.initializeMessageMetadataIfNeeded(managedLedgerName);
+ }
+ return wrapper.value;
+ });
+ }
accessed = true;
return localValue;
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 04de8dde0ad..fd391ba2bf6 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -532,6 +532,7 @@ public class RangeEntryCacheImpl implements EntryCache {
final List<Entry> entriesToReturn = new
ArrayList<>(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = EntryImpl.create(e,
interceptor, expectedReadCountVal);
+
entry.initializeMessageMetadataIfNeeded(ml.getName());
entriesToReturn.add(entry);
totalSize += entry.getLength();
if (expectedReadCountVal > 0) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a16be37bb38..ec61d58d2af 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2659,8 +2659,10 @@ public class PersistentTopicsBase extends AdminResource {
if (entry == null) {
batchSizeFuture.complete(0);
} else {
- MessageMetadata metadata =
-
Commands.parseMessageMetadata(entry.getDataBuffer());
+ MessageMetadata metadata =
entry.getMessageMetadata();
+ if (metadata == null) {
+ metadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ }
batchSizeFuture.complete(metadata.getNumMessagesInBatch());
}
} catch (Exception e) {
@@ -2839,7 +2841,7 @@ public class PersistentTopicsBase extends AdminResource {
private CompletableFuture<MessageId> findMessageIdByPublishTime(long
timestamp, ManagedLedger managedLedger) {
return managedLedger.asyncFindPosition(entry -> {
try {
- long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
+ long entryTimestamp = entry.getEntryTimestamp();
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp,
timestamp);
} catch (Exception e) {
log.error("[{}] Error deserializing message for message
position find",
@@ -3009,7 +3011,12 @@ public class PersistentTopicsBase extends AdminResource {
long totalSize = metadataAndPayload.readableBytes();
BrokerEntryMetadata brokerEntryMetadata =
Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
- MessageMetadata metadata =
Commands.parseMessageMetadata(metadataAndPayload);
+ MessageMetadata metadata = entry.getMessageMetadata();
+ if (metadata == null) {
+ metadata = Commands.parseMessageMetadata(metadataAndPayload);
+ } else {
+ Commands.skipMessageMetadata(metadataAndPayload);
+ }
ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
@@ -5451,11 +5458,12 @@ public class PersistentTopicsBase extends AdminResource
{
private static Long getIndexFromEntry(Entry entry) {
- final var brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
- if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
- return brokerEntryMetadata.getIndex();
- } else {
- return null;
- }
+ return Commands.peekBrokerEntryMetadataToObject(entry.getDataBuffer(),
brokerEntryMetadata -> {
+ if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex())
{
+ return brokerEntryMetadata.getIndex();
+ } else {
+ return null;
+ }
+ });
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
index db138989a8e..a86b49d627f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
@@ -117,12 +116,11 @@ public class ManagedLedgerInterceptorImpl implements
ManagedLedgerInterceptor {
if (lastEntryOptional.isPresent()) {
Entry lastEntry = lastEntryOptional.get();
try {
- BrokerEntryMetadata brokerEntryMetadata =
-
Commands.parseBrokerEntryMetadataIfExist(lastEntry.getDataBuffer());
- if (brokerEntryMetadata != null &&
brokerEntryMetadata.hasIndex()) {
- appendIndexMetadataInterceptor.recoveryIndexGenerator(
- brokerEntryMetadata.getIndex());
- }
+
Commands.peekBrokerEntryMetadataAndConsume(lastEntry.getDataBuffer(),
brokerEntryMetadata -> {
+ if (brokerEntryMetadata != null &&
brokerEntryMetadata.hasIndex()) {
+
appendIndexMetadataInterceptor.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+ }
+ });
} finally {
lastEntry.release();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index f074f234b87..c5e001692f2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -149,6 +149,8 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
msgMetadata = metadataArray[metadataIndex];
} else if (entry instanceof EntryAndMetadata) {
msgMetadata = ((EntryAndMetadata) entry).getMetadata();
+ } else if (entry.getMessageMetadata() != null) {
+ msgMetadata = entry.getMessageMetadata();
} else {
msgMetadata =
Commands.peekAndCopyMessageMetadata(metadataAndPayload,
subscription.toString(), -1);
}
@@ -454,8 +456,18 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
return true;
}
- protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
- return Commands.peekStickyKey(metadataAndPayload,
subscription.getTopicName(), subscription.getName());
+ protected byte[] peekStickyKey(Entry entry) {
+ if (entry instanceof EntryAndMetadata entryAndMetadata) {
+ return entryAndMetadata.getStickyKey();
+ }
+ MessageMetadata metadata = entry.getMessageMetadata();
+ if (metadata == null) {
+ metadata = Commands.peekMessageMetadata(entry.getDataBuffer(),
subscription.toString(), -1);
+ }
+ if (metadata == null) {
+ return Commands.NONE_KEY;
+ }
+ return Commands.resolveStickyKey(metadata);
}
protected String getSubscriptionName() {
@@ -528,4 +540,16 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
protected final void updatePendingBytesToDispatch(long size) {
PENDING_BYTES_TO_DISPATCH.inc(size);
}
+
+ protected int getNumberOfMessagesInBatch(Entry entry) {
+ MessageMetadata msgMetadata = entry.getMessageMetadata();
+ if (msgMetadata == null) {
+ msgMetadata = Commands.peekMessageMetadata(entry.getDataBuffer(),
subscription.toString(), -1);
+ }
+ if (msgMetadata == null) {
+ return -1;
+ } else {
+ return msgMetadata.getNumMessagesInBatch();
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
index 33abddc300b..117f0115064 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
@@ -43,12 +43,22 @@ public class EntryAndMetadata implements Entry {
}
public static EntryAndMetadata create(final Entry entry, final
MessageMetadata metadata) {
+ if (entry instanceof EntryAndMetadata entryAndMetadata) {
+ return entryAndMetadata;
+ }
return new EntryAndMetadata(entry, metadata);
}
@VisibleForTesting
public static EntryAndMetadata create(final Entry entry) {
- return create(entry,
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), "", -1));
+ if (entry instanceof EntryAndMetadata entryAndMetadata) {
+ return entryAndMetadata;
+ }
+ MessageMetadata msgMetadata = entry.getMessageMetadata();
+ if (msgMetadata == null) {
+ msgMetadata =
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), "", -1);
+ }
+ return create(entry, msgMetadata);
}
public byte[] getStickyKey() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d1bc4953d11..44927c375b5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2365,7 +2365,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}, null);
CompletableFuture<Integer> batchSizeFuture =
entryFuture.thenApply(entry -> {
- MessageMetadata metadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ MessageMetadata metadata = entry.getMessageMetadata();
+ if (metadata == null) {
+ metadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ }
int batchSize = metadata.getNumMessagesInBatch();
entry.release();
return metadata.hasNumMessagesInBatch() ? batchSize : -1;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 5941093e71c..a4fba30cc9b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
@@ -205,7 +204,7 @@ public class NonPersistentDispatcherMultipleConsumers
extends AbstractDispatcher
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-sendMessageInfo.getTotalMessages());
} else {
entries.forEach(entry -> {
- int totalMsgs =
Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(),
subscription.toString(), -1);
+ int totalMsgs = getNumberOfMessagesInBatch(entry);
if (totalMsgs > 0) {
msgDrop.recordEvent(totalMsgs);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 414e9235418..26bb5a791cf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -30,7 +30,6 @@ import
org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
@Slf4j
@@ -66,7 +65,7 @@ public final class
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
} else {
entries.forEach(entry -> {
- int totalMsgs =
Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(),
subscription.toString(), -1);
+ int totalMsgs = getNumberOfMessagesInBatch(entry);
if (totalMsgs > 0) {
msgDrop.recordEvent(totalMsgs);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 915f9c0f925..2e884eb0ea1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -151,7 +151,7 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
consumerStickyKeyHashesMap.clear();
for (Entry entry : entries) {
- byte[] stickyKey = peekStickyKey(entry.getDataBuffer());
+ byte[] stickyKey = peekStickyKey(entry);
int stickyKeyHash = selector.makeStickyKeyHash(stickyKey);
Consumer consumer = selector.select(stickyKeyHash);
@@ -182,7 +182,7 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-sendMessageInfo.getTotalMessages());
} else {
entriesForConsumer.forEach(e -> {
- int totalMsgs =
Commands.getNumberOfMessagesInBatch(e.getDataBuffer(), subscription.toString(),
-1);
+ int totalMsgs = getNumberOfMessagesInBatch(e);
if (totalMsgs > 0) {
msgDrop.recordEvent(totalMsgs);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c35d802f43d..097ab9cd0fe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -72,7 +72,6 @@ import
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferEx
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
@@ -798,10 +797,11 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractPersistentDis
if (entry instanceof EntryAndMetadata) {
metadata = ((EntryAndMetadata) entry).getMetadata();
} else {
- metadata =
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(),
subscription.toString(), -1);
// cache the metadata in the entry with EntryAndMetadata for
later use to avoid re-parsing the metadata
// and to carry the metadata and calculated stickyKeyHash with
the entry
- entries.set(i, EntryAndMetadata.create(entry, metadata));
+ EntryAndMetadata entryAndMetadata =
EntryAndMetadata.create(entry);
+ metadata = entryAndMetadata.getMetadata();
+ entries.set(i, entryAndMetadata);
}
if (metadata != null) {
remainingMessages += metadata.getNumMessagesInBatch();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 99a09d3a5d7..e809d984ae3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -189,7 +189,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
Iterator<Entry> iterator = entries.iterator();
while (iterator.hasNext()) {
Entry entry = iterator.next();
- byte[] key = peekStickyKey(entry.getDataBuffer());
+ byte[] key = peekStickyKey(entry);
Consumer consumer = stickyKeyConsumerSelector.select(key);
// Skip the entry if it's not for current active consumer.
if (consumer == null || currentConsumer != consumer) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index e8bc9fdc3ea..991e6a06029 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -30,7 +30,6 @@ import org.apache.bookkeeper.mledger.PositionFactory;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +72,7 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
entry -> {
try {
// Find the latest entry that is earlier than the target
timestamp.
- long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
+ long entryTimestamp = entry.getEntryTimestamp();
return
MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for
message position find", topicName, subName, e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 080f5acbf16..e0c74f9043c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -58,7 +58,6 @@ import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -419,8 +418,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
} else {
// replace the input entry with EntryAndMetadata instance. In
addition to the entry and metadata,
// it will also carry the calculated sticky key hash
- entry = EntryAndMetadata.create(inputEntry,
-
Commands.peekAndCopyMessageMetadata(inputEntry.getDataBuffer(),
getSubscriptionName(), -1));
+ entry = EntryAndMetadata.create(inputEntry);
}
int stickyKeyHash = getStickyKeyHash(entry);
Consumer consumer = null;
@@ -610,7 +608,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
// use the cached sticky key hash if available, otherwise
calculate the sticky key hash and cache it
return
entryAndMetadata.getOrUpdateCachedStickyKeyHash(selector::makeStickyKeyHash);
}
- return
selector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
+ return selector.makeStickyKeyHash(peekStickyKey(entry));
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
index c3b246fe9ba..1f29cb73626 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
@@ -652,7 +652,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersClassic
// use the cached sticky key hash if available, otherwise
calculate the sticky key hash and cache it
return
entryAndMetadata.getOrUpdateCachedStickyKeyHash(selector::makeStickyKeyHash);
}
- return
selector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
+ return selector.makeStickyKeyHash(peekStickyKey(entry));
}
private static final Logger log =
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index ac54957f0c6..056e9a25fda 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -23,7 +23,6 @@ import static
org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopi
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.netty.buffer.ByteBuf;
-import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -644,7 +643,12 @@ public class PersistentSubscription extends
AbstractSubscription {
firstPosition.compareAndSet(null, entryPosition);
lastPosition.set(entryPosition);
ByteBuf metadataAndPayload = entry.getDataBuffer();
- MessageMetadata messageMetadata =
Commands.peekMessageMetadata(metadataAndPayload, "", -1);
+ MessageMetadata messageMetadata;
+ if (entry.getMessageMetadata() != null) {
+ messageMetadata = entry.getMessageMetadata();
+ } else {
+ messageMetadata =
Commands.peekMessageMetadata(metadataAndPayload, "", -1);
+ }
int numMessages = 1;
if (messageMetadata.hasNumMessagesInBatch()) {
numMessages = messageMetadata.getNumMessagesInBatch();
@@ -1438,9 +1442,9 @@ public class PersistentSubscription extends
AbstractSubscription {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
- long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
+ long entryTimestamp = entry.getEntryTimestamp();
future.complete(entryTimestamp);
- } catch (IOException e) {
+ } catch (Exception e) {
log.error("Error deserializing message for message
position {}", nextPos, e);
future.completeExceptionally(e);
} finally {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 657f03d6b38..3bbd8b00777 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
-import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
@@ -3801,7 +3800,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public void readEntryComplete(Entry entry, Object ctx)
{
try {
- long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
+ long entryTimestamp =
entry.getEntryTimestamp();
updateResultIfNewer(
new OldestPositionInfo(
oldestMarkDeleteCursorInfo.getPosition(),
@@ -3979,7 +3978,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
try {
entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
if (entry != null) {
- long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
+ long entryTimestamp = entry.getEntryTimestamp();
isOldestMessageExpired = MessageImpl.isEntryExpired(
(int) (messageTTLInSeconds *
MESSAGE_EXPIRY_THRESHOLD), entryTimestamp);
}
@@ -4009,10 +4008,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
public void readEntryComplete(Entry entry, Object ctx) {
long entryTimestamp = 0;
try {
- entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
+ entryTimestamp = entry.getEntryTimestamp();
res.complete(MessageImpl.isEntryExpired(
(int) (messageTTLInSeconds *
MESSAGE_EXPIRY_THRESHOLD), entryTimestamp));
- } catch (IOException e) {
+ } catch (Exception e) {
log.warn("[{}] [{}] Error while getting the oldest
message", topic, cursor.toString(), e);
res.complete(false);
}
@@ -4110,7 +4109,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return CompletableFuture.completedFuture(lastDispatchablePosition);
}
return ledger.getLastDispatchablePosition(entry -> {
- MessageMetadata md =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ MessageMetadata md = entry.getMessageMetadata();
+ if (md == null) {
+ md = Commands.parseMessageMetadata(entry.getDataBuffer());
+ }
// If a messages has marker will filter by
AbstractBaseDispatcher.filterEntriesForConsumer
if (Markers.isServerOnlyMarker(md)) {
return false;
@@ -4179,7 +4181,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
- MessageMetadata metadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ MessageMetadata metadata = entry.getMessageMetadata();
+ if (metadata == null) {
+ metadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ }
if (metadata.hasNumMessagesInBatch()) {
completableFuture.complete(new
BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(),
partitionIndex,
metadata.getNumMessagesInBatch() - 1));
@@ -4797,7 +4802,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
public void readEntryComplete(Entry entry, Object ctx) {
try {
ByteBuf metadataAndPayload = entry.getDataBuffer();
- MessageMetadata msgMetadata =
Commands.parseMessageMetadata(metadataAndPayload);
+ MessageMetadata msgMetadata =
entry.getMessageMetadata();
+ if (msgMetadata == null) {
+ msgMetadata =
Commands.parseMessageMetadata(metadataAndPayload);
+ }
long publishTime = msgMetadata.getPublishTime();
future.complete(publishTime);
} catch (Exception e) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 98d8a40eb36..38b902dc520 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -408,7 +408,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
//decode snapshot from entry
ByteBuf headersAndPayload = entry.getDataBuffer();
//skip metadata
- Commands.parseMessageMetadata(headersAndPayload);
+ Commands.skipMessageMetadata(headersAndPayload);
TransactionBufferSnapshotSegment snapshotSegment =
Schema.AVRO(TransactionBufferSnapshotSegment.class)
.decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index c43f0ed7fb9..c3961dca6d4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -184,8 +184,11 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
public void handleTxnEntry(Entry entry) {
ByteBuf metadataAndPayload = entry.getDataBuffer();
- MessageMetadata msgMetadata =
Commands.peekMessageMetadata(metadataAndPayload,
-
TopicTransactionBufferRecover.SUBSCRIPTION_NAME, -1);
+ MessageMetadata msgMetadata =
entry.getMessageMetadata();
+ if (msgMetadata == null) {
+ msgMetadata =
Commands.peekMessageMetadata(metadataAndPayload,
+
TopicTransactionBufferRecover.SUBSCRIPTION_NAME, -1);
+ }
if (msgMetadata != null &&
msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
TxnID txnID = new
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits());
Position position =
PositionFactory.create(entry.getLedgerId(), entry.getEntryId());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 4dc10671e6a..5707a6cca60 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -635,7 +635,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
private CompletableFuture<MessageId> findMessageIdByPublishTime(long
timestamp, ManagedLedger managedLedger) {
return managedLedger.asyncFindPosition(entry -> {
try {
- long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
+ long entryTimestamp = entry.getEntryTimestamp();
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp,
timestamp);
} catch (Exception e) {
log.error("Error deserializing message for message position
find", e);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index b152eda4689..f3eb7b434a9 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -28,7 +28,6 @@ import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
@@ -480,7 +479,7 @@ public class MessageImplTest {
entryTimestamp = Commands.getEntryTimestamp(compositeByteBuf);
assertFalse(MessageImpl.isEntryExpired(24 * 3600, entryTimestamp));
assertEquals(entryTimestamp, brokerEntryTimestamp);
- } catch (IOException e) {
+ } catch (Exception e) {
fail();
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7aaa010c11e..cab4dc8bcab 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -22,6 +22,7 @@ import static
com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
@@ -36,6 +37,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.ToLongFunction;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -457,7 +460,7 @@ public class Commands {
public static void skipChecksumIfPresent(ByteBuf buffer) {
if (hasChecksum(buffer)) {
- readChecksum(buffer);
+ buffer.skipBytes(Short.BYTES + Integer.BYTES);
}
}
@@ -488,15 +491,21 @@ public class Commands {
buffer.skipBytes(metadataSize);
}
- public static long getEntryTimestamp(ByteBuf
headersAndPayloadWithBrokerEntryMetadata) throws IOException {
+ /**
+ * Gets the entry timestamp from either broker metadata broker timestamp
or the message metadata publish time.
+ * Prefer using Managed Ledger's Entry's getEntryTimestamp() method over
this method.
+ * @param headersAndPayloadWithBrokerEntryMetadata headers and payload for
the message
+ * @return the entry timestamp
+ */
+ public static long getEntryTimestamp(ByteBuf
headersAndPayloadWithBrokerEntryMetadata) {
// get broker timestamp first if BrokerEntryMetadata is enabled with
AppendBrokerTimestampMetadataInterceptor
- BrokerEntryMetadata brokerEntryMetadata =
-
Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
- if (brokerEntryMetadata != null &&
brokerEntryMetadata.hasBrokerTimestamp()) {
- return brokerEntryMetadata.getBrokerTimestamp();
- }
- // otherwise get the publish_time
- return
parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime();
+ return
peekBrokerEntryMetadataToLong(headersAndPayloadWithBrokerEntryMetadata,
brokerEntryMetadata -> {
+ if (brokerEntryMetadata != null &&
brokerEntryMetadata.hasBrokerTimestamp()) {
+ return brokerEntryMetadata.getBrokerTimestamp();
+ }
+ // otherwise get the publish_time
+ return
parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime();
+ });
}
public static BaseCommand newMessageCommand(long consumerId, long
ledgerId, long entryId, int partition,
@@ -1761,39 +1770,126 @@ public class Commands {
return compositeByteBuf;
}
- public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf
headerAndPayloadWithBrokerEntryMetadata) {
- int readerIndex =
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
- if (headerAndPayloadWithBrokerEntryMetadata.readShort() ==
magicBrokerEntryMetadata) {
- int brokerEntryMetadataSize =
headerAndPayloadWithBrokerEntryMetadata.readInt();
-
headerAndPayloadWithBrokerEntryMetadata.readerIndex(headerAndPayloadWithBrokerEntryMetadata.readerIndex()
- + brokerEntryMetadataSize);
- } else {
- headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
+ /**
+ * Moves the readerIndex ahead skipping possible BrokerEntryMetadata if it
exists in the header and payload
+ * buffer.
+ * @param headerAndPayload the header and payload buffer
+ * @return the header and payload buffer passed as parameter
+ */
+ public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf
headerAndPayload) {
+ int readerIndex = headerAndPayload.readerIndex();
+ if (headerAndPayload.getShort(readerIndex) ==
magicBrokerEntryMetadata) {
+ headerAndPayload.skipBytes(Short.BYTES);
+ int brokerEntryMetadataSize = headerAndPayload.readInt();
+ headerAndPayload.skipBytes(brokerEntryMetadataSize);
}
- return headerAndPayloadWithBrokerEntryMetadata;
+ return headerAndPayload;
+ }
+
+ /**
+ * Parses the broker entry metadata from the header and payload buffer and
returns a new BrokerEntryMetadata
+ * instance if the broker entry metadata exists in the header and payload
buffer. Null is returned if the
+ * broker entry metadata does not exist in the header and payload buffer.
+ * The readerIndex of the headerAndPayload buffer is advanced.
+ *
+ * @param headerAndPayload the header and payload buffer
+ * @return broker entry metadata or null
+ */
+ public static BrokerEntryMetadata parseBrokerEntryMetadataIfExist(ByteBuf
headerAndPayload) {
+ return parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, null,
false);
+ }
+
+ /**
+ * Parses the broker entry metadata from the header and payload buffer and
returns a new BrokerEntryMetadata
+ * instance if the broker entry metadata exists in the header and payload
buffer. Null is returned if the
+ * broker entry metadata does not exist in the header and payload buffer.
+ * The readerIndex of the headerAndPayload buffer is not advanced.
+ *
+ * @param headerAndPayload the header and payload buffer
+ * @return broker entry metadata or null
+ */
+ public static BrokerEntryMetadata peekBrokerEntryMetadataIfExist(
+ ByteBuf headerAndPayload) {
+ return parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, null,
true);
}
- public static BrokerEntryMetadata parseBrokerEntryMetadataIfExist(
- ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
- int readerIndex =
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
- if (headerAndPayloadWithBrokerEntryMetadata.getShort(readerIndex) ==
magicBrokerEntryMetadata) {
- headerAndPayloadWithBrokerEntryMetadata.skipBytes(2);
- int brokerEntryMetadataSize =
headerAndPayloadWithBrokerEntryMetadata.readInt();
- BrokerEntryMetadata brokerEntryMetadata = new
BrokerEntryMetadata();
-
brokerEntryMetadata.parseFrom(headerAndPayloadWithBrokerEntryMetadata,
brokerEntryMetadataSize);
- return brokerEntryMetadata;
+ /**
+ * Internal method for parsing and peeking broker entry metadata.
+ * @param headerAndPayload header and payload buffer
+ * @param brokerEntryMetadata the broker entry metadata instance to reuse,
null if a new instance should be created
+ * @param peek when true, the readerIndex of the headerAndPayload buffer
is resetted to the original
+ * @return the broker entry metadata instance or null
+ */
+ private static BrokerEntryMetadata parseOrPeekBrokerEntryMetadataIfExist(
+ ByteBuf headerAndPayload, BrokerEntryMetadata brokerEntryMetadata,
boolean peek) {
+ int readerIndex = headerAndPayload.readerIndex();
+ if (headerAndPayload.getShort(readerIndex) ==
magicBrokerEntryMetadata) {
+ headerAndPayload.skipBytes(Short.BYTES);
+ try {
+ int brokerEntryMetadataSize = headerAndPayload.readInt();
+ if (brokerEntryMetadata == null) {
+ brokerEntryMetadata = new BrokerEntryMetadata();
+ }
+ brokerEntryMetadata.parseFrom(headerAndPayload,
brokerEntryMetadataSize);
+ return brokerEntryMetadata;
+ } finally {
+ if (peek) {
+ headerAndPayload.readerIndex(readerIndex);
+ }
+ }
} else {
return null;
}
}
- public static BrokerEntryMetadata peekBrokerEntryMetadataIfExist(
- ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
- final int readerIndex =
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
- BrokerEntryMetadata entryMetadata =
-
parseBrokerEntryMetadataIfExist(headerAndPayloadWithBrokerEntryMetadata);
- headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
- return entryMetadata;
+ /**
+ * Peeks the BrokerEntryMetadata from the given payload and applies the
function to the result.
+ * null will be passed to the function if no BrokerEntryMetadata is found.
+ * The function shouldn't return the BrokerEntryMetadata instance or
reference it after the function completes
+ * since it's a ThreadLocal instance that is reused.
+ *
+ * @param headerAndPayload the header and payload of the message
+ * @param function the function to apply to the BrokerEntryMetadata
+ * @param <T> the return type of the function
+ * @return the result of the function
+ */
+ public static <T> T peekBrokerEntryMetadataToObject(ByteBuf
headerAndPayload,
+
Function<BrokerEntryMetadata, T> function) {
+ BrokerEntryMetadata brokerEntryMetadata =
+ parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload,
BROKER_ENTRY_METADATA.get(), true);
+ return function.apply(brokerEntryMetadata);
+ }
+
+ /**
+ * Peeks the BrokerEntryMetadata from the given payload and applies a
function returning a long value to the result.
+ * null will be passed to the function if no BrokerEntryMetadata is found.
The function shouldn't reference the
+ * BrokerEntryMetadata instance after the function completes since it's a
ThreadLocal instance that is reused.
+ *
+ * @param headerAndPayload the header and payload of the message
+ * @param function the function to apply to the BrokerEntryMetadata
+ * @return the result of the function
+ */
+ public static long peekBrokerEntryMetadataToLong(ByteBuf headerAndPayload,
+
ToLongFunction<BrokerEntryMetadata> function) {
+ BrokerEntryMetadata brokerEntryMetadata =
+ parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload,
BROKER_ENTRY_METADATA.get(), true);
+ return function.applyAsLong(brokerEntryMetadata);
+ }
+
+ /**
+ * Peeks the BrokerEntryMetadata from the given payload and consumes the
value using a function.
+ * null will be passed to the function if no BrokerEntryMetadata is found.
+ * The function shouldn't keep a reference to the BrokerEntryMetadata
instance after the call completes
+ * since it's a ThreadLocal instance that is reused.
+ *
+ * @param headerAndPayload the header and payload of the message
+ * @param function the function to apply to the BrokerEntryMetadata
+ */
+ public static void peekBrokerEntryMetadataAndConsume(ByteBuf
headerAndPayload,
+
Consumer<BrokerEntryMetadata> function) {
+ BrokerEntryMetadata brokerEntryMetadata =
+ parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload,
BROKER_ENTRY_METADATA.get(), true);
+ function.accept(brokerEntryMetadata);
}
public static ByteBuf serializeMetadataAndPayload(ChecksumType
checksumType,
@@ -1958,28 +2054,28 @@ public class Commands {
return ByteBufPair.get(headers, metadataAndPayload);
}
- public static int getNumberOfMessagesInBatch(ByteBuf metadataAndPayload,
String subscription,
- long consumerId) {
- MessageMetadata msgMetadata = peekMessageMetadata(metadataAndPayload,
subscription, consumerId);
- if (msgMetadata == null) {
- return -1;
- } else {
- return msgMetadata.getNumMessagesInBatch();
- }
- }
-
public static MessageMetadata peekMessageMetadata(ByteBuf
metadataAndPayload, String subscription,
long consumerId) {
+ // save the reader index and restore after parsing
+ int readerIdx = metadataAndPayload.readerIndex();
try {
- // save the reader index and restore after parsing
- int readerIdx = metadataAndPayload.readerIndex();
- MessageMetadata metadata =
Commands.parseMessageMetadata(metadataAndPayload);
- metadataAndPayload.readerIndex(readerIdx);
-
+ MessageMetadata metadata =
parseMessageMetadata(metadataAndPayload);
return metadata;
} catch (Throwable t) {
log.error("[{}] [{}] Failed to parse message metadata",
subscription, consumerId, t);
return null;
+ } finally {
+ metadataAndPayload.readerIndex(readerIdx);
+ }
+ }
+
+ public static void peekMessageMetadata(ByteBuf metadataAndPayload,
MessageMetadata msgMetadata) {
+ // save the reader index and restore after parsing
+ int readerIdx = metadataAndPayload.readerIndex();
+ try {
+ parseMessageMetadata(metadataAndPayload, msgMetadata);
+ } finally {
+ metadataAndPayload.readerIndex(readerIdx);
}
}
@@ -1992,26 +2088,28 @@ public class Commands {
*/
public static MessageMetadata peekAndCopyMessageMetadata(
ByteBuf metadataAndPayload, String subscription, long consumerId) {
- final MessageMetadata localMetadata =
peekMessageMetadata(metadataAndPayload, subscription, consumerId);
- if (localMetadata == null) {
+ final MessageMetadata metadata = new MessageMetadata();
+ try {
+ peekMessageMetadata(metadataAndPayload, metadata);
+ } catch (Throwable t) {
+ log.error("[{}] [{}] Failed to parse message metadata",
subscription, consumerId, t);
return null;
}
- final MessageMetadata metadata = new MessageMetadata();
- metadata.copyFrom(localMetadata);
return metadata;
}
public static final byte[] NONE_KEY =
"NONE_KEY".getBytes(StandardCharsets.UTF_8);
public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String
topic, String subscription) {
+ int readerIdx = metadataAndPayload.readerIndex();
try {
- int readerIdx = metadataAndPayload.readerIndex();
MessageMetadata metadata =
parseMessageMetadata(metadataAndPayload);
- metadataAndPayload.readerIndex(readerIdx);
return resolveStickyKey(metadata);
} catch (Throwable t) {
log.error("[{}] [{}] Failed to peek sticky key from the message
metadata", topic, subscription, t);
return NONE_KEY;
+ } finally {
+ metadataAndPayload.readerIndex(readerIdx);
}
}