[ https://issues.apache.org/jira/browse/IGNITE-19700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Denis Chudov updated IGNITE-19700: ---------------------------------- Description: Motivation: Prerequisites: RocksDBKeyValueStorage filled with 1000 random keys and values, 5000 random tombstones, 100 values with prefix "tables", 5000 tombstones with prefix "tables". Load profile: * Thread1: performs storage.invoke with the same key and value of size 500k bytes, once per 100 ms. * Thread2: performs storage.range for prefix "tables" once per 200 ms, collects all entries from cursor. * Thread3: performs storage.get with random key once per 3 ms. Each operation performed by Thread2 mostly takes 20-50 ms, but sometimes this time spikes up to hundreds of milliseconds (or even seconds): and this lasts for some time (I observed up to half of a minute), after that the time returns to normal values: {code:java} 2023-06-09 17:09:05:971 +0300 [INFO][Thread-5][RocksDBLoadTest] time 31, size 100 2023-06-09 17:09:06:223 +0300 [INFO][Thread-5][RocksDBLoadTest] time 50, size 100 2023-06-09 17:09:06:471 +0300 [INFO][Thread-5][RocksDBLoadTest] time 47, size 100 2023-06-09 17:09:06:715 +0300 [INFO][Thread-5][RocksDBLoadTest] time 44, size 100 2023-06-09 17:09:07:483 +0300 [INFO][Thread-5][RocksDBLoadTest] time 566, size 100 2023-06-09 17:09:08:228 +0300 [INFO][Thread-5][RocksDBLoadTest] time 543, size 100 2023-06-09 17:09:09:000 +0300 [INFO][Thread-5][RocksDBLoadTest] time 571, size 100 2023-06-09 17:09:09:774 +0300 [INFO][Thread-5][RocksDBLoadTest] time 572, size 100 2023-06-09 17:09:10:570 +0300 [INFO][Thread-5][RocksDBLoadTest] time 596, size 100 2023-06-09 17:09:11:323 +0300 [INFO][Thread-5][RocksDBLoadTest] time 552, size 100 2023-06-09 17:09:12:103 +0300 [INFO][Thread-5][RocksDBLoadTest] time 579, size 100 2023-06-09 17:09:12:861 +0300 [INFO][Thread-5][RocksDBLoadTest] time 556, size 100{code} On teamcity it was even over 6 seconds: [https://ci.ignite.apache.org/buildConfiguration/ApacheIgnite3xGradle_Test_RunUnitTests/7283421?buildTab=log&focusLine=46540&expandAll=true&logFilter=debug&logView=flowAware] Reproducer: {code:java} package org.apache.ignite.internal.metastorage; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Conditions.or; import static org.apache.ignite.internal.metastorage.dsl.Conditions.value; import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE; import java.io.ObjectStreamException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.dsl.CompoundCondition; import org.apache.ignite.internal.metastorage.dsl.ConditionType; import org.apache.ignite.internal.metastorage.dsl.SimpleCondition; import org.apache.ignite.internal.metastorage.server.AndCondition; import org.apache.ignite.internal.metastorage.server.Condition; import org.apache.ignite.internal.metastorage.server.ExistenceCondition; import org.apache.ignite.internal.metastorage.server.OrCondition; import org.apache.ignite.internal.metastorage.server.RevisionCondition; import org.apache.ignite.internal.metastorage.server.TombstoneCondition; import org.apache.ignite.internal.metastorage.server.ValueCondition; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.lang.ByteArray; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(WorkDirectoryExtension.class) public class RocksDBLoadTest { private static final IgniteLogger LOG = Loggers.forClass(RocksDBLoadTest.class); private byte[] randomBytes() { return UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); } private byte[] randomBytes(String prefix) { return (prefix + UUID.randomUUID()).getBytes(StandardCharsets.UTF_8); } /** * Increments the last character of the given string. */ private static String incrementLastChar(String str) { char lastChar = str.charAt(str.length() - 1); return str.substring(0, str.length() - 1) + (char) (lastChar + 1); } @Test public void test(@WorkDirectory Path path) throws InterruptedException { System.out.println("start"); HybridClock clock = new HybridClockImpl(); RocksDbKeyValueStorage storage = new RocksDbKeyValueStorage("asd", path.resolve("rocksdbtest")); storage.start(); for (int i = 0; i < 1000; i++) { storage.put(randomBytes(), randomBytes(), clock.now()); } for (int i = 0; i < 5000; i++) { storage.put(randomBytes(), TOMBSTONE, clock.now()); } for (int i = 0; i < 100; i++) { storage.put(randomBytes("tables"), randomBytes(), clock.now()); } for (int i = 0; i < 5000; i++) { storage.put(randomBytes("tables"), TOMBSTONE, clock.now()); } ByteArray leaseKey = ByteArray.fromString("placementdriver.leases"); AtomicBoolean leasesStopped = new AtomicBoolean(); AtomicBoolean rangeStopped = new AtomicBoolean(); Thread leases = new Thread(() -> { byte[] leaseRaw = new byte[500_000]; byte a = 0; while (!leasesStopped.get()) { byte[] renewedLease = new byte[500_000]; renewedLease[0] = ++a; storage.invoke( toCondition(or(notExists(leaseKey), value(leaseKey).eq(leaseRaw))), List.of(put(leaseKey, renewedLease)), List.of(noop()), clock.now() ); leaseRaw = renewedLease; try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); leases.start(); Thread range = new Thread(() -> { while (!rangeStopped.get()) { long start = System.currentTimeMillis(); Cursor<Entry> cursor = storage.range("tables".getBytes(StandardCharsets.UTF_8), incrementLastChar("tables").getBytes(StandardCharsets.UTF_8)); List<Object> list = new ArrayList<>(); for(Entry e : cursor) { if (!e.tombstone()) { list.add(e.value()); } } LOG.info("time " + (System.currentTimeMillis() - start) + ", size " + list.size()); cursor.close(); try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); range.start(); for (int i = 0; i < 180_000; i++) { storage.get(randomBytes()); Thread.sleep(3); } leasesStopped.set(true); rangeStopped.set(true); leases.join(); range.join(); } private static Condition toCondition(org.apache.ignite.internal.metastorage.dsl.Condition condition) { if (condition instanceof SimpleCondition.ValueCondition) { var valueCondition = (SimpleCondition.ValueCondition) condition; return new ValueCondition( toValueConditionType(valueCondition.type()), valueCondition.key(), valueCondition.value() ); } else if (condition instanceof SimpleCondition.RevisionCondition) { var revisionCondition = (SimpleCondition.RevisionCondition) condition; return new RevisionCondition( toRevisionConditionType(revisionCondition.type()), revisionCondition.key(), revisionCondition.revision() ); } else if (condition instanceof SimpleCondition) { var simpleCondition = (SimpleCondition) condition; switch (simpleCondition.type()) { case KEY_EXISTS: return new ExistenceCondition(ExistenceCondition.Type.EXISTS, simpleCondition.key()); case KEY_NOT_EXISTS: return new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, simpleCondition.key()); case TOMBSTONE: return new TombstoneCondition(simpleCondition.key()); default: throw new IllegalArgumentException("Unexpected simple condition type " + simpleCondition.type()); } } else if (condition instanceof CompoundCondition) { CompoundCondition compoundCondition = (CompoundCondition) condition; Condition leftCondition = toCondition(compoundCondition.leftCondition()); Condition rightCondition = toCondition(compoundCondition.rightCondition()); switch (compoundCondition.type()) { case AND: return new AndCondition(leftCondition, rightCondition); case OR: return new OrCondition(leftCondition, rightCondition); default: throw new IllegalArgumentException("Unexpected compound condition type " + compoundCondition.type()); } } else { throw new IllegalArgumentException("Unknown condition " + condition); } } private static ValueCondition.Type toValueConditionType(ConditionType type) { switch (type) { case VAL_EQUAL: return ValueCondition.Type.EQUAL; case VAL_NOT_EQUAL: return ValueCondition.Type.NOT_EQUAL; case VAL_GREATER: return ValueCondition.Type.GREATER; case VAL_GREATER_OR_EQUAL: return ValueCondition.Type.GREATER_OR_EQUAL; case VAL_LESS: return ValueCondition.Type.LESS; case VAL_LESS_OR_EQUAL: return ValueCondition.Type.LESS_OR_EQUAL; default: throw new IllegalArgumentException("Unexpected value condition type " + type); } } private static RevisionCondition.Type toRevisionConditionType(ConditionType type) { switch (type) { case REV_EQUAL: return RevisionCondition.Type.EQUAL; case REV_NOT_EQUAL: return RevisionCondition.Type.NOT_EQUAL; case REV_GREATER: return RevisionCondition.Type.GREATER; case REV_GREATER_OR_EQUAL: return RevisionCondition.Type.GREATER_OR_EQUAL; case REV_LESS: return RevisionCondition.Type.LESS; case REV_LESS_OR_EQUAL: return RevisionCondition.Type.LESS_OR_EQUAL; default: throw new IllegalArgumentException("Unexpected revision condition type " + type); } } } {code} was: Prerequisites: RocksDBKeyValueStorage filled with 1000 random keys and values, 5000 random tombstones, 100 values with prefix "tables", 5000 tombstones with prefix "tables". Load profile: * Thread1: performs storage.invoke with the same key and value of size 500k bytes, once per 100 ms. * Thread2: performs storage.range for prefix "tables" once per 200 ms, collects all entries from cursor. * Thread3: performs storage.get with random key once per 3 ms. Each operation performed by Thread2 mostly takes 20-50 ms, but sometimes this time spikes up to hundreds of milliseconds (or even seconds): and this lasts for some time (I observed up to half of a minute), after that the time returns to normal values: {code:java} 2023-06-09 17:09:05:971 +0300 [INFO][Thread-5][RocksDBLoadTest] time 31, size 100 2023-06-09 17:09:06:223 +0300 [INFO][Thread-5][RocksDBLoadTest] time 50, size 100 2023-06-09 17:09:06:471 +0300 [INFO][Thread-5][RocksDBLoadTest] time 47, size 100 2023-06-09 17:09:06:715 +0300 [INFO][Thread-5][RocksDBLoadTest] time 44, size 100 2023-06-09 17:09:07:483 +0300 [INFO][Thread-5][RocksDBLoadTest] time 566, size 100 2023-06-09 17:09:08:228 +0300 [INFO][Thread-5][RocksDBLoadTest] time 543, size 100 2023-06-09 17:09:09:000 +0300 [INFO][Thread-5][RocksDBLoadTest] time 571, size 100 2023-06-09 17:09:09:774 +0300 [INFO][Thread-5][RocksDBLoadTest] time 572, size 100 2023-06-09 17:09:10:570 +0300 [INFO][Thread-5][RocksDBLoadTest] time 596, size 100 2023-06-09 17:09:11:323 +0300 [INFO][Thread-5][RocksDBLoadTest] time 552, size 100 2023-06-09 17:09:12:103 +0300 [INFO][Thread-5][RocksDBLoadTest] time 579, size 100 2023-06-09 17:09:12:861 +0300 [INFO][Thread-5][RocksDBLoadTest] time 556, size 100{code} On teamcity it was even over 6 seconds: [https://ci.ignite.apache.org/buildConfiguration/ApacheIgnite3xGradle_Test_RunUnitTests/7283421?buildTab=log&focusLine=46540&expandAll=true&logFilter=debug&logView=flowAware] Reproducer: {code:java} package org.apache.ignite.internal.metastorage; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Conditions.or; import static org.apache.ignite.internal.metastorage.dsl.Conditions.value; import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE; import java.io.ObjectStreamException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.dsl.CompoundCondition; import org.apache.ignite.internal.metastorage.dsl.ConditionType; import org.apache.ignite.internal.metastorage.dsl.SimpleCondition; import org.apache.ignite.internal.metastorage.server.AndCondition; import org.apache.ignite.internal.metastorage.server.Condition; import org.apache.ignite.internal.metastorage.server.ExistenceCondition; import org.apache.ignite.internal.metastorage.server.OrCondition; import org.apache.ignite.internal.metastorage.server.RevisionCondition; import org.apache.ignite.internal.metastorage.server.TombstoneCondition; import org.apache.ignite.internal.metastorage.server.ValueCondition; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.lang.ByteArray; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(WorkDirectoryExtension.class) public class RocksDBLoadTest { private static final IgniteLogger LOG = Loggers.forClass(RocksDBLoadTest.class); private byte[] randomBytes() { return UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); } private byte[] randomBytes(String prefix) { return (prefix + UUID.randomUUID()).getBytes(StandardCharsets.UTF_8); } /** * Increments the last character of the given string. */ private static String incrementLastChar(String str) { char lastChar = str.charAt(str.length() - 1); return str.substring(0, str.length() - 1) + (char) (lastChar + 1); } @Test public void test(@WorkDirectory Path path) throws InterruptedException { System.out.println("start"); HybridClock clock = new HybridClockImpl(); RocksDbKeyValueStorage storage = new RocksDbKeyValueStorage("asd", path.resolve("rocksdbtest")); storage.start(); for (int i = 0; i < 1000; i++) { storage.put(randomBytes(), randomBytes(), clock.now()); } for (int i = 0; i < 5000; i++) { storage.put(randomBytes(), TOMBSTONE, clock.now()); } for (int i = 0; i < 100; i++) { storage.put(randomBytes("tables"), randomBytes(), clock.now()); } for (int i = 0; i < 5000; i++) { storage.put(randomBytes("tables"), TOMBSTONE, clock.now()); } ByteArray leaseKey = ByteArray.fromString("placementdriver.leases"); AtomicBoolean leasesStopped = new AtomicBoolean(); AtomicBoolean rangeStopped = new AtomicBoolean(); Thread leases = new Thread(() -> { byte[] leaseRaw = new byte[500_000]; byte a = 0; while (!leasesStopped.get()) { byte[] renewedLease = new byte[500_000]; renewedLease[0] = ++a; storage.invoke( toCondition(or(notExists(leaseKey), value(leaseKey).eq(leaseRaw))), List.of(put(leaseKey, renewedLease)), List.of(noop()), clock.now() ); leaseRaw = renewedLease; try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); leases.start(); Thread range = new Thread(() -> { while (!rangeStopped.get()) { long start = System.currentTimeMillis(); Cursor<Entry> cursor = storage.range("tables".getBytes(StandardCharsets.UTF_8), incrementLastChar("tables").getBytes(StandardCharsets.UTF_8)); List<Object> list = new ArrayList<>(); for(Entry e : cursor) { if (!e.tombstone()) { list.add(e.value()); } } LOG.info("time " + (System.currentTimeMillis() - start) + ", size " + list.size()); cursor.close(); try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); range.start(); for (int i = 0; i < 180_000; i++) { storage.get(randomBytes()); Thread.sleep(3); } leasesStopped.set(true); rangeStopped.set(true); leases.join(); range.join(); } private static Condition toCondition(org.apache.ignite.internal.metastorage.dsl.Condition condition) { if (condition instanceof SimpleCondition.ValueCondition) { var valueCondition = (SimpleCondition.ValueCondition) condition; return new ValueCondition( toValueConditionType(valueCondition.type()), valueCondition.key(), valueCondition.value() ); } else if (condition instanceof SimpleCondition.RevisionCondition) { var revisionCondition = (SimpleCondition.RevisionCondition) condition; return new RevisionCondition( toRevisionConditionType(revisionCondition.type()), revisionCondition.key(), revisionCondition.revision() ); } else if (condition instanceof SimpleCondition) { var simpleCondition = (SimpleCondition) condition; switch (simpleCondition.type()) { case KEY_EXISTS: return new ExistenceCondition(ExistenceCondition.Type.EXISTS, simpleCondition.key()); case KEY_NOT_EXISTS: return new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, simpleCondition.key()); case TOMBSTONE: return new TombstoneCondition(simpleCondition.key()); default: throw new IllegalArgumentException("Unexpected simple condition type " + simpleCondition.type()); } } else if (condition instanceof CompoundCondition) { CompoundCondition compoundCondition = (CompoundCondition) condition; Condition leftCondition = toCondition(compoundCondition.leftCondition()); Condition rightCondition = toCondition(compoundCondition.rightCondition()); switch (compoundCondition.type()) { case AND: return new AndCondition(leftCondition, rightCondition); case OR: return new OrCondition(leftCondition, rightCondition); default: throw new IllegalArgumentException("Unexpected compound condition type " + compoundCondition.type()); } } else { throw new IllegalArgumentException("Unknown condition " + condition); } } private static ValueCondition.Type toValueConditionType(ConditionType type) { switch (type) { case VAL_EQUAL: return ValueCondition.Type.EQUAL; case VAL_NOT_EQUAL: return ValueCondition.Type.NOT_EQUAL; case VAL_GREATER: return ValueCondition.Type.GREATER; case VAL_GREATER_OR_EQUAL: return ValueCondition.Type.GREATER_OR_EQUAL; case VAL_LESS: return ValueCondition.Type.LESS; case VAL_LESS_OR_EQUAL: return ValueCondition.Type.LESS_OR_EQUAL; default: throw new IllegalArgumentException("Unexpected value condition type " + type); } } private static RevisionCondition.Type toRevisionConditionType(ConditionType type) { switch (type) { case REV_EQUAL: return RevisionCondition.Type.EQUAL; case REV_NOT_EQUAL: return RevisionCondition.Type.NOT_EQUAL; case REV_GREATER: return RevisionCondition.Type.GREATER; case REV_GREATER_OR_EQUAL: return RevisionCondition.Type.GREATER_OR_EQUAL; case REV_LESS: return RevisionCondition.Type.LESS; case REV_LESS_OR_EQUAL: return RevisionCondition.Type.LESS_OR_EQUAL; default: throw new IllegalArgumentException("Unexpected revision condition type " + type); } } } {code} > RocksDB scan time spikes up at some moments > ------------------------------------------- > > Key: IGNITE-19700 > URL: https://issues.apache.org/jira/browse/IGNITE-19700 > Project: Ignite > Issue Type: Task > Reporter: Denis Chudov > Priority: Major > Labels: ignite-3 > > Motivation: > > Prerequisites: > RocksDBKeyValueStorage filled with 1000 random keys and values, 5000 random > tombstones, 100 values with prefix "tables", 5000 tombstones with prefix > "tables". > Load profile: > * Thread1: performs storage.invoke with the same key and value of size 500k > bytes, once per 100 ms. > * Thread2: performs storage.range for prefix "tables" once per 200 ms, > collects all entries from cursor. > * Thread3: performs storage.get with random key once per 3 ms. > Each operation performed by Thread2 mostly takes 20-50 ms, but sometimes this > time spikes up to hundreds of milliseconds (or even seconds): and this lasts > for some time (I observed up to half of a minute), after that the time > returns to normal values: > {code:java} > 2023-06-09 17:09:05:971 +0300 [INFO][Thread-5][RocksDBLoadTest] time 31, size > 100 > 2023-06-09 17:09:06:223 +0300 [INFO][Thread-5][RocksDBLoadTest] time 50, size > 100 > 2023-06-09 17:09:06:471 +0300 [INFO][Thread-5][RocksDBLoadTest] time 47, size > 100 > 2023-06-09 17:09:06:715 +0300 [INFO][Thread-5][RocksDBLoadTest] time 44, size > 100 > 2023-06-09 17:09:07:483 +0300 [INFO][Thread-5][RocksDBLoadTest] time 566, > size 100 > 2023-06-09 17:09:08:228 +0300 [INFO][Thread-5][RocksDBLoadTest] time 543, > size 100 > 2023-06-09 17:09:09:000 +0300 [INFO][Thread-5][RocksDBLoadTest] time 571, > size 100 > 2023-06-09 17:09:09:774 +0300 [INFO][Thread-5][RocksDBLoadTest] time 572, > size 100 > 2023-06-09 17:09:10:570 +0300 [INFO][Thread-5][RocksDBLoadTest] time 596, > size 100 > 2023-06-09 17:09:11:323 +0300 [INFO][Thread-5][RocksDBLoadTest] time 552, > size 100 > 2023-06-09 17:09:12:103 +0300 [INFO][Thread-5][RocksDBLoadTest] time 579, > size 100 > 2023-06-09 17:09:12:861 +0300 [INFO][Thread-5][RocksDBLoadTest] time 556, > size 100{code} > On teamcity it was even over 6 seconds: > [https://ci.ignite.apache.org/buildConfiguration/ApacheIgnite3xGradle_Test_RunUnitTests/7283421?buildTab=log&focusLine=46540&expandAll=true&logFilter=debug&logView=flowAware] > > Reproducer: > {code:java} > package org.apache.ignite.internal.metastorage; > import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; > import static org.apache.ignite.internal.metastorage.dsl.Conditions.or; > import static org.apache.ignite.internal.metastorage.dsl.Conditions.value; > import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; > import static org.apache.ignite.internal.metastorage.dsl.Operations.put; > import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE; > import java.io.ObjectStreamException; > import java.nio.charset.StandardCharsets; > import java.nio.file.Path; > import java.util.ArrayList; > import java.util.List; > import java.util.UUID; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.logging.Logger; > import org.apache.ignite.internal.hlc.HybridClock; > import org.apache.ignite.internal.hlc.HybridClockImpl; > import org.apache.ignite.internal.logger.IgniteLogger; > import org.apache.ignite.internal.logger.Loggers; > import org.apache.ignite.internal.metastorage.dsl.CompoundCondition; > import org.apache.ignite.internal.metastorage.dsl.ConditionType; > import org.apache.ignite.internal.metastorage.dsl.SimpleCondition; > import org.apache.ignite.internal.metastorage.server.AndCondition; > import org.apache.ignite.internal.metastorage.server.Condition; > import org.apache.ignite.internal.metastorage.server.ExistenceCondition; > import org.apache.ignite.internal.metastorage.server.OrCondition; > import org.apache.ignite.internal.metastorage.server.RevisionCondition; > import org.apache.ignite.internal.metastorage.server.TombstoneCondition; > import org.apache.ignite.internal.metastorage.server.ValueCondition; > import > org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; > import org.apache.ignite.internal.testframework.WorkDirectory; > import org.apache.ignite.internal.testframework.WorkDirectoryExtension; > import org.apache.ignite.internal.util.Cursor; > import org.apache.ignite.lang.ByteArray; > import org.junit.jupiter.api.Test; > import org.junit.jupiter.api.extension.ExtendWith; > @ExtendWith(WorkDirectoryExtension.class) > public class RocksDBLoadTest { > private static final IgniteLogger LOG = > Loggers.forClass(RocksDBLoadTest.class); > private byte[] randomBytes() { > return UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); > } > private byte[] randomBytes(String prefix) { > return (prefix + UUID.randomUUID()).getBytes(StandardCharsets.UTF_8); > } > /** > * Increments the last character of the given string. > */ > private static String incrementLastChar(String str) { > char lastChar = str.charAt(str.length() - 1); > return str.substring(0, str.length() - 1) + (char) (lastChar + 1); > } > @Test > public void test(@WorkDirectory Path path) throws InterruptedException { > System.out.println("start"); > HybridClock clock = new HybridClockImpl(); > RocksDbKeyValueStorage storage = new RocksDbKeyValueStorage("asd", > path.resolve("rocksdbtest")); > storage.start(); > for (int i = 0; i < 1000; i++) { > storage.put(randomBytes(), randomBytes(), clock.now()); > } > for (int i = 0; i < 5000; i++) { > storage.put(randomBytes(), TOMBSTONE, clock.now()); > } > for (int i = 0; i < 100; i++) { > storage.put(randomBytes("tables"), randomBytes(), clock.now()); > } > for (int i = 0; i < 5000; i++) { > storage.put(randomBytes("tables"), TOMBSTONE, clock.now()); > } > ByteArray leaseKey = ByteArray.fromString("placementdriver.leases"); > AtomicBoolean leasesStopped = new AtomicBoolean(); > AtomicBoolean rangeStopped = new AtomicBoolean(); > Thread leases = new Thread(() -> { > byte[] leaseRaw = new byte[500_000]; > byte a = 0; > while (!leasesStopped.get()) { > byte[] renewedLease = new byte[500_000]; > renewedLease[0] = ++a; > storage.invoke( > toCondition(or(notExists(leaseKey), > value(leaseKey).eq(leaseRaw))), > List.of(put(leaseKey, renewedLease)), > List.of(noop()), > clock.now() > ); > leaseRaw = renewedLease; > try { > Thread.sleep(100); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > } > }); > leases.start(); > Thread range = new Thread(() -> { > while (!rangeStopped.get()) { > long start = System.currentTimeMillis(); > Cursor<Entry> cursor = > > storage.range("tables".getBytes(StandardCharsets.UTF_8), > incrementLastChar("tables").getBytes(StandardCharsets.UTF_8)); > List<Object> list = new ArrayList<>(); > for(Entry e : cursor) { > if (!e.tombstone()) { > list.add(e.value()); > } > } > LOG.info("time " + (System.currentTimeMillis() - start) + ", > size " + list.size()); > cursor.close(); > try { > Thread.sleep(200); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > } > }); > range.start(); > for (int i = 0; i < 180_000; i++) { > storage.get(randomBytes()); > Thread.sleep(3); > } > leasesStopped.set(true); > rangeStopped.set(true); > leases.join(); > range.join(); > } > private static Condition > toCondition(org.apache.ignite.internal.metastorage.dsl.Condition condition) { > if (condition instanceof SimpleCondition.ValueCondition) { > var valueCondition = (SimpleCondition.ValueCondition) condition; > return new ValueCondition( > toValueConditionType(valueCondition.type()), > valueCondition.key(), > valueCondition.value() > ); > } else if (condition instanceof SimpleCondition.RevisionCondition) { > var revisionCondition = (SimpleCondition.RevisionCondition) > condition; > return new RevisionCondition( > toRevisionConditionType(revisionCondition.type()), > revisionCondition.key(), > revisionCondition.revision() > ); > } else if (condition instanceof SimpleCondition) { > var simpleCondition = (SimpleCondition) condition; > switch (simpleCondition.type()) { > case KEY_EXISTS: > return new > ExistenceCondition(ExistenceCondition.Type.EXISTS, simpleCondition.key()); > case KEY_NOT_EXISTS: > return new > ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, simpleCondition.key()); > case TOMBSTONE: > return new TombstoneCondition(simpleCondition.key()); > default: > throw new IllegalArgumentException("Unexpected simple > condition type " + simpleCondition.type()); > } > } else if (condition instanceof CompoundCondition) { > CompoundCondition compoundCondition = (CompoundCondition) > condition; > Condition leftCondition = > toCondition(compoundCondition.leftCondition()); > Condition rightCondition = > toCondition(compoundCondition.rightCondition()); > switch (compoundCondition.type()) { > case AND: > return new AndCondition(leftCondition, rightCondition); > case OR: > return new OrCondition(leftCondition, rightCondition); > default: > throw new IllegalArgumentException("Unexpected compound > condition type " + compoundCondition.type()); > } > } else { > throw new IllegalArgumentException("Unknown condition " + > condition); > } > } > private static ValueCondition.Type toValueConditionType(ConditionType > type) { > switch (type) { > case VAL_EQUAL: > return ValueCondition.Type.EQUAL; > case VAL_NOT_EQUAL: > return ValueCondition.Type.NOT_EQUAL; > case VAL_GREATER: > return ValueCondition.Type.GREATER; > case VAL_GREATER_OR_EQUAL: > return ValueCondition.Type.GREATER_OR_EQUAL; > case VAL_LESS: > return ValueCondition.Type.LESS; > case VAL_LESS_OR_EQUAL: > return ValueCondition.Type.LESS_OR_EQUAL; > default: > throw new IllegalArgumentException("Unexpected value > condition type " + type); > } > } > private static RevisionCondition.Type > toRevisionConditionType(ConditionType type) { > switch (type) { > case REV_EQUAL: > return RevisionCondition.Type.EQUAL; > case REV_NOT_EQUAL: > return RevisionCondition.Type.NOT_EQUAL; > case REV_GREATER: > return RevisionCondition.Type.GREATER; > case REV_GREATER_OR_EQUAL: > return RevisionCondition.Type.GREATER_OR_EQUAL; > case REV_LESS: > return RevisionCondition.Type.LESS; > case REV_LESS_OR_EQUAL: > return RevisionCondition.Type.LESS_OR_EQUAL; > default: > throw new IllegalArgumentException("Unexpected revision > condition type " + type); > } > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)