ibessonov commented on code in PR #4821: URL: https://github.com/apache/ignite-3/pull/4821#discussion_r1879985979
########## modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java: ########## @@ -17,10 +17,27 @@ package org.apache.ignite.internal.raft; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.NetworkMessage; +import org.jetbrains.annotations.Nullable; /** * A marker interface for replication group command. */ public interface Command extends NetworkMessage { + /** + * This is called before a command is submitted to replication pipeline. + * + * @param safeTs Safe timestamp. + */ + default void patch(HybridTimestamp safeTs) {} Review Comment: Shouldn't we only do this for write commands? ########## modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java: ########## @@ -313,6 +313,38 @@ public void writeInt(int val) { writeVarInt(val + 1); } + /** {@inheritDoc} */ + @Override + public void writeFixedInt(int val) { + lastFinished = remainingInternal() >= Integer.BYTES; + + if (lastFinished) { + int pos = buf.position(); + + GridUnsafe.putInt(heapArr, baseOff + pos, val); Review Comment: This call is system-dependent, while in our case I suppose that we want to always use Little Endian. Similar problem is solved in `org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput#putInt`, for example, please change your implementation accordingly. ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java: ########## @@ -31,39 +32,51 @@ public PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializatio super(serializationRegistry, cache); } + @Override + public void patch(ByteBuffer raw, HybridTimestamp safeTs) { + ByteBuffer dup = raw.duplicate().order(ORDER); + dup.putLong(4, safeTs.longValue()); Review Comment: Please introduce a constant for `4` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java: ########## @@ -241,30 +215,46 @@ private void onWriteBusy(Iterator<CommandClosure<WriteCommand>> iterator) { storage.acquirePartitionSnapshotsReadLock(); try { + boolean[] applied = {false}; + if (command instanceof UpdateCommand) { - result = handleUpdateCommand((UpdateCommand) command, commandIndex, commandTerm); + result = handleUpdateCommand((UpdateCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof UpdateAllCommand) { - result = handleUpdateAllCommand((UpdateAllCommand) command, commandIndex, commandTerm); + result = handleUpdateAllCommand((UpdateAllCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof FinishTxCommand) { - result = handleFinishTxCommand((FinishTxCommand) command, commandIndex, commandTerm); + result = handleFinishTxCommand((FinishTxCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof WriteIntentSwitchCommand) { - handleWriteIntentSwitchCommand((WriteIntentSwitchCommand) command, commandIndex, commandTerm); + handleWriteIntentSwitchCommand((WriteIntentSwitchCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof SafeTimeSyncCommand) { - handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, commandIndex, commandTerm); + handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof BuildIndexCommand) { - handleBuildIndexCommand((BuildIndexCommand) command, commandIndex, commandTerm); + handleBuildIndexCommand((BuildIndexCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof PrimaryReplicaChangeCommand) { - handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command, commandIndex, commandTerm); + handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof VacuumTxStatesCommand) { - handleVacuumTxStatesCommand((VacuumTxStatesCommand) command, commandIndex, commandTerm); + handleVacuumTxStatesCommand((VacuumTxStatesCommand) command, commandIndex, commandTerm, applied); } else if (command instanceof UpdateMinimumActiveTxBeginTimeCommand) { - handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand) command, commandIndex, commandTerm); + handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand) command, commandIndex, commandTerm, + applied); } else { assert false : "Command was not found [cmd=" + command + ']'; } + + if (applied[0]) { + // Adjust safe time before completing update to reduce waiting. + if (command instanceof SafeTimePropagatingCommand) { + SafeTimePropagatingCommand safeTimePropagatingCommand = (SafeTimePropagatingCommand) command; + + assert safeTimePropagatingCommand.safeTime() != null; + + updateTrackerIgnoringTrackerClosedException(safeTime, safeTimePropagatingCommand.safeTime()); + } + + updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex); Review Comment: This was outside of the `PartitionSnapshotsReadLock`, maybe for a reason. Please confirm that this change is safe. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -1837,23 +1820,19 @@ private CompletableFuture<Object> applyFinishCommand( int catalogVersion, List<TablePartitionIdMessage> partitionIds ) { - synchronized (commandProcessingLinearizationMutex) { - FinishTxCommandBuilder finishTxCmdBldr = PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand() - .txId(transactionId) - .commit(commit) - .safeTime(clockService.now()) - .requiredCatalogVersion(catalogVersion) - .partitionIds(partitionIds); - - if (commit) { - finishTxCmdBldr.commitTimestamp(commitTimestamp); - } - CompletableFuture<Object> resultFuture = new CompletableFuture<>(); - - applyCmdWithRetryOnSafeTimeReorderException(finishTxCmdBldr.build(), resultFuture); + HybridTimestamp now = clockService.now(); + FinishTxCommandBuilder finishTxCmdBldr = PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand() + .txId(transactionId) + .commit(commit) + .initiatorTime(now) + .requiredCatalogVersion(catalogVersion) + .partitionIds(partitionIds); - return resultFuture; + if (commit) { + finishTxCmdBldr.commitTimestamp(commitTimestamp); Review Comment: Where can I read about our guarantees about `commitTimestamp` generation? How is it related to safe time? I don't know much about it, and I have a feeling that there could be issues with it ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java: ########## @@ -59,7 +60,8 @@ public CheckCatalogVersionOnAppendEntries(CatalogService catalogService) { Node node = (Node) service; - ByteBuffer allData = request.data().asReadOnlyBuffer(); + // TODO use from marshaller Review Comment: Please mention the corresponding `Jira` link ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java: ########## @@ -37,4 +37,6 @@ public interface PartitionCommandsMarshaller extends Marshaller { * @return Catalog version. {@value #NO_VERSION_REQUIRED} if version is not required for the given command. */ int readRequiredCatalogVersion(ByteBuffer raw); + + long readSafeTimestamp(ByteBuffer raw); Review Comment: Please provide javadocs to new methods that you introduce to interfaces ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java: ########## @@ -31,39 +32,51 @@ public PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializatio super(serializationRegistry, cache); } + @Override + public void patch(ByteBuffer raw, HybridTimestamp safeTs) { + ByteBuffer dup = raw.duplicate().order(ORDER); Review Comment: I think we should add an optimistic check in case `raw.order()` matches `ORDER`, this way we will avoid unnecessary allocations. But this is up to you. ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java: ########## @@ -70,20 +85,90 @@ protected int replicas() { @Override protected HybridClock createClock(ClusterNode node) { - // Client physical time is frozen in the past, server time advances normally. - return new TestHybridClock(() -> node.address().port() == CLIENT_PORT ? CLIENT_FROZEN_PHYSICAL_TIME : System.currentTimeMillis()); + int idx = NODE_PORT_BASE - node.address().port() + 1; + + // Physical time is frozen. + return new TestHybridClock( + () -> node.address().port() == CLIENT_PORT ? CLIENT_FROZEN_PHYSICAL_TIME : CLIENT_FROZEN_PHYSICAL_TIME + 1000L * idx); + } + + @Override + protected long getSafeTimePropagationTimeout() { + return 300_000; } @Test public void testImplicitObservableTimePropagation() { RecordView<Tuple> view = accounts.recordView(); view.upsert(null, makeValue(1, 100.0)); - TxManagerImpl clientTxManager = (TxManagerImpl) txTestCluster.clientTxManager; - Collection<TxStateMeta> states = clientTxManager.states(); + List<TxStateMeta> states = txTestCluster.states(); assertEquals(1, states.size()); - HybridTimestamp commitTs = states.iterator().next().commitTimestamp(); + HybridTimestamp commitTs = states.get(0).commitTimestamp(); + + LOG.info("commitTs={}", commitTs); + assertNotNull(commitTs); assertEquals(commitTs, timestampTracker.get()); - assertTrue(commitTs.getPhysical() != CLIENT_FROZEN_PHYSICAL_TIME, "Client time should be advanced to server time"); + + assertTrue(commitTs.compareTo(new HybridTimestamp(CLIENT_FROZEN_PHYSICAL_TIME, 0)) > 0, "Observable timestamp should be advanced"); + + TablePartitionId part = new TablePartitionId(accounts.tableId(), 0); + + NodeImpl[] handle = {null}; + NodeImpl[] leader = {null}; + + txTestCluster.raftServers().values().stream().map(Loza::server).forEach(s -> { + JraftServerImpl srv = (JraftServerImpl) s; + List<RaftGroupService> grps = srv.localNodes().stream().map(srv::raftGroupService).collect(toList()); Review Comment: You don't have to call `collect(toList())` if all you need is `forEach`, please remove the `collect` call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org