[ 
https://issues.apache.org/jira/browse/IMPALA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boglarka Egyed updated IMPALA-13404:
------------------------------------
    Priority: Minor  (was: Major)

> Implement adaptive parallelization in IcebergDeleteBuilder
> ----------------------------------------------------------
>
>                 Key: IMPALA-13404
>                 URL: https://issues.apache.org/jira/browse/IMPALA-13404
>             Project: IMPALA
>          Issue Type: Improvement
>            Reporter: Zoltán Borók-Nagy
>            Priority: Minor
>              Labels: impala-iceberg, performance
>
> IcebergDeleteBuilder uses a single thread to build the data structures for 
> IcebergDeleteNode.
> In most cases it is OK as it can run in parallel with the SCAN operators of 
> the data files that don't have associated delete files:
> {noformat}
>     UNION ALL
>    /         \
>   /           \
>  /             \
> SCAN all    ANTI JOIN
> datafiles  /        BUILD
> without   /           \
> deletes  SCAN         SCAN
>   ^      datafiles    deletes
>   |      with deletes.  ^
>   |          ^          |
>   |          |          |
>  RUNS   NOT STARTED    RUNS{noformat}
> But if that SCAN operator is finished (if the IO is cached SCANs can finish 
> quite quickly), and the BUILD is not ready, we are potentially blocking 
> #MT_DOP threads:
> {noformat}
>     UNION ALL
>    /         \
>   /           \
>  /             \
> SCAN all    ANTI JOIN
> datafiles  /        BUILD
> without   /           \
> deletes  SCAN         SCAN
>   ^      datafiles    deletes
>   |      with deletes.  ^
>   |          ^          |
>   |          |          |
> FINISHED   BLOCKED     RUNS
> {noformat}
> Or, for plain count( * ) queries the situation is even worse, as the UNION 
> ALL and its left side are optimized out:
> {noformat}
>       ArithmeticExpr(ADD)
>       /             \
>      /               \
>     /                 \
> record_count       AGGREGATE
> of all             COUNT(*)
> datafiles              |
> without            ANTI JOIN
> deletes           /        BUILD
>    ^             /           \
>    |            SCAN        SCAN
>    |            datafiles   deletes
>    |            with deletes    ^
> compile-time       ^            |
>  constant          |            |
>                 BLOCKED        RUNS
>            (MT_DOP threads)  (single thread){noformat}
> The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
> and dynamically increase its thread count. Possibly some enhancments are 
> required on Impala's ThreadPool to allow increasing the number of worker 
> threads.
> IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
> cannot directly work on the RowBatch object that it gets from the 
> FragmentInstanceState's ExecInternal() function, as it is being released 
> immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we 
> cannot use RowBatch's AcquireState() either due to a bug, see:
> [https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]
> Also, row batches can reference memory of other row batches, which means we 
> cannot process and free them in parallel without deep copying them. Since 
> deep copying is expensive and needs to happen on the "main" thread of the 
> builder, we cannot gain anything from that.
> We need to find a way to quickly extract and copy information from the 
> incoming row batches then we could insert into the different roaring bitmaps 
> in parallel.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to