Csaba Ringhofer has submitted this change and it was merged. (
http://gerrit.cloudera.org:8080/21932 )
Change subject: IMPALA-13509: Copy rows directly to OutboundRowBatch during
hash partitioning
......................................................................
IMPALA-13509: Copy rows directly to OutboundRowBatch during hash partitioning
Before this patch all rows were copied twice during hash/Kudu
partitioning in KrpcDataStreamSender:
1. to the collector RowBatch of the given Channel
(KrpcDataStreamSender::Channel::AddRow())
2. to an OutboundRowBatch when the collector RowBatch is at capacity
(KrpcDataStreamSender::Channel::SendCurrentBatch())
The change removes the RowBatch from KrpcDataStreamSender::Channel
and simplifies its interface to only allow sending already serialized
OutboundRowBatches. The responsibility of collecting per-partition rows
is moved to a new struct KrpcDataStreamSender::PartitionRowCollector.
During partitioning each row is deepcopied to the partition's
PartitionRowCollector's OutboundRowBatch (collector_batch_), which
has a dynamically growing internal buffer. Once collector_batch_
reaches its row count/size limit it is finalized/compressed
and sent to its Channel.
The function responsible for deep copy (OutboundRowBatch::AppendRow())
is designed similarly to BufferedTupleStream::AddRow():
1. try deep copy assuming that there is enough space in the buffer
2. in the rarer case of hitting the end of buffer reallocate the
buffer and try again
The main reason for the similar design is to potentially merge the two
functions (and their codegen) in the future.
The new logic would make it easier to use strict mem limit for
OutboundRowBatch. Currently the main limit is a row count
calculated from fixed len row size and
FLAGS_data_stream_sender_buffer_size (16KB), but ignores var len size.
An enforced mem limit (RowBatch::AT_CAPACITY_MEM_USAGE) is 8MB,
but it should be hit only with very large rows.
Strict 16KB limit is not enforced in this patch as it would likely
cause perf regression by making OutboundRowBatches smaller. Another
problem is that a single row can be >16KB (default max_row_size=512KB)
and it has to be ensured that at least a single row at a time can always
be transmitted.
A future commit could enforce the limit and increase
data_stream_sender_buffer_size at the same time. This would allow
KrpcDataStreamSender to efficiently work within memory limits.
See IMPALA-12594 for more details about memory usage and estimates.
Another potential future improvement is adding tuple deduplication
for the partitioned case (IMPALA-13225). The dedup logic used
in non-partitioned change (RowBatch::Serialize()) was not working
as the first deepcopy to rowbatch led to all rows having unique
addresses so the pointer based deduplication always failed.
Codegen of KrpcDataStreamSender::HashAndAddRows is slightly improved
by replacing some functions with constants and moving some code
to inline.h/ir.cc, but no "proper" codegen is done for deepcopy - it
is still done in an interpreted way using Row/TupleDescriptor.
Other changes:
- New profile counters added to help analyzing KrpcDataStreamSender
performance: RpcSuccess, TransmitDataTime
Performance:
TPCH benchmarks improved significantly:
+----------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) |
Delta(GeoMean) |
+----------+-----------------------+---------+------------+------------+----------------+
| TPCH(42) | parquet / none / none | 2.60 | -5.19% | 1.98 | -3.13%
|
+----------+-----------------------+---------+------------+------------+----------------+
In the summaries of improved queries some >1s EXCHANGE SENDERs became
15-40% faster.
Based on local benchmarks KrpcDataStreamSender's CPU usage seems to be
dominated by LZ4 compression after the change.
Change-Id: I81a16c2f0fcfc1f3adef7077b3932a29a0f15a8f
Reviewed-on: http://gerrit.cloudera.org:8080/21932
Reviewed-by: Csaba Ringhofer <[email protected]>
Tested-by: Csaba Ringhofer <[email protected]>
---
M be/src/runtime/descriptors.h
M be/src/runtime/krpc-data-stream-sender-ir.cc
M be/src/runtime/krpc-data-stream-sender.cc
M be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/outbound-row-batch.cc
M be/src/runtime/outbound-row-batch.h
A be/src/runtime/outbound-row-batch.inline.h
M be/src/runtime/tuple-ir.cc
M be/src/runtime/tuple-row.h
M be/src/runtime/tuple.h
10 files changed, 355 insertions(+), 81 deletions(-)
Approvals:
Csaba Ringhofer: Looks good to me, approved; Verified
--
To view, visit http://gerrit.cloudera.org:8080/21932
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: I81a16c2f0fcfc1f3adef7077b3932a29a0f15a8f
Gerrit-Change-Number: 21932
Gerrit-PatchSet: 16
Gerrit-Owner: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Smith <[email protected]>
Gerrit-Reviewer: Zoltan Borok-Nagy <[email protected]>