Hi Hackers,

The idea of achieving Postgres scaling via sharding using postgres_fdw + partitioning got a lot of attention last years. Many optimisations have been done in this direction: partition pruning, partition-wise aggregates / joins, postgres_fdw push-down of LIMIT, GROUP BY, etc. In many cases they work really nice.

However, still there is a vast case, where postgres_fdw + native partitioning doesn't perform so good — Multi-tenant architecture. From the database perspective it is presented well in this Citus tutorial [1]. The main idea is that there is a number of tables and all of them are sharded / partitioned by the same key, e.g. company_id. That way, if every company mostly works within its own data, then every query may be effectively executed on a single node without a need for an internode communication.

I built a simple two node multi-tenant schema for tests, which can be easily set up with attached scripts. It creates three tables (companies, users, documents) distributed over two nodes. Everything can be found in this Gist [2] as well.

Some real-life test queries show, that all single-node queries aren't pushed-down to the required node. For example:

SELECT
    *
FROM
    documents
    INNER JOIN users ON documents.user_id = users.id
WHERE
    documents.company_id = 5
    AND users.company_id = 5;

executed as following

                      QUERY PLAN
-------------------------------------------------------
 Nested Loop
   Join Filter: (documents.user_id = users.id)
   ->  Foreign Scan on users_node2 users
   ->  Materialize
         ->  Foreign Scan on documents_node2 documents

i.e. it uses two foreign scans and does the final join locally. However, once I specify target partitions explicitly, then the entire query is pushed down to the foreign node:

                       QUERY PLAN
---------------------------------------------------------
 Foreign Scan
   Relations: (documents_node2) INNER JOIN (users_node2)

Execution time is dropped significantly as well — by more than 3 times even for this small test database. Situation for simple queries with aggregates or joins and aggregates followed by the sharding key filter is the same. Something similar was briefly discussed in this thread [3].

IIUC, it means that push-down of queries through the postgres_fdw works perfectly well, the problem is with partition-wise operation detection at the planning time. Currently, partition-wise aggregate routines, e.g., looks for a GROUP BY and checks whether sharding key exists there or not. After that PARTITIONWISE_AGGREGATE_* flag is set. However, it doesn't look for a content of WHERE clause, so frankly speaking it isn't a problem, this functionality is not yet implemented.

Actually, sometimes I was able to push down queries with aggregate simply by adding an additional GROUP BY with sharding key, like this:

SELECT
    count(*)
FROM
    documents
WHERE
    company_id = 5
GROUP BY company_id;

where this GROUP BY obviously doesn't change a results, it just allows planner to choose from more possible paths.

Also, I have tried to hack it a bit and forcedly set PARTITIONWISE_AGGREGATE_FULL for this particular query. Everything executed fine and returned result was correct, which means that all underlying machinery is ready.

That way, I propose a change to the planner, which will check whether partitioning key exist in the WHERE clause and will set PARTITIONWISE_AGGREGATE_* flags if appropriate. The whole logic may look like:

1. If the only one condition by partitioning key is used (like above), then it is PARTITIONWISE_AGGREGATE_FULL. 2. If several conditions are used, then it should be PARTITIONWISE_AGGREGATE_PARTIAL.

I'm aware that WHERE clause may be extremely complex in general, but we could narrow this possible optimisation to the same restrictions as postgres_fdw push-down "only WHERE clauses using built-in operators and functions will be considered for execution on the remote server".

Although it seems that it will be easier to start with aggregates, probably we should initially plan a more general solution? For example, check that all involved tables are filtered by partitioning key and push down the entire query if all of them target the same foreign server.

Any thoughts?


[1] https://docs.citusdata.com/en/v9.3/get_started/tutorial_multi_tenant.html
[2] https://gist.github.com/ololobus/8fba33241f68be2e3765d27bf04882a3
[3] https://www.postgresql.org/message-id/flat/CAFT%2BaqL1Tt0qfYqjHH%2BshwPoW8qdFjpJ8vBR5ABoXJDUcHyN1w%40mail.gmail.com

Regards
--
Alexey Kondratov

Postgres Professional https://www.postgrespro.com
Russian Postgres Company
DROP TABLE IF EXISTS companies CASCADE;
DROP TABLE IF EXISTS users CASCADE;
DROP TABLE IF EXISTS documents CASCADE;
DROP SERVER IF EXISTS node2 CASCADE;

CREATE EXTENSION IF NOT EXISTS postgres_fdw;

CREATE SERVER node2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '5433');
CREATE USER MAPPING FOR current_user SERVER node2;

CREATE TABLE companies (
    company_id      int not null,
    created_at      timestamp without time zone default current_timestamp,
    name            text
) PARTITION BY HASH (company_id);

CREATE TABLE users (
    company_id      int not null,
    id              int not null,
    created_at      timestamp without time zone default current_timestamp,
    name            text
) PARTITION BY HASH (company_id);

CREATE TABLE documents (
    company_id      int not null,
    id              int not null,
    user_id         int not null,
    created_at      timestamp without time zone default current_timestamp,
    text            text
) PARTITION BY HASH (company_id);

CREATE TABLE companies_node1 PARTITION OF companies
    FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE FOREIGN TABLE companies_node2 PARTITION OF companies
    FOR VALUES WITH (MODULUS 2, REMAINDER 1)
    SERVER node2;

CREATE TABLE users_node1 PARTITION OF users
    FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE FOREIGN TABLE users_node2 PARTITION OF users
    FOR VALUES WITH (MODULUS 2, REMAINDER 1)
    SERVER node2;

CREATE TABLE documents_node1 PARTITION OF documents
    FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE FOREIGN TABLE documents_node2 PARTITION OF documents
    FOR VALUES WITH (MODULUS 2, REMAINDER 1)
    SERVER node2;

ALTER TABLE companies_node1 ADD CONSTRAINT companies_pk PRIMARY KEY 
(company_id);
ALTER TABLE users_node1 ADD CONSTRAINT users_pk PRIMARY KEY (company_id, id);
ALTER TABLE documents_node1 ADD CONSTRAINT documents_pk PRIMARY KEY 
(company_id, id);
DROP TABLE IF EXISTS companies CASCADE;
DROP TABLE IF EXISTS users CASCADE;
DROP TABLE IF EXISTS documents CASCADE;
DROP SERVER IF EXISTS node1 CASCADE;

CREATE EXTENSION IF NOT EXISTS postgres_fdw;

CREATE SERVER node1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '5432');
CREATE USER MAPPING FOR current_user SERVER node1;


CREATE TABLE companies (
    company_id      int not null,
    created_at      timestamp without time zone default current_timestamp,
    name            text
) PARTITION BY HASH (company_id);

CREATE TABLE users (
    company_id      int not null,
    id              int not null,
    created_at      timestamp without time zone default current_timestamp,
    name            text
) PARTITION BY HASH (company_id);

CREATE TABLE documents (
    company_id      int not null,
    id              int not null,
    user_id         int not null,
    created_at      timestamp without time zone default current_timestamp,
    text            text
) PARTITION BY HASH (company_id);

CREATE TABLE companies_node2 PARTITION OF companies
    FOR VALUES WITH (MODULUS 2, REMAINDER 1);
CREATE FOREIGN TABLE companies_node1 PARTITION OF companies
    FOR VALUES WITH (MODULUS 2, REMAINDER 0)
    SERVER node1;

CREATE TABLE users_node2 PARTITION OF users
    FOR VALUES WITH (MODULUS 2, REMAINDER 1);
CREATE FOREIGN TABLE users_node1 PARTITION OF users
    FOR VALUES WITH (MODULUS 2, REMAINDER 0)
    SERVER node1;

CREATE TABLE documents_node2 PARTITION OF documents
    FOR VALUES WITH (MODULUS 2, REMAINDER 1);
CREATE FOREIGN TABLE documents_node1 PARTITION OF documents
    FOR VALUES WITH (MODULUS 2, REMAINDER 0)
    SERVER node1;

ALTER TABLE companies_node2 ADD CONSTRAINT companies_pk PRIMARY KEY 
(company_id);
ALTER TABLE users_node2 ADD CONSTRAINT users_pk PRIMARY KEY (company_id, id);
ALTER TABLE documents_node2 ADD CONSTRAINT documents_pk PRIMARY KEY 
(company_id, id);
DELETE FROM companies;
DELETE FROM users;
DELETE FROM documents;

INSERT INTO companies(company_id, name)
SELECT id, md5(id::text) FROM generate_series(1, 10) c(id);

DO
$do$
BEGIN 
    FOR cid IN 1..10 LOOP
        INSERT INTO users(id, company_id, name)
        SELECT id, cid, md5(id::text) FROM generate_series(1, 100) u(id);
    END LOOP;
END
$do$;

INSERT INTO documents(id, company_id, user_id, text)
SELECT id, trunc(random() * 10 + 1)::int, trunc(random() * 100 + 1)::int, 
md5(id::text)
FROM generate_series(1, 200000) p(id);
-- Basic aggregate + filter by sharding key
EXPLAIN (
    ANALYZE,
    VERBOSE
)
SELECT
    count(*)
FROM
    documents
WHERE
    company_id = 5;

EXPLAIN (
    ANALYZE,
    VERBOSE
)
SELECT
    count(*)
FROM
    documents_node2
WHERE
    company_id = 5;

-- Basic aggregate + filter by sharding key + fake group by -> then it is 
pushdown sometimes
EXPLAIN (
    ANALYZE,
    VERBOSE
)
SELECT
    count(*)
FROM
    documents
WHERE
    company_id = 5
GROUP BY company_id;

EXPLAIN (
    ANALYZE,
    VERBOSE
)
SELECT
    sum(id)
FROM
    documents
WHERE
    company_id = 5
GROUP BY company_id;

-- Basic join by secondary key + filter by sharding key
EXPLAIN (
    ANALYZE,
    VERBOSE
)
SELECT
    *
FROM
    documents
    INNER JOIN users ON documents.user_id = users.id
WHERE
    documents.company_id = 5
    AND users.company_id = 5;

EXPLAIN (
    ANALYZE,
    VERBOSE
)
SELECT
    *
FROM
    documents_node2
    INNER JOIN users_node2 ON documents_node2.user_id = users_node2.id
WHERE
    documents_node2.company_id = 5
    AND users_node2.company_id = 5;

-- Join by secondary key + aggregate + filter by sharding key
EXPLAIN (
    ANALYZE,
    VERBOSE
)
SELECT
    count(*)
FROM
    documents
    INNER JOIN users ON documents.user_id = users.id
WHERE
    documents.company_id = 5
    AND users.company_id = 5;

EXPLAIN (
    ANALYZE,
    VERBOSE
)
SELECT
    count(*)
FROM
    documents_node2
    INNER JOIN users_node2 ON documents_node2.user_id = users_node2.id
WHERE
    documents_node2.company_id = 5
    AND users_node2.company_id = 5;

-- Join by secondary key + aggregate + group by secondary key
EXPLAIN (
    ANALYZE,
    VERBOSE
)
SELECT
    user_id,
    count(*) AS documents_count
FROM
    documents
    INNER JOIN users ON documents.user_id = users.id
WHERE
    documents.company_id = 5
    AND users.company_id = 5
GROUP BY
    user_id;

EXPLAIN (
    ANALYZE,
    VERBOSE
)
SELECT
    user_id,
    count(*) AS documents_count
FROM
    documents_node2
    INNER JOIN users_node2 ON documents_node2.user_id = users_node2.id
WHERE
    documents_node2.company_id = 5
    AND users_node2.company_id = 5
GROUP BY
    user_id;
#!/usr/bin/env sh

pg_ctl -D node1 stop > /dev/null
pg_ctl -D node2 stop > /dev/null

rm -rf node1 node2
rm node1.log node2.log

initdb -D node1
initdb -D node2

echo "port = 5433" >> node2/postgresql.conf

echo "enable_partitionwise_join = 'on'" >> node1/postgresql.conf
echo "enable_partitionwise_aggregate = 'on'" >> node1/postgresql.conf
echo "postgres_fdw.use_remote_estimate = true" >> node1/postgresql.conf

echo "enable_partitionwise_join = 'on'" >> node2/postgresql.conf
echo "enable_partitionwise_aggregate = 'on'" >> node2/postgresql.conf
echo "postgres_fdw.use_remote_estimate = true" >> node2/postgresql.conf

pg_ctl -D node1 -l node1.log start
pg_ctl -D node2 -l node2.log start

createdb
createdb -p5433

psql -f init1.sql
psql -p5433 -f init2.sql

psql -f load.sql

psql -c "ANALYSE"
psql -p5433 -c "ANALYSE"

Reply via email to