denis-chudov commented on code in PR #4608:
URL: https://github.com/apache/ignite-3/pull/4608#discussion_r1810709902


##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+
+import it.unimi.dsi.fastutil.longs.Long2IntMap;
+import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link LeaseBatch} instances.
+ *
+ * <p>The following optimizations are applied to minimize the amount of bytes 
a batch is serialized to:</p>
+ * <ul>
+ *     <li>Java Serialization is not used to serialize components (it's pretty 
verbose)</li>
+ *     <li>Varints are used extensively</li>
+ *     <li>A dictionary of all nodes mentioned as lease holders and proposed 
candidates is collected and written in the header once
+ *     per batch. This is beneficial as we usually a lot more leases than 
number of nodes in cluster, so we can just represent nodes
+ *     and their names as indices in the dictionary (this is especially 
effective given we use varints as indices are usually very small).
+ *     </li>
+ *     <li>Leases are grouped per table/zone ID (aka object ID), so an object 
ID is only written once per table </li>
+ * </ul>
+ */
+public class LeaseBatchSerializer extends VersionedSerializer<LeaseBatch> {
+    /*
+     * The following optimizations are applied to minimize the number of bytes 
a batch is serialized to:
+     *
+     * - Java Serialization is not used to serialize components (it's pretty 
verbose)
+     * - Varints are used extensively
+     * - A dictionary of all nodes mentioned as lease holders and proposed 
candidates is collected and written in the header once
+     * per batch. This is beneficial as we usually a lot more leases than 
number of nodes in cluster, so we can just represent nodes
+     * and their names as indices in the dictionary (this is especially 
effective given we use varints as indices are usually very small).
+     * - Leases are grouped per table/zone ID (aka object ID), so an object ID 
is only written once per object
+     * - Object IDs are represented as deltas with respect to the previous 
object; as leases are sorted by object IDs, the deltas are
+     * encoded effectively as varints
+     * - Partition IDs are not written, instead their index represents 
partition ID. Holes are rare (if possible), so we just write
+     * special markers to denote holes
+     * - Expiration timestamps are always computed from Start timestamps. To 
account for a possibility of this duration being changed
+     * on the go, we pass the most frequent duration once per batch (in the 
header), and then we only write lease duration for leases with
+     * uncommon duration (that is, duration different from the most frequent 
one). We use a flag to distinguish such leases.
+     * - Expiration timestamps always have their logical part equal to 0, so 
it's never written.
+     * - Physical parts of Start timestamps are coded as deltas with respect 
to the least timestamp present in the batch; this allows to
+     * have those deltas small and encode them effectively as varints
+     */

Review Comment:
   why not put it all to the javadoc?



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+
+import it.unimi.dsi.fastutil.longs.Long2IntMap;
+import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link LeaseBatch} instances.
+ *
+ * <p>The following optimizations are applied to minimize the amount of bytes 
a batch is serialized to:</p>
+ * <ul>
+ *     <li>Java Serialization is not used to serialize components (it's pretty 
verbose)</li>
+ *     <li>Varints are used extensively</li>
+ *     <li>A dictionary of all nodes mentioned as lease holders and proposed 
candidates is collected and written in the header once
+ *     per batch. This is beneficial as we usually a lot more leases than 
number of nodes in cluster, so we can just represent nodes
+ *     and their names as indices in the dictionary (this is especially 
effective given we use varints as indices are usually very small).
+ *     </li>
+ *     <li>Leases are grouped per table/zone ID (aka object ID), so an object 
ID is only written once per table </li>
+ * </ul>
+ */
+public class LeaseBatchSerializer extends VersionedSerializer<LeaseBatch> {
+    /*
+     * The following optimizations are applied to minimize the number of bytes 
a batch is serialized to:
+     *
+     * - Java Serialization is not used to serialize components (it's pretty 
verbose)
+     * - Varints are used extensively
+     * - A dictionary of all nodes mentioned as lease holders and proposed 
candidates is collected and written in the header once
+     * per batch. This is beneficial as we usually a lot more leases than 
number of nodes in cluster, so we can just represent nodes
+     * and their names as indices in the dictionary (this is especially 
effective given we use varints as indices are usually very small).
+     * - Leases are grouped per table/zone ID (aka object ID), so an object ID 
is only written once per object
+     * - Object IDs are represented as deltas with respect to the previous 
object; as leases are sorted by object IDs, the deltas are
+     * encoded effectively as varints
+     * - Partition IDs are not written, instead their index represents 
partition ID. Holes are rare (if possible), so we just write
+     * special markers to denote holes
+     * - Expiration timestamps are always computed from Start timestamps. To 
account for a possibility of this duration being changed
+     * on the go, we pass the most frequent duration once per batch (in the 
header), and then we only write lease duration for leases with
+     * uncommon duration (that is, duration different from the most frequent 
one). We use a flag to distinguish such leases.
+     * - Expiration timestamps always have their logical part equal to 0, so 
it's never written.
+     * - Physical parts of Start timestamps are coded as deltas with respect 
to the least timestamp present in the batch; this allows to
+     * have those deltas small and encode them effectively as varints
+     */
+
+    /** Serializer instance. */
+    public static final LeaseBatchSerializer INSTANCE = new 
LeaseBatchSerializer();
+
+    /** Contains {@link Lease#isAccepted()}. */
+    @SuppressWarnings("PointlessBitwiseExpression")
+    private static final int ACCEPTED_MASK = 1 << 0;
+
+    /** Contains {@link Lease#isProlongable()}. */
+    private static final int PROLONGABLE_MASK = 1 << 1;
+
+    /** Whether the lease has a non-null {@link Lease#proposedCandidate()}. */
+    private static final int HAS_PROPOSED_CANDIDATE_MASK = 1 << 2;
+
+    /** Whether lease period (in absolute time) differs from the most common 
period in the batch. */
+    private static final int HAS_UNCOMMON_PERIOD_MASK = 1 << 3;
+
+    /** Whether expiration timestamp logical part is not zero (this is 
uncommon). */
+    private static final int HAS_EXPIRATION_LOGICAL_PART_MASK = 1 << 4;
+
+    /** Whether this is not a real lease, but a hole in partitionId sequence. 
Having this flag allows us to omit partitionId. */
+    private static final int DUMMY_LEASE_MASK = 1 << 5;
+
+    // When there are no more 8 nodes in the cluster, node name index and node 
index are guaranteed to fit in 7 bits we have in a varint
+    // byte, which allows us to enable 'compact mode' to save 1 byte per lease.
+
+    /** Number of bits to fit name index/node index in to enable compact mode. 
*/
+    private static final int BIT_WIDTH_TO_FIT_IN_HALF_BYTE = 3;
+
+    /** Max size of cluster which allows compact mode. */
+    private static final int MAX_NODES_FOR_COMPACT_MODE = 1 << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE;
+
+    /** Mask to extract lease holder index from compact representation. */
+    private static final int COMPACT_HOLDER_INDEX_MASK = (1 << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE) - 1;
+
+    @Override
+    protected void writeExternalData(LeaseBatch batch, IgniteDataOutput out) 
throws IOException {
+        long minTimestampPhysical = minTimestampPhysicalPart(batch);
+        long commonLeasePeriod = mostFrequentLeasePeriod(batch);
+
+        out.writeVarInt(minTimestampPhysical);
+        out.writeVarInt(commonLeasePeriod);
+
+        NodesDictionary nodesDictionary = buildNodesDictionary(batch);
+        nodesDictionary.writeTo(out);
+
+        List<Lease> tableLeases = batch.leases().stream()
+                .filter(lease -> lease.replicationGroupId() instanceof 
TablePartitionId)
+                .collect(toList());
+        List<Lease> zoneLeases = batch.leases().stream()
+                .filter(lease -> lease.replicationGroupId() instanceof 
ZonePartitionId)
+                .collect(toList());
+        assert tableLeases.size() + zoneLeases.size() == batch.leases().size() 
: "There are " + batch.leases().size()
+                + " leases in total, "
+                + tableLeases.size() + " of them are table leases, " + 
zoneLeases.size() + " are zone leases, but "
+                + (batch.leases().size() - tableLeases.size() - 
zoneLeases.size()) + " are neither";
+
+        writePartitionedGroupLeases(tableLeases, minTimestampPhysical, 
commonLeasePeriod, nodesDictionary, out);
+
+        assert zoneLeases.isEmpty() : "There are zone leases which are not 
supported yet";
+    }
+
+    private static long minTimestampPhysicalPart(LeaseBatch batch) {
+        long min = HybridTimestamp.MAX_VALUE.getPhysical();
+
+        for (Lease lease : batch.leases()) {
+            min = Math.min(min, lease.getStartTime().getPhysical());
+            min = Math.min(min, lease.getExpirationTime().getPhysical());

Review Comment:
   does it make sense? start time is always less than expiration time, I would 
recommend an assertion



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+
+import it.unimi.dsi.fastutil.longs.Long2IntMap;
+import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link LeaseBatch} instances.
+ *
+ * <p>The following optimizations are applied to minimize the amount of bytes 
a batch is serialized to:</p>
+ * <ul>
+ *     <li>Java Serialization is not used to serialize components (it's pretty 
verbose)</li>
+ *     <li>Varints are used extensively</li>
+ *     <li>A dictionary of all nodes mentioned as lease holders and proposed 
candidates is collected and written in the header once
+ *     per batch. This is beneficial as we usually a lot more leases than 
number of nodes in cluster, so we can just represent nodes
+ *     and their names as indices in the dictionary (this is especially 
effective given we use varints as indices are usually very small).
+ *     </li>
+ *     <li>Leases are grouped per table/zone ID (aka object ID), so an object 
ID is only written once per table </li>
+ * </ul>
+ */
+public class LeaseBatchSerializer extends VersionedSerializer<LeaseBatch> {
+    /*
+     * The following optimizations are applied to minimize the number of bytes 
a batch is serialized to:
+     *
+     * - Java Serialization is not used to serialize components (it's pretty 
verbose)
+     * - Varints are used extensively
+     * - A dictionary of all nodes mentioned as lease holders and proposed 
candidates is collected and written in the header once
+     * per batch. This is beneficial as we usually a lot more leases than 
number of nodes in cluster, so we can just represent nodes
+     * and their names as indices in the dictionary (this is especially 
effective given we use varints as indices are usually very small).
+     * - Leases are grouped per table/zone ID (aka object ID), so an object ID 
is only written once per object
+     * - Object IDs are represented as deltas with respect to the previous 
object; as leases are sorted by object IDs, the deltas are
+     * encoded effectively as varints
+     * - Partition IDs are not written, instead their index represents 
partition ID. Holes are rare (if possible), so we just write
+     * special markers to denote holes
+     * - Expiration timestamps are always computed from Start timestamps. To 
account for a possibility of this duration being changed
+     * on the go, we pass the most frequent duration once per batch (in the 
header), and then we only write lease duration for leases with
+     * uncommon duration (that is, duration different from the most frequent 
one). We use a flag to distinguish such leases.
+     * - Expiration timestamps always have their logical part equal to 0, so 
it's never written.
+     * - Physical parts of Start timestamps are coded as deltas with respect 
to the least timestamp present in the batch; this allows to
+     * have those deltas small and encode them effectively as varints
+     */
+
+    /** Serializer instance. */
+    public static final LeaseBatchSerializer INSTANCE = new 
LeaseBatchSerializer();
+
+    /** Contains {@link Lease#isAccepted()}. */
+    @SuppressWarnings("PointlessBitwiseExpression")
+    private static final int ACCEPTED_MASK = 1 << 0;
+
+    /** Contains {@link Lease#isProlongable()}. */
+    private static final int PROLONGABLE_MASK = 1 << 1;
+
+    /** Whether the lease has a non-null {@link Lease#proposedCandidate()}. */
+    private static final int HAS_PROPOSED_CANDIDATE_MASK = 1 << 2;
+
+    /** Whether lease period (in absolute time) differs from the most common 
period in the batch. */
+    private static final int HAS_UNCOMMON_PERIOD_MASK = 1 << 3;
+
+    /** Whether expiration timestamp logical part is not zero (this is 
uncommon). */
+    private static final int HAS_EXPIRATION_LOGICAL_PART_MASK = 1 << 4;
+
+    /** Whether this is not a real lease, but a hole in partitionId sequence. 
Having this flag allows us to omit partitionId. */
+    private static final int DUMMY_LEASE_MASK = 1 << 5;
+
+    // When there are no more 8 nodes in the cluster, node name index and node 
index are guaranteed to fit in 7 bits we have in a varint
+    // byte, which allows us to enable 'compact mode' to save 1 byte per lease.
+
+    /** Number of bits to fit name index/node index in to enable compact mode. 
*/
+    private static final int BIT_WIDTH_TO_FIT_IN_HALF_BYTE = 3;
+
+    /** Max size of cluster which allows compact mode. */
+    private static final int MAX_NODES_FOR_COMPACT_MODE = 1 << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE;
+
+    /** Mask to extract lease holder index from compact representation. */
+    private static final int COMPACT_HOLDER_INDEX_MASK = (1 << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE) - 1;
+
+    @Override
+    protected void writeExternalData(LeaseBatch batch, IgniteDataOutput out) 
throws IOException {
+        long minTimestampPhysical = minTimestampPhysicalPart(batch);
+        long commonLeasePeriod = mostFrequentLeasePeriod(batch);
+
+        out.writeVarInt(minTimestampPhysical);
+        out.writeVarInt(commonLeasePeriod);
+
+        NodesDictionary nodesDictionary = buildNodesDictionary(batch);
+        nodesDictionary.writeTo(out);
+
+        List<Lease> tableLeases = batch.leases().stream()
+                .filter(lease -> lease.replicationGroupId() instanceof 
TablePartitionId)
+                .collect(toList());
+        List<Lease> zoneLeases = batch.leases().stream()
+                .filter(lease -> lease.replicationGroupId() instanceof 
ZonePartitionId)
+                .collect(toList());
+        assert tableLeases.size() + zoneLeases.size() == batch.leases().size() 
: "There are " + batch.leases().size()
+                + " leases in total, "
+                + tableLeases.size() + " of them are table leases, " + 
zoneLeases.size() + " are zone leases, but "
+                + (batch.leases().size() - tableLeases.size() - 
zoneLeases.size()) + " are neither";
+
+        writePartitionedGroupLeases(tableLeases, minTimestampPhysical, 
commonLeasePeriod, nodesDictionary, out);
+
+        assert zoneLeases.isEmpty() : "There are zone leases which are not 
supported yet";
+    }
+
+    private static long minTimestampPhysicalPart(LeaseBatch batch) {
+        long min = HybridTimestamp.MAX_VALUE.getPhysical();
+
+        for (Lease lease : batch.leases()) {
+            min = Math.min(min, lease.getStartTime().getPhysical());
+            min = Math.min(min, lease.getExpirationTime().getPhysical());
+        }
+
+        return min;
+    }
+
+    private static long mostFrequentLeasePeriod(LeaseBatch batch) {
+        if (batch.leases().isEmpty()) {
+            return 0;
+        }
+
+        Long2IntMap counts = new Long2IntOpenHashMap();
+
+        for (Lease lease : batch.leases()) {
+            long period = lease.getExpirationTime().getPhysical() - 
lease.getStartTime().getPhysical();
+            counts.mergeInt(period, 1, Integer::sum);
+        }
+
+        long commonPeriod = -1;
+        int maxCount = -1;
+        for (Long2IntMap.Entry entry : counts.long2IntEntrySet()) {
+            if (entry.getIntValue() > maxCount) {
+                commonPeriod = entry.getLongKey();
+                maxCount = entry.getIntValue();
+            }
+        }
+
+        return commonPeriod;
+    }
+
+    private static NodesDictionary buildNodesDictionary(LeaseBatch batch) {
+        NodesDictionary nodesDictionary = new NodesDictionary();
+
+        for (Lease lease : batch.leases()) {
+            if (lease.getLeaseholderId() != null) {
+                assert lease.getLeaseholder() != null : lease;
+                nodesDictionary.putNode(lease.getLeaseholderId(), 
lease.getLeaseholder());
+            }
+            if (lease.proposedCandidate() != null) {
+                //noinspection DataFlowIssue
+                nodesDictionary.putName(lease.proposedCandidate());
+            }
+        }
+
+        return nodesDictionary;
+    }
+
+    private static void writePartitionedGroupLeases(
+            List<Lease> leases,
+            long minTsPhysical,
+            long commonLeasePeriod,
+            NodesDictionary nodesDictionary,
+            IgniteDataOutput out
+    ) throws IOException {
+        Map<Integer, List<Lease>> leasesByObjectId = leases.stream()
+                .collect(
+                        groupingBy(
+                                lease -> 
partitionedGroupIdFrom(lease).objectId(),
+                                TreeMap::new,
+                                toList()
+                        )
+                );
+
+        out.writeVarInt(leasesByObjectId.size());
+
+        int objectIdBase = 0;
+        for (Entry<Integer, List<Lease>> entry : leasesByObjectId.entrySet()) {
+            int objectId = entry.getKey();
+            List<Lease> objectLeases = entry.getValue();
+
+            objectIdBase = writeLeasesForObject(
+                    objectId,
+                    objectLeases,
+                    minTsPhysical,
+                    commonLeasePeriod,
+                    nodesDictionary,
+                    out,
+                    objectIdBase
+            );
+        }
+    }
+
+    private static PartitionGroupId partitionedGroupIdFrom(Lease lease) {
+        return (PartitionGroupId) lease.replicationGroupId();
+    }
+
+    private static int writeLeasesForObject(
+            int objectId,
+            List<Lease> objectLeases,
+            long minTsPhysical,
+            long commonLeasePeriod,
+            NodesDictionary nodesDictionary,
+            IgniteDataOutput out,
+            int objectIdBase
+    ) throws IOException {
+        
objectLeases.sort(comparing(LeaseBatchSerializer::partitionedGroupIdFrom, 
comparing(PartitionGroupId::partitionId)));
+
+        out.writeVarInt(objectId - objectIdBase);
+
+        int partitionCount = 
partitionedGroupIdFrom(objectLeases.get(objectLeases.size() - 1)).partitionId() 
+ 1;
+        out.writeVarInt(partitionCount);
+
+        int partitionId = 0;
+        for (Lease lease : objectLeases) {
+            partitionId = writeLease(lease, partitionId, minTsPhysical, 
commonLeasePeriod, nodesDictionary, out);
+        }
+
+        return objectId;
+    }
+
+    private static int writeLease(
+            Lease lease,
+            int partitionId,
+            long minTsPhysical,
+            long commonLeasePeriod,
+            NodesDictionary nodesDictionary,
+            IgniteDataOutput out
+    ) throws IOException {
+        PartitionGroupId groupId = partitionedGroupIdFrom(lease);
+
+        while (partitionId < groupId.partitionId()) {
+            // It's a hole in partitionId sequence, let's write a 'dummy 
lease'.
+            out.write(DUMMY_LEASE_MASK);
+            partitionId++;
+        }
+
+        assert partitionId == groupId.partitionId() : "Duplicate partitionId 
in " + lease;
+
+        assert lease.getLeaseholder() != null && lease.getLeaseholderId() != 
null : lease + " doesn't have a leaseholder";
+        assert lease.getStartTime() != HybridTimestamp.MIN_VALUE : lease + " 
has illegal start time";
+        assert lease.getExpirationTime() != HybridTimestamp.MIN_VALUE : lease 
+ " has illegal expiration time";
+
+        UUID leaseHolderId = lease.getLeaseholderId();
+        String proposedCandidate = lease.proposedCandidate();
+        boolean hasProposedCandidate = proposedCandidate != null;
+
+        long periodAbsolutePart = lease.getExpirationTime().getPhysical() - 
lease.getStartTime().getPhysical();
+        boolean hasUncommonPeriod = periodAbsolutePart != commonLeasePeriod;
+        boolean hasExpirationLogicalPart = 
lease.getExpirationTime().getLogical() != 0;
+
+        out.write(flags(lease.isAccepted(), lease.isProlongable(), 
hasProposedCandidate, hasUncommonPeriod, hasExpirationLogicalPart));
+
+        if (holderIdAndProposedCandidateFitIn1Byte(nodesDictionary)) {
+            int nodesInfo = packNodesInfo(
+                    nodesDictionary.getNodeIndex(leaseHolderId),
+                    hasProposedCandidate ? 
nodesDictionary.getNameIndex(proposedCandidate) : 0
+            );
+            out.writeVarInt(nodesInfo);
+        } else {
+            out.writeVarInt(nodesDictionary.getNodeIndex(leaseHolderId));
+            if (hasProposedCandidate) {
+                
out.writeVarInt(nodesDictionary.getNameIndex(proposedCandidate));
+            }
+        }
+
+        out.writeVarInt(lease.getStartTime().getPhysical() - minTsPhysical);
+        out.writeVarInt(lease.getStartTime().getLogical());
+
+        if (hasUncommonPeriod) {
+            out.writeVarInt(lease.getExpirationTime().getPhysical() - 
lease.getStartTime().getPhysical());
+        }
+        if (hasExpirationLogicalPart) {
+            out.writeVarInt(lease.getExpirationTime().getLogical());
+        }
+
+        return partitionId + 1;
+    }
+
+    private static int packNodesInfo(int holderNodeIndex, int 
proposedCandidateNameIndex) {
+        assert holderNodeIndex < MAX_NODES_FOR_COMPACT_MODE : holderNodeIndex;
+        assert proposedCandidateNameIndex < MAX_NODES_FOR_COMPACT_MODE : 
proposedCandidateNameIndex;
+
+        return holderNodeIndex | (proposedCandidateNameIndex << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE);
+    }
+
+    private static boolean 
holderIdAndProposedCandidateFitIn1Byte(NodesDictionary dictionary) {
+        // Up to 8 names means that for name index it's enough to have 3 bits, 
same for node index, so, in sum, they
+        // require up to 6 bits, and we have 7 bits in a varint byte.
+        return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE;
+    }
+
+    private static int flags(
+            boolean accepted,
+            boolean prolongable,
+            boolean hasProposedCandidate,
+            boolean hasUncommonPeriod,
+            boolean hasExpirationLogicalPart
+    ) {
+        return (accepted ? ACCEPTED_MASK : 0)
+                | (prolongable ? PROLONGABLE_MASK : 0)
+                | (hasProposedCandidate ? HAS_PROPOSED_CANDIDATE_MASK : 0)
+                | (hasUncommonPeriod ? HAS_UNCOMMON_PERIOD_MASK : 0)
+                | (hasExpirationLogicalPart ? HAS_EXPIRATION_LOGICAL_PART_MASK 
: 0);
+    }
+
+    @Override
+    protected LeaseBatch readExternalData(byte protoVer, IgniteDataInput in) 
throws IOException {
+        long minTimestampPhysical = in.readVarInt();
+        long commonLeasePeriod = in.readVarInt();
+        NodesDictionary nodesDictionary = NodesDictionary.readFrom(in);
+
+        List<Lease> leases = new ArrayList<>();
+
+        readPartitionedGroupLeases(minTimestampPhysical, commonLeasePeriod, 
nodesDictionary, leases, in, TablePartitionId::new);
+
+        return new LeaseBatch(leases);
+    }
+
+    private static void readPartitionedGroupLeases(
+            long minTimestampPhysical,
+            long commonLeasePeriod,
+            NodesDictionary nodesDictionary,
+            List<Lease> leases,
+            IgniteDataInput in,
+            GroupIdFactory groupIdFactory
+    ) throws IOException {
+        int objectCount = in.readVarIntAsInt();
+
+        int objectIdBase = 0;
+        for (int i = 0; i < objectCount; i++) {
+            objectIdBase = readLeasesForObject(
+                    minTimestampPhysical,
+                    commonLeasePeriod,
+                    nodesDictionary,
+                    leases,
+                    in,
+                    groupIdFactory,
+                    objectIdBase
+            );
+        }
+    }
+
+    private static int readLeasesForObject(
+            long minTimestampPhysical,
+            long commonLeasePeriod,
+            NodesDictionary nodesDictionary,
+            List<Lease> leases,
+            IgniteDataInput in,
+            GroupIdFactory groupIdFactory,
+            int objectIdBase
+    ) throws IOException {
+        int objectId = objectIdBase + in.readVarIntAsInt();
+
+        int partitionCount = in.readVarIntAsInt();
+        for (int partitionId = 0; partitionId < partitionCount; partitionId++) 
{
+            Lease lease = readLeaseForPartition(
+                    partitionId,
+                    objectId,
+                    minTimestampPhysical,
+                    commonLeasePeriod,
+                    nodesDictionary,
+                    in,
+                    groupIdFactory
+            );
+            if (lease != null) {
+                leases.add(lease);
+            }
+        }
+
+        return objectId;
+    }
+
+    private static @Nullable Lease readLeaseForPartition(

Review Comment:
   could you please describe in text (or better ascii-scheme) the data input 
structure and how the lease is stored? You may also do this in class header



##########
modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java:
##########
@@ -65,7 +43,7 @@ public void testLeaseBatchSerialization() {
                     i % 2 == 0,
                     i % 2 == 1,
                     i % 2 == 0 ? null : "node" + i,
-                    groupId
+                    new TablePartitionId(1, i)

Review Comment:
   could you pls add tests with different table ids and holes in partition list?



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+
+import it.unimi.dsi.fastutil.longs.Long2IntMap;
+import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link LeaseBatch} instances.
+ *
+ * <p>The following optimizations are applied to minimize the amount of bytes 
a batch is serialized to:</p>
+ * <ul>
+ *     <li>Java Serialization is not used to serialize components (it's pretty 
verbose)</li>
+ *     <li>Varints are used extensively</li>
+ *     <li>A dictionary of all nodes mentioned as lease holders and proposed 
candidates is collected and written in the header once
+ *     per batch. This is beneficial as we usually a lot more leases than 
number of nodes in cluster, so we can just represent nodes
+ *     and their names as indices in the dictionary (this is especially 
effective given we use varints as indices are usually very small).
+ *     </li>
+ *     <li>Leases are grouped per table/zone ID (aka object ID), so an object 
ID is only written once per table </li>
+ * </ul>
+ */
+public class LeaseBatchSerializer extends VersionedSerializer<LeaseBatch> {
+    /*
+     * The following optimizations are applied to minimize the number of bytes 
a batch is serialized to:
+     *
+     * - Java Serialization is not used to serialize components (it's pretty 
verbose)
+     * - Varints are used extensively
+     * - A dictionary of all nodes mentioned as lease holders and proposed 
candidates is collected and written in the header once
+     * per batch. This is beneficial as we usually a lot more leases than 
number of nodes in cluster, so we can just represent nodes
+     * and their names as indices in the dictionary (this is especially 
effective given we use varints as indices are usually very small).
+     * - Leases are grouped per table/zone ID (aka object ID), so an object ID 
is only written once per object
+     * - Object IDs are represented as deltas with respect to the previous 
object; as leases are sorted by object IDs, the deltas are
+     * encoded effectively as varints
+     * - Partition IDs are not written, instead their index represents 
partition ID. Holes are rare (if possible), so we just write
+     * special markers to denote holes
+     * - Expiration timestamps are always computed from Start timestamps. To 
account for a possibility of this duration being changed
+     * on the go, we pass the most frequent duration once per batch (in the 
header), and then we only write lease duration for leases with
+     * uncommon duration (that is, duration different from the most frequent 
one). We use a flag to distinguish such leases.
+     * - Expiration timestamps always have their logical part equal to 0, so 
it's never written.
+     * - Physical parts of Start timestamps are coded as deltas with respect 
to the least timestamp present in the batch; this allows to
+     * have those deltas small and encode them effectively as varints
+     */
+
+    /** Serializer instance. */
+    public static final LeaseBatchSerializer INSTANCE = new 
LeaseBatchSerializer();
+
+    /** Contains {@link Lease#isAccepted()}. */
+    @SuppressWarnings("PointlessBitwiseExpression")
+    private static final int ACCEPTED_MASK = 1 << 0;
+
+    /** Contains {@link Lease#isProlongable()}. */
+    private static final int PROLONGABLE_MASK = 1 << 1;
+
+    /** Whether the lease has a non-null {@link Lease#proposedCandidate()}. */
+    private static final int HAS_PROPOSED_CANDIDATE_MASK = 1 << 2;
+
+    /** Whether lease period (in absolute time) differs from the most common 
period in the batch. */
+    private static final int HAS_UNCOMMON_PERIOD_MASK = 1 << 3;
+
+    /** Whether expiration timestamp logical part is not zero (this is 
uncommon). */
+    private static final int HAS_EXPIRATION_LOGICAL_PART_MASK = 1 << 4;
+
+    /** Whether this is not a real lease, but a hole in partitionId sequence. 
Having this flag allows us to omit partitionId. */
+    private static final int DUMMY_LEASE_MASK = 1 << 5;
+
+    // When there are no more 8 nodes in the cluster, node name index and node 
index are guaranteed to fit in 7 bits we have in a varint
+    // byte, which allows us to enable 'compact mode' to save 1 byte per lease.
+
+    /** Number of bits to fit name index/node index in to enable compact mode. 
*/
+    private static final int BIT_WIDTH_TO_FIT_IN_HALF_BYTE = 3;
+
+    /** Max size of cluster which allows compact mode. */
+    private static final int MAX_NODES_FOR_COMPACT_MODE = 1 << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE;
+
+    /** Mask to extract lease holder index from compact representation. */
+    private static final int COMPACT_HOLDER_INDEX_MASK = (1 << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE) - 1;
+
+    @Override
+    protected void writeExternalData(LeaseBatch batch, IgniteDataOutput out) 
throws IOException {
+        long minTimestampPhysical = minTimestampPhysicalPart(batch);
+        long commonLeasePeriod = mostFrequentLeasePeriod(batch);
+
+        out.writeVarInt(minTimestampPhysical);
+        out.writeVarInt(commonLeasePeriod);
+
+        NodesDictionary nodesDictionary = buildNodesDictionary(batch);
+        nodesDictionary.writeTo(out);
+
+        List<Lease> tableLeases = batch.leases().stream()
+                .filter(lease -> lease.replicationGroupId() instanceof 
TablePartitionId)
+                .collect(toList());
+        List<Lease> zoneLeases = batch.leases().stream()
+                .filter(lease -> lease.replicationGroupId() instanceof 
ZonePartitionId)
+                .collect(toList());
+        assert tableLeases.size() + zoneLeases.size() == batch.leases().size() 
: "There are " + batch.leases().size()
+                + " leases in total, "
+                + tableLeases.size() + " of them are table leases, " + 
zoneLeases.size() + " are zone leases, but "
+                + (batch.leases().size() - tableLeases.size() - 
zoneLeases.size()) + " are neither";
+
+        writePartitionedGroupLeases(tableLeases, minTimestampPhysical, 
commonLeasePeriod, nodesDictionary, out);
+
+        assert zoneLeases.isEmpty() : "There are zone leases which are not 
supported yet";
+    }
+
+    private static long minTimestampPhysicalPart(LeaseBatch batch) {
+        long min = HybridTimestamp.MAX_VALUE.getPhysical();
+
+        for (Lease lease : batch.leases()) {
+            min = Math.min(min, lease.getStartTime().getPhysical());
+            min = Math.min(min, lease.getExpirationTime().getPhysical());
+        }
+
+        return min;
+    }
+
+    private static long mostFrequentLeasePeriod(LeaseBatch batch) {
+        if (batch.leases().isEmpty()) {
+            return 0;
+        }
+
+        Long2IntMap counts = new Long2IntOpenHashMap();
+
+        for (Lease lease : batch.leases()) {
+            long period = lease.getExpirationTime().getPhysical() - 
lease.getStartTime().getPhysical();

Review Comment:
   pls add assertion that there is no logical part



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to