ibessonov commented on code in PR #4679: URL: https://github.com/apache/ignite-3/pull/4679#discussion_r1829357242
########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java: ########## @@ -1080,15 +1080,28 @@ class Node { cmgRaftConfigurer ); - LogicalTopologyServiceImpl logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); + var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); + + var metastorageWorkDir = new ComponentWorkingDir(dir.resolve("metastorage")); + + msLogStorageFactory = SharedLogStorageFactoryUtils.create(clusterService.nodeName(), metastorageWorkDir.raftLogPath()); + + LogSyncer logSyncer = () -> { Review Comment: Was this necessary for this test? ########## modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java: ########## @@ -41,8 +43,8 @@ import org.rocksdb.RocksDBException; /** - * Helper class to deal with RocksDB flushes. Provides an ability to wait until current state of data is flushed to the storage. - * Requires enabled {@link Options#setAtomicFlush(boolean)} option to work properly. + * Helper class to deal with RocksDB flushes. Provides an ability to wait until current state of data is flushed to the storage. Requires + * enabled {@link Options#setAtomicFlush(boolean)} option to work properly. Review Comment: It was fine before, why did you re-format the comment? ########## modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java: ########## @@ -1213,6 +1213,19 @@ private class Node { LogicalTopologyServiceImpl logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); + ComponentWorkingDir metastorageWorkDir = metastoragePath(systemConfiguration, dir); + + msLogStorageFactory = + SharedLogStorageFactoryUtils.create(clusterService.nodeName(), metastorageWorkDir.raftLogPath()); + + LogSyncer logSyncer = () -> { Review Comment: Same question, I don't understand why we need it in these tests. ########## modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java: ########## @@ -279,6 +281,18 @@ public void stop() { * @return Future that completes when the {@code onFlushCompleted} callback finishes. */ CompletableFuture<Void> onFlushCompleted() { - return CompletableFuture.runAsync(onFlushCompleted, threadPool); + return inBusyLockSafeAsync(() -> runAsync(onFlushCompleted, threadPool)); + } + + private CompletableFuture<Void> inBusyLockSafeAsync(Supplier<CompletableFuture<Void>> supplier) { Review Comment: Offtopic: I truly believe that `Async` prefix in these methods was a big mistake. I would want us to rename all the "inBusy...Async" methods in the future. Are you OK with that? ########## modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java: ########## @@ -176,15 +179,14 @@ public void removeColumnFamily(ColumnFamilyHandle handle) { /** * Returns a future to wait next flush operation from the current point in time. Uses {@link RocksDB#getLatestSequenceNumber()} to - * achieve this, by fixing its value at the time of invocation. Storage is considered flushed when at least one persisted column - * family has its latest sequence number greater or equal to the one that we fixed. This is enough to guarantee that all column families - * have up-to-data state as well, because flusher expects its users to also have {@link Options#setAtomicFlush(boolean)} option - * enabled. - * - * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} should be explicitly triggerred in the near future. Please refer - * to {@link RocksDbFlusher#RocksDbFlusher(String, IgniteSpinBusyLock, ScheduledExecutorService, ExecutorService, IntSupplier, - * LogSyncer, Runnable)} parameters description to see what's really happening in this case. + * achieve this, by fixing its value at the time of invocation. Storage is considered flushed when at least one persisted column family Review Comment: And here. What's the difference? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -279,8 +301,16 @@ public RocksDbKeyValueStorage( ); this.dbPath = dbPath; + this.logSyncer = logSyncer; - snapshotExecutor = Executors.newFixedThreadPool(2, NamedThreadFactory.create(nodeName, "metastorage-snapshot-executor", log)); + executor = Executors.newFixedThreadPool( + 2, + NamedThreadFactory.create(nodeName, "metastorage-rocksdb-kv-storage-executor", log) + ); + + scheduledExecutor = Executors.newSingleThreadScheduledExecutor( Review Comment: Please add a TODO that will remind us to use `ThreadPoolsManager#commonScheduler` here, and probably in other such places too ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java: ########## @@ -477,24 +477,33 @@ private PartialNode startPartialNode( raftGroupEventsClientListener ); + var metastorageWorkDir = new ComponentWorkingDir(workDir.resolve("metastorage")); + + LogStorageFactory msLogStorageFactory = SharedLogStorageFactoryUtils.create( + clusterSvc.nodeName(), + metastorageWorkDir.raftLogPath() + ); + + LogSyncer logSyncer = () -> { Review Comment: Again, it's not obvious why we can't use noop implementation here ########## modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java: ########## @@ -688,13 +688,22 @@ public class IgniteImpl implements Ignite { RaftGroupOptionsConfigurer msRaftConfigurer = RaftGroupOptionsConfigHelper.configureProperties(msLogStorageFactory, metastorageWorkDir.metaPath()); + LogSyncer logSyncer = () -> { Review Comment: I know that someone else did it first, but. I guess we might fix it. Does meta-storage flushing really require log syncing for partitions? I doubt. I believe we need different syncer instances for different rocksdb instances. Current code makes no sense to me. Who's the author? ########## modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java: ########## @@ -103,13 +105,13 @@ public class RocksDbFlusher { * @param busyLock Busy lock. * @param scheduledPool Scheduled pool the schedule flushes. * @param threadPool Thread pool to run flush completion closure, provided by {@code onFlushCompleted} parameter. - * @param delaySupplier Supplier of delay values to batch independent flush requests. When {@link #awaitFlush(boolean)} is called with - * {@code true} parameter, the flusher waits given number of milliseconds (using {@code scheduledPool}) and then executes flush - * only if there were no other {@code awaitFlush(true)} calls. Otherwise, it does nothing after the timeout. This guarantees that - * either the last one wins, or automatic flush wins if there's an endless stream of {@code awaitFlush(true)} calls with very small - * time-intervals between them. Such behavior allows to save on unnecessary flushes when multiple await flush calls appear at - * roughly the same time from different threads. For example, several partitions might be flushed at the same time, because they - * started at the same time and their flush frequency is also the same. + * @param delaySupplier Supplier of delay values to batch independent flush requests. When {@link #awaitFlush(boolean)} is Review Comment: Same question here, I don't get it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org