Hello. Thanks for the comment.

# Sorry in advance for possilbe breaking the thread.

> MarkBufferDirtyHint() writes WAL even when rd_firstRelfilenodeSubid or
> rd_createSubid is set; see attached test case.  It needs to skip WAL whenever
> RelationNeedsWAL() returns false.

Thanks for pointing out that. And the test patch helped me very much.

Most of callers can tell that to the function, but SetHintBits()
cannot easily. Rather I think we shouldn't even try to do
that. Instead, In the attached, MarkBufferDirtyHint() asks storage.c
for sync-pending state of the relfilenode for the buffer. In the
attached patch (0003) RelFileNodeSkippingWAL loops over pendingSyncs
but it is called only at the time FPW is added so I believe it doesn't
affect performance so much. However, we can use hash for pendingSyncs
instead of liked list. Anyway the change is in its own file
v21-0003-Fix-MarkBufferDirtyHint.patch, which will be merged into
0002.

AFAICS all XLogInsert is guarded by RelationNeedsWAL() or in the
non-wal_minimal code paths.

> Cylinder and track sizes are obsolete as user-visible concepts.  (They're not
> onstant for a given drive, and I think modern disks provide no way to read
> the relevant parameters.)  I like the name "wal_skip_threshold", and my second

I strongly agree. Thanks for the draft. I used it as-is. I don't come
up with an appropriate second description of the GUC so I just removed
it.

# it was "For rotating magnetic disks, it is around the size of a
# track or sylinder."

> the relevant parameters.)  I like the name "wal_skip_threshold", and
> my second choice would be "wal_skip_min_size".  Possibly documented
> as follows:
..
> Any other opinions on the GUC name?

I prefer the first candidate. I already used the terminology in
storage.c and the name fits more to the context.

> * We emit newpage WAL records for smaller size of relations.
> *
> * Small WAL records have a chance to be emitted at once along with
> * other backends' WAL records. We emit WAL records instead of syncing
> * for files that are smaller than a certain threshold expecting faster
- * commit. The threshold is defined by the GUC effective_io_block_size.
+ * commit. The threshold is defined by the GUC wal_skip_threshold.

The attached are:

- v21-0001-TAP-test-for-copy-truncation-optimization.patch
  same as v20

- v21-0002-Fix-WAL-skipping-feature.patch
  GUC name changed.

- v21-0003-Fix-MarkBufferDirtyHint.patch
  PoC of fixing the function. will be merged into 0002. (New)

- v21-0004-Documentation-for-wal_skip_threshold.patch
  GUC name and description changed. (Previous 0003)

- v21-0005-Additional-test-for-new-GUC-setting.patch
  including adjusted version of wal-optimize-noah-tests-v3.patch
  Maybe test names need further adjustment. (Previous 0004)

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 34149545942480d8dcc1cc587f40091b19b5aa39 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 11 Oct 2018 10:03:21 +0900
Subject: [PATCH v21 1/5] TAP test for copy-truncation optimization.

---
 src/test/recovery/t/018_wal_optimize.pl | 312 ++++++++++++++++++++++++
 1 file changed, 312 insertions(+)
 create mode 100644 src/test/recovery/t/018_wal_optimize.pl

diff --git a/src/test/recovery/t/018_wal_optimize.pl b/src/test/recovery/t/018_wal_optimize.pl
new file mode 100644
index 0000000000..b041121745
--- /dev/null
+++ b/src/test/recovery/t/018_wal_optimize.pl
@@ -0,0 +1,312 @@
+# Test WAL replay for optimized TRUNCATE and COPY records
+#
+# WAL truncation is optimized in some cases with TRUNCATE and COPY queries
+# which sometimes interact badly with the other optimizations in line with
+# several setting values of wal_level, particularly when using "minimal" or
+# "replica".  The optimization may be enabled or disabled depending on the
+# scenarios dealt here, and should never result in any type of failures or
+# data loss.
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 26;
+
+sub check_orphan_relfilenodes
+{
+	my($node, $test_name) = @_;
+
+	my $db_oid = $node->safe_psql('postgres',
+	   "SELECT oid FROM pg_database WHERE datname = 'postgres'");
+	my $prefix = "base/$db_oid/";
+	my $filepaths_referenced = $node->safe_psql('postgres', "
+	   SELECT pg_relation_filepath(oid) FROM pg_class
+	   WHERE reltablespace = 0 and relpersistence <> 't' and
+	   pg_relation_filepath(oid) IS NOT NULL;");
+	is_deeply([sort(map { "$prefix$_" }
+					grep(/^[0-9]+$/,
+						 slurp_dir($node->data_dir . "/$prefix")))],
+			  [sort split /\n/, $filepaths_referenced],
+			  $test_name);
+	return;
+}
+
+# Wrapper routine tunable for wal_level.
+sub run_wal_optimize
+{
+	my $wal_level = shift;
+
+	# Primary needs to have wal_level = minimal here
+	my $node = get_new_node("node_$wal_level");
+	$node->init;
+	$node->append_conf('postgresql.conf', qq(
+wal_level = $wal_level
+max_prepared_transactions = 1
+));
+	$node->start;
+
+	# Setup
+	my $tablespace_dir = $node->basedir . '/tablespace_other';
+	mkdir ($tablespace_dir);
+	$tablespace_dir = TestLib::perl2host($tablespace_dir);
+	$node->safe_psql('postgres',
+	   "CREATE TABLESPACE other LOCATION '$tablespace_dir';");
+
+	# Test direct truncation optimization.  No tuples
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test1 (id serial PRIMARY KEY);
+		TRUNCATE test1;
+		COMMIT;");
+
+	$node->stop('immediate');
+	$node->start;
+
+	my $result = $node->safe_psql('postgres', "SELECT count(*) FROM test1;");
+	is($result, qq(0),
+	   "wal_level = $wal_level, optimized truncation with empty table");
+
+	# Test truncation with inserted tuples within the same transaction.
+	# Tuples inserted after the truncation should be seen.
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test2 (id serial PRIMARY KEY);
+		INSERT INTO test2 VALUES (DEFAULT);
+		TRUNCATE test2;
+		INSERT INTO test2 VALUES (DEFAULT);
+		COMMIT;");
+
+	$node->stop('immediate');
+	$node->start;
+
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test2;");
+	is($result, qq(1),
+	   "wal_level = $wal_level, optimized truncation with inserted table");
+
+
+	# Same for prepared transaction
+	# Tuples inserted after the truncation should be seen.
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test2a (id serial PRIMARY KEY);
+		INSERT INTO test2a VALUES (DEFAULT);
+		TRUNCATE test2a;
+		INSERT INTO test2a VALUES (DEFAULT);
+		PREPARE TRANSACTION 't';
+		COMMIT PREPARED 't';");
+
+	$node->stop('immediate');
+	$node->start;
+
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test2a;");
+	is($result, qq(1),
+	   "wal_level = $wal_level, optimized truncation with prepared transaction");
+
+
+	# Data file for COPY query in follow-up tests.
+	my $basedir = $node->basedir;
+	my $copy_file = "$basedir/copy_data.txt";
+	TestLib::append_to_file($copy_file, qq(20000,30000
+20001,30001
+20002,30002));
+
+	# Test truncation with inserted tuples using COPY.  Tuples copied after the
+	# truncation should be seen.
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test3 (id serial PRIMARY KEY, id2 int);
+		INSERT INTO test3 (id, id2) VALUES (DEFAULT, generate_series(1,3000));
+		TRUNCATE test3;
+		COPY test3 FROM '$copy_file' DELIMITER ',';
+		COMMIT;");
+	$node->stop('immediate');
+	$node->start;
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test3;");
+	is($result, qq(3),
+	   "wal_level = $wal_level, optimized truncation with copied table");
+
+	# Like previous test, but rollback SET TABLESPACE in a subtransaction.
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test3a (id serial PRIMARY KEY, id2 int);
+		INSERT INTO test3a (id, id2) VALUES (DEFAULT, generate_series(1,3000));
+		TRUNCATE test3a;
+		SAVEPOINT s; ALTER TABLE test3a SET TABLESPACE other; ROLLBACK TO s;
+		COPY test3a FROM '$copy_file' DELIMITER ',';
+		COMMIT;");
+	$node->stop('immediate');
+	$node->start;
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test3a;");
+	is($result, qq(3),
+	   "wal_level = $wal_level, SET TABLESPACE in subtransaction");
+
+	# in different subtransaction patterns
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test3a2 (id serial PRIMARY KEY, id2 int);
+		INSERT INTO test3a2 (id, id2) VALUES (DEFAULT, generate_series(1,3000));
+		TRUNCATE test3a2;
+		SAVEPOINT s; ALTER TABLE test3a SET TABLESPACE other; RELEASE s;
+		COPY test3a2 FROM '$copy_file' DELIMITER ',';
+		COMMIT;");
+	$node->stop('immediate');
+	$node->start;
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test3a;");
+	is($result, qq(3),
+	   "wal_level = $wal_level, SET TABLESPACE in subtransaction");
+
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test3a3 (id serial PRIMARY KEY, id2 int);
+		INSERT INTO test3a3 (id, id2) VALUES (DEFAULT, generate_series(1,3000));
+		TRUNCATE test3a3;
+		SAVEPOINT s;
+			ALTER TABLE test3a3 SET TABLESPACE other;
+			SAVEPOINT s2;
+				ALTER TABLE test3a3 SET TABLESPACE pg_default;
+			ROLLBACK TO s2;
+			SAVEPOINT s2;
+				ALTER TABLE test3a3 SET TABLESPACE pg_default;
+			RELEASE s2;
+		ROLLBACK TO s;
+		COPY test3a3 FROM '$copy_file' DELIMITER ',';
+		COMMIT;");
+	$node->stop('immediate');
+	$node->start;
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test3a;");
+	is($result, qq(3),
+	   "wal_level = $wal_level, SET TABLESPACE in subtransaction");
+
+	# UPDATE touches two buffers; one is BufferNeedsWAL(); the other is not.
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test3b (id serial PRIMARY KEY, id2 int);
+		INSERT INTO test3b (id, id2) VALUES (DEFAULT, generate_series(1,10000));
+		COPY test3b FROM '$copy_file' DELIMITER ',';  -- set sync_above
+		UPDATE test3b SET id2 = id2 + 1;
+		DELETE FROM test3b;
+		COMMIT;");
+	$node->stop('immediate');
+	$node->start;
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test3b;");
+	is($result, qq(0),
+	   "wal_level = $wal_level, UPDATE of logged page extends relation");
+
+	# Test truncation with inserted tuples using both INSERT and COPY. Tuples
+	# inserted after the truncation should be seen.
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test4 (id serial PRIMARY KEY, id2 int);
+		INSERT INTO test4 (id, id2) VALUES (DEFAULT, generate_series(1,10000));
+		TRUNCATE test4;
+		INSERT INTO test4 (id, id2) VALUES (DEFAULT, 10000);
+		COPY test4 FROM '$copy_file' DELIMITER ',';
+		INSERT INTO test4 (id, id2) VALUES (DEFAULT, 10000);
+		COMMIT;");
+
+	$node->stop('immediate');
+	$node->start;
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test4;");
+	is($result, qq(5),
+	   "wal_level = $wal_level, optimized truncation with inserted/copied table");
+
+	# Test consistency of COPY with INSERT for table created in the same
+	# transaction.
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test5 (id serial PRIMARY KEY, id2 int);
+		INSERT INTO test5 VALUES (DEFAULT, 1);
+		COPY test5 FROM '$copy_file' DELIMITER ',';
+		COMMIT;");
+	$node->stop('immediate');
+	$node->start;
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test5;");
+	is($result, qq(4),
+	   "wal_level = $wal_level, replay of optimized copy with inserted table");
+
+	# Test consistency of COPY that inserts more to the same table using
+	# triggers.  If the INSERTS from the trigger go to the same block data
+	# is copied to, and the INSERTs are WAL-logged, WAL replay will fail when
+	# it tries to replay the WAL record but the "before" image doesn't match,
+	# because not all changes were WAL-logged.
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test6 (id serial PRIMARY KEY, id2 text);
+		CREATE FUNCTION test6_before_row_trig() RETURNS trigger
+		  LANGUAGE plpgsql as \$\$
+		  BEGIN
+		    IF new.id2 NOT LIKE 'triggered%' THEN
+		      INSERT INTO test6 VALUES (DEFAULT, 'triggered row before' || NEW.id2);
+		    END IF;
+		    RETURN NEW;
+		  END; \$\$;
+		CREATE FUNCTION test6_after_row_trig() RETURNS trigger
+		  LANGUAGE plpgsql as \$\$
+		  BEGIN
+		    IF new.id2 NOT LIKE 'triggered%' THEN
+		      INSERT INTO test6 VALUES (DEFAULT, 'triggered row after' || OLD.id2);
+		    END IF;
+		    RETURN NEW;
+		  END; \$\$;
+		CREATE TRIGGER test6_before_row_insert
+		  BEFORE INSERT ON test6
+		  FOR EACH ROW EXECUTE PROCEDURE test6_before_row_trig();
+		CREATE TRIGGER test6_after_row_insert
+		  AFTER INSERT ON test6
+		  FOR EACH ROW EXECUTE PROCEDURE test6_after_row_trig();
+		COPY test6 FROM '$copy_file' DELIMITER ',';
+		COMMIT;");
+	$node->stop('immediate');
+	$node->start;
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test6;");
+	is($result, qq(9),
+	   "wal_level = $wal_level, replay of optimized copy with before trigger");
+
+	# Test consistency of INSERT, COPY and TRUNCATE in same transaction block
+	# with TRUNCATE triggers.
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test7 (id serial PRIMARY KEY, id2 text);
+		CREATE FUNCTION test7_before_stat_trig() RETURNS trigger
+		  LANGUAGE plpgsql as \$\$
+		  BEGIN
+		    INSERT INTO test7 VALUES (DEFAULT, 'triggered stat before');
+		    RETURN NULL;
+		  END; \$\$;
+		CREATE FUNCTION test7_after_stat_trig() RETURNS trigger
+		  LANGUAGE plpgsql as \$\$
+		  BEGIN
+		    INSERT INTO test7 VALUES (DEFAULT, 'triggered stat before');
+		    RETURN NULL;
+		  END; \$\$;
+		CREATE TRIGGER test7_before_stat_truncate
+		  BEFORE TRUNCATE ON test7
+		  FOR EACH STATEMENT EXECUTE PROCEDURE test7_before_stat_trig();
+		CREATE TRIGGER test7_after_stat_truncate
+		  AFTER TRUNCATE ON test7
+		  FOR EACH STATEMENT EXECUTE PROCEDURE test7_after_stat_trig();
+		INSERT INTO test7 VALUES (DEFAULT, 1);
+		TRUNCATE test7;
+		COPY test7 FROM '$copy_file' DELIMITER ',';
+		COMMIT;");
+	$node->stop('immediate');
+	$node->start;
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test7;");
+	is($result, qq(4),
+	   "wal_level = $wal_level, replay of optimized copy with before trigger");
+
+	# Test redo of temp table creation.
+	$node->safe_psql('postgres', "
+		CREATE TEMP TABLE test8 (id serial PRIMARY KEY, id2 text);");
+	$node->stop('immediate');
+	$node->start;
+
+	check_orphan_relfilenodes($node, "wal_level = $wal_level, no orphan relfilenode remains");
+
+	return;
+}
+
+# Run same test suite for multiple wal_level values.
+run_wal_optimize("minimal");
+run_wal_optimize("replica");
-- 
2.23.0

>From e297f55d0d9215d9e828ec32dc0ebadb8e04bb2c Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota....@gmail.com>
Date: Fri, 25 Oct 2019 12:07:09 +0900
Subject: [PATCH v21 2/5] Fix WAL skipping feature

WAL-skipping operations mixed with WAL-logged operations can lead to
database corruption after a crash. This patch changes the WAL-skipping
feature so that no data modification is WAL-logged at all then sync
such relations at commit.
---
 src/backend/access/heap/heapam.c         |   4 +-
 src/backend/access/heap/heapam_handler.c |  22 +-
 src/backend/access/heap/rewriteheap.c    |  13 +-
 src/backend/access/transam/xact.c        |  17 ++
 src/backend/access/transam/xlogutils.c   |  11 +-
 src/backend/catalog/storage.c            | 294 ++++++++++++++++++++---
 src/backend/commands/cluster.c           |  24 ++
 src/backend/commands/copy.c              |  39 +--
 src/backend/commands/createas.c          |   5 +-
 src/backend/commands/matview.c           |   4 -
 src/backend/commands/tablecmds.c         |  10 +-
 src/backend/storage/buffer/bufmgr.c      |  41 ++--
 src/backend/storage/smgr/md.c            |  30 +++
 src/backend/utils/cache/relcache.c       |  28 ++-
 src/backend/utils/misc/guc.c             |  13 +
 src/include/access/heapam.h              |   1 -
 src/include/access/rewriteheap.h         |   2 +-
 src/include/access/tableam.h             |  40 +--
 src/include/catalog/storage.h            |  12 +
 src/include/storage/bufmgr.h             |   1 +
 src/include/storage/md.h                 |   1 +
 src/include/utils/rel.h                  |  19 +-
 22 files changed, 455 insertions(+), 176 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 0128bb34ef..a7ead9405a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1936,7 +1936,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
+	if (RelationNeedsWAL(relation))
 	{
 		xl_heap_insert xlrec;
 		xl_heap_header xlhdr;
@@ -2119,7 +2119,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	/* currently not needed (thus unsupported) for heap_multi_insert() */
 	AssertArg(!(options & HEAP_INSERT_NO_LOGICAL));
 
-	needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
+	needwal = RelationNeedsWAL(relation);
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
 												   HEAP_DEFAULT_FILLFACTOR);
 
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 2dd8821fac..0871df7730 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -558,18 +558,6 @@ tuple_lock_retry:
 	return result;
 }
 
-static void
-heapam_finish_bulk_insert(Relation relation, int options)
-{
-	/*
-	 * If we skipped writing WAL, then we need to sync the heap (but not
-	 * indexes since those use WAL anyway / don't go through tableam)
-	 */
-	if (options & HEAP_INSERT_SKIP_WAL)
-		heap_sync(relation);
-}
-
-
 /* ------------------------------------------------------------------------
  * DDL related callbacks for heap AM.
  * ------------------------------------------------------------------------
@@ -701,7 +689,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 	IndexScanDesc indexScan;
 	TableScanDesc tableScan;
 	HeapScanDesc heapScan;
-	bool		use_wal;
 	bool		is_system_catalog;
 	Tuplesortstate *tuplesort;
 	TupleDesc	oldTupDesc = RelationGetDescr(OldHeap);
@@ -716,12 +703,8 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 	is_system_catalog = IsSystemRelation(OldHeap);
 
 	/*
-	 * We need to log the copied data in WAL iff WAL archiving/streaming is
-	 * enabled AND it's a WAL-logged rel.
+	 * smgr_targblock must be initially invalid if we are to skip WAL logging
 	 */
-	use_wal = XLogIsNeeded() && RelationNeedsWAL(NewHeap);
-
-	/* use_wal off requires smgr_targblock be initially invalid */
 	Assert(RelationGetTargetBlock(NewHeap) == InvalidBlockNumber);
 
 	/* Preallocate values/isnull arrays */
@@ -731,7 +714,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 
 	/* Initialize the rewrite operation */
 	rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, *xid_cutoff,
-								 *multi_cutoff, use_wal);
+								 *multi_cutoff);
 
 
 	/* Set up sorting if wanted */
@@ -2519,7 +2502,6 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
-	.finish_bulk_insert = heapam_finish_bulk_insert,
 
 	.tuple_fetch_row_version = heapam_fetch_row_version,
 	.tuple_get_latest_tid = heap_get_latest_tid,
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index d41dbcf5f7..9b757cacf4 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -144,7 +144,6 @@ typedef struct RewriteStateData
 	Page		rs_buffer;		/* page currently being built */
 	BlockNumber rs_blockno;		/* block where page will go */
 	bool		rs_buffer_valid;	/* T if any tuples in buffer */
-	bool		rs_use_wal;		/* must we WAL-log inserts? */
 	bool		rs_logical_rewrite; /* do we need to do logical rewriting */
 	TransactionId rs_oldest_xmin;	/* oldest xmin used by caller to determine
 									 * tuple visibility */
@@ -238,15 +237,13 @@ static void logical_end_heap_rewrite(RewriteState state);
  * oldest_xmin	xid used by the caller to determine which tuples are dead
  * freeze_xid	xid before which tuples will be frozen
  * cutoff_multi	multixact before which multis will be removed
- * use_wal		should the inserts to the new heap be WAL-logged?
  *
  * Returns an opaque RewriteState, allocated in current memory context,
  * to be used in subsequent calls to the other functions.
  */
 RewriteState
 begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin,
-				   TransactionId freeze_xid, MultiXactId cutoff_multi,
-				   bool use_wal)
+				   TransactionId freeze_xid, MultiXactId cutoff_multi)
 {
 	RewriteState state;
 	MemoryContext rw_cxt;
@@ -271,7 +268,6 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 	/* new_heap needn't be empty, just locked */
 	state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
 	state->rs_buffer_valid = false;
-	state->rs_use_wal = use_wal;
 	state->rs_oldest_xmin = oldest_xmin;
 	state->rs_freeze_xid = freeze_xid;
 	state->rs_cutoff_multi = cutoff_multi;
@@ -330,7 +326,7 @@ end_heap_rewrite(RewriteState state)
 	/* Write the last page, if any */
 	if (state->rs_buffer_valid)
 	{
-		if (state->rs_use_wal)
+		if (RelationNeedsWAL(state->rs_new_rel))
 			log_newpage(&state->rs_new_rel->rd_node,
 						MAIN_FORKNUM,
 						state->rs_blockno,
@@ -654,9 +650,6 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 	{
 		int			options = HEAP_INSERT_SKIP_FSM;
 
-		if (!state->rs_use_wal)
-			options |= HEAP_INSERT_SKIP_WAL;
-
 		/*
 		 * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
 		 * for the TOAST table are not logically decoded.  The main heap is
@@ -695,7 +688,7 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 			/* Doesn't fit, so write out the existing page */
 
 			/* XLOG stuff */
-			if (state->rs_use_wal)
+			if (RelationNeedsWAL(state->rs_new_rel))
 				log_newpage(&state->rs_new_rel->rd_node,
 							MAIN_FORKNUM,
 							state->rs_blockno,
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index fc55fa6d53..59d65bc214 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2107,6 +2107,13 @@ CommitTransaction(void)
 	 */
 	PreCommit_on_commit_actions();
 
+	/*
+	 * Synchronize files that are created and not WAL-logged during this
+	 * transaction. This must happen before emitting commit record so that we
+	 * don't see committed-but-broken files after a crash.
+	 */
+	smgrDoPendingSyncs(true, false);
+
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
@@ -2339,6 +2346,14 @@ PrepareTransaction(void)
 	 */
 	PreCommit_on_commit_actions();
 
+	/*
+	 * Sync all WAL-skipped files now. Some of them may be deleted at
+	 * transaction end but we don't bother store that information in PREPARE
+	 * record or two-phase files. Like commit, we should sync WAL-skipped
+	 * files before emitting PREPARE record. See CommitTransaction().
+	 */
+	smgrDoPendingSyncs(true, true);
+
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
@@ -2657,6 +2672,7 @@ AbortTransaction(void)
 	 */
 	AfterTriggerEndXact(false); /* 'false' means it's abort */
 	AtAbort_Portals();
+	smgrDoPendingSyncs(false, false);
 	AtEOXact_LargeObject(false);
 	AtAbort_Notify();
 	AtEOXact_RelationMap(false, is_parallel_worker);
@@ -4964,6 +4980,7 @@ AbortSubTransaction(void)
 						   s->parent->curTransactionOwner);
 		AtEOSubXact_LargeObject(false, s->subTransactionId,
 								s->parent->subTransactionId);
+		smgrDoPendingSyncs(false, false);
 		AtSubAbort_Notify();
 
 		/* Advertise the fact that we aborted in pg_xact. */
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 5f1e5ba75d..fc296abf91 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -544,6 +544,8 @@ typedef FakeRelCacheEntryData *FakeRelCacheEntry;
  * fields related to physical storage, like rd_rel, are initialized, so the
  * fake entry is only usable in low-level operations like ReadBuffer().
  *
+ * This is also used for syncing WAL-skipped files.
+ *
  * Caller must free the returned entry with FreeFakeRelcacheEntry().
  */
 Relation
@@ -552,18 +554,19 @@ CreateFakeRelcacheEntry(RelFileNode rnode)
 	FakeRelCacheEntry fakeentry;
 	Relation	rel;
 
-	Assert(InRecovery);
-
 	/* Allocate the Relation struct and all related space in one block. */
 	fakeentry = palloc0(sizeof(FakeRelCacheEntryData));
 	rel = (Relation) fakeentry;
 
 	rel->rd_rel = &fakeentry->pgc;
 	rel->rd_node = rnode;
-	/* We will never be working with temp rels during recovery */
+	/*
+	 * We will never be working with temp rels during recovery or syncing
+	 * WAL-skpped files.
+	 */
 	rel->rd_backend = InvalidBackendId;
 
-	/* It must be a permanent table if we're in recovery. */
+	/* It must be a permanent table here */
 	rel->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
 
 	/* We don't know the name of the relation; use relfilenode instead */
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 625af8d49a..806f235a24 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -30,9 +30,13 @@
 #include "catalog/storage_xlog.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
+#include "utils/hsearch.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
+/* GUC variables */
+int	wal_skip_threshold = 64; /* threshold of WAL-skipping in kilobytes */
+
 /*
  * We keep a list of all relations (represented as RelFileNode values)
  * that have been created or deleted in the current transaction.  When
@@ -53,16 +57,17 @@
  * but I'm being paranoid.
  */
 
-typedef struct PendingRelDelete
+typedef struct PendingRelOp
 {
 	RelFileNode relnode;		/* relation that may need to be deleted */
 	BackendId	backend;		/* InvalidBackendId if not a temp rel */
-	bool		atCommit;		/* T=delete at commit; F=delete at abort */
+	bool		atCommit;		/* T=work at commit; F=work at abort */
 	int			nestLevel;		/* xact nesting level of request */
-	struct PendingRelDelete *next;	/* linked-list link */
-} PendingRelDelete;
+	struct PendingRelOp *next;	/* linked-list link */
+} PendingRelOp;
 
-static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
+static PendingRelOp *pendingDeletes = NULL; /* head of linked list */
+static PendingRelOp *pendingSyncs = NULL; /* head of linked list */
 
 /*
  * RelationCreateStorage
@@ -78,7 +83,7 @@ static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
 SMgrRelation
 RelationCreateStorage(RelFileNode rnode, char relpersistence)
 {
-	PendingRelDelete *pending;
+	PendingRelOp *pending;
 	SMgrRelation srel;
 	BackendId	backend;
 	bool		needs_wal;
@@ -109,8 +114,8 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
 		log_smgrcreate(&srel->smgr_rnode.node, MAIN_FORKNUM);
 
 	/* Add the relation to the list of stuff to delete at abort */
-	pending = (PendingRelDelete *)
-		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+	pending = (PendingRelOp *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelOp));
 	pending->relnode = rnode;
 	pending->backend = backend;
 	pending->atCommit = false;	/* delete if abort */
@@ -118,6 +123,25 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
 	pending->next = pendingDeletes;
 	pendingDeletes = pending;
 
+	/*
+	 * When wal_level = minimal, we are going to skip WAL-logging for storage
+	 * of persistent relations created in the current transaction. The
+	 * relation needs to be synced at commit.
+	 */
+	if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
+	{
+		int nestLevel = GetCurrentTransactionNestLevel();
+
+		pending = (PendingRelOp *)
+			MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelOp));
+		pending->relnode = rnode;
+		pending->backend = backend;
+		pending->atCommit = true;
+		pending->nestLevel = nestLevel;
+		pending->next = pendingSyncs;
+		pendingSyncs = pending;
+	}
+
 	return srel;
 }
 
@@ -147,11 +171,11 @@ log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum)
 void
 RelationDropStorage(Relation rel)
 {
-	PendingRelDelete *pending;
+	PendingRelOp *pending;
 
 	/* Add the relation to the list of stuff to delete at commit */
-	pending = (PendingRelDelete *)
-		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+	pending = (PendingRelOp *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelOp));
 	pending->relnode = rel->rd_node;
 	pending->backend = rel->rd_backend;
 	pending->atCommit = true;	/* delete if commit */
@@ -192,9 +216,9 @@ RelationDropStorage(Relation rel)
 void
 RelationPreserveStorage(RelFileNode rnode, bool atCommit)
 {
-	PendingRelDelete *pending;
-	PendingRelDelete *prev;
-	PendingRelDelete *next;
+	PendingRelOp *pending;
+	PendingRelOp *prev;
+	PendingRelOp *next;
 
 	prev = NULL;
 	for (pending = pendingDeletes; pending != NULL; pending = next)
@@ -431,9 +455,9 @@ void
 smgrDoPendingDeletes(bool isCommit)
 {
 	int			nestLevel = GetCurrentTransactionNestLevel();
-	PendingRelDelete *pending;
-	PendingRelDelete *prev;
-	PendingRelDelete *next;
+	PendingRelOp *pending;
+	PendingRelOp *prev;
+	PendingRelOp *next;
 	int			nrels = 0,
 				i = 0,
 				maxrels = 0;
@@ -494,11 +518,194 @@ smgrDoPendingDeletes(bool isCommit)
 }
 
 /*
- * smgrGetPendingDeletes() -- Get a list of non-temp relations to be deleted.
+ *	smgrDoPendingSyncs() -- Take care of relation syncs at end of xact.
+ *
+ * This should be called before smgrDoPendingDeletes() at every subtransaction
+ * end. Also this should be called before emitting WAL record so that sync
+ * failure prevents commit.
+ *
+ * If sync_all is true, syncs all files including that are scheduled to be
+ * deleted.
+ */
+void
+smgrDoPendingSyncs(bool isCommit, bool sync_all)
+{
+	int			nestLevel = GetCurrentTransactionNestLevel();
+	PendingRelOp *pending;
+	PendingRelOp *prev;
+	PendingRelOp *next;
+	SMgrRelation srel = NULL;
+	ForkNumber fork;
+	BlockNumber nblocks[MAX_FORKNUM + 1];
+	BlockNumber total_blocks = 0;
+	HTAB	*delhash = NULL;
+
+	/* Return if nothing to be synced in this nestlevel */
+	if (!pendingSyncs || pendingSyncs->nestLevel < nestLevel)
+		return;
+
+	Assert (pendingSyncs->nestLevel <= nestLevel);
+	Assert (pendingSyncs->backend == InvalidBackendId);
+
+	/*
+	 * If sync_all is false, pending syncs on the relation that are to be
+	 * deleted in this transaction-end should be ignored. Collect pending
+	 * deletes that will happen in the following call to
+	 * smgrDoPendingDeletes().
+	 */
+	if (!sync_all)
+	{
+		for (pending = pendingDeletes; pending != NULL; pending = pending->next)
+		{
+			bool found PG_USED_FOR_ASSERTS_ONLY;
+
+			if (pending->nestLevel < pendingSyncs->nestLevel ||
+				pending->atCommit != isCommit)
+				continue;
+
+			/* create the hash if not yet */
+			if (delhash == NULL)
+			{
+				HASHCTL hash_ctl;
+
+				memset(&hash_ctl, 0, sizeof(hash_ctl));
+				hash_ctl.keysize = sizeof(RelFileNode);
+				hash_ctl.entrysize = sizeof(RelFileNode);
+				hash_ctl.hcxt = CurrentMemoryContext;
+				delhash =
+					hash_create("pending del temporary hash", 8, &hash_ctl,
+								HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+			}
+
+			(void) hash_search(delhash, (void *) &(pending->relnode),
+							   HASH_ENTER, &found);
+			Assert(!found);
+		}
+	}
+
+	/* Loop over pendingSyncs */
+	prev = NULL;
+	for (pending = pendingSyncs; pending != NULL; pending = next)
+	{
+		bool to_be_removed = (!isCommit); /* don't sync if aborted */
+
+		next = pending->next;
+
+		/* outer-level entries should not be processed yet */
+		if (pending->nestLevel < nestLevel)
+		{
+			prev = pending;
+			continue;
+		}
+
+		/* don't sync relnodes that is being deleted */
+		if (delhash && !to_be_removed)
+			hash_search(delhash, (void *) &pending->relnode,
+						HASH_FIND, &to_be_removed);
+
+		/* remove the entry if no longer useful */
+		if (to_be_removed)
+		{
+			if (prev)
+				prev->next = next;
+			else
+				pendingSyncs = next;
+			pfree(pending);
+			continue;
+		}
+
+		/* actual sync happens at the end of top transaction */
+		if (nestLevel > 1)
+		{
+			prev = pending;
+			continue;
+		}
+
+		/* Now the time to sync the rnode */
+		srel = smgropen(pendingSyncs->relnode, pendingSyncs->backend);
+
+		/*
+		 * We emit newpage WAL records for smaller size of relations.
+		 *
+		 * Small WAL records have a chance to be emitted at once along with
+		 * other backends' WAL records. We emit WAL records instead of syncing
+		 * for files that are smaller than a certain threshold expecting faster
+		 * commit. The threshold is defined by the GUC wal_skip_threshold.
+		 */
+		for (fork = 0 ; fork <= MAX_FORKNUM ; fork++)
+		{
+			/* FSM doesn't need WAL nor sync */
+			if (fork != FSM_FORKNUM && smgrexists(srel, fork))
+			{
+				BlockNumber n = smgrnblocks(srel, fork);
+
+				/* we shouldn't come here for unlogged relations */
+				Assert(fork != INIT_FORKNUM);
+
+				nblocks[fork] = n;
+				total_blocks += n;
+			}
+			else
+				nblocks[fork] = InvalidBlockNumber;
+		}
+
+		/*
+		 * Sync file or emit WAL record for the file according to the total
+		 * size.
+		 */
+		if (total_blocks * BLCKSZ >= wal_skip_threshold * 1024)
+		{
+			/* Flush all buffers then sync the file */
+			FlushRelationBuffersWithoutRelcache(srel->smgr_rnode.node, false);
+
+			for (fork = 0; fork <= MAX_FORKNUM; fork++)
+			{
+				if (smgrexists(srel, fork))
+					smgrimmedsync(srel, fork);
+			}
+		}
+		else
+		{
+			/*
+			 * Emit WAL records for all blocks. Some of the blocks might have
+			 * been synced or evicted, but We don't bother checking that. The
+			 * file is small enough.
+			 */
+			for (fork = 0 ; fork <= MAX_FORKNUM ; fork++)
+			{
+				bool   page_std = (fork == MAIN_FORKNUM);
+				int    n        = nblocks[fork];
+				Relation rel;
+
+				if (!BlockNumberIsValid(n))
+					continue;
+
+				/* Emit WAL for the whole file */
+				rel = CreateFakeRelcacheEntry(srel->smgr_rnode.node);
+				log_newpage_range(rel, fork, 0, n, page_std);
+				FreeFakeRelcacheEntry(rel);
+			}
+		}
+
+		/* done remove from list */
+		if (prev)
+			prev->next = next;
+		else
+			pendingSyncs = next;
+		pfree(pending);
+	}
+
+	if (delhash)
+		hash_destroy(delhash);
+}
+
+/*
+ * smgrGetPendingOperations() -- Get a list of non-temp relations to be
+ *								 deleted or synced.
  *
- * The return value is the number of relations scheduled for termination.
- * *ptr is set to point to a freshly-palloc'd array of RelFileNodes.
- * If there are no relations to be deleted, *ptr is set to NULL.
+ * The return value is the number of relations scheduled in the given
+ * list. *ptr is set to point to a freshly-palloc'd array of RelFileNodes.  If
+ * there are no matching relations, *ptr is set to NULL.
  *
  * Only non-temporary relations are included in the returned list.  This is OK
  * because the list is used only in contexts where temporary relations don't
@@ -507,19 +714,19 @@ smgrDoPendingDeletes(bool isCommit)
  * (and all temporary files will be zapped if we restart anyway, so no need
  * for redo to do it also).
  *
- * Note that the list does not include anything scheduled for termination
- * by upper-level transactions.
+ * Note that the list does not include anything scheduled by upper-level
+ * transactions.
  */
-int
-smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
+static inline int
+smgrGetPendingOperations(PendingRelOp *list, bool forCommit, RelFileNode **ptr)
 {
 	int			nestLevel = GetCurrentTransactionNestLevel();
 	int			nrels;
 	RelFileNode *rptr;
-	PendingRelDelete *pending;
+	PendingRelOp *pending;
 
 	nrels = 0;
-	for (pending = pendingDeletes; pending != NULL; pending = pending->next)
+	for (pending = list; pending != NULL; pending = pending->next)
 	{
 		if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
 			&& pending->backend == InvalidBackendId)
@@ -532,7 +739,7 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
 	}
 	rptr = (RelFileNode *) palloc(nrels * sizeof(RelFileNode));
 	*ptr = rptr;
-	for (pending = pendingDeletes; pending != NULL; pending = pending->next)
+	for (pending = list; pending != NULL; pending = pending->next)
 	{
 		if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
 			&& pending->backend == InvalidBackendId)
@@ -544,6 +751,20 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
 	return nrels;
 }
 
+/* Returns list of pending deletes, see smgrGetPendingOperations for details */
+int
+smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
+{
+	return smgrGetPendingOperations(pendingDeletes, forCommit, ptr);
+}
+
+/* Returns list of pending syncs, see smgrGetPendingOperations for details */
+int
+smgrGetPendingSyncs(bool forCommit, RelFileNode **ptr)
+{
+	return smgrGetPendingOperations(pendingSyncs, forCommit, ptr);
+}
+
 /*
  *	PostPrepare_smgr -- Clean up after a successful PREPARE
  *
@@ -554,8 +775,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
 void
 PostPrepare_smgr(void)
 {
-	PendingRelDelete *pending;
-	PendingRelDelete *next;
+	PendingRelOp *pending;
+	PendingRelOp *next;
 
 	for (pending = pendingDeletes; pending != NULL; pending = next)
 	{
@@ -564,25 +785,34 @@ PostPrepare_smgr(void)
 		/* must explicitly free the list entry */
 		pfree(pending);
 	}
+
+	/* We shouldn't have an entry in pendingSyncs */
+	Assert(pendingSyncs == NULL);
 }
 
 
 /*
  * AtSubCommit_smgr() --- Take care of subtransaction commit.
  *
- * Reassign all items in the pending-deletes list to the parent transaction.
+ * Reassign all items in the pending-operations list to the parent transaction.
  */
 void
 AtSubCommit_smgr(void)
 {
 	int			nestLevel = GetCurrentTransactionNestLevel();
-	PendingRelDelete *pending;
+	PendingRelOp *pending;
 
 	for (pending = pendingDeletes; pending != NULL; pending = pending->next)
 	{
 		if (pending->nestLevel >= nestLevel)
 			pending->nestLevel = nestLevel - 1;
 	}
+
+	for (pending = pendingSyncs; pending != NULL; pending = pending->next)
+	{
+		if (pending->nestLevel >= nestLevel)
+			pending->nestLevel = nestLevel - 1;
+	}
 }
 
 /*
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index a23128d7a0..fba44de88a 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -1034,12 +1034,36 @@ swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
 
 	if (OidIsValid(relfilenode1) && OidIsValid(relfilenode2))
 	{
+		Relation rel1;
+		Relation rel2;
+
 		/*
 		 * Normal non-mapped relations: swap relfilenodes, reltablespaces,
 		 * relpersistence
 		 */
 		Assert(!target_is_pg_class);
 
+		/* Update creation subid hints of relcache */
+		rel1 = relation_open(r1, ExclusiveLock);
+		rel2 = relation_open(r2, ExclusiveLock);
+
+		/*
+		 * New relation's relfilenode is created in the current transaction
+		 * and used as old ralation's new relfilenode. So its
+		 * newRelfilenodeSubid as new relation's createSubid. We don't fix
+		 * rel2 since it would be deleted soon.
+		 */
+		Assert(rel2->rd_createSubid != InvalidSubTransactionId);
+		rel1->rd_newRelfilenodeSubid = rel2->rd_createSubid;
+
+		/* record the first relfilenode change in the current transaction */
+		if (rel1->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
+			rel1->rd_firstRelfilenodeSubid = GetCurrentSubTransactionId();
+
+		relation_close(rel1, ExclusiveLock);
+		relation_close(rel2, ExclusiveLock);
+
+		/* swap relfilenodes, reltablespaces, relpersistence */
 		swaptemp = relform1->relfilenode;
 		relform1->relfilenode = relform2->relfilenode;
 		relform2->relfilenode = swaptemp;
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3aeef30b28..3ce04f7efc 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2534,9 +2534,6 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
 		ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
-							 miinfo->ti_options);
-
 	pfree(buffer);
 }
 
@@ -2725,28 +2722,9 @@ CopyFrom(CopyState cstate)
 	 * If it does commit, we'll have done the table_finish_bulk_insert() at
 	 * the bottom of this routine first.
 	 *
-	 * As mentioned in comments in utils/rel.h, the in-same-transaction test
-	 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
-	 * can be cleared before the end of the transaction. The exact case is
-	 * when a relation sets a new relfilenode twice in same transaction, yet
-	 * the second one fails in an aborted subtransaction, e.g.
-	 *
-	 * BEGIN;
-	 * TRUNCATE t;
-	 * SAVEPOINT save;
-	 * TRUNCATE t;
-	 * ROLLBACK TO save;
-	 * COPY ...
-	 *
-	 * Also, if the target file is new-in-transaction, we assume that checking
-	 * FSM for free space is a waste of time, even if we must use WAL because
-	 * of archiving.  This could possibly be wrong, but it's unlikely.
-	 *
-	 * The comments for table_tuple_insert and RelationGetBufferForTuple
-	 * specify that skipping WAL logging is only safe if we ensure that our
-	 * tuples do not go into pages containing tuples from any other
-	 * transactions --- but this must be the case if we have a new table or
-	 * new relfilenode, so we need no additional work to enforce that.
+	 * If the target file is new-in-transaction, we assume that checking FSM
+	 * for free space is a waste of time, even if we must use WAL because of
+	 * archiving.  This could possibly be wrong, but it's unlikely.
 	 *
 	 * We currently don't support this optimization if the COPY target is a
 	 * partitioned table as we currently only lazily initialize partition
@@ -2762,15 +2740,14 @@ CopyFrom(CopyState cstate)
 	 * are not supported as per the description above.
 	 *----------
 	 */
-	/* createSubid is creation check, newRelfilenodeSubid is truncation check */
+	/*
+	 * createSubid is creation check, firstRelfilenodeSubid is truncation and
+	 * cluster check. Partitioned table doesn't have storage.
+	 */
 	if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
 		(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
-		 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
-	{
+		 cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId))
 		ti_options |= TABLE_INSERT_SKIP_FSM;
-		if (!XLogIsNeeded())
-			ti_options |= TABLE_INSERT_SKIP_WAL;
-	}
 
 	/*
 	 * Optimize if new relfilenode was created in this subxact or one of its
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index b7d220699f..8a91d946e3 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -558,8 +558,7 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 * We can skip WAL-logging the insertions, unless PITR or streaming
 	 * replication is in use. We can skip the FSM in any case.
 	 */
-	myState->ti_options = TABLE_INSERT_SKIP_FSM |
-		(XLogIsNeeded() ? 0 : TABLE_INSERT_SKIP_WAL);
+	myState->ti_options = TABLE_INSERT_SKIP_FSM;
 	myState->bistate = GetBulkInsertState();
 
 	/* Not using WAL requires smgr_targblock be initially invalid */
@@ -604,8 +603,6 @@ intorel_shutdown(DestReceiver *self)
 
 	FreeBulkInsertState(myState->bistate);
 
-	table_finish_bulk_insert(myState->rel, myState->ti_options);
-
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
 	myState->rel = NULL;
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 537d0e8cef..1c854dcebf 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -463,8 +463,6 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 * replication is in use. We can skip the FSM in any case.
 	 */
 	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	if (!XLogIsNeeded())
-		myState->ti_options |= TABLE_INSERT_SKIP_WAL;
 	myState->bistate = GetBulkInsertState();
 
 	/* Not using WAL requires smgr_targblock be initially invalid */
@@ -509,8 +507,6 @@ transientrel_shutdown(DestReceiver *self)
 
 	FreeBulkInsertState(myState->bistate);
 
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
-
 	/* close transientrel, but keep lock until commit */
 	table_close(myState->transientrel, NoLock);
 	myState->transientrel = NULL;
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 8d25d14772..54c8b0fb04 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -4764,9 +4764,9 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 
 	/*
 	 * Prepare a BulkInsertState and options for table_tuple_insert. Because
-	 * we're building a new heap, we can skip WAL-logging and fsync it to disk
-	 * at the end instead (unless WAL-logging is required for archiving or
-	 * streaming replication). The FSM is empty too, so don't bother using it.
+	 * we're building a new heap, the underlying table AM can skip WAL-logging
+	 * and smgr will sync the relation to disk at the end of the current
+	 * transaction instead. The FSM is empty too, so don't bother using it.
 	 */
 	if (newrel)
 	{
@@ -4774,8 +4774,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 		bistate = GetBulkInsertState();
 
 		ti_options = TABLE_INSERT_SKIP_FSM;
-		if (!XLogIsNeeded())
-			ti_options |= TABLE_INSERT_SKIP_WAL;
 	}
 	else
 	{
@@ -5070,8 +5068,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	{
 		FreeBulkInsertState(bistate);
 
-		table_finish_bulk_insert(newrel, ti_options);
-
 		table_close(newrel, NoLock);
 	}
 }
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 483f705305..827626b330 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -171,6 +171,7 @@ static HTAB *PrivateRefCountHash = NULL;
 static int32 PrivateRefCountOverflowed = 0;
 static uint32 PrivateRefCountClock = 0;
 static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
+static void FlushRelationBuffers_common(SMgrRelation smgr, bool islocal);
 
 static void ReservePrivateRefCountEntry(void);
 static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer);
@@ -675,10 +676,10 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
  *		a relcache entry for the relation.
  *
- * NB: At present, this function may only be used on permanent relations, which
- * is OK, because we only use it during XLOG replay.  If in the future we
- * want to use it on temporary or unlogged relations, we could pass additional
- * parameters.
+ * NB: At present, this function may only be used on permanent relations,
+ * which is OK, because we only use it during XLOG replay and processing
+ * pending syncs.  If in the future we want to use it on temporary or unlogged
+ * relations, we could pass additional parameters.
  */
 Buffer
 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
@@ -3203,20 +3204,32 @@ PrintPinnedBufs(void)
 void
 FlushRelationBuffers(Relation rel)
 {
-	int			i;
-	BufferDesc *bufHdr;
-
-	/* Open rel at the smgr level if not already done */
 	RelationOpenSmgr(rel);
 
-	if (RelationUsesLocalBuffers(rel))
+	FlushRelationBuffers_common(rel->rd_smgr, RelationUsesLocalBuffers(rel));
+}
+
+void
+FlushRelationBuffersWithoutRelcache(RelFileNode rnode, bool islocal)
+{
+	FlushRelationBuffers_common(smgropen(rnode, InvalidBackendId), islocal);
+}
+
+static void
+FlushRelationBuffers_common(SMgrRelation smgr, bool islocal)
+{
+	RelFileNode rnode = smgr->smgr_rnode.node;
+	int i;
+	BufferDesc *bufHdr;
+
+	if (islocal)
 	{
 		for (i = 0; i < NLocBuffer; i++)
 		{
 			uint32		buf_state;
 
 			bufHdr = GetLocalBufferDescriptor(i);
-			if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+			if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
 				((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
 				 (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
 			{
@@ -3233,7 +3246,7 @@ FlushRelationBuffers(Relation rel)
 
 				PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
 
-				smgrwrite(rel->rd_smgr,
+				smgrwrite(smgr,
 						  bufHdr->tag.forkNum,
 						  bufHdr->tag.blockNum,
 						  localpage,
@@ -3263,18 +3276,18 @@ FlushRelationBuffers(Relation rel)
 		 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
 		 * and saves some cycles.
 		 */
-		if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode))
 			continue;
 
 		ReservePrivateRefCountEntry();
 
 		buf_state = LockBufHdr(bufHdr);
-		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+		if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
 			(buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
-			FlushBuffer(bufHdr, rel->rd_smgr);
+			FlushBuffer(bufHdr, smgr);
 			LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
 			UnpinBuffer(bufHdr, true);
 		}
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 07f3c93d3f..514c6098e6 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -994,6 +994,36 @@ ForgetDatabaseSyncRequests(Oid dbid)
 	RegisterSyncRequest(&tag, SYNC_FILTER_REQUEST, true /* retryOnError */ );
 }
 
+/*
+ * SyncRelationFiles -- sync files of all given relations
+ *
+ * This function is assumed to be called only when skipping WAL-logging and
+ * emits no xlog records.
+ */
+void
+SyncRelationFiles(RelFileNode *syncrels, int nsyncrels)
+{
+	int			i;
+
+	for (i = 0; i < nsyncrels; i++)
+	{
+		SMgrRelation srel;
+		ForkNumber	fork;
+
+		/* sync all existing forks of the relation */
+		FlushRelationBuffersWithoutRelcache(syncrels[i], false);
+		srel = smgropen(syncrels[i], InvalidBackendId);
+
+		for (fork = 0; fork <= MAX_FORKNUM; fork++)
+		{
+			if (smgrexists(srel, fork))
+				smgrimmedsync(srel, fork);
+		}
+
+		smgrclose(srel);
+	}
+}
+
 /*
  * DropRelationFiles -- drop files of all given relations
  */
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 585dcee5db..892462873f 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1096,6 +1096,7 @@ RelationBuildDesc(Oid targetRelId, bool insertIt)
 	relation->rd_isnailed = false;
 	relation->rd_createSubid = InvalidSubTransactionId;
 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
 	switch (relation->rd_rel->relpersistence)
 	{
 		case RELPERSISTENCE_UNLOGGED:
@@ -1829,6 +1830,7 @@ formrdesc(const char *relationName, Oid relationReltype,
 	relation->rd_isnailed = true;
 	relation->rd_createSubid = InvalidSubTransactionId;
 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
 	relation->rd_backend = InvalidBackendId;
 	relation->rd_islocaltemp = false;
 
@@ -2094,7 +2096,7 @@ RelationClose(Relation relation)
 #ifdef RELCACHE_FORCE_RELEASE
 	if (RelationHasReferenceCountZero(relation) &&
 		relation->rd_createSubid == InvalidSubTransactionId &&
-		relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
+		relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
 		RelationClearRelation(relation, false);
 #endif
 }
@@ -2510,8 +2512,8 @@ RelationClearRelation(Relation relation, bool rebuild)
 		 * problem.
 		 *
 		 * When rebuilding an open relcache entry, we must preserve ref count,
-		 * rd_createSubid/rd_newRelfilenodeSubid, and rd_toastoid state.  Also
-		 * attempt to preserve the pg_class entry (rd_rel), tupledesc,
+		 * rd_createSubid/rd_new/firstRelfilenodeSubid, and rd_toastoid state.
+		 * Also attempt to preserve the pg_class entry (rd_rel), tupledesc,
 		 * rewrite-rule, partition key, and partition descriptor substructures
 		 * in place, because various places assume that these structures won't
 		 * move while they are working with an open relcache entry.  (Note:
@@ -2600,6 +2602,7 @@ RelationClearRelation(Relation relation, bool rebuild)
 		/* creation sub-XIDs must be preserved */
 		SWAPFIELD(SubTransactionId, rd_createSubid);
 		SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
+		SWAPFIELD(SubTransactionId, rd_firstRelfilenodeSubid);
 		/* un-swap rd_rel pointers, swap contents instead */
 		SWAPFIELD(Form_pg_class, rd_rel);
 		/* ... but actually, we don't have to update newrel->rd_rel */
@@ -2667,7 +2670,7 @@ static void
 RelationFlushRelation(Relation relation)
 {
 	if (relation->rd_createSubid != InvalidSubTransactionId ||
-		relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
+		relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
 	{
 		/*
 		 * New relcache entries are always rebuilt, not flushed; else we'd
@@ -2807,7 +2810,7 @@ RelationCacheInvalidate(void)
 		 * pending invalidations.
 		 */
 		if (relation->rd_createSubid != InvalidSubTransactionId ||
-			relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
+			relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
 			continue;
 
 		relcacheInvalsReceived++;
@@ -3064,6 +3067,7 @@ AtEOXact_cleanup(Relation relation, bool isCommit)
 	 * Likewise, reset the hint about the relfilenode being new.
 	 */
 	relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+	relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
 }
 
 /*
@@ -3155,7 +3159,7 @@ AtEOSubXact_cleanup(Relation relation, bool isCommit,
 	}
 
 	/*
-	 * Likewise, update or drop any new-relfilenode-in-subtransaction hint.
+	 * Likewise, update or drop any new-relfilenode-in-subtransaction hints.
 	 */
 	if (relation->rd_newRelfilenodeSubid == mySubid)
 	{
@@ -3164,6 +3168,14 @@ AtEOSubXact_cleanup(Relation relation, bool isCommit,
 		else
 			relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
 	}
+
+	if (relation->rd_firstRelfilenodeSubid == mySubid)
+	{
+		if (isCommit)
+			relation->rd_firstRelfilenodeSubid = parentSubid;
+		else
+			relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
+	}
 }
 
 
@@ -3253,6 +3265,7 @@ RelationBuildLocalRelation(const char *relname,
 	/* it's being created in this transaction */
 	rel->rd_createSubid = GetCurrentSubTransactionId();
 	rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+	rel->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
 
 	/*
 	 * create a new tuple descriptor from the one passed in.  We do this
@@ -3556,6 +3569,8 @@ RelationSetNewRelfilenode(Relation relation, char persistence)
 	 * operations on the rel in the same transaction.
 	 */
 	relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
+	if (relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
+		relation->rd_firstRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
 
 	/* Flag relation as needing eoxact cleanup (to remove the hint) */
 	EOXactListAdd(relation);
@@ -5592,6 +5607,7 @@ load_relcache_init_file(bool shared)
 		rel->rd_fkeylist = NIL;
 		rel->rd_createSubid = InvalidSubTransactionId;
 		rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+		rel->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
 		rel->rd_amcache = NULL;
 		MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 31a5ef0474..559f96a6dc 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -36,6 +36,7 @@
 #include "access/xlog_internal.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
+#include "catalog/storage.h"
 #include "commands/async.h"
 #include "commands/prepare.h"
 #include "commands/user.h"
@@ -2774,6 +2775,18 @@ static struct config_int ConfigureNamesInt[] =
 		check_effective_io_concurrency, assign_effective_io_concurrency, NULL
 	},
 
+	{
+		{"wal_skip_threshold", PGC_USERSET, RESOURCES_DISK,
+			gettext_noop("Size of file that can be fsync'ed in the minimum required duration."),
+			NULL,
+			GUC_UNIT_KB
+		},
+		&wal_skip_threshold,
+		64,
+		0, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"backend_flush_after", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
 			gettext_noop("Number of pages after which previously performed writes are flushed to disk."),
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 858bcb6bc9..80c2e1bafc 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -29,7 +29,6 @@
 
 
 /* "options" flag bits for heap_insert */
-#define HEAP_INSERT_SKIP_WAL	TABLE_INSERT_SKIP_WAL
 #define HEAP_INSERT_SKIP_FSM	TABLE_INSERT_SKIP_FSM
 #define HEAP_INSERT_FROZEN		TABLE_INSERT_FROZEN
 #define HEAP_INSERT_NO_LOGICAL	TABLE_INSERT_NO_LOGICAL
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 8056253916..7f9736e294 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -23,7 +23,7 @@ typedef struct RewriteStateData *RewriteState;
 
 extern RewriteState begin_heap_rewrite(Relation OldHeap, Relation NewHeap,
 									   TransactionId OldestXmin, TransactionId FreezeXid,
-									   MultiXactId MultiXactCutoff, bool use_wal);
+									   MultiXactId MultiXactCutoff);
 extern void end_heap_rewrite(RewriteState state);
 extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple,
 							   HeapTuple newTuple);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 7f81703b78..b652cd6cef 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -407,22 +407,6 @@ typedef struct TableAmRoutine
 							   uint8 flags,
 							   TM_FailureData *tmfd);
 
-	/*
-	 * Perform operations necessary to complete insertions made via
-	 * tuple_insert and multi_insert with a BulkInsertState specified. This
-	 * may for example be used to flush the relation, when the
-	 * TABLE_INSERT_SKIP_WAL option was used.
-	 *
-	 * Typically callers of tuple_insert and multi_insert will just pass all
-	 * the flags that apply to them, and each AM has to decide which of them
-	 * make sense for it, and then only take actions in finish_bulk_insert for
-	 * those flags, and ignore others.
-	 *
-	 * Optional callback.
-	 */
-	void		(*finish_bulk_insert) (Relation rel, int options);
-
-
 	/* ------------------------------------------------------------------------
 	 * DDL related functionality.
 	 * ------------------------------------------------------------------------
@@ -1087,10 +1071,6 @@ table_compute_xid_horizon_for_tuples(Relation rel,
  * The options bitmask allows the caller to specify options that may change the
  * behaviour of the AM. The AM will ignore options that it does not support.
  *
- * If the TABLE_INSERT_SKIP_WAL option is specified, the new tuple doesn't
- * need to be logged to WAL, even for a non-temp relation. It is the AMs
- * choice whether this optimization is supported.
- *
  * If the TABLE_INSERT_SKIP_FSM option is specified, AMs are free to not reuse
  * free space in the relation. This can save some cycles when we know the
  * relation is new and doesn't contain useful amounts of free space.
@@ -1112,8 +1092,7 @@ table_compute_xid_horizon_for_tuples(Relation rel,
  * heap's TOAST table, too, if the tuple requires any out-of-line data.
  *
  * The BulkInsertState object (if any; bistate can be NULL for default
- * behavior) is also just passed through to RelationGetBufferForTuple. If
- * `bistate` is provided, table_finish_bulk_insert() needs to be called.
+ * behavior) is also just passed through to RelationGetBufferForTuple.
  *
  * On return the slot's tts_tid and tts_tableOid are updated to reflect the
  * insertion. But note that any toasting of fields within the slot is NOT
@@ -1248,6 +1227,8 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
  * update was done.  However, any TOAST changes in the new tuple's
  * data are not reflected into *newtup.
  *
+ * See table_insert about skipping WAL-logging feature.
+ *
  * In the failure cases, the routine fills *tmfd with the tuple's t_ctid,
  * t_xmax, and, if possible, t_cmax.  See comments for struct TM_FailureData
  * for additional info.
@@ -1308,21 +1289,6 @@ table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot,
 									   flags, tmfd);
 }
 
-/*
- * Perform operations necessary to complete insertions made via
- * tuple_insert and multi_insert with a BulkInsertState specified. This
- * e.g. may e.g. used to flush the relation when inserting with
- * TABLE_INSERT_SKIP_WAL specified.
- */
-static inline void
-table_finish_bulk_insert(Relation rel, int options)
-{
-	/* optional callback */
-	if (rel->rd_tableam && rel->rd_tableam->finish_bulk_insert)
-		rel->rd_tableam->finish_bulk_insert(rel, options);
-}
-
-
 /* ------------------------------------------------------------------------
  * DDL related functionality.
  * ------------------------------------------------------------------------
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 3579d3f3eb..24e71651c3 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -19,6 +19,16 @@
 #include "storage/smgr.h"
 #include "utils/relcache.h"
 
+/* enum for operation type of PendingDelete entries */
+typedef enum PendingOpType
+{
+	PENDING_DELETE,
+	PENDING_SYNC
+} PendingOpType;
+
+/* GUC variables */
+extern int	wal_skip_threshold; /* threshold for WAL-skipping */
+
 extern SMgrRelation RelationCreateStorage(RelFileNode rnode, char relpersistence);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
@@ -31,7 +41,9 @@ extern void RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
  * naming
  */
 extern void smgrDoPendingDeletes(bool isCommit);
+extern void smgrDoPendingSyncs(bool isCommit, bool sync_all);
 extern int	smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
+extern int	smgrGetPendingSyncs(bool forCommit, RelFileNode **ptr);
 extern void AtSubCommit_smgr(void);
 extern void AtSubAbort_smgr(void);
 extern void PostPrepare_smgr(void);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 17b97f7e38..f31a36de17 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -189,6 +189,7 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation,
 												   ForkNumber forkNum);
 extern void FlushOneBuffer(Buffer buffer);
 extern void FlushRelationBuffers(Relation rel);
+extern void FlushRelationBuffersWithoutRelcache(RelFileNode rnode, bool islocal);
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
 								   int nforks, BlockNumber *firstDelBlock);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index c0f05e23ff..2bb2947bdb 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -42,6 +42,7 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 
 extern void ForgetDatabaseSyncRequests(Oid dbid);
+extern void SyncRelationFiles(RelFileNode *syncrels, int nsyncrels);
 extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo);
 
 /* md sync callbacks */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index a5cf804f9f..b2062efa63 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -75,10 +75,17 @@ typedef struct RelationData
 	 * transaction, with one of them occurring in a subsequently aborted
 	 * subtransaction, e.g. BEGIN; TRUNCATE t; SAVEPOINT save; TRUNCATE t;
 	 * ROLLBACK TO save; -- rd_newRelfilenodeSubid is now forgotten
+	 * rd_firstRelfilenodeSubid is the ID of the first subtransaction the
+	 * relfilenode change has took place in the current transaction. Unlike
+	 * newRelfilenodeSubid, this won't be accidentially forgotten. A valid OID
+	 * means that the currently active relfilenode is transaction-local and we
+	 * sync the relation at commit instead of WAL-logging.
 	 */
 	SubTransactionId rd_createSubid;	/* rel was created in current xact */
 	SubTransactionId rd_newRelfilenodeSubid;	/* new relfilenode assigned in
 												 * current xact */
+	SubTransactionId rd_firstRelfilenodeSubid;	/* new relfilenode assigned
+												 * first in current xact */
 
 	Form_pg_class rd_rel;		/* RELATION tuple */
 	TupleDesc	rd_att;			/* tuple descriptor */
@@ -517,9 +524,15 @@ typedef struct ViewOptions
 /*
  * RelationNeedsWAL
  *		True if relation needs WAL.
- */
-#define RelationNeedsWAL(relation) \
-	((relation)->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT)
+ *
+ * Returns false if wal_level = minimal and this relation is created or
+ * truncated in the current transaction.
+ */
+#define RelationNeedsWAL(relation)										\
+	((relation)->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT &&	\
+	 (XLogIsNeeded() ||													\
+	  (relation->rd_createSubid == InvalidSubTransactionId &&			\
+	   relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)))
 
 /*
  * RelationUsesLocalBuffers
-- 
2.23.0

>From 96ad8bd4537e5055509ec9fdbbef502b52f136b5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota....@gmail.com>
Date: Fri, 25 Oct 2019 12:07:52 +0900
Subject: [PATCH v21 3/5] Fix MarkBufferDirtyHint

---
 src/backend/catalog/storage.c       | 17 +++++++++++++++++
 src/backend/storage/buffer/bufmgr.c |  7 +++++++
 src/include/catalog/storage.h       |  1 +
 3 files changed, 25 insertions(+)

diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 806f235a24..6d5a3d53e7 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -440,6 +440,23 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 		smgrimmedsync(dst, forkNum);
 }
 
+/*
+ * RelFileNodeSkippingWAL - check if this relfilenode needs WAL
+ */
+bool
+RelFileNodeSkippingWAL(RelFileNode rnode)
+{
+	PendingRelOp *pending;
+
+	for (pending = pendingSyncs ; pending != NULL ; pending = pending->next)
+	{
+		if (RelFileNodeEquals(pending->relnode, rnode))
+			return true;
+	}
+
+	return false;
+}
+
 /*
  *	smgrDoPendingDeletes() -- Take care of relation deletes at end of xact.
  *
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 827626b330..06ec7cc186 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3506,6 +3506,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 			if (RecoveryInProgress())
 				return;
 
+			/*
+			 * Skip WAL logging if this buffer belongs to a relation that is
+			 * skipping WAL-logging.
+			 */
+			if (RelFileNodeSkippingWAL(bufHdr->tag.rnode))
+				return;
+
 			/*
 			 * If the block is already dirty because we either made a change
 			 * or set a hint already, then we don't need to write a full page
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 24e71651c3..eb2666e001 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -35,6 +35,7 @@ extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationTruncate(Relation rel, BlockNumber nblocks);
 extern void RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 								ForkNumber forkNum, char relpersistence);
+extern bool RelFileNodeSkippingWAL(RelFileNode rnode);
 
 /*
  * These functions used to be in storage/smgr/smgr.c, which explains the
-- 
2.23.0

>From 6ad62905d8a256c3531c9225bdb3212c45f5faff Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 28 Aug 2019 14:05:30 +0900
Subject: [PATCH v21 4/5] Documentation for wal_skip_threshold

---
 doc/src/sgml/config.sgml | 26 ++++++++++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 886632ff43..f928c5aa0b 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1833,6 +1833,32 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-wal-skip-min_size" xreflabel="wal_skip_threshold">
+      <term><varname>wal_skip_threshold</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>wal_skip_threshold</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+         When wal_level is minimal and a transaction commits after creating or
+         rewriting a permanent table, materialized view, or index, this
+         setting determines how to persist the new data.  If the data is
+         smaller than this setting, write it to the WAL log; otherwise, use an
+         fsync of the data file.  Depending on the properties of your storage,
+         raising or lowering this value might help if such commits are slowing
+         concurrent transactions.  The default is 64 kilobytes (64kB).
+       </para>
+       <para>
+        When <xref linkend="guc-wal-level"/> is <literal>minimal</literal>,
+        WAL-logging is skipped for tables created in-trasaction.  If a table
+        is smaller than that size at commit, it is WAL-logged instead of
+        issueing <function>fsync</function> on it.
+
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
      </sect2>
 
-- 
2.23.0

>From 67baae223a93bc4c9827e1c8d99a040a058ad6ad Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 28 Aug 2019 14:12:18 +0900
Subject: [PATCH v21 5/5] Additional test for new GUC setting.

This patchset adds new GUC variable effective_io_block_size that
controls wheter WAL-skipped tables are finally WAL-logged or
fcync'ed. All of the TAP test performs WAL-logging so this adds an
item that performs file sync.
---
 src/test/recovery/t/018_wal_optimize.pl | 38 ++++++++++++++++++++++++-
 1 file changed, 37 insertions(+), 1 deletion(-)

diff --git a/src/test/recovery/t/018_wal_optimize.pl b/src/test/recovery/t/018_wal_optimize.pl
index b041121745..ba9185e2ba 100644
--- a/src/test/recovery/t/018_wal_optimize.pl
+++ b/src/test/recovery/t/018_wal_optimize.pl
@@ -11,7 +11,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 26;
+use Test::More tests => 32;
 
 sub check_orphan_relfilenodes
 {
@@ -43,6 +43,8 @@ sub run_wal_optimize
 	$node->append_conf('postgresql.conf', qq(
 wal_level = $wal_level
 max_prepared_transactions = 1
+wal_log_hints = on
+wal_skip_threshold = 0
 ));
 	$node->start;
 
@@ -102,7 +104,23 @@ max_prepared_transactions = 1
 	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test2a;");
 	is($result, qq(1),
 	   "wal_level = $wal_level, optimized truncation with prepared transaction");
+	# Same for file sync mode
+	# Tuples inserted after the truncation should be seen.
+	$node->safe_psql('postgres', "
+		SET wal_skip_threshold to 0;
+		BEGIN;
+		CREATE TABLE test2b (id serial PRIMARY KEY);
+		INSERT INTO test2b VALUES (DEFAULT);
+		TRUNCATE test2b;
+		INSERT INTO test2b VALUES (DEFAULT);
+		COMMIT;");
+
+	$node->stop('immediate');
+	$node->start;
 
+	$result = $node->safe_psql('postgres', "SELECT count(*) FROM test2b;");
+	is($result, qq(1),
+	   "wal_level = $wal_level, optimized truncation with file-sync");
 
 	# Data file for COPY query in follow-up tests.
 	my $basedir = $node->basedir;
@@ -178,6 +196,24 @@ max_prepared_transactions = 1
 	is($result, qq(3),
 	   "wal_level = $wal_level, SET TABLESPACE in subtransaction");
 
+	$node->safe_psql('postgres', "
+		BEGIN;
+		CREATE TABLE test3a5 (c int PRIMARY KEY);
+		SAVEPOINT q; INSERT INTO test3a5 VALUES (1); ROLLBACK TO q;
+		CHECKPOINT;
+		INSERT INTO test3a5 VALUES (1);  -- set index hint bit
+		INSERT INTO test3a5 VALUES (2);
+		COMMIT;");
+	$node->stop('immediate');
+	$node->start;
+	$result = $node->psql('postgres', );
+	my($ret, $stdout, $stderr) = $node->psql(
+		'postgres', "INSERT INTO test3a5 VALUES (2);");
+	is($ret, qq(3),
+	   "wal_level = $wal_level, unique index LP_DEAD");
+	like($stderr, qr/violates unique/,
+	   "wal_level = $wal_level, unique index LP_DEAD message");
+
 	# UPDATE touches two buffers; one is BufferNeedsWAL(); the other is not.
 	$node->safe_psql('postgres', "
 		BEGIN;
-- 
2.23.0

Reply via email to