Hi Hackers,
The idea of achieving Postgres scaling via sharding using postgres_fdw +
partitioning got a lot of attention last years. Many optimisations have
been done in this direction: partition pruning, partition-wise
aggregates / joins, postgres_fdw push-down of LIMIT, GROUP BY, etc. In
many cases they work really nice.
However, still there is a vast case, where postgres_fdw + native
partitioning doesn't perform so good — Multi-tenant architecture. From
the database perspective it is presented well in this Citus tutorial
[1]. The main idea is that there is a number of tables and all of them
are sharded / partitioned by the same key, e.g. company_id. That way, if
every company mostly works within its own data, then every query may be
effectively executed on a single node without a need for an internode
communication.
I built a simple two node multi-tenant schema for tests, which can be
easily set up with attached scripts. It creates three tables (companies,
users, documents) distributed over two nodes. Everything can be found in
this Gist [2] as well.
Some real-life test queries show, that all single-node queries aren't
pushed-down to the required node. For example:
SELECT
*
FROM
documents
INNER JOIN users ON documents.user_id = users.id
WHERE
documents.company_id = 5
AND users.company_id = 5;
executed as following
QUERY PLAN
-------------------------------------------------------
Nested Loop
Join Filter: (documents.user_id = users.id)
-> Foreign Scan on users_node2 users
-> Materialize
-> Foreign Scan on documents_node2 documents
i.e. it uses two foreign scans and does the final join locally. However,
once I specify target partitions explicitly, then the entire query is
pushed down to the foreign node:
QUERY PLAN
---------------------------------------------------------
Foreign Scan
Relations: (documents_node2) INNER JOIN (users_node2)
Execution time is dropped significantly as well — by more than 3 times
even for this small test database. Situation for simple queries with
aggregates or joins and aggregates followed by the sharding key filter
is the same. Something similar was briefly discussed in this thread [3].
IIUC, it means that push-down of queries through the postgres_fdw works
perfectly well, the problem is with partition-wise operation detection
at the planning time. Currently, partition-wise aggregate routines,
e.g., looks for a GROUP BY and checks whether sharding key exists there
or not. After that PARTITIONWISE_AGGREGATE_* flag is set. However, it
doesn't look for a content of WHERE clause, so frankly speaking it isn't
a problem, this functionality is not yet implemented.
Actually, sometimes I was able to push down queries with aggregate
simply by adding an additional GROUP BY with sharding key, like this:
SELECT
count(*)
FROM
documents
WHERE
company_id = 5
GROUP BY company_id;
where this GROUP BY obviously doesn't change a results, it just allows
planner to choose from more possible paths.
Also, I have tried to hack it a bit and forcedly set
PARTITIONWISE_AGGREGATE_FULL for this particular query. Everything
executed fine and returned result was correct, which means that all
underlying machinery is ready.
That way, I propose a change to the planner, which will check whether
partitioning key exist in the WHERE clause and will set
PARTITIONWISE_AGGREGATE_* flags if appropriate. The whole logic may look
like:
1. If the only one condition by partitioning key is used (like above),
then it is PARTITIONWISE_AGGREGATE_FULL.
2. If several conditions are used, then it should be
PARTITIONWISE_AGGREGATE_PARTIAL.
I'm aware that WHERE clause may be extremely complex in general, but we
could narrow this possible optimisation to the same restrictions as
postgres_fdw push-down "only WHERE clauses using built-in operators and
functions will be considered for execution on the remote server".
Although it seems that it will be easier to start with aggregates,
probably we should initially plan a more general solution? For example,
check that all involved tables are filtered by partitioning key and push
down the entire query if all of them target the same foreign server.
Any thoughts?
[1]
https://docs.citusdata.com/en/v9.3/get_started/tutorial_multi_tenant.html
[2] https://gist.github.com/ololobus/8fba33241f68be2e3765d27bf04882a3
[3]
https://www.postgresql.org/message-id/flat/CAFT%2BaqL1Tt0qfYqjHH%2BshwPoW8qdFjpJ8vBR5ABoXJDUcHyN1w%40mail.gmail.com
Regards
--
Alexey Kondratov
Postgres Professional https://www.postgrespro.com
Russian Postgres Company
DROP TABLE IF EXISTS companies CASCADE;
DROP TABLE IF EXISTS users CASCADE;
DROP TABLE IF EXISTS documents CASCADE;
DROP SERVER IF EXISTS node2 CASCADE;
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
CREATE SERVER node2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '5433');
CREATE USER MAPPING FOR current_user SERVER node2;
CREATE TABLE companies (
company_id int not null,
created_at timestamp without time zone default current_timestamp,
name text
) PARTITION BY HASH (company_id);
CREATE TABLE users (
company_id int not null,
id int not null,
created_at timestamp without time zone default current_timestamp,
name text
) PARTITION BY HASH (company_id);
CREATE TABLE documents (
company_id int not null,
id int not null,
user_id int not null,
created_at timestamp without time zone default current_timestamp,
text text
) PARTITION BY HASH (company_id);
CREATE TABLE companies_node1 PARTITION OF companies
FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE FOREIGN TABLE companies_node2 PARTITION OF companies
FOR VALUES WITH (MODULUS 2, REMAINDER 1)
SERVER node2;
CREATE TABLE users_node1 PARTITION OF users
FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE FOREIGN TABLE users_node2 PARTITION OF users
FOR VALUES WITH (MODULUS 2, REMAINDER 1)
SERVER node2;
CREATE TABLE documents_node1 PARTITION OF documents
FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE FOREIGN TABLE documents_node2 PARTITION OF documents
FOR VALUES WITH (MODULUS 2, REMAINDER 1)
SERVER node2;
ALTER TABLE companies_node1 ADD CONSTRAINT companies_pk PRIMARY KEY
(company_id);
ALTER TABLE users_node1 ADD CONSTRAINT users_pk PRIMARY KEY (company_id, id);
ALTER TABLE documents_node1 ADD CONSTRAINT documents_pk PRIMARY KEY
(company_id, id);
DROP TABLE IF EXISTS companies CASCADE;
DROP TABLE IF EXISTS users CASCADE;
DROP TABLE IF EXISTS documents CASCADE;
DROP SERVER IF EXISTS node1 CASCADE;
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
CREATE SERVER node1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '5432');
CREATE USER MAPPING FOR current_user SERVER node1;
CREATE TABLE companies (
company_id int not null,
created_at timestamp without time zone default current_timestamp,
name text
) PARTITION BY HASH (company_id);
CREATE TABLE users (
company_id int not null,
id int not null,
created_at timestamp without time zone default current_timestamp,
name text
) PARTITION BY HASH (company_id);
CREATE TABLE documents (
company_id int not null,
id int not null,
user_id int not null,
created_at timestamp without time zone default current_timestamp,
text text
) PARTITION BY HASH (company_id);
CREATE TABLE companies_node2 PARTITION OF companies
FOR VALUES WITH (MODULUS 2, REMAINDER 1);
CREATE FOREIGN TABLE companies_node1 PARTITION OF companies
FOR VALUES WITH (MODULUS 2, REMAINDER 0)
SERVER node1;
CREATE TABLE users_node2 PARTITION OF users
FOR VALUES WITH (MODULUS 2, REMAINDER 1);
CREATE FOREIGN TABLE users_node1 PARTITION OF users
FOR VALUES WITH (MODULUS 2, REMAINDER 0)
SERVER node1;
CREATE TABLE documents_node2 PARTITION OF documents
FOR VALUES WITH (MODULUS 2, REMAINDER 1);
CREATE FOREIGN TABLE documents_node1 PARTITION OF documents
FOR VALUES WITH (MODULUS 2, REMAINDER 0)
SERVER node1;
ALTER TABLE companies_node2 ADD CONSTRAINT companies_pk PRIMARY KEY
(company_id);
ALTER TABLE users_node2 ADD CONSTRAINT users_pk PRIMARY KEY (company_id, id);
ALTER TABLE documents_node2 ADD CONSTRAINT documents_pk PRIMARY KEY
(company_id, id);
DELETE FROM companies;
DELETE FROM users;
DELETE FROM documents;
INSERT INTO companies(company_id, name)
SELECT id, md5(id::text) FROM generate_series(1, 10) c(id);
DO
$do$
BEGIN
FOR cid IN 1..10 LOOP
INSERT INTO users(id, company_id, name)
SELECT id, cid, md5(id::text) FROM generate_series(1, 100) u(id);
END LOOP;
END
$do$;
INSERT INTO documents(id, company_id, user_id, text)
SELECT id, trunc(random() * 10 + 1)::int, trunc(random() * 100 + 1)::int,
md5(id::text)
FROM generate_series(1, 200000) p(id);
-- Basic aggregate + filter by sharding key
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
count(*)
FROM
documents
WHERE
company_id = 5;
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
count(*)
FROM
documents_node2
WHERE
company_id = 5;
-- Basic aggregate + filter by sharding key + fake group by -> then it is
pushdown sometimes
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
count(*)
FROM
documents
WHERE
company_id = 5
GROUP BY company_id;
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
sum(id)
FROM
documents
WHERE
company_id = 5
GROUP BY company_id;
-- Basic join by secondary key + filter by sharding key
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
*
FROM
documents
INNER JOIN users ON documents.user_id = users.id
WHERE
documents.company_id = 5
AND users.company_id = 5;
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
*
FROM
documents_node2
INNER JOIN users_node2 ON documents_node2.user_id = users_node2.id
WHERE
documents_node2.company_id = 5
AND users_node2.company_id = 5;
-- Join by secondary key + aggregate + filter by sharding key
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
count(*)
FROM
documents
INNER JOIN users ON documents.user_id = users.id
WHERE
documents.company_id = 5
AND users.company_id = 5;
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
count(*)
FROM
documents_node2
INNER JOIN users_node2 ON documents_node2.user_id = users_node2.id
WHERE
documents_node2.company_id = 5
AND users_node2.company_id = 5;
-- Join by secondary key + aggregate + group by secondary key
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
user_id,
count(*) AS documents_count
FROM
documents
INNER JOIN users ON documents.user_id = users.id
WHERE
documents.company_id = 5
AND users.company_id = 5
GROUP BY
user_id;
EXPLAIN (
ANALYZE,
VERBOSE
)
SELECT
user_id,
count(*) AS documents_count
FROM
documents_node2
INNER JOIN users_node2 ON documents_node2.user_id = users_node2.id
WHERE
documents_node2.company_id = 5
AND users_node2.company_id = 5
GROUP BY
user_id;
#!/usr/bin/env sh
pg_ctl -D node1 stop > /dev/null
pg_ctl -D node2 stop > /dev/null
rm -rf node1 node2
rm node1.log node2.log
initdb -D node1
initdb -D node2
echo "port = 5433" >> node2/postgresql.conf
echo "enable_partitionwise_join = 'on'" >> node1/postgresql.conf
echo "enable_partitionwise_aggregate = 'on'" >> node1/postgresql.conf
echo "postgres_fdw.use_remote_estimate = true" >> node1/postgresql.conf
echo "enable_partitionwise_join = 'on'" >> node2/postgresql.conf
echo "enable_partitionwise_aggregate = 'on'" >> node2/postgresql.conf
echo "postgres_fdw.use_remote_estimate = true" >> node2/postgresql.conf
pg_ctl -D node1 -l node1.log start
pg_ctl -D node2 -l node2.log start
createdb
createdb -p5433
psql -f init1.sql
psql -p5433 -f init2.sql
psql -f load.sql
psql -c "ANALYSE"
psql -p5433 -c "ANALYSE"