In this email, I would like to discuss allowing streaming logical transactions (large in-progress transactions) by background workers and parallel apply in general. The goal of this work is to improve the performance of the apply work in logical replication.
Currently, for large transactions, the publisher sends the data in multiple streams (changes divided into chunks depending upon logical_decoding_work_mem), and then on the subscriber-side, the apply worker writes the changes into temporary files and once it receives the commit, it read from the file and apply the entire transaction. To improve the performance of such transactions, we can instead allow them to be applied via background workers. There could be multiple ways to achieve this: Approach-1: Assign a new bgworker (if available) as soon as the xact's first stream came and the main apply worker will send changes to this new worker via shared memory. We keep this worker assigned till the transaction commit came and also wait for the worker to finish at commit. This preserves commit ordering and avoid writing to and reading from file in most cases. We still need to spill if there is no worker available. We also need to allow stream_stop to complete by the background worker to finish it to avoid deadlocks because T-1's current stream of changes can update rows in conflicting order with T-2's next stream of changes. Approach-2: Assign another worker to spill the changes and only allow to apply at the commit time by the same or another worker. Now, to preserve, the commit order, we need to wait at commit so that the assigned respective workers can finish. This won't avoid spilling to disk and reading back at commit time but can help in receiving and processing more data than we are doing currently but not sure if this can win over Approach-1 because we still need to write and read from the file and we need to probably use share memory queue to send the data to other background workers to process it. We need to change error handling to allow the above parallelization. The current model for apply is such that if any error occurs while applying we will simply report the error in server logs and the apply worker will exit. On the restart, it will again get the transaction data which previously failed and it will try to apply it again. Now, in the new approach (say Approach-1), we need to ensure that all the active workers that are applying in-progress transactions should also exit before the main apply worker exit to allow rollback of currently applied transactions and re-apply them as we get the data again. This is required to avoid losing transactions if any later transaction got committed and updated the replication origin as in such cases the earlier transactions won't be resent. This won't be much different than what we do now, where say two transactions, t-1, and t-2 have multiple streams overlapped. Now, if the error happened before one of those is completed via commit or rollback, all the data needs to be resent by the server and processed again by the apply worker. The next step in this area is to parallelize apply of all possible transactions. I think the main things we need to care about to allow this are: 1. Transaction dependency: We can't simply allow dependent transactions to perform in parallel as that can lead to inconsistency. Say, if we insert a row in the first transaction and update it in the second transaction and allow both transactions to apply in parallel, the insert-one may occur later and the update will fail. 2. Deadlocks: It can happen because now the transactions will be applied in parallel. Say transaction T-1 updates row-2 and row-3 and transaction T-2 updates row-3 and row-2, if we allow in parallel then there is a chance of deadlock whereas there is no such risk in serial execution where the commit order is preserved. We can solve both problems if we allow only independent xacts to be parallelized. The transactions would be considered dependent if they operate on the same set of rows from the same table. Now apart from this, there could be other cases where determining transaction dependency won't be straightforward, so we can disallow those transactions to participate in parallel apply. Those are the cases where we can use functions in the table definition expressions. We can think of identifying safe functions like all built-in functions, and any immutable functions (and probably stable functions). We need to check safety for cases such as (a) trigger functions, (b) column default value expressions (as those can call functions), (c) constraint expressions, (d) foreign keys, (e) operations on partitioned tables (especially those performed via publish_via_partition_root option) as we need to check for expressions on all partitions. The transactions that operate on the same set of tables and are performing truncate can lead to deadlock, so we need to consider such transactions as a dependent. The basic idea is that for each running xact we can maintain the table oid, row id(pkey or replica identity), and xid in the hash table in apply worker. For any new xact, we need to check if it doesn't conflict with one of the previous running xacts and only then allow it to be applied parallelly. We can collect all the changes of a transaction in the in-memory buffer while checking its dependency and then allow it to perform by one of the available workers at commit. If the rows for a particular transaction exceed a certain threshold then we need to escalate to a table-level strategy which means any other transaction operating on the same table will be considered dependent. For very large transactions that didn't fit in the in-memory buffer, either we need to spill those to disk or just decide to not parallelize them. We need to remove rows from the hash table once the transaction is applied completely. The other thing we need to ensure while parallelizing independent transactions is to preserve the commit order of transactions. This is to ensure that in case of errors, we won't get replicas out of sync. Say, if we allow the commit order to be changed then it is possible that some later transaction has updated the replication_origin LSN to a later value than the transaction for which the apply is in progress. Now, if the error occurs for such an in-progress transaction, the server won't send the changes for such a transaction as the replication_origin's LSN would have moved ahead. Even though we are preserving commit order there will be a benefit of doing parallel apply as we should be able to parallelize most of the writes in the transactions. Thoughts? Thanks to Hou-San and Shi-San for helping me to investigate these ideas. -- With Regards, Amit Kapila.