rpuch commented on code in PR #4700: URL: https://github.com/apache/ignite-3/pull/4700#discussion_r1848078340
########## modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java: ########## @@ -38,45 +37,34 @@ public class HybridClockImpl implements HybridClock { /** * Var handle for {@link #latestTime}. */ - private static final VarHandle LATEST_TIME; - - static { - try { - LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClockImpl.class, "latestTime", long.class); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new ExceptionInInitializerError(e); - } - } + private static final AtomicLongFieldUpdater<HybridClockImpl> LATEST_TIME = AtomicLongFieldUpdater.newUpdater(HybridClockImpl.class, + "latestTime"); private volatile long latestTime; private final List<ClockUpdateListener> updateListeners = new CopyOnWriteArrayList<>(); /** - * The constructor which initializes the latest time to current time by system clock. - */ - public HybridClockImpl() { - this.latestTime = currentTime(); - } - - /** - * System current time in milliseconds shifting left to free insignificant bytes. - * This method is marked with a public modifier to mock in tests because there is no way to mock currentTimeMillis. + * Returns current physical time in milliseconds. * - * @return Current time in milliseconds shifted right on two bytes. + * @return Current time. */ - public static long currentTime() { - return System.currentTimeMillis() << LOGICAL_TIME_BITS_SIZE; + protected long physicalTime() { + return System.currentTimeMillis(); } @Override - public long nowLong() { + public final long nowLong() { while (true) { - long now = currentTime(); + long now = physicalTime() << LOGICAL_TIME_BITS_SIZE; // Read the latest time after accessing UTC time to reduce contention. long oldLatestTime = latestTime; + if (oldLatestTime >= now) { + return LATEST_TIME.incrementAndGet(this); + } + long newLatestTime = max(oldLatestTime + 1, now); Review Comment: ```suggestion long newLatestTime = now; ``` ########## modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java: ########## @@ -107,33 +95,34 @@ private void notifyUpdateListeners(long newTs) { } @Override - public HybridTimestamp now() { + public final HybridTimestamp now() { return hybridTimestamp(nowLong()); } @Override - public HybridTimestamp current() { + public final HybridTimestamp current() { return hybridTimestamp(currentLong()); } /** - * Updates the clock in accordance with an external event timestamp. If the supplied timestamp is ahead of the - * current clock timestamp, the clock gets adjusted to make sure it never returns any timestamp before (or equal to) - * the supplied external timestamp. + * Updates the clock in accordance with an external event timestamp. If the supplied timestamp is ahead of the current clock timestamp, + * the clock gets adjusted to make sure it never returns any timestamp before (or equal to) the supplied external timestamp. * * @param requestTime Timestamp from request. * @return The resulting timestamp (guaranteed to exceed both previous clock 'currentTs' and the supplied external ts). */ @Override - public HybridTimestamp update(HybridTimestamp requestTime) { + public final HybridTimestamp update(HybridTimestamp requestTime) { while (true) { - long now = currentTime(); + long now = physicalTime() << LOGICAL_TIME_BITS_SIZE; // Read the latest time after accessing UTC time to reduce contention. - long oldLatestTime = this.latestTime; + long oldLatestTime = latestTime; long newLatestTime = max(requestTime.longValue() + 1, max(now, oldLatestTime + 1)); + // TODO avoid CAS on lower value. Review Comment: Please mention a Jira issue here ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ApplyResult.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.replicator; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; + +/** + * Replication command application result. + */ +public final class ApplyResult { Review Comment: Should it be called `ApplicationResult`? 'Apply' seems to only be verb, never used as a noun. Especially when a method is called `applyResult`, it confuses a reader as it clearly looks like a command to apply some result, not as a getter of a result of an application. ########## modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java: ########## @@ -86,8 +74,8 @@ public long nowLong() { } @Override - public long currentLong() { - long current = currentTime(); Review Comment: Is it worth it inlining `currentTime()`? C2 will do it anyway, but now the same code is used at least thrice ########## modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java: ########## @@ -118,18 +119,20 @@ public void update(T newValue, @Nullable R futureResult) { * @param valueToWait Value to wait. */ public CompletableFuture<R> waitFor(T valueToWait) { - if (!busyLock.enterBusy()) { + if (!busyLock.readLock().tryLock()) { return failedFuture(new TrackerClosedException()); } try { - if (current.getKey().compareTo(valueToWait) >= 0) { - return completedFuture(current.getValue()); + Entry<T, @Nullable R> tmp = current; Review Comment: ```suggestion Entry<T, @Nullable R> currentKeyValue = current; ``` ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collection; +import org.apache.ignite.internal.TestHybridClock; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.table.TxInfrastructureTest; +import org.apache.ignite.internal.testframework.SystemPropertiesExtension; +import org.apache.ignite.internal.testframework.WithSystemProperty; +import org.apache.ignite.internal.tx.TxStateMeta; +import org.apache.ignite.internal.tx.impl.TxManagerImpl; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.table.RecordView; +import org.apache.ignite.table.Tuple; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Tests if commit timestamp is propagated to observable time correctly. + */ +@ExtendWith(SystemPropertiesExtension.class) +@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "1000000") +public class ItTxObservableTimePropagationTest extends TxInfrastructureTest { + private static final long CLIENT_FROZEN_PHYSICAL_TIME = 3000; + + /** + * The constructor. + * + * @param testInfo Test info. + */ + public ItTxObservableTimePropagationTest(TestInfo testInfo) { + super(testInfo); + } + + @Override + protected int nodes() { + return 3; + } + + @Override + protected int replicas() { + return 3; + } + + @Override + protected HybridClock createClocks(ClusterNode node) { + // Client physical time is frozen in the past, server time advances normally. + return new TestHybridClock(() -> node.address().port() == 19999 ? CLIENT_FROZEN_PHYSICAL_TIME : System.currentTimeMillis()); Review Comment: Please introduce a constant for 19999 ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java: ########## @@ -588,7 +588,7 @@ public boolean onBeforeApply(Command command) { if (maxObservableSafeTime == -1) { maxObservableSafeTime = clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()).longValue(); - LOG.info("maxObservableSafeTime is initialized with [" + maxObservableSafeTime + "]."); + LOG.info("maxObservableSafeTime is initialized with [" + HybridTimestamp.hybridTimestamp(maxObservableSafeTime) + "]."); Review Comment: ```suggestion if (LOG.isInfoEnabled()) { LOG.info("maxObservableSafeTime is initialized with [{}].", HybridTimestamp.hybridTimestamp(maxObservableSafeTime)); } ``` ########## modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java: ########## @@ -549,6 +546,14 @@ public void prepareCluster() throws Exception { assertNotNull(clientTxManager); } + protected HybridClock createClocks(ClusterNode node) { Review Comment: ```suggestion protected HybridClock createClock(ClusterNode node) { ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -2735,24 +2738,20 @@ private CompletableFuture<CompletableFuture<?>> applyUpdateCommand( ); } - CompletableFuture<UUID> fut = applyCmdWithExceptionHandling(cmd, new CompletableFuture<>()) - .thenApply(res -> cmd.txId()); + CompletableFuture<UUID> repFut = applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId()); - return completedFuture(fut); + return completedFuture(new ApplyResult(null, repFut)); } else { - CompletableFuture<ApplyCommandResult<Object>> resultFuture = new CompletableFuture<>(); - - applyCmdWithExceptionHandling(cmd, resultFuture); - - return resultFuture.thenCompose(res -> { + return applyCmdWithExceptionHandling(cmd).thenCompose(res -> { UpdateCommandResult updateCommandResult = (UpdateCommandResult) res.getResult(); if (updateCommandResult != null && !updateCommandResult.isPrimaryReplicaMatch()) { throw new PrimaryReplicaMissException(txId, cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime()); } if (updateCommandResult != null && updateCommandResult.isPrimaryInPeersAndLearners()) { - return safeTime.waitFor(((UpdateCommand) res.getCommand()).safeTime()).thenApply(ignored -> null); + return safeTime.waitFor(((UpdateCommand) res.getCommand()).safeTime()).thenApply(ignored -> null) + .thenApply(ret -> new ApplyResult(((UpdateCommand) res.getCommand()).safeTime(), null)); Review Comment: ```suggestion return safeTime.waitFor(((UpdateCommand) res.getCommand()).safeTime()) .thenApply(ret -> new ApplyResult(((UpdateCommand) res.getCommand()).safeTime(), null)); ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java: ########## @@ -1107,7 +1116,7 @@ public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransac .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestType(RW_UPSERT) - .timestamp(clockService.now()) + .timestamp(txo.startTimestamp()) // TODO replace everythere Review Comment: Also, could you please write a comment somewhere (in the top of this class, maybe?) which would explain why it's ok to pass transaction begin ts instead of real 'HLC.now()'? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -642,11 +643,11 @@ private CompletableFuture<Void> triggerTxRecovery(UUID txId, UUID senderId) { HybridTimestamp opStartTs; if (request instanceof ReadWriteReplicaRequest) { - opStartTs = clockService.now(); + opStartTs = clockService.current(); Review Comment: It seems that now we only trigger ticking with `HLC.now()` (on the transaction operations processing path) in the following cases: 1. Tx begin 2. Write operation in a tx 3. Commit Is this true? In any case, could you please write a comment somewhere that would define the principle that is used to choose when we use `now()` and when `current()`? ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java: ########## @@ -66,6 +66,14 @@ public class UpsertKvBenchmark extends AbstractMultiNodeBenchmark { @Param({"8"}) private int partitionCount; + private static final AtomicInteger counter = new AtomicInteger(); + + private static ThreadLocal<Integer> gen = ThreadLocal.withInitial(() -> { + int id = counter.getAndIncrement(); + return id * 20000000; Review Comment: ```suggestion return id * 20_000_000; ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java: ########## @@ -1107,7 +1116,7 @@ public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransac .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestType(RW_UPSERT) - .timestamp(clockService.now()) + .timestamp(txo.startTimestamp()) // TODO replace everythere Review Comment: Please mention a Jira issue in the TODO ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java: ########## @@ -633,7 +634,19 @@ private <R> CompletableFuture<R> trackingInvoke( || request instanceof SwapRowReplicaRequest; if (full) { // Full transaction retries are handled in postEnlist. - return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), request); + return replicaSvc.invokeRaw(primaryReplicaAndConsistencyToken.get1(), request).handle((r, e) -> { + boolean hasError = e != null; + assert hasError || r instanceof TimestampAware; + + // timestamp is set to commit timestamp for full transactions. Review Comment: ```suggestion // Timestamp is set to commit timestamp for full transactions. ``` ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java: ########## @@ -1040,17 +1041,18 @@ private void sendAwaitReplicaResponse(String senderConsistentId, long correlatio /** * Prepares replica response. */ - private NetworkMessage prepareReplicaResponse(boolean sendTimestamp, Object result) { + private NetworkMessage prepareReplicaResponse(boolean sendTimestamp, ReplicaResult result) { if (sendTimestamp) { + HybridTimestamp commitTs = result.applyResult().getCommitTimestamp(); return REPLICA_MESSAGES_FACTORY .timestampAwareReplicaResponse() - .result(result) - .timestamp(clockService.now()) + .result(result.result()) + .timestamp(commitTs == null ? clockService.current() : commitTs) Review Comment: If I'm not mistaken, this timestamp is used to update HLC on the recipient node. If we send it a `commitTs` (which may be very stale) instead of the real clock value, we might make the recipient lag with respect to our clock. Is this ok? ########## modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java: ########## @@ -256,7 +257,8 @@ void testClose(PendingComparableValuesTracker trackerParam) { CompletableFuture<Void> future0 = tracker.waitFor(2); - tracker.close(); + // Close is called from dedicated stop worker. + IgniteTestUtils.runAsync(tracker::close).join(); Review Comment: Why is this needed? ########## modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java: ########## @@ -2243,10 +2243,6 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi .success(false) .term(this.currTerm); - if (request.timestamp() != null) { Review Comment: If I'm not mistaken, transaction processing will cause other nodes' clocks to be adjusted to the sender clock (due to clock updates that happen when we process replica messages), and that's all. 1. Are there any other sources of such adjustments? 2. Will we adjust clocks if there is no transactional traffic? Will partition safe time propagation cause such adjustments? 3. If there are no partitions in the cluster at all, will the clocks be adjusted? Should we leave the 'through Raft adjustment' for the Metastorage? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org