Kontinuation opened a new pull request, #91: URL: https://github.com/apache/sedona-db/pull/91
## Background
We found that range queries like the following one run very slowly using
SedonaDB:
```sql
SELECT COUNT(*) AS trip_count_in_coconino_county
FROM trip t
WHERE ST_Intersects(ST_GeomFromWKB(t.t_pickuploc), (SELECT
ST_GeomFromWKB(z.z_boundary) FROM zone z WHERE z.z_name = 'Coconino County'
LIMIT 1));
```
This is a range query, where the query window is computed using a subquery
`SELECT ST_GeomFromWKB(z.z_boundary) FROM zone z WHERE z.z_name = 'Coconino
County' LIMIT 1`. This query is planned as a NestedLoopJoin followed by a
filter:
```
┌───────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ plan_type ┆
plan
│
│ utf8 ┆
utf8
│
╞═══════════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╡
│ logical_plan ┆ Projection: count(Int64(1)) AS count(*) AS
trip_count_in_coconino_county
│
│ ┆ Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
│
│ ┆ Projection:
│
│ ┆ Filter: st_intersects(st_geomfromwkb(t.t_pickuploc),
__scalar_sq_1.st_geomfromwkb(z.z_boundary))
│
│ ┆ Left Join:
│
│ ┆ SubqueryAlias: t
│
│ ┆ SubqueryAlias: trip
│
│ ┆ TableScan:
/Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip/*.parquet
projection=[t_pickuploc]
│
│ ┆ SubqueryAlias: __scalar_sq_1
│
│ ┆ Projection: st_geomfromwkb(z.z_boundary)
│
│ ┆ SubqueryAlias: z
│
│ ┆ SubqueryAlias: zone
│
│ ┆ Projection:
/Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/*.parquet.z_boundary
│
│ ┆ Limit: skip=0, fetch=1
│
│ ┆ Filter:
/Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/*.parquet.z_name
= Utf8View("Coconino County")
│
│ ┆ TableScan:
/Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/*.parquet
projection=[z_name, z_boundary],
partial_filters=[/Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parqu
│
│ ┆ et/zone/*.parquet.z_name = Utf8View("Coconino County")]
│
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ physical_plan ┆ ProjectionExec: expr=[count(Int64(1))@0 as
trip_count_in_coconino_county]
│
│ ┆ AggregateExec: mode=Final, gby=[],
aggr=[count(Int64(1))]
│
│ ┆ CoalescePartitionsExec
│
│ ┆ AggregateExec: mode=Partial, gby=[],
aggr=[count(Int64(1))]
│
│ ┆ ProjectionExec: expr=[]
│
│ ┆ CoalesceBatchesExec: target_batch_size=8192
│
│ ┆ FilterExec:
st_intersects(st_geomfromwkb(t_pickuploc@0), st_geomfromwkb(z.z_boundary)@1)
│
│ ┆ ProjectionExec: expr=[t_pickuploc@1 as
t_pickuploc, st_geomfromwkb(z.z_boundary)@0 as st_geomfromwkb(z.z_boundary)]
│
│ ┆ NestedLoopJoinExec: join_type=Right
│
│ ┆ CoalescePartitionsExec
│
│ ┆ ProjectionExec:
expr=[st_geomfromwkb(z_boundary@0) as st_geomfromwkb(z.z_boundary)]
│
│ ┆ RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1
│
│ ┆ CoalescePartitionsExec: fetch=1
│
│ ┆ ProjectionExec:
expr=[z_boundary@1 as z_boundary]
│
│ ┆ CoalesceBatchesExec:
target_batch_size=8192, fetch=1
│
│ ┆ FilterExec: z_name@0 =
Coconino County
│
│ ┆ DataSourceExec:
file_groups={10 groups:
[[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/part-1.parquet:0..141525345],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_form │
│ ┆ at=parquet/zone/part-1.parquet:141525345..283050690],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/part-1.parquet:283050690..424576035],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/ │
│ ┆ zone/part-1.parquet:424576035..566101380],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/part-1.parquet:566101380..707626725],
...]}, projection=[z_name, z_boundary], file_type=parquet
│
│ ┆ DataSourceExec: file_groups={10 groups:
[[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip/part-1.parquet:0..36478017],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip
│
│ ┆ /part-1.parquet:36478017..72956034],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip/part-1.parquet:72956034..109434051],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip/part-1.parque
│
│ ┆ t:109434051..145912068],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip/part-1.parquet:145912068..182390085],
...]}, projection=[t_pickuploc], file_type=parquet
│
│ ┆
│
└───────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
The problem with this approach is that we cannot make use of the fact that
the query window is a constant. We'll end up with repeatedly decoding WKB and
constructing geometry objects for the query window. This is a waste of work
especially when the query window is a very complex shape.
## Changes made by this PR
This PR optimizes the logical plan of the above query to merge the Filter
and Join operators as one Join operator with filter (theta-join). This enables
the physical optimization to convert the NestedLoopJoin with spatial filter
into a SpatialJoin.
```
┌───────────────┬─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ plan_type ┆
plan
│
│ utf8 ┆
utf8
│
╞═══════════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╡
│ logical_plan ┆ Projection: count(Int64(1)) AS count(*) AS
trip_count_in_coconino_county
│
│ ┆ Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
│
│ ┆ Projection:
│
│ ┆ Inner Join: Filter:
st_intersects(st_geomfromwkb(t.t_pickuploc),
__scalar_sq_1.st_geomfromwkb(z.z_boundary))
│
│ ┆ SubqueryAlias: t
│
│ ┆ SubqueryAlias: trip
│
│ ┆ TableScan:
/Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip/*.parquet
projection=[t_pickuploc] │
│ ┆ SubqueryAlias: __scalar_sq_1
│
│ ┆ Projection: st_geomfromwkb(z.z_boundary)
│
│ ┆ SubqueryAlias: z
│
│ ┆ SubqueryAlias: zone
│
│ ┆ Projection:
/Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/*.parquet.z_boundary
│
│ ┆ Limit: skip=0, fetch=1
│
│ ┆ Filter:
/Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/*.parquet.z_name
= Utf8View("Coconino County") │
│ ┆ TableScan:
/Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/*.parquet
projection=[z_name, z_boundary], partial_filters=[/ │
│ ┆
Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/*.parquet.z_name
= Utf8View("Coconino County")]
│
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ physical_plan ┆ ProjectionExec: expr=[count(Int64(1))@0 as
trip_count_in_coconino_county]
│
│ ┆ AggregateExec: mode=Final, gby=[],
aggr=[count(Int64(1))]
│
│ ┆ CoalescePartitionsExec
│
│ ┆ AggregateExec: mode=Partial, gby=[],
aggr=[count(Int64(1))]
│
│ ┆ ProjectionExec: expr=[]
│
│ ┆ SpatialJoinExec: join_type=Inner,
on=ST_intersects(st_geomfromwkb(z.z_boundary)@0, st_geomfromwkb(t_pickuploc@0))
│
│ ┆ ProjectionExec:
expr=[st_geomfromwkb(z_boundary@0) as st_geomfromwkb(z.z_boundary)]
│
│ ┆ RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1
│
│ ┆ CoalescePartitionsExec: fetch=1
│
│ ┆ ProjectionExec: expr=[z_boundary@1 as
z_boundary]
│
│ ┆ CoalesceBatchesExec:
target_batch_size=8192, fetch=1
│
│ ┆ FilterExec: z_name@0 = Coconino
County
│
│ ┆ DataSourceExec: file_groups={10
groups:
[[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/part-1.parquet:0..141525345],
│
│ ┆
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/part-1.parquet:141525345..283050690],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench │
│ ┆
_sf=1_format=parquet/zone/part-1.parquet:283050690..424576035],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/part-1.parquet:424576035..5661
│
│ ┆ 01380],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/zone/part-1.parquet:566101380..707626725],
...]}, projection=[z_name, z_boundary], file_typ │
│ ┆ e=parquet
│
│ ┆ DataSourceExec: file_groups={10 groups:
[[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip/part-1.parquet:0..36478017],
[Users/bopen │
│ ┆
g/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip/part-1.parquet:36478017..72956034],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=pa │
│ ┆ rquet/trip/part-1.parquet:72956034..109434051],
[Users/bopeng/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip/part-1.parquet:109434051..145912068],
[Users/b │
│ ┆
openg/workspace/data/spatial-bench/SpatialBench_sf=1_format=parquet/trip/part-1.parquet:145912068..182390085],
...]}, projection=[t_pickuploc], file_type=parquet │
│ ┆
│
└───────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
## Performance Improvements
The query mentioned in the background section took 21 seconds before
optimization, and took 0.4 seconds after this optimization.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
