sanpwc commented on code in PR #4256:
URL: https://github.com/apache/ignite-3/pull/4256#discussion_r1777349331


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorService.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.Map;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+
+/** Collects minimum required timestamp for each partition. */
+public interface MinimumRequiredTimeCollectorService {
+

Review Comment:
   Unnecessary blank line.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorServiceImpl.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+
+/** Collects minimum required timestamp for each partition. */
+public class MinimumRequiredTimeCollectorServiceImpl implements 
MinimumRequiredTimeCollectorService {
+
+    private final ConcurrentHashMap<TablePartitionId, Long> partitions = new 
ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized void addPartition(TablePartitionId tablePartitionId) {
+        partitions.put(tablePartitionId, UNDEFINED_MIN_TIME);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized void recordMinActiveTxTimestamp(TablePartitionId 
tablePartitionId, long timestamp) {
+        Long time = partitions.get(tablePartitionId);
+        if (time == null) {
+            // Ignore removed partitions.
+            return;
+        }
+
+        if (time == UNDEFINED_MIN_TIME || timestamp > time) {
+            partitions.put(tablePartitionId, timestamp);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void removePartition(TablePartitionId tablePartitionId) {

Review Comment:
   As mentioned above, I'd rather consider different approach. However, if you 
will stick with given on - should we mark removePartition as synchronized? 
Otherwise you'll have a race with recordMinActiveTxTimestamp.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorService.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.Map;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+
+/** Collects minimum required timestamp for each partition. */
+public interface MinimumRequiredTimeCollectorService {
+
+    /** Undefined value of a min timestamp. */
+    long UNDEFINED_MIN_TIME = 0;
+
+    /** Registers a partition. */
+    void addPartition(TablePartitionId tablePartitionId);
+
+    /** Records the given timestamp .*/
+    void recordMinActiveTxTimestamp(TablePartitionId tablePartitionId, long 
timestamp);
+
+    /** Remove timestamps associated with the given partition .*/
+    void removePartition(TablePartitionId tablePartitionId);
+
+    /** Returns a snapshot of collected timestamps. */
+    Map<TablePartitionId, Long> minTimestampPerPartition();
+
+    /** Closes this service. */
+    void close();

Review Comment:
   Should we mark the interface as ManuallyCloseable?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/MinimumRequiredTimeCollectorServiceSelfTest.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Map;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link MinimumRequiredTimeCollectorServiceImpl}.
+ */
+public class MinimumRequiredTimeCollectorServiceSelfTest extends 
BaseIgniteAbstractTest {
+

Review Comment:
   Same as above, unnecessary empty line.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorService.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.Map;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+
+/** Collects minimum required timestamp for each partition. */
+public interface MinimumRequiredTimeCollectorService {
+
+    /** Undefined value of a min timestamp. */
+    long UNDEFINED_MIN_TIME = 0;
+
+    /** Registers a partition. */
+    void addPartition(TablePartitionId tablePartitionId);
+
+    /** Records the given timestamp .*/
+    void recordMinActiveTxTimestamp(TablePartitionId tablePartitionId, long 
timestamp);
+
+    /** Remove timestamps associated with the given partition .*/
+    void removePartition(TablePartitionId tablePartitionId);

Review Comment:
   Don't you think that adjustWatermark(watermark) will be simper?
   I mean that every partition may recordMinActiveTxTimestamp without previous 
partition registration (addPartition()). CleanupService (I don't remember real 
name) on successful cleanup will call adjustWatermark(cleanupTimestamp) that 
will remove all map entires with timestamp <=cleanupTimestamp. In that case 
removePartition is also no longer needed.
   
   Or it's somehow related to awaiting all partitions readiness?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/MinimumRequiredTimeCollectorServiceSelfTest.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Map;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link MinimumRequiredTimeCollectorServiceImpl}.
+ */
+public class MinimumRequiredTimeCollectorServiceSelfTest extends 
BaseIgniteAbstractTest {
+
+    private static final long UNDEFINED_MIN_TIME = 
MinimumRequiredTimeCollectorService.UNDEFINED_MIN_TIME;
+
+    @Test
+    public void test() {
+        MinimumRequiredTimeCollectorServiceImpl collectorService = new 
MinimumRequiredTimeCollectorServiceImpl();
+
+        // No partitions
+        assertEquals(Map.of(), collectorService.minTimestampPerPartition());

Review Comment:
   It might be implementation specific. I'd rather assert that size == 0.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorServiceImpl.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+
+/** Collects minimum required timestamp for each partition. */
+public class MinimumRequiredTimeCollectorServiceImpl implements 
MinimumRequiredTimeCollectorService {
+

Review Comment:
   Unnecessary blank line.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/MinimumRequiredTimeCollectorService.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.Map;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+
+/** Collects minimum required timestamp for each partition. */
+public interface MinimumRequiredTimeCollectorService {
+
+    /** Undefined value of a min timestamp. */
+    long UNDEFINED_MIN_TIME = 0;
+
+    /** Registers a partition. */
+    void addPartition(TablePartitionId tablePartitionId);
+
+    /** Records the given timestamp .*/
+    void recordMinActiveTxTimestamp(TablePartitionId tablePartitionId, long 
timestamp);
+
+    /** Remove timestamps associated with the given partition .*/
+    void removePartition(TablePartitionId tablePartitionId);
+
+    /** Returns a snapshot of collected timestamps. */
+    Map<TablePartitionId, Long> minTimestampPerPartition();

Review Comment:
   Is that really important that it's TablePartitionId or we can use 
ReplicationGroupId here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -714,10 +713,18 @@ private void 
handleUpdateMinimalActiveTxTimeCommand(UpdateMinimumActiveTxBeginTi
         }
 
         long minActiveTxBeginTime0 = minActiveTxBeginTime;
+        long timestamp = cmd.timestamp();
 
-        assert minActiveTxBeginTime0 <= cmd.timestamp() : "maxTime=" + 
minActiveTxBeginTime0 + ", cmdTime=" + cmd.timestamp();
+        assert minActiveTxBeginTime0 <= timestamp : "maxTime=" + 
minActiveTxBeginTime0 + ", cmdTime=" + timestamp;
 
-        minActiveTxBeginTime = cmd.timestamp();
+        storage.flush(false).whenComplete((r, t) -> {

Review Comment:
   I'm talking about TxStateStorage and not other's table PartitionDataStorage 
here.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/MinimumRequiredTimeCollectorServiceSelfTest.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Map;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link MinimumRequiredTimeCollectorServiceImpl}.
+ */
+public class MinimumRequiredTimeCollectorServiceSelfTest extends 
BaseIgniteAbstractTest {
+
+    private static final long UNDEFINED_MIN_TIME = 
MinimumRequiredTimeCollectorService.UNDEFINED_MIN_TIME;

Review Comment:
   Why not to use static import instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to