niebayes commented on issue #5173: URL: https://github.com/apache/datafusion/issues/5173#issuecomment-2670953355
@Blizzara Thanks for your reply. I initially choose the physical plan because there're more computation can be distributed to executors in a distributed query engine. Say a sql: ``` select avg(value) from sx1 group by sid having sid > 1; ``` The corresponding logical plan might be: ``` Projection: avg(sx1.value) | Aggregate: groupBy=[[sx1.sid]], aggr=[[avg(CAST(sx1.value AS Float64))]] | Filter: sx1.sid > Int8(1) | TableScan: sx1 projection=[sid, value], partial_filters=[sx1.sid > Int8(1)] ``` And the physical plan might look like: ``` ProjectionExec: expr=[avg(sx1.value)@1 as avg(sx1.value)] | AggregateExec: mode=FinalPartitioned, gby=[sid@0 as sid], aggr=[avg(sx1.value)] | CoalesceBatchesExec: target_batch_size=8192 | RepartitionExec: partitioning=Hash([sid@0], 8), input_partitions=8 | AggregateExec: mode=Partial, gby=[sid@0 as sid], aggr=[avg(sx1.value)] | CoalesceBatchesExec: target_batch_size=8192 | FilterExec: sid@0 > 1 | ParquetExec: file_groups = [..] ``` By learning from the datafusion-ballista project, I know we can split the execution plan at pipeline breakers (including RepartitionExec, SortPreservingExec, CoalescePartitionsExec, etc.). So the above execution plan would be split into two parts (aka. pipelines): ``` ProjectionExec: expr=[avg(sx1.value)@1 as avg(sx1.value)] | AggregateExec: mode=FinalPartitioned, gby=[sid@0 as sid], aggr=[avg(sx1.value)] | CoalesceBatchesExec: target_batch_size=8192 MergePipelinesExec: pipeline_ids = [...] ``` ``` AggregateExec: mode=Partial, gby=[sid@0 as sid], aggr=[avg(sx1.value)] | CoalesceBatchesExec: target_batch_size=8192 | FilterExec: sid@0 > 1 | ParquetExec: file_groups = [..] ``` As you can see, the first stage of the parallel aggregation algorithm can be distributed to multiple executors which makes the resource utilization better. If we choose to split the logical plan, it seems we can't distribute the aggregation operation, even part of it. I don't know if my understanding is right. By the way, datafusion-ballista is good for OLAP workloads and it assumes executors are stateless. However, in my scenario, executors are stateful and each executor maintain an in-memory buffer containing the most recently written data (History data are stored in shared object storage). So, when the scheduler is about to construct a physical plan, it has to query each executor for the latest statistics which is required for query optimization. I wonder if it's the standard approach to achieve distributed query based on DataFusion, since the implementation seems complicated. I really hope the DataFusion community can provide some recommendations on building a distributed query engine based on DataFusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org