This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ae5c70c2789 [improve] Add secondary index support to MetadataStore
(#25328)
ae5c70c2789 is described below
commit ae5c70c27899b64884bdeaaba87a2e27c7f9408c
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 16 17:49:32 2026 -0700
[improve] Add secondary index support to MetadataStore (#25328)
---
.../apache/pulsar/metadata/api/MetadataStore.java | 21 +++
.../api/extended/MetadataStoreExtended.java | 20 +++
.../metadata/impl/AbstractMetadataStore.java | 58 ++++++-
.../pulsar/metadata/impl/DualMetadataStore.java | 37 +++++
.../metadata/impl/oxia/OxiaMetadataStore.java | 42 +++++
.../metadata/MetadataStoreSecondaryIndexTest.java | 170 +++++++++++++++++++++
6 files changed, 344 insertions(+), 4 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 8383be4d5b3..38ecf79fc09 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -25,6 +25,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
@@ -266,6 +267,26 @@ public interface MetadataStore extends AutoCloseable {
return getMetadataCache(serde, cacheConfig);
}
+ /**
+ * Find all records matching a secondary index.
+ *
+ * <p>On stores that support secondary indexes natively (e.g. Oxia), this
uses the index
+ * efficiently. On stores that don't (e.g. ZooKeeper), it falls back to
listing all children
+ * under {@code scanPathPrefix}, fetching each record, and applying {@code
fallbackFilter}.
+ *
+ * @param scanPathPrefix path prefix for fallback scan (used by stores
without native index support)
+ * @param indexName the secondary index name
+ * @param secondaryKey the secondary key to look up
+ * @param fallbackFilter predicate to filter results during fallback scan;
ignored by native implementations
+ * @return list of matching {@link GetResult} entries
+ */
+ default CompletableFuture<List<GetResult>> findByIndex(
+ String scanPathPrefix, String indexName, String secondaryKey,
+ Predicate<GetResult> fallbackFilter) {
+ return CompletableFuture.failedFuture(
+ new MetadataStoreException("Secondary index queries not
supported by this store"));
+ }
+
/**
* Returns the default metadata cache config.
*
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
index 2bbd627c2fc..ad5c7a25300 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.metadata.api.extended;
import java.util.EnumSet;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -67,6 +68,25 @@ public interface MetadataStoreExtended extends MetadataStore
{
CompletableFuture<Stat> put(String path, byte[] value, Optional<Long>
expectedVersion,
EnumSet<CreateOption> options);
+ /**
+ * Put a new value for a given key with secondary index associations.
+ *
+ * <p>Secondary indexes are hints: stores that don't support them simply
ignore them
+ * and delegate to the regular {@link #put(String, byte[], Optional,
EnumSet)} method.
+ *
+ * @param path the path of the key
+ * @param value the value to store
+ * @param expectedVersion if present, the version will have to match for
the operation to succeed
+ * @param options a set of {@link CreateOption} to use if the
key-value pair is being created
+ * @param secondaryIndexes secondary indexes to associate with this
record (index name to secondary key)
+ * @throws BadVersionException if the expected version doesn't match the
actual version
+ * @return a future to track the async request
+ */
+ default CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion,
+ EnumSet<CreateOption> options, Map<String, String>
secondaryIndexes) {
+ return put(path, value, expectedVersion, options);
+ }
+
/**
* Register a session listener that will get notified of changes in status
of the current session.
*
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 77b990c2be1..50e53038f3e 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -37,6 +37,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -49,6 +50,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -506,9 +508,50 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected abstract CompletableFuture<Stat> storePut(String path, byte[]
data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption>
options);
+ protected CompletableFuture<Stat> storePut(String path, byte[] data,
Optional<Long> optExpectedVersion,
+ EnumSet<CreateOption> options,
+ Map<String, String>
secondaryIndexes) {
+ return storePut(path, data, optExpectedVersion, options);
+ }
+
+ @Override
+ public CompletableFuture<List<GetResult>> findByIndex(
+ String scanPathPrefix, String indexName, String secondaryKey,
+ Predicate<GetResult> fallbackFilter) {
+ if (isClosed()) {
+ return alreadyClosedFailedFuture();
+ }
+ return storeFindByIndex(scanPathPrefix, indexName, secondaryKey,
fallbackFilter);
+ }
+
+ protected CompletableFuture<List<GetResult>> storeFindByIndex(
+ String scanPathPrefix, String indexName, String secondaryKey,
+ Predicate<GetResult> fallbackFilter) {
+ // Default fallback: full scan under scanPathPrefix, applying
fallbackFilter to each result.
+ return getChildrenFromStore(scanPathPrefix)
+ .thenCompose(children -> {
+ List<CompletableFuture<Optional<GetResult>>> futures =
children.stream()
+ .map(child -> storeGet(scanPathPrefix + "/" +
child))
+ .toList();
+ return FutureUtil.waitForAll(futures)
+ .thenApply(__ -> futures.stream()
+ .map(CompletableFuture::join)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .filter(fallbackFilter)
+ .toList());
+ });
+ }
+
@Override
public final CompletableFuture<Stat> put(String path, byte[] data,
Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
+ return put(path, data, optExpectedVersion, options,
Collections.emptyMap());
+ }
+
+ @Override
+ public final CompletableFuture<Stat> put(String path, byte[] data,
Optional<Long> optExpectedVersion,
+ EnumSet<CreateOption> options, Map<String, String>
secondaryIndexes) {
if (isClosed()) {
return alreadyClosedFailedFuture();
}
@@ -525,7 +568,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
Instant.now().toEpochMilli(),
getMetadataEventSynchronizer().get().getClusterName(),
NotificationType.Modified);
return getMetadataEventSynchronizer().get().notify(event)
- .thenCompose(__ -> putInternal(path, data,
optExpectedVersion, options))
+ .thenCompose(__ -> putInternal(path, data,
optExpectedVersion, options, secondaryIndexes))
.whenComplete((v, t) -> {
if (t != null) {
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
@@ -535,7 +578,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
});
} else {
- return putInternal(path, data, optExpectedVersion, options)
+ return putInternal(path, data, optExpectedVersion, options,
secondaryIndexes)
.whenComplete((v, t) -> {
if (t != null) {
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
@@ -547,11 +590,18 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
}
+
public final CompletableFuture<Stat> putInternal(String path, byte[] data,
Optional<Long> optExpectedVersion,
Set<CreateOption> options) {
+ return putInternal(path, data, optExpectedVersion, options,
Collections.emptyMap());
+ }
+
+ public final CompletableFuture<Stat> putInternal(String path, byte[] data,
Optional<Long> optExpectedVersion,
+ Set<CreateOption> options, Map<String, String> secondaryIndexes) {
+ var enumOptions =
+ (options != null && !options.isEmpty()) ?
EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class);
// Ensure caches are invalidated before the operation is confirmed
- return storePut(path, data, optExpectedVersion,
- (options != null && !options.isEmpty()) ?
EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class))
+ return storePut(path, data, optExpectedVersion, enumOptions,
secondaryIndexes)
.thenApply(stat -> {
NotificationType type = stat.isFirstVersion() ?
NotificationType.Created
: NotificationType.Modified;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 5360d480868..24165007dbf 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -23,6 +23,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -32,6 +33,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.migration.MigrationPhase;
@@ -294,6 +296,41 @@ public class DualMetadataStore implements
MetadataStoreExtended {
return put(path, value, expectedVersion,
EnumSet.noneOf(CreateOption.class));
}
+ @Override
+ public CompletableFuture<List<GetResult>> findByIndex(
+ String scanPathPrefix, String indexName, String secondaryKey,
+ Predicate<GetResult> fallbackFilter) {
+ return switch (migrationState.getPhase()) {
+ case NOT_STARTED, PREPARATION, COPYING, FAILED ->
+ sourceStore.findByIndex(scanPathPrefix, indexName,
secondaryKey, fallbackFilter);
+ case COMPLETED ->
+ targetStore.findByIndex(scanPathPrefix, indexName,
secondaryKey, fallbackFilter);
+ };
+ }
+
+ @Override
+ public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion,
+ EnumSet<CreateOption> options,
Map<String, String> secondaryIndexes) {
+ switch (migrationState.getPhase()) {
+ case NOT_STARTED, FAILED -> {
+ if (options.contains(CreateOption.Ephemeral)) {
+ localEphemeralPaths.add(path);
+ }
+ pendingSourceWrites.incrementAndGet();
+ var future = sourceStore.put(path, value, expectedVersion,
options, secondaryIndexes);
+ future.whenComplete((result, e) ->
pendingSourceWrites.decrementAndGet());
+ return future;
+ }
+ case PREPARATION, COPYING -> {
+ return
CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
+ }
+ case COMPLETED -> {
+ return targetStore.put(path, value, expectedVersion, options,
secondaryIndexes);
+ }
+ default -> throw new IllegalStateException("Invalid phase " +
migrationState.getPhase());
+ }
+ }
+
@Override
public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion,
EnumSet<CreateOption> options) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 407a927bda4..1bbec3be80c 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -27,18 +27,21 @@ import io.oxia.client.api.Version;
import io.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.oxia.client.api.exceptions.UnexpectedVersionIdException;
import io.oxia.client.api.options.DeleteOption;
+import io.oxia.client.api.options.ListOption;
import io.oxia.client.api.options.PutOption;
import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.function.Predicate;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -203,6 +206,39 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
@Override
protected CompletableFuture<Stat> storePut(
String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
+ return doStorePut(path, data, optExpectedVersion, options,
Collections.emptyMap());
+ }
+
+ @Override
+ protected CompletableFuture<Stat> storePut(
+ String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options,
+ Map<String, String> secondaryIndexes) {
+ return doStorePut(path, data, optExpectedVersion, options,
secondaryIndexes);
+ }
+
+ @Override
+ protected CompletableFuture<List<GetResult>> storeFindByIndex(
+ String scanPathPrefix, String indexName, String secondaryKey,
+ Predicate<GetResult> fallbackFilter) {
+ String scopedKey = scanPathPrefix + "/" + secondaryKey;
+ return client.list(scopedKey, scopedKey + "~",
Set.of(ListOption.UseIndex(indexName)))
+ .thenCompose(primaryKeys -> {
+ List<CompletableFuture<Optional<GetResult>>> futures =
primaryKeys.stream()
+ .map(this::storeGet)
+ .toList();
+ return FutureUtil.waitForAll(futures)
+ .thenApply(__ -> futures.stream()
+ .map(CompletableFuture::join)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .toList());
+ })
+ .exceptionallyCompose(this::convertException);
+ }
+
+ private CompletableFuture<Stat> doStorePut(
+ String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options,
+ Map<String, String> secondaryIndexes) {
CompletableFuture<Void> parentsCreated = createParents(path);
return parentsCreated.thenCompose(
__ -> {
@@ -242,6 +278,12 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
if (options.contains(CreateOption.Ephemeral)) {
putOptions.add(PutOption.AsEphemeralRecord);
}
+ var parentPath = parent(path);
+ var parentPrefix = parentPath == null ? "" : parentPath;
+ secondaryIndexes.forEach((indexName, secondaryKey) ->
+ putOptions.add(PutOption.SecondaryIndex(indexName,
+ parentPrefix + "/" + secondaryKey)));
+
return actualPath
.thenCompose(
aPath ->
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
new file mode 100644
index 00000000000..8dd00543a8f
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+public class MetadataStoreSecondaryIndexTest extends BaseMetadataStoreTest {
+
+ @Test(dataProvider = "impl")
+ public void putWithSecondaryIndexesPreservesValue(String provider,
Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ String path = newKey();
+ byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+
+ store.put(path, value, Optional.of(-1L),
EnumSet.noneOf(CreateOption.class),
+ Map.of("by-owner", "broker-1")).join();
+
+ var result = store.get(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(result.get().getValue(), value);
+ }
+
+ @Test(dataProvider = "impl")
+ public void putWithMultipleSecondaryIndexes(String provider,
Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ String path = newKey();
+ byte[] value = "multi-index-value".getBytes(StandardCharsets.UTF_8);
+
+ store.put(path, value, Optional.of(-1L),
EnumSet.noneOf(CreateOption.class),
+ Map.of("by-owner", "broker-1", "by-namespace",
"tenant/ns1")).join();
+
+ var result = store.get(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(result.get().getValue(), value);
+ }
+
+ @Test(dataProvider = "impl")
+ public void putWithEmptySecondaryIndexes(String provider, Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ String path = newKey();
+ byte[] value = "no-index-value".getBytes(StandardCharsets.UTF_8);
+
+ store.put(path, value, Optional.of(-1L),
EnumSet.noneOf(CreateOption.class), Map.of()).join();
+
+ var result = store.get(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(result.get().getValue(), value);
+ }
+
+ @Test(dataProvider = "impl")
+ public void findByIndexFallbackReturnsFilteredResults(String provider,
Supplier<String> urlSupplier)
+ throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ String basePath = newKey();
+
+ store.put(basePath + "/topic-1",
"owned-by-broker-1".getBytes(StandardCharsets.UTF_8),
+ Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+ Map.of("by-owner", "broker-1")).join();
+ store.put(basePath + "/topic-2",
"owned-by-broker-2".getBytes(StandardCharsets.UTF_8),
+ Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+ Map.of("by-owner", "broker-2")).join();
+ store.put(basePath + "/topic-3",
"owned-by-broker-1".getBytes(StandardCharsets.UTF_8),
+ Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+ Map.of("by-owner", "broker-1")).join();
+
+ List<GetResult> results = store.findByIndex(basePath, "by-owner",
"broker-1",
+ r -> new String(r.getValue(),
StandardCharsets.UTF_8).contains("broker-1")).join();
+
+ assertEquals(results.size(), 2);
+ Set<String> values = results.stream()
+ .map(r -> new String(r.getValue(), StandardCharsets.UTF_8))
+ .collect(Collectors.toSet());
+ assertTrue(values.contains("owned-by-broker-1"));
+ }
+
+ @Test(dataProvider = "impl")
+ public void findByIndexFallbackWithNoMatches(String provider,
Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ String basePath = newKey();
+
+ store.put(basePath + "/topic-1",
"value-1".getBytes(StandardCharsets.UTF_8),
+ Optional.of(-1L), EnumSet.noneOf(CreateOption.class)).join();
+
+ List<GetResult> results = store.findByIndex(basePath, "by-owner",
"nonexistent",
+ r -> false).join();
+
+ assertEquals(results.size(), 0);
+ }
+
+ @Test(dataProvider = "impl")
+ public void findByIndexFallbackWithEmptyPrefix(String provider,
Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ String basePath = newKey();
+
+ List<GetResult> results = store.findByIndex(basePath, "by-owner",
"broker-1",
+ r -> true).join();
+
+ assertEquals(results.size(), 0);
+ }
+
+ @Test(dataProvider = "impl")
+ public void updateWithSecondaryIndexes(String provider, Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ String path = newKey();
+
+ var stat = store.put(path, "v1".getBytes(StandardCharsets.UTF_8),
+ Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+ Map.of("idx", "key-1")).join();
+
+ store.put(path, "v2".getBytes(StandardCharsets.UTF_8),
+ Optional.of(stat.getVersion()),
EnumSet.noneOf(CreateOption.class),
+ Map.of("idx", "key-2")).join();
+
+ var result = store.get(path).join();
+ assertTrue(result.isPresent());
+ assertEquals(result.get().getValue(),
"v2".getBytes(StandardCharsets.UTF_8));
+ }
+}