This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ae5c70c2789 [improve] Add secondary index support to MetadataStore 
(#25328)
ae5c70c2789 is described below

commit ae5c70c27899b64884bdeaaba87a2e27c7f9408c
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 16 17:49:32 2026 -0700

    [improve] Add secondary index support to MetadataStore (#25328)
---
 .../apache/pulsar/metadata/api/MetadataStore.java  |  21 +++
 .../api/extended/MetadataStoreExtended.java        |  20 +++
 .../metadata/impl/AbstractMetadataStore.java       |  58 ++++++-
 .../pulsar/metadata/impl/DualMetadataStore.java    |  37 +++++
 .../metadata/impl/oxia/OxiaMetadataStore.java      |  42 +++++
 .../metadata/MetadataStoreSecondaryIndexTest.java  | 170 +++++++++++++++++++++
 6 files changed, 344 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 8383be4d5b3..38ecf79fc09 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.slf4j.Logger;
@@ -266,6 +267,26 @@ public interface MetadataStore extends AutoCloseable {
         return getMetadataCache(serde, cacheConfig);
     }
 
+    /**
+     * Find all records matching a secondary index.
+     *
+     * <p>On stores that support secondary indexes natively (e.g. Oxia), this 
uses the index
+     * efficiently. On stores that don't (e.g. ZooKeeper), it falls back to 
listing all children
+     * under {@code scanPathPrefix}, fetching each record, and applying {@code 
fallbackFilter}.
+     *
+     * @param scanPathPrefix path prefix for fallback scan (used by stores 
without native index support)
+     * @param indexName      the secondary index name
+     * @param secondaryKey   the secondary key to look up
+     * @param fallbackFilter predicate to filter results during fallback scan; 
ignored by native implementations
+     * @return list of matching {@link GetResult} entries
+     */
+    default CompletableFuture<List<GetResult>> findByIndex(
+            String scanPathPrefix, String indexName, String secondaryKey,
+            Predicate<GetResult> fallbackFilter) {
+        return CompletableFuture.failedFuture(
+                new MetadataStoreException("Secondary index queries not 
supported by this store"));
+    }
+
     /**
      * Returns the default metadata cache config.
      *
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
index 2bbd627c2fc..ad5c7a25300 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.metadata.api.extended;
 
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
@@ -67,6 +68,25 @@ public interface MetadataStoreExtended extends MetadataStore 
{
     CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> 
expectedVersion,
             EnumSet<CreateOption> options);
 
+    /**
+     * Put a new value for a given key with secondary index associations.
+     *
+     * <p>Secondary indexes are hints: stores that don't support them simply 
ignore them
+     * and delegate to the regular {@link #put(String, byte[], Optional, 
EnumSet)} method.
+     *
+     * @param path              the path of the key
+     * @param value             the value to store
+     * @param expectedVersion   if present, the version will have to match for 
the operation to succeed
+     * @param options           a set of {@link CreateOption} to use if the 
key-value pair is being created
+     * @param secondaryIndexes  secondary indexes to associate with this 
record (index name to secondary key)
+     * @throws BadVersionException if the expected version doesn't match the 
actual version
+     * @return a future to track the async request
+     */
+    default CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion,
+            EnumSet<CreateOption> options, Map<String, String> 
secondaryIndexes) {
+        return put(path, value, expectedVersion, options);
+    }
+
     /**
      * Register a session listener that will get notified of changes in status 
of the current session.
      *
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 77b990c2be1..50e53038f3e 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -37,6 +37,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -49,6 +50,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -506,9 +508,50 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     protected abstract CompletableFuture<Stat> storePut(String path, byte[] 
data, Optional<Long> optExpectedVersion,
                                                         EnumSet<CreateOption> 
options);
 
+    protected CompletableFuture<Stat> storePut(String path, byte[] data, 
Optional<Long> optExpectedVersion,
+                                               EnumSet<CreateOption> options,
+                                               Map<String, String> 
secondaryIndexes) {
+        return storePut(path, data, optExpectedVersion, options);
+    }
+
+    @Override
+    public CompletableFuture<List<GetResult>> findByIndex(
+            String scanPathPrefix, String indexName, String secondaryKey,
+            Predicate<GetResult> fallbackFilter) {
+        if (isClosed()) {
+            return alreadyClosedFailedFuture();
+        }
+        return storeFindByIndex(scanPathPrefix, indexName, secondaryKey, 
fallbackFilter);
+    }
+
+    protected CompletableFuture<List<GetResult>> storeFindByIndex(
+            String scanPathPrefix, String indexName, String secondaryKey,
+            Predicate<GetResult> fallbackFilter) {
+        // Default fallback: full scan under scanPathPrefix, applying 
fallbackFilter to each result.
+        return getChildrenFromStore(scanPathPrefix)
+                .thenCompose(children -> {
+                    List<CompletableFuture<Optional<GetResult>>> futures = 
children.stream()
+                            .map(child -> storeGet(scanPathPrefix + "/" + 
child))
+                            .toList();
+                    return FutureUtil.waitForAll(futures)
+                            .thenApply(__ -> futures.stream()
+                                    .map(CompletableFuture::join)
+                                    .filter(Optional::isPresent)
+                                    .map(Optional::get)
+                                    .filter(fallbackFilter)
+                                    .toList());
+                });
+    }
+
     @Override
     public final CompletableFuture<Stat> put(String path, byte[] data, 
Optional<Long> optExpectedVersion,
             EnumSet<CreateOption> options) {
+        return put(path, data, optExpectedVersion, options, 
Collections.emptyMap());
+    }
+
+    @Override
+    public final CompletableFuture<Stat> put(String path, byte[] data, 
Optional<Long> optExpectedVersion,
+            EnumSet<CreateOption> options, Map<String, String> 
secondaryIndexes) {
         if (isClosed()) {
             return alreadyClosedFailedFuture();
         }
@@ -525,7 +568,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                     Instant.now().toEpochMilli(), 
getMetadataEventSynchronizer().get().getClusterName(),
                     NotificationType.Modified);
             return getMetadataEventSynchronizer().get().notify(event)
-                    .thenCompose(__ -> putInternal(path, data, 
optExpectedVersion, options))
+                    .thenCompose(__ -> putInternal(path, data, 
optExpectedVersion, options, secondaryIndexes))
                     .whenComplete((v, t) -> {
                         if (t != null) {
                             
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
@@ -535,7 +578,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                         }
                     });
         } else {
-            return putInternal(path, data, optExpectedVersion, options)
+            return putInternal(path, data, optExpectedVersion, options, 
secondaryIndexes)
                     .whenComplete((v, t) -> {
                         if (t != null) {
                             
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
@@ -547,11 +590,18 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         }
 
     }
+
     public final CompletableFuture<Stat> putInternal(String path, byte[] data, 
Optional<Long> optExpectedVersion,
             Set<CreateOption> options) {
+        return putInternal(path, data, optExpectedVersion, options, 
Collections.emptyMap());
+    }
+
+    public final CompletableFuture<Stat> putInternal(String path, byte[] data, 
Optional<Long> optExpectedVersion,
+            Set<CreateOption> options, Map<String, String> secondaryIndexes) {
+        var enumOptions =
+                (options != null && !options.isEmpty()) ? 
EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class);
         // Ensure caches are invalidated before the operation is confirmed
-        return storePut(path, data, optExpectedVersion,
-                (options != null && !options.isEmpty()) ? 
EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class))
+        return storePut(path, data, optExpectedVersion, enumOptions, 
secondaryIndexes)
                 .thenApply(stat -> {
                     NotificationType type = stat.isFirstVersion() ? 
NotificationType.Created
                             : NotificationType.Modified;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 5360d480868..24165007dbf 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -23,6 +23,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.time.Duration;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -32,6 +33,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.migration.MigrationPhase;
@@ -294,6 +296,41 @@ public class DualMetadataStore implements 
MetadataStoreExtended {
         return put(path, value, expectedVersion, 
EnumSet.noneOf(CreateOption.class));
     }
 
+    @Override
+    public CompletableFuture<List<GetResult>> findByIndex(
+            String scanPathPrefix, String indexName, String secondaryKey,
+            Predicate<GetResult> fallbackFilter) {
+        return switch (migrationState.getPhase()) {
+            case NOT_STARTED, PREPARATION, COPYING, FAILED ->
+                    sourceStore.findByIndex(scanPathPrefix, indexName, 
secondaryKey, fallbackFilter);
+            case COMPLETED ->
+                    targetStore.findByIndex(scanPathPrefix, indexName, 
secondaryKey, fallbackFilter);
+        };
+    }
+
+    @Override
+    public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion,
+                                       EnumSet<CreateOption> options, 
Map<String, String> secondaryIndexes) {
+        switch (migrationState.getPhase()) {
+            case NOT_STARTED, FAILED -> {
+                if (options.contains(CreateOption.Ephemeral)) {
+                    localEphemeralPaths.add(path);
+                }
+                pendingSourceWrites.incrementAndGet();
+                var future = sourceStore.put(path, value, expectedVersion, 
options, secondaryIndexes);
+                future.whenComplete((result, e) -> 
pendingSourceWrites.decrementAndGet());
+                return future;
+            }
+            case PREPARATION, COPYING -> {
+                return 
CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
+            }
+            case COMPLETED -> {
+                return targetStore.put(path, value, expectedVersion, options, 
secondaryIndexes);
+            }
+            default -> throw new IllegalStateException("Invalid phase " + 
migrationState.getPhase());
+        }
+    }
+
     @Override
     public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion,
                                        EnumSet<CreateOption> options) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 407a927bda4..1bbec3be80c 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -27,18 +27,21 @@ import io.oxia.client.api.Version;
 import io.oxia.client.api.exceptions.KeyAlreadyExistsException;
 import io.oxia.client.api.exceptions.UnexpectedVersionIdException;
 import io.oxia.client.api.options.DeleteOption;
+import io.oxia.client.api.options.ListOption;
 import io.oxia.client.api.options.PutOption;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.function.Predicate;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -203,6 +206,39 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
     @Override
     protected CompletableFuture<Stat> storePut(
             String path, byte[] data, Optional<Long> optExpectedVersion, 
EnumSet<CreateOption> options) {
+        return doStorePut(path, data, optExpectedVersion, options, 
Collections.emptyMap());
+    }
+
+    @Override
+    protected CompletableFuture<Stat> storePut(
+            String path, byte[] data, Optional<Long> optExpectedVersion, 
EnumSet<CreateOption> options,
+            Map<String, String> secondaryIndexes) {
+        return doStorePut(path, data, optExpectedVersion, options, 
secondaryIndexes);
+    }
+
+    @Override
+    protected CompletableFuture<List<GetResult>> storeFindByIndex(
+            String scanPathPrefix, String indexName, String secondaryKey,
+            Predicate<GetResult> fallbackFilter) {
+        String scopedKey = scanPathPrefix + "/" + secondaryKey;
+        return client.list(scopedKey, scopedKey + "~", 
Set.of(ListOption.UseIndex(indexName)))
+                .thenCompose(primaryKeys -> {
+                    List<CompletableFuture<Optional<GetResult>>> futures = 
primaryKeys.stream()
+                            .map(this::storeGet)
+                            .toList();
+                    return FutureUtil.waitForAll(futures)
+                            .thenApply(__ -> futures.stream()
+                                    .map(CompletableFuture::join)
+                                    .filter(Optional::isPresent)
+                                    .map(Optional::get)
+                                    .toList());
+                })
+                .exceptionallyCompose(this::convertException);
+    }
+
+    private CompletableFuture<Stat> doStorePut(
+            String path, byte[] data, Optional<Long> optExpectedVersion, 
EnumSet<CreateOption> options,
+            Map<String, String> secondaryIndexes) {
         CompletableFuture<Void> parentsCreated = createParents(path);
         return parentsCreated.thenCompose(
                 __ -> {
@@ -242,6 +278,12 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
                     if (options.contains(CreateOption.Ephemeral)) {
                         putOptions.add(PutOption.AsEphemeralRecord);
                     }
+                    var parentPath = parent(path);
+                    var parentPrefix = parentPath == null ? "" : parentPath;
+                    secondaryIndexes.forEach((indexName, secondaryKey) ->
+                            putOptions.add(PutOption.SecondaryIndex(indexName,
+                                    parentPrefix + "/" + secondaryKey)));
+
                     return actualPath
                             .thenCompose(
                                     aPath ->
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
new file mode 100644
index 00000000000..8dd00543a8f
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+public class MetadataStoreSecondaryIndexTest extends BaseMetadataStoreTest {
+
+    @Test(dataProvider = "impl")
+    public void putWithSecondaryIndexesPreservesValue(String provider, 
Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        String path = newKey();
+        byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+
+        store.put(path, value, Optional.of(-1L), 
EnumSet.noneOf(CreateOption.class),
+                Map.of("by-owner", "broker-1")).join();
+
+        var result = store.get(path).join();
+        assertTrue(result.isPresent());
+        assertEquals(result.get().getValue(), value);
+    }
+
+    @Test(dataProvider = "impl")
+    public void putWithMultipleSecondaryIndexes(String provider, 
Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        String path = newKey();
+        byte[] value = "multi-index-value".getBytes(StandardCharsets.UTF_8);
+
+        store.put(path, value, Optional.of(-1L), 
EnumSet.noneOf(CreateOption.class),
+                Map.of("by-owner", "broker-1", "by-namespace", 
"tenant/ns1")).join();
+
+        var result = store.get(path).join();
+        assertTrue(result.isPresent());
+        assertEquals(result.get().getValue(), value);
+    }
+
+    @Test(dataProvider = "impl")
+    public void putWithEmptySecondaryIndexes(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        String path = newKey();
+        byte[] value = "no-index-value".getBytes(StandardCharsets.UTF_8);
+
+        store.put(path, value, Optional.of(-1L), 
EnumSet.noneOf(CreateOption.class), Map.of()).join();
+
+        var result = store.get(path).join();
+        assertTrue(result.isPresent());
+        assertEquals(result.get().getValue(), value);
+    }
+
+    @Test(dataProvider = "impl")
+    public void findByIndexFallbackReturnsFilteredResults(String provider, 
Supplier<String> urlSupplier)
+            throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        String basePath = newKey();
+
+        store.put(basePath + "/topic-1", 
"owned-by-broker-1".getBytes(StandardCharsets.UTF_8),
+                Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+                Map.of("by-owner", "broker-1")).join();
+        store.put(basePath + "/topic-2", 
"owned-by-broker-2".getBytes(StandardCharsets.UTF_8),
+                Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+                Map.of("by-owner", "broker-2")).join();
+        store.put(basePath + "/topic-3", 
"owned-by-broker-1".getBytes(StandardCharsets.UTF_8),
+                Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+                Map.of("by-owner", "broker-1")).join();
+
+        List<GetResult> results = store.findByIndex(basePath, "by-owner", 
"broker-1",
+                r -> new String(r.getValue(), 
StandardCharsets.UTF_8).contains("broker-1")).join();
+
+        assertEquals(results.size(), 2);
+        Set<String> values = results.stream()
+                .map(r -> new String(r.getValue(), StandardCharsets.UTF_8))
+                .collect(Collectors.toSet());
+        assertTrue(values.contains("owned-by-broker-1"));
+    }
+
+    @Test(dataProvider = "impl")
+    public void findByIndexFallbackWithNoMatches(String provider, 
Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        String basePath = newKey();
+
+        store.put(basePath + "/topic-1", 
"value-1".getBytes(StandardCharsets.UTF_8),
+                Optional.of(-1L), EnumSet.noneOf(CreateOption.class)).join();
+
+        List<GetResult> results = store.findByIndex(basePath, "by-owner", 
"nonexistent",
+                r -> false).join();
+
+        assertEquals(results.size(), 0);
+    }
+
+    @Test(dataProvider = "impl")
+    public void findByIndexFallbackWithEmptyPrefix(String provider, 
Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        String basePath = newKey();
+
+        List<GetResult> results = store.findByIndex(basePath, "by-owner", 
"broker-1",
+                r -> true).join();
+
+        assertEquals(results.size(), 0);
+    }
+
+    @Test(dataProvider = "impl")
+    public void updateWithSecondaryIndexes(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        String path = newKey();
+
+        var stat = store.put(path, "v1".getBytes(StandardCharsets.UTF_8),
+                Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+                Map.of("idx", "key-1")).join();
+
+        store.put(path, "v2".getBytes(StandardCharsets.UTF_8),
+                Optional.of(stat.getVersion()), 
EnumSet.noneOf(CreateOption.class),
+                Map.of("idx", "key-2")).join();
+
+        var result = store.get(path).join();
+        assertTrue(result.isPresent());
+        assertEquals(result.get().getValue(), 
"v2".getBytes(StandardCharsets.UTF_8));
+    }
+}

Reply via email to