[ 
https://issues.apache.org/jira/browse/CASSANDRA-18773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17759595#comment-17759595
 ] 

Branimir Lambov edited comment on CASSANDRA-18773 at 8/28/23 2:11 PM:
----------------------------------------------------------------------

The crux of the issue here is likely in this comment in 
{{{}UnfilteredPartitionIterators.merge{}}}:
{code:java}
                // Note that because the MergeListener cares about it, we want 
to preserve the index of the iterator.
                // Non-present iterator will thus be set to empty in getReduced.
{code}
The need for this is, indeed, a serious problem in merges of many sstables in 
key-value tables (i.e. ones containing only one row per partition) that we have 
not yet tried to address.

I expect that simply changing the code of {{merge}} to only present partitions 
selected by the partition merger, for example by doing
{code:java}
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
    (revision 052a26474108febad545d6528bb203ecf19b22e5)
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
    (date 1693230551402)
@@ -113,16 +113,11 @@
             private final List<UnfilteredRowIterator> toMerge = new 
ArrayList<>(iterators.size());
 
             private DecoratedKey partitionKey;
-            private boolean isReverseOrder;
 
             public void reduce(int idx, UnfilteredRowIterator current)
             {
                 partitionKey = current.partitionKey();
-                isReverseOrder = current.isReverseOrder();
-
-                // Note that because the MergeListener cares about it, we want 
to preserve the index of the iterator.
-                // Non-present iterator will thus be set to empty in 
getReduced.
-                toMerge.set(idx, current);
+                toMerge.add(current);
             }
 
             @SuppressWarnings("resource")
@@ -132,20 +127,6 @@
                                                                  ? null
                                                                  : 
listener.getRowMergeListener(partitionKey, toMerge);
 
-                // Make a single empty iterator object to merge, we don't need 
toMerge.size() copiess
-                UnfilteredRowIterator empty = null;
-
-                // Replace nulls by empty iterators
-                for (int i = 0; i < toMerge.size(); i++)
-                {
-                    if (toMerge.get(i) == null)
-                    {
-                        if (null == empty)
-                            empty = EmptyIterators.unfilteredRow(metadata, 
partitionKey, isReverseOrder);
-                        toMerge.set(i, empty);
-                    }
-                }
-
                 return UnfilteredRowIterators.merge(toMerge, rowListener);
             }
{code}
will give a higher performance boost. The problem is that doing this causes the 
merge listener (which can be e.g. a secondary index implementation) to lose 
information about the sources of a merged value. At some point this was crucial 
for secondary indexes, but I'm not sure it still is, and I don't think anyone 
has invested the time to understand whether it is still necessary in Cassandra 
4. It's even less likely to matter for Cassandra 5, whose storage-attached 
indexes don't need merge listeners.

Nevertheless, unless we have done this investigation, this behaviour needs to 
be preserved, but I believe we can still get an improvement for the cases where 
there is no index. To get a more complete solution to the problem, how about 
changing the {{merge}} code to get the {{rowListener}} on the first call to 
{{{}reduce{}}}, and switch between the two methods of constructing the 
{{toMerge}} list based on whether or not it is {{{}null{}}}?


was (Author: blambov):
The crux of the issue here is likely in this comment in 
{{{}UnfilteredPartitionIterators.merge{}}}:
{code:java}
                // Note that because the MergeListener cares about it, we want 
to preserve the index of the iterator.
                // Non-present iterator will thus be set to empty in getReduced.
{code}
The need for this is, indeed, a serious problem in merges of many sstables in 
key-value tables (i.e. ones containing only one row per partition) that we have 
not yet tried to address.

I expect that simply changing the code of {{merge}} to only present partitions 
selected by the partition merger, for example by doing
{code:java}
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
    (revision 052a26474108febad545d6528bb203ecf19b22e5)
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
    (date 1693230551402)
@@ -113,16 +113,11 @@
             private final List<UnfilteredRowIterator> toMerge = new 
ArrayList<>(iterators.size());
 
             private DecoratedKey partitionKey;
-            private boolean isReverseOrder;
 
             public void reduce(int idx, UnfilteredRowIterator current)
             {
                 partitionKey = current.partitionKey();
-                isReverseOrder = current.isReverseOrder();
-
-                // Note that because the MergeListener cares about it, we want 
to preserve the index of the iterator.
-                // Non-present iterator will thus be set to empty in 
getReduced.
-                toMerge.set(idx, current);
+                toMerge.add(current);
             }
 
             @SuppressWarnings("resource")
@@ -132,20 +127,6 @@
                                                                  ? null
                                                                  : 
listener.getRowMergeListener(partitionKey, toMerge);
 
-                // Make a single empty iterator object to merge, we don't need 
toMerge.size() copiess
-                UnfilteredRowIterator empty = null;
-
-                // Replace nulls by empty iterators
-                for (int i = 0; i < toMerge.size(); i++)
-                {
-                    if (toMerge.get(i) == null)
-                    {
-                        if (null == empty)
-                            empty = EmptyIterators.unfilteredRow(metadata, 
partitionKey, isReverseOrder);
-                        toMerge.set(i, empty);
-                    }
-                }
-
                 return UnfilteredRowIterators.merge(toMerge, rowListener);
             }
{code}
will give a higher performance boost. The problem is that doing this causes the 
merge listener (which can be e.g. a secondary index implementation) to lose 
information about the sources of a merged value. At some point this was crucial 
for secondary indexes, but I'm not sure it still is, and I don't think anyone 
has invested the time to understand whether it is still necessary in Cassandra 
4. It's even less likely to matter for Cassandra 5, whose storage-attached 
indexes don't need merge listeners.

To get a more complete solution to the problem, how about changing the 
{{merge}} code to get the {{rowListener}} on the first call to {{reduce}}, and 
switch between the two methods of constructing the {{toMerge}} list based on 
whether or not it is {{null}}?

> Compactions are slow
> --------------------
>
>                 Key: CASSANDRA-18773
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-18773
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Local/Compaction
>            Reporter: Cameron Zemek
>            Priority: Normal
>             Fix For: 4.0.x, 4.1.x, 5.0.x, 5.x
>
>         Attachments: compact-poc.patch, flamegraph.png, stress.yaml
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> I have noticed that compactions involving a lot of sstables are very slow 
> (for example major compactions). I have attached a cassandra stress profile 
> that can generate such a dataset under ccm. In my local test I have 2567 
> sstables at 4Mb each.
> I added code to track wall clock time of various parts of the code. One 
> problematic part is ManyToOne constructor. Tracing through the code for every 
> partition creating a ManyToOne for all the sstable iterators for each 
> partition. In my local test get a measy 60Kb/sec read speed, and bottlenecked 
> on single core CPU (since this code is single threaded) with it spending 85% 
> of the wall clock time in ManyToOne constructor.
> As another datapoint to show its the merge iterator part of the code using 
> the cfstats from [https://github.com/instaclustr/cassandra-sstable-tools/] 
> which reads all the sstables but does no merging gets 26Mb/sec read speed.
> Tracking back from ManyToOne call I see this in 
> UnfilteredPartitionIterators::merge
> {code:java}
>                 for (int i = 0; i < toMerge.size(); i++)
>                 {
>                     if (toMerge.get(i) == null)
>                     {
>                         if (null == empty)
>                             empty = EmptyIterators.unfilteredRow(metadata, 
> partitionKey, isReverseOrder);
>                         toMerge.set(i, empty);
>                     }
>                 }
>  {code}
> Not sure what purpose of creating these empty rows are. But on a whim I 
> removed all these empty iterators before passing to ManyToOne and then all 
> the wall clock time shifted to CompactionIterator::hasNext() and read speed 
> increased to 1.5Mb/s.
> So there are further bottlenecks in this code path it seems, but the first is 
> this ManyToOne and having to build it for every partition read.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to