sanpwc commented on code in PR #4256: URL: https://github.com/apache/ignite-3/pull/4256#discussion_r1777349331
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorService.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.Map; +import org.apache.ignite.internal.replicator.TablePartitionId; + +/** Collects minimum required timestamp for each partition. */ +public interface MinimumRequiredTimeCollectorService { + Review Comment: Unnecessary blank line. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorServiceImpl.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.internal.replicator.TablePartitionId; + +/** Collects minimum required timestamp for each partition. */ +public class MinimumRequiredTimeCollectorServiceImpl implements MinimumRequiredTimeCollectorService { + + private final ConcurrentHashMap<TablePartitionId, Long> partitions = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override + public synchronized void addPartition(TablePartitionId tablePartitionId) { + partitions.put(tablePartitionId, UNDEFINED_MIN_TIME); + } + + /** {@inheritDoc} */ + @Override + public synchronized void recordMinActiveTxTimestamp(TablePartitionId tablePartitionId, long timestamp) { + Long time = partitions.get(tablePartitionId); + if (time == null) { + // Ignore removed partitions. + return; + } + + if (time == UNDEFINED_MIN_TIME || timestamp > time) { + partitions.put(tablePartitionId, timestamp); + } + } + + /** {@inheritDoc} */ + @Override + public void removePartition(TablePartitionId tablePartitionId) { Review Comment: As mentioned above, I'd rather consider different approach. However, if you will stick with given on - should we mark removePartition as synchronized? Otherwise you'll have a race with recordMinActiveTxTimestamp. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorService.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.Map; +import org.apache.ignite.internal.replicator.TablePartitionId; + +/** Collects minimum required timestamp for each partition. */ +public interface MinimumRequiredTimeCollectorService { + + /** Undefined value of a min timestamp. */ + long UNDEFINED_MIN_TIME = 0; + + /** Registers a partition. */ + void addPartition(TablePartitionId tablePartitionId); + + /** Records the given timestamp .*/ + void recordMinActiveTxTimestamp(TablePartitionId tablePartitionId, long timestamp); + + /** Remove timestamps associated with the given partition .*/ + void removePartition(TablePartitionId tablePartitionId); + + /** Returns a snapshot of collected timestamps. */ + Map<TablePartitionId, Long> minTimestampPerPartition(); + + /** Closes this service. */ + void close(); Review Comment: Should we mark the interface as ManuallyCloseable? ########## modules/table/src/test/java/org/apache/ignite/internal/table/MinimumRequiredTimeCollectorServiceSelfTest.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Map; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService; +import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link MinimumRequiredTimeCollectorServiceImpl}. + */ +public class MinimumRequiredTimeCollectorServiceSelfTest extends BaseIgniteAbstractTest { + Review Comment: Same as above, unnecessary empty line. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorService.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.Map; +import org.apache.ignite.internal.replicator.TablePartitionId; + +/** Collects minimum required timestamp for each partition. */ +public interface MinimumRequiredTimeCollectorService { + + /** Undefined value of a min timestamp. */ + long UNDEFINED_MIN_TIME = 0; + + /** Registers a partition. */ + void addPartition(TablePartitionId tablePartitionId); + + /** Records the given timestamp .*/ + void recordMinActiveTxTimestamp(TablePartitionId tablePartitionId, long timestamp); + + /** Remove timestamps associated with the given partition .*/ + void removePartition(TablePartitionId tablePartitionId); Review Comment: Don't you think that adjustWatermark(watermark) will be simper? I mean that every partition may recordMinActiveTxTimestamp without previous partition registration (addPartition()). CleanupService (I don't remember real name) on successful cleanup will call adjustWatermark(cleanupTimestamp) that will remove all map entires with timestamp <=cleanupTimestamp. In that case removePartition is also no longer needed. Or it's somehow related to awaiting all partitions readiness? ########## modules/table/src/test/java/org/apache/ignite/internal/table/MinimumRequiredTimeCollectorServiceSelfTest.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Map; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService; +import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link MinimumRequiredTimeCollectorServiceImpl}. + */ +public class MinimumRequiredTimeCollectorServiceSelfTest extends BaseIgniteAbstractTest { + + private static final long UNDEFINED_MIN_TIME = MinimumRequiredTimeCollectorService.UNDEFINED_MIN_TIME; + + @Test + public void test() { + MinimumRequiredTimeCollectorServiceImpl collectorService = new MinimumRequiredTimeCollectorServiceImpl(); + + // No partitions + assertEquals(Map.of(), collectorService.minTimestampPerPartition()); Review Comment: It might be implementation specific. I'd rather assert that size == 0. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorServiceImpl.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.internal.replicator.TablePartitionId; + +/** Collects minimum required timestamp for each partition. */ +public class MinimumRequiredTimeCollectorServiceImpl implements MinimumRequiredTimeCollectorService { + Review Comment: Unnecessary blank line. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorService.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.Map; +import org.apache.ignite.internal.replicator.TablePartitionId; + +/** Collects minimum required timestamp for each partition. */ +public interface MinimumRequiredTimeCollectorService { + + /** Undefined value of a min timestamp. */ + long UNDEFINED_MIN_TIME = 0; + + /** Registers a partition. */ + void addPartition(TablePartitionId tablePartitionId); + + /** Records the given timestamp .*/ + void recordMinActiveTxTimestamp(TablePartitionId tablePartitionId, long timestamp); + + /** Remove timestamps associated with the given partition .*/ + void removePartition(TablePartitionId tablePartitionId); + + /** Returns a snapshot of collected timestamps. */ + Map<TablePartitionId, Long> minTimestampPerPartition(); Review Comment: Is that really important that it's TablePartitionId or we can use ReplicationGroupId here? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java: ########## @@ -714,10 +713,18 @@ private void handleUpdateMinimalActiveTxTimeCommand(UpdateMinimumActiveTxBeginTi } long minActiveTxBeginTime0 = minActiveTxBeginTime; + long timestamp = cmd.timestamp(); - assert minActiveTxBeginTime0 <= cmd.timestamp() : "maxTime=" + minActiveTxBeginTime0 + ", cmdTime=" + cmd.timestamp(); + assert minActiveTxBeginTime0 <= timestamp : "maxTime=" + minActiveTxBeginTime0 + ", cmdTime=" + timestamp; - minActiveTxBeginTime = cmd.timestamp(); + storage.flush(false).whenComplete((r, t) -> { Review Comment: I'm talking about TxStateStorage and not other's table PartitionDataStorage here. ########## modules/table/src/test/java/org/apache/ignite/internal/table/MinimumRequiredTimeCollectorServiceSelfTest.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Map; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService; +import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link MinimumRequiredTimeCollectorServiceImpl}. + */ +public class MinimumRequiredTimeCollectorServiceSelfTest extends BaseIgniteAbstractTest { + + private static final long UNDEFINED_MIN_TIME = MinimumRequiredTimeCollectorService.UNDEFINED_MIN_TIME; Review Comment: Why not to use static import instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org