This is another try of [1].

BACKGROUND
========================================

We want to realize parallel INSERT SELECT in the following steps:
1) INSERT + parallel SELECT
2) Parallel INSERT + parallel SELECT

Below are example use cases.  We don't expect high concurrency or an empty data 
source.
* Data loading (ETL or ELT) into an analytics database, typically a data ware 
house.
* Batch processing in an OLTP database.


PROBLEMS
========================================

(1) The overhead of checking parallel-safety could be large
We have to check the target table and its child partitions for parallel safety. 
 That is, we make sure those relations don't have parallel-unsafe domains, 
constraints, indexes, or triggers.

What we should check is the relations into which the statement actually inserts 
data.  However, the planner does not know which relations will be actually 
inserted into.  So, the planner has to check all descendant partitions of a 
target table.  When the target table has many partitions, this overhead could 
be unacceptable when compared to the benefit gained from parallelism.


(2) There's no mechanism for parallel workers to assign an XID
Parallel workers need an XID of the current (sub)transaction when actually 
inserting a tuple (i.e., calling heap_insert()).  When the leader has not got 
the XID yet, the worker may have to assign a new XID and communicate it to the 
leader and other workers so that all parallel processes use the same XID.


SOLUTION TO (1)
========================================

The candidate ideas are:

1) Caching the result of parallel-safety check
The planner stores the result of checking parallel safety for each relation in 
relcache, or some purpose-built hash table in shared memory.

The problems are:

* Even if the target relation turns out to be parallel safe by looking at those 
data structures, we cannot assume it remains true until the SQL statement 
finishes.  For instance, other sessions might add a parallel-unsafe index to 
its descendant relations.  Other examples include that when the user changes 
the parallel safety of indexes or triggers by running ALTER FUNCTION on the 
underlying index AM function or trigger function, the relcache entry of the 
table or index is not invalidated, so the correct parallel safety is not 
maintained in the cache.
In that case, when the executor encounters a parallel-unsafe object, it can 
change the cached state as being parallel-unsafe and error out.

* Can't ensure fast access.  With relcache, the first access in each session 
has to undergo the overhead of parallel-safety check.  With a hash table in 
shared memory, the number of relations stored there would be limited, so the 
first access after database startup or the hash table entry eviction similarly 
experiences slowness.

* With a new hash table, some lwlock for concurrent access must be added, which 
can have an adverse effect on performance.


2) Enabling users to declare that the table allows parallel data modification
Add a table property that represents parallel safety of the table for DML 
statement execution.  Users specify it as follows:
    
CREATE TABLE table_name (...) PARALLEL { UNSAFE | RESTRICTED | SAFE };
    ALTER TABLE table_name PARALLEL { UNSAFE | RESTRICTED | SAFE };

This property is recorded in pg_class's relparallel column as 'u', 'r', or 's', 
just like pg_proc's proparallel.  The default is UNSAFE.

The planner assumes that all of the table, its descendant partitions, and their 
ancillary objects have the specified parallel safety or safer one.  The user is 
responsible for its correctness.  If the parallel processes find an object that 
is less safer than the assumed parallel safety during statement execution, it 
throws an ERROR and abort the statement execution.

The objects that relate to the parallel safety of a DML target table are as 
follows:

    * Column default expression
    * DOMAIN type CHECK expression
    * CHECK constraints on column
    * Partition key
    * Partition key support function
    * Index expression
    * Index predicate
    * Index AM function
    * Operator function
    * Trigger function

When the parallel safety of some of these objects is changed, it's costly to 
reflect it on the parallel safety of tables that depend on them.  So, we don't 
do it.  Instead, we provide a utility function 
pg_get_parallel_safety('table_name') that returns records of (objid, classid, 
parallel_safety) that represent the parallel safety of objects that determine 
the parallel safety of the specified table.  The function only outputs objects 
that are not parallel safe.  Otherwise, it will consume excessive memory while 
accumulating the output.  The user can use this function to identify 
problematic objects when a parallel DML fails or is not parallelized in an 
expected manner.

How does the executor detect parallel unsafe objects?  There are two ways:

1) At loading time
When the executor loads the definition of objects (tables, constraints, index, 
triggers, etc.) during the first access to them after session start or their 
eviction by sinval message, it checks the parallel safety.

This is a legitimate way, but may need much code.  Also, it might overlook 
necessary code changes without careful inspection.


2) At function execution time
All related objects come down to some function execution.  So, add a parallel 
safety check there when in a parallel worker.  If the current process is a 
parallel worker and the function is parallel unsafe, error out with 
ereport(ERROR).  This approach eliminates the oversight of parallel safety 
check with the additional bonus of tiny code change!

The place would be FunctionCallInvoke().  It's a macro in fmgr.h now.  Perhaps 
we should make it a function in fmgr.c, so that fmgr.h does not have to include 
header files for parallelism-related definitions.

We have to evaluate the performance effect of converting FunctionCallInvoke() 
into a function and adding an if statement there, because it's a relatively 
low-level function.




SOLUTION TO (2)
========================================

1) Make it possible for workers to assign an XID and share it among the 
parallel processes
The problems are:

* Tuple visibility
If the worker that acquires the XID writes some row and another worker reads 
that row before it gets to see the XID information, the latter worker won't 
treat such a row is written by its own transaction.

For instance, the worker (w-1) that acquires the XID (501) deletes the tuple 
(CTID: 0, 2).  Now, another worker (w-2) reads that tuple (CTID: 0, 2), it 
would consider that the tuple is still visible to its snapshot but if the w-2 
knows that 501 is its own XID, it would have been considered it as 
(not-visible) deleted.  I think this can happen when multiple updates to the 
same row happen and new rows get added to the new page.

* The implementation seems complex
When the DML is run inside a deeply nested subtransaction and the parent 
transactions have not allocated their XIDs yet, the worker needs to allocate 
the XIDs for its parents.  That indeterminate number of XIDs must be stored in 
shared memory.  The stack of TransactionState structures must also be passed.

Also, TransactionIdIsCurrentTransactionId() uses an array ParallelCurrentXids 
where parallel workers receive sub-committed XIDs from the leader.  This needs 
to be reconsidered.


2) The leader assigns an XID before entering parallel mode and passes it to 
workers
This is what was done in [1].

The problem is that the XID would not be used if the data source (SELECT query) 
returns no valid rows.  This is a waste of XID.

However, the data source should be rarely empty when this feature is used.  As 
the following Oracle manual says, parallel DML will be used in data analytics 
and OLTP batch jobs.  There should be plenty of source data in those scenarios.

When to Use Parallel DML
https://docs.oracle.com/en/database/oracle/oracle-database/21/vldbg/types-parallelism.html#GUID-18B2AF09-C548-48DE-A794-86224111549F
--------------------------------------------------
Several scenarios where parallel DML is used include:

Refreshing Tables in a Data Warehouse System

Creating Intermediate Summary Tables

Using Scoring Tables

Updating Historical Tables

Running Batch Jobs
--------------------------------------------------



CONCLUSION
========================================

(1) The overhead of checking parallel-safety could be large
We're inclined to go with solution 2, because it doesn't have a big problem.  
However, we'd like to try to present some more analysis on solution 1 in this 
thread.

Regarding how to check parallel safety in executor, I prefer the simpler way of 
adding a check in function execution.  If it turns out to have an untolerable 
performance problem, we can choose the other approach.

(2) There's no mechanism for parallel workers to assign an XID
We'd like to adopt solution 2 because it will really not have a big issue in 
the assumed use cases.  The implementation is very easy and does not look 
strange.


Of course, any better-looking idea would be much appreciated.  (But simple, or 
not unnecessarily complex,  one is desired.)



[1]
Parallel INSERT (INTO ... SELECT ...)
https://www.postgresql.org/message-id/flat/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpfjrr3acrt...@mail.gmail.com


Regards
Takayuki Tsunakawa




Reply via email to