diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index 22c5f7a..e8ebfe9 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -87,7 +87,7 @@ statapprox_heap(Relation rel, output_type *stat)
 		 * If the page has only visible tuples, then we can find out the free
 		 * space from the FSM and move on.
 		 */
-		if (visibilitymap_test(rel, blkno, &vmbuffer))
+		if (VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
 		{
 			freespace = GetRecordedFreeSpace(rel, blkno);
 			stat->tuple_len += BLCKSZ - freespace;
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 392eb70..c43443a 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5916,7 +5916,7 @@ COPY postgres_log FROM '/full/path/to/logfile.csv' WITH csv;
       </term>
       <listitem>
        <para>
-        <command>VACUUM</> performs a whole-table scan if the table's
+        <command>VACUUM</> performs an eager freezing if the table's
         <structname>pg_class</>.<structfield>relfrozenxid</> field has reached
         the age specified by this setting.  The default is 150 million
         transactions.  Although users can set this value anywhere from zero to
@@ -5960,7 +5960,7 @@ COPY postgres_log FROM '/full/path/to/logfile.csv' WITH csv;
       </term>
       <listitem>
        <para>
-        <command>VACUUM</> performs a whole-table scan if the table's
+        <command>VACUUM</> performs an eager freezing if the table's
         <structname>pg_class</>.<structfield>relminmxid</> field has reached
         the age specified by this setting.  The default is 150 million multixacts.
         Although users can set this value anywhere from zero to two billions,
diff --git a/doc/src/sgml/maintenance.sgml b/doc/src/sgml/maintenance.sgml
index 5204b34..7cc975d 100644
--- a/doc/src/sgml/maintenance.sgml
+++ b/doc/src/sgml/maintenance.sgml
@@ -352,9 +352,9 @@
     Vacuum maintains a <link linkend="storage-vm">visibility map</> for each
     table to keep track of which pages contain only tuples that are known to be
     visible to all active transactions (and all future transactions, until the
-    page is again modified).  This has two purposes.  First, vacuum
-    itself can skip such pages on the next run, since there is nothing to
-    clean up.
+    page is again modified), and pages contain only frozen tuples.
+    This has two purposes.  First, vacuum itself can skip such pages
+    on the next run, since there is nothing to clean up.
    </para>
 
    <para>
@@ -438,28 +438,25 @@
    </para>
 
    <para>
-    <command>VACUUM</> normally skips pages that don't have any dead row
-    versions, but those pages might still have row versions with old XID
-    values.  To ensure all old row versions have been frozen, a
-    scan of the whole table is needed.
+    <command>VACUUM</> skips pages that don't have any dead row
+    versions, and pages that have only frozen rows. To ensure all old
+    row versions have been frozen, a scan of all unfrozen pages is needed.
     <xref linkend="guc-vacuum-freeze-table-age"> controls when
-    <command>VACUUM</> does that: a whole table sweep is forced if
-    the table hasn't been fully scanned for <varname>vacuum_freeze_table_age</>
-    minus <varname>vacuum_freeze_min_age</> transactions. Setting it to 0
-    forces <command>VACUUM</> to always scan all pages, effectively ignoring
-    the visibility map.
+    <command>VACUUM</> does that: a whole-table freezing is forced if
+    the table hasn't been ensured all row versions are frozen for
+    <varname>vacuum_freeze_table_age</> minus <varname>vacuum_freeze_min_age</>
+    transcation.
    </para>
 
    <para>
     The maximum time that a table can go unvacuumed is two billion
     transactions minus the <varname>vacuum_freeze_min_age</> value at
-    the time <command>VACUUM</> last scanned the whole table.  If it were to go
-    unvacuumed for longer than
-    that, data loss could result.  To ensure that this does not happen,
-    autovacuum is invoked on any table that might contain unfrozen rows with
-    XIDs older than the age specified by the configuration parameter <xref
-    linkend="guc-autovacuum-freeze-max-age">.  (This will happen even if
-    autovacuum is disabled.)
+    the time <command>VACUUM</> last scanned unfrozen pages. If it were to go
+    unvacuumed for longer than that, data loss could result.  To ensure
+    that this does not happen, autovacuum is invoked on any table that might
+    contain unfrozen rows with XIDs older than the age specified by the
+    configuration parameter <xref linkend="guc-autovacuum-freeze-max-age">.
+    (This will happen even if autovacuum is disabled.)
    </para>
 
    <para>
@@ -490,8 +487,7 @@
     a regularly scheduled <command>VACUUM</> or an autovacuum triggered by
     normal delete and update activity is run in that window.  Setting it too
     close could lead to anti-wraparound autovacuums, even though the table
-    was recently vacuumed to reclaim space, whereas lower values lead to more
-    frequent whole-table scans.
+    was recently vacuumed to reclaim space.
    </para>
 
    <para>
@@ -527,7 +523,7 @@
     <structname>pg_database</>.  In particular,
     the <structfield>relfrozenxid</> column of a table's
     <structname>pg_class</> row contains the freeze cutoff XID that was used
-    by the last whole-table <command>VACUUM</> for that table.  All rows
+    by the last whole-table freezing for that table.  All rows
     inserted by transactions with XIDs older than this cutoff XID are
     guaranteed to have been frozen.  Similarly,
     the <structfield>datfrozenxid</> column of a database's
@@ -555,17 +551,18 @@ SELECT datname, age(datfrozenxid) FROM pg_database;
     <command>VACUUM</> normally
     only scans pages that have been modified since the last vacuum, but
     <structfield>relfrozenxid</> can only be advanced when the whole table is
-    scanned. The whole table is scanned when <structfield>relfrozenxid</> is
-    more than <varname>vacuum_freeze_table_age</> transactions old, when
-    <command>VACUUM</>'s <literal>FREEZE</> option is used, or when all pages
-    happen to
-    require vacuuming to remove dead row versions. When <command>VACUUM</>
-    scans the whole table, after it's finished <literal>age(relfrozenxid)</>
-    should be a little more than the <varname>vacuum_freeze_min_age</> setting
-    that was used (more by the number of transactions started since the
-    <command>VACUUM</> started).  If no whole-table-scanning <command>VACUUM</>
-    is issued on the table until <varname>autovacuum_freeze_max_age</> is
-    reached, an autovacuum will soon be forced for the table.
+    frozen. Freezing occurs on the whole table once all pages of this relation
+    require it. In other cases such as where <structfield>relfrozenxid</> is more
+    than <varname>vacuum_freeze_table_age</> transactions old, where
+    <command>VACUUM</>'s <literal>FREEZE</> option is used, <command>VACUUM</>
+    can skip the pages that all tuples on the page itself are marked as frozen.
+    When all pages of table are eventually marked as frozen by <command>VACUUM</>,
+    after it's finished <literal>age(relfrozenxid)</> should be a little more
+    than the <varname>vacuum_freeze_min_age</> setting that was used (more by
+    the number of transactions started since the <command>VACUUM</> started).
+    If the advancing of <structfield>relfrozenxid</> is not happend until
+    <varname>autovacuum_freeze_max_age</> is reached, an autovacuum will soon
+    be forced for the table.
    </para>
 
    <para>
@@ -642,28 +639,28 @@ HINT:  Stop the postmaster and vacuum that database in single-user mode.
      <structname>pg_class</>.<structfield>relminmxid</> stores the oldest
      possible multixact ID still appearing in any tuple of that table.
      If this value is older than
-     <xref linkend="guc-vacuum-multixact-freeze-table-age">, a whole-table
+     <xref linkend="guc-vacuum-multixact-freeze-table-age">, a table
      scan is forced.  <function>mxid_age()</> can be used on
      <structname>pg_class</>.<structfield>relminmxid</> to find its age.
     </para>
 
     <para>
-     Whole-table <command>VACUUM</> scans, regardless of
-     what causes them, enable advancing the value for that table.
+     When <command>VACUUM</> scans all unfrozen pages, regardless of what causes
+     them, enable advancing the value for that table.
      Eventually, as all tables in all databases are scanned and their
      oldest multixact values are advanced, on-disk storage for older
      multixacts can be removed.
     </para>
 
     <para>
-     As a safety device, a whole-table vacuum scan will occur for any table
-     whose multixact-age is greater than
-     <xref linkend="guc-autovacuum-multixact-freeze-max-age">.  Whole-table
+     As a safety device, a vacuum scan will occur for any table whose
+     multixact-age is greater than
+     <xref linkend="guc-autovacuum-multixact-freeze-max-age">.  A
      vacuum scans will also occur progressively for all tables, starting with
      those that have the oldest multixact-age, if the amount of used member
      storage space exceeds the amount 50% of the addressable storage space.
-     Both of these kinds of whole-table scans will occur even if autovacuum is
-     nominally disabled.
+     Both of these kinds of scans will occur even if autovacuum is nominally
+     disabled.
     </para>
    </sect3>
   </sect2>
@@ -743,8 +740,8 @@ vacuum threshold = vacuum base threshold + vacuum scale factor * number of tuple
     <command>UPDATE</command> and <command>DELETE</command> operation.  (It
     is only semi-accurate because some information might be lost under heavy
     load.)  If the <structfield>relfrozenxid</> value of the table is more
-    than <varname>vacuum_freeze_table_age</> transactions old, the whole
-    table is scanned to freeze old tuples and advance
+    than <varname>vacuum_freeze_table_age</> transactions old, the table is
+    scanned to freeze old tuples and advance
     <structfield>relfrozenxid</>, otherwise only pages that have been modified
     since the last vacuum are scanned.
    </para>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index c503636..d3ecc38 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1332,6 +1332,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
      <entry>Estimated number of rows modified since this table was last analyzed</entry>
     </row>
     <row>
+     <entry><structfield>n_frozen_pages</></entry>
+     <entry><type>integer</></entry>
+     <entry>Number of frozen pages</entry>
+    </row>
+    <row>
      <entry><structfield>last_vacuum</></entry>
      <entry><type>timestamp with time zone</></entry>
      <entry>Last time at which this table was manually vacuumed
diff --git a/doc/src/sgml/storage.sgml b/doc/src/sgml/storage.sgml
index 164d08c..5dc8b04 100644
--- a/doc/src/sgml/storage.sgml
+++ b/doc/src/sgml/storage.sgml
@@ -623,18 +623,20 @@ can be used to examine the information stored in free space maps.
 <para>
 Each heap relation has a Visibility Map
 (VM) to keep track of which pages contain only tuples that are known to be
-visible to all active transactions. It's stored
-alongside the main relation data in a separate relation fork, named after the
-filenode number of the relation, plus a <literal>_vm</> suffix. For example,
-if the filenode of a relation is 12345, the VM is stored in a file called
-<filename>12345_vm</>, in the same directory as the main relation file.
+visible to all active transactions, and pages contain only unfrozen tuples.
+It's stored alongside the main relation data in a separate relation fork,
+named after the filenode number of the relation, plus a <literal>_vm</> suffix.
+For example, if the filenode of a relation is 12345, the VM is stored in a file
+called <filename>12345_vm</>, in the same directory as the main relation file.
 Note that indexes do not have VMs.
 </para>
 
 <para>
-The visibility map simply stores one bit per heap page. A set bit means
-that all tuples on the page are known to be visible to all transactions.
-This means that the page does not contain any tuples that need to be vacuumed.
+The visibility map stores two bits per heap page: all-visible, all-frozen.
+A set all-visible bit means that all tuples on the page are known to be visible
+to all transactions. A set all-frozen bit means that all tuples on the page are
+completely marked as frozen. This means that the page does not contain any tuples
+that need to be vacuumed, even if a scan of whole table is required.
 This information can also be used by <firstterm>index-only scans</> to answer
 queries using only the index tuple.
 </para>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 9ff7a41..651dd0e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -3034,9 +3034,9 @@ heap_delete(Relation relation, ItemPointer tid,
 
 	/*
 	 * If we didn't pin the visibility map page and the page has become all
-	 * visible while we were busy locking the buffer, we'll have to unlock and
-	 * re-lock, to avoid holding the buffer lock across an I/O.  That's a bit
-	 * unfortunate, but hopefully shouldn't happen often.
+	 * visible or all frozen while we were busy locking the buffer, we'll
+	 * have to unlock and re-lock, to avoid holding the buffer lock across an
+	 * I/O.  That's a bit unfortunate, but hopefully shouldn't happen often.
 	 */
 	if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
 	{
@@ -7205,7 +7205,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
  */
 XLogRecPtr
 log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
-				 TransactionId cutoff_xid)
+				 TransactionId cutoff_xid, uint8 vmflags)
 {
 	xl_heap_visible xlrec;
 	XLogRecPtr	recptr;
@@ -7215,6 +7215,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
 	Assert(BufferIsValid(vm_buffer));
 
 	xlrec.cutoff_xid = cutoff_xid;
+	xlrec.flags = vmflags;
 	XLogBeginInsert();
 	XLogRegisterData((char *) &xlrec, SizeOfHeapVisible);
 
@@ -7804,7 +7805,12 @@ heap_xlog_visible(XLogReaderState *record)
 		 * the subsequent update won't be replayed to clear the flag.
 		 */
 		page = BufferGetPage(buffer);
-		PageSetAllVisible(page);
+
+		if (xlrec->flags & VISIBILITYMAP_ALL_VISIBLE)
+			PageSetAllVisible(page);
+		if (xlrec->flags & VISIBILITYMAP_ALL_FROZEN)
+			PageSetAllFrozen(page);
+
 		MarkBufferDirty(buffer);
 	}
 	else if (action == BLK_RESTORED)
@@ -7856,7 +7862,7 @@ heap_xlog_visible(XLogReaderState *record)
 		 */
 		if (lsn > PageGetLSN(vmpage))
 			visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
-							  xlrec->cutoff_xid);
+							  xlrec->cutoff_xid, xlrec->flags);
 
 		ReleaseBuffer(vmbuffer);
 		FreeFakeRelcacheEntry(reln);
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 7c38772..6186caf 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -15,39 +15,45 @@
  *		visibilitymap_pin	 - pin a map page for setting a bit
  *		visibilitymap_pin_ok - check whether correct map page is already pinned
  *		visibilitymap_set	 - set a bit in a previously pinned page
- *		visibilitymap_test	 - test if a bit is set
+ *		visibilitymap_get_status - get status of bits
  *		visibilitymap_count  - count number of bits set in visibility map
  *		visibilitymap_truncate	- truncate the visibility map
  *
  * NOTES
  *
- * The visibility map is a bitmap with one bit per heap page. A set bit means
- * that all tuples on the page are known visible to all transactions, and
- * therefore the page doesn't need to be vacuumed. The map is conservative in
- * the sense that we make sure that whenever a bit is set, we know the
- * condition is true, but if a bit is not set, it might or might not be true.
+ * The visibility map is a bitmap with two bits (all-visible and all-frozen)
+ * per heap page. A set all-visible bit means that all tuples on the page are
+ * known visible to all transactions, and therefore the page doesn't need to
+ * be vacuumed. A set all-frozen bit means that all tuples on the page are
+ * completely frozen, and therefore the page doesn't need to be vacuumed even
+ * if whole table scanning vacuum is required (e.g. anti-wraparound vacuum).
+ * The all-frozen bit must be set only when the page is already all-visible.
  *
- * Clearing a visibility map bit is not separately WAL-logged.  The callers
+ * The map is conservative in the sense that we make sure that whenever a bit
+ * is set, we know the condition is true, but if a bit is not set, it might or
+ * might not be true.
+ *
+ * Clearing both visibility map bits is not separately WAL-logged.  The callers
  * must make sure that whenever a bit is cleared, the bit is cleared on WAL
  * replay of the updating operation as well.
  *
  * When we *set* a visibility map during VACUUM, we must write WAL.  This may
  * seem counterintuitive, since the bit is basically a hint: if it is clear,
- * it may still be the case that every tuple on the page is visible to all
- * transactions; we just don't know that for certain.  The difficulty is that
- * there are two bits which are typically set together: the PD_ALL_VISIBLE bit
- * on the page itself, and the visibility map bit.  If a crash occurs after the
- * visibility map page makes it to disk and before the updated heap page makes
- * it to disk, redo must set the bit on the heap page.  Otherwise, the next
- * insert, update, or delete on the heap page will fail to realize that the
- * visibility map bit must be cleared, possibly causing index-only scans to
- * return wrong answers.
+ * it may still be the case that every tuple on the page is all-visible or
+ * all-frozen we just don't know that for certain.  The difficulty is that
+ * there are two bits which are typically set together: the PD_ALL_VISIBLE
+ * or PD_ALL_FROZEN bit on the page itself, and the corresponding visibility map
+ * bit.  If a crash occurs after the visibility map page makes it to disk and before
+ * the updated heap page makes it to disk, redo must set the bit on the heap page.
+ * Otherwise, the next insert, update, or delete on the heap page will fail to
+ * realize that the visibility map bit must be cleared, possibly causing index-only
+ * scans to return wrong answers.
  *
  * VACUUM will normally skip pages for which the visibility map bit is set;
  * such pages can't contain any dead tuples and therefore don't need vacuuming.
- * The visibility map is not used for anti-wraparound vacuums, because
- * an anti-wraparound vacuum needs to freeze tuples and observe the latest xid
- * present in the table, even on pages that don't have any dead tuples.
+ * The visibility map has the all-frozen bit which indicates all tuples on
+ * corresponding page have been completely frozen, so the visibility map is also
+ * used for anti-wraparound vacuum, even if freezing of tuples is required.
  *
  * LOCKING
  *
@@ -58,14 +64,14 @@
  * section that logs the page modification. However, we don't want to hold
  * the buffer lock over any I/O that may be required to read in the visibility
  * map page.  To avoid this, we examine the heap page before locking it;
- * if the page-level PD_ALL_VISIBLE bit is set, we pin the visibility map
- * bit.  Then, we lock the buffer.  But this creates a race condition: there
- * is a possibility that in the time it takes to lock the buffer, the
- * PD_ALL_VISIBLE bit gets set.  If that happens, we have to unlock the
- * buffer, pin the visibility map page, and relock the buffer.  This shouldn't
- * happen often, because only VACUUM currently sets visibility map bits,
- * and the race will only occur if VACUUM processes a given page at almost
- * exactly the same time that someone tries to further modify it.
+ * if the page-level PD_ALL_VISIBLE or PD_ALL_FROZEN bit is set, we pin the
+ * visibility map bit.  Then, we lock the buffer.  But this creates a race
+ * condition: there is a possibility that in the time it takes to lock the
+ * buffer, the PD_ALL_VISIBLE or PD_ALL_FROZEN bit gets set.  If that happens,
+ * we have to unlock the buffer, pin the visibility map page, and relock the
+ * buffer.  This shouldn't happen often, because only VACUUM currently sets
+ * visibility map bits, and the race will only occur if VACUUM processes a given
+ * page at almost exactly the same time that someone tries to further modify it.
  *
  * To set a bit, you need to hold a lock on the heap page. That prevents
  * the race condition where VACUUM sees that all tuples on the page are
@@ -101,38 +107,50 @@
  */
 #define MAPSIZE (BLCKSZ - MAXALIGN(SizeOfPageHeaderData))
 
-/* Number of bits allocated for each heap block. */
-#define BITS_PER_HEAPBLOCK 1
-
-/* Number of heap blocks we can represent in one byte. */
-#define HEAPBLOCKS_PER_BYTE 8
-
 /* Number of heap blocks we can represent in one visibility map page. */
 #define HEAPBLOCKS_PER_PAGE (MAPSIZE * HEAPBLOCKS_PER_BYTE)
 
 /* Mapping from heap block number to the right bit in the visibility map */
 #define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)
 #define HEAPBLK_TO_MAPBYTE(x) (((x) % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_BYTE)
-#define HEAPBLK_TO_MAPBIT(x) ((x) % HEAPBLOCKS_PER_BYTE)
-
-/* table for fast counting of set bits */
-static const uint8 number_of_ones[256] = {
-	0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4,
-	1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
-	1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
-	2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
-	1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
-	2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
-	2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
-	3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
-	1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
-	2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
-	2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
-	3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
-	2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
-	3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
-	3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
-	4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8
+#define HEAPBLK_TO_MAPBIT(x) (((x) % HEAPBLOCKS_PER_BYTE) * BITS_PER_HEAPBLOCK)
+
+/* tables for fast counting of set bits for visible and freeze */
+static const uint8 number_of_ones_for_visible[256] = {
+	0, 1, 0, 1, 1, 2, 1, 2, 0, 1, 0, 1, 1, 2, 1, 2,
+	1, 2, 1, 2, 2, 3, 2, 3, 1, 2, 1, 2, 2, 3, 2, 3,
+	0, 1, 0, 1, 1, 2, 1, 2, 0, 1, 0, 1, 1, 2, 1, 2,
+	1, 2, 1, 2, 2, 3, 2, 3, 1, 2, 1, 2, 2, 3, 2, 3,
+	1, 2, 1, 2, 2, 3, 2, 3, 1, 2, 1, 2, 2, 3, 2, 3,
+	2, 3, 2, 3, 3, 4, 3, 4, 2, 3, 2, 3, 3, 4, 3, 4,
+	1, 2, 1, 2, 2, 3, 2, 3, 1, 2, 1, 2, 2, 3, 2, 3,
+	2, 3, 2, 3, 3, 4, 3, 4, 2, 3, 2, 3, 3, 4, 3, 4,
+	0, 1, 0, 1, 1, 2, 1, 2, 0, 1, 0, 1, 1, 2, 1, 2,
+	1, 2, 1, 2, 2, 3, 2, 3, 1, 2, 1, 2, 2, 3, 2, 3,
+	0, 1, 0, 1, 1, 2, 1, 2, 0, 1, 0, 1, 1, 2, 1, 2,
+	1, 2, 1, 2, 2, 3, 2, 3, 1, 2, 1, 2, 2, 3, 2, 3,
+	1, 2, 1, 2, 2, 3, 2, 3, 1, 2, 1, 2, 2, 3, 2, 3,
+	2, 3, 2, 3, 3, 4, 3, 4, 2, 3, 2, 3, 3, 4, 3, 4,
+	1, 2, 1, 2, 2, 3, 2, 3, 1, 2, 1, 2, 2, 3, 2, 3,
+	2, 3, 2, 3, 3, 4, 3, 4, 2, 3, 2, 3, 3, 4, 3, 4
+};
+static const uint8 number_of_ones_for_frozen[256] = {
+	0, 0, 1, 1, 0, 0, 1, 1, 1, 1, 2, 2, 1, 1, 2, 2,
+	0, 0, 1, 1, 0, 0, 1, 1, 1, 1, 2, 2, 1, 1, 2, 2,
+	1, 1, 2, 2, 1, 1, 2, 2, 2, 2, 3, 3, 2, 2, 3, 3,
+	1, 1, 2, 2, 1, 1, 2, 2, 2, 2, 3, 3, 2, 2, 3, 3,
+	0, 0, 1, 1, 0, 0, 1, 1, 1, 1, 2, 2, 1, 1, 2, 2,
+	0, 0, 1, 1, 0, 0, 1, 1, 1, 1, 2, 2, 1, 1, 2, 2,
+	1, 1, 2, 2, 1, 1, 2, 2, 2, 2, 3, 3, 2, 2, 3, 3,
+	1, 1, 2, 2, 1, 1, 2, 2, 2, 2, 3, 3, 2, 2, 3, 3,
+	1, 1, 2, 2, 1, 1, 2, 2, 2, 2, 3, 3, 2, 2, 3, 3,
+	1, 1, 2, 2, 1, 1, 2, 2, 2, 2, 3, 3, 2, 2, 3, 3,
+	2, 2, 3, 3, 2, 2, 3, 3, 3, 3, 4, 4, 3, 3, 4, 4,
+	2, 2, 3, 3, 2, 2, 3, 3, 3, 3, 4, 4, 3, 3, 4, 4,
+	1, 1, 2, 2, 1, 1, 2, 2, 2, 2, 3, 3, 2, 2, 3, 3,
+	1, 1, 2, 2, 1, 1, 2, 2, 2, 2, 3, 3, 2, 2, 3, 3,
+	2, 2, 3, 3, 2, 2, 3, 3, 3, 3, 4, 4, 3, 3, 4, 4,
+	2, 2, 3, 3, 2, 2, 3, 3, 3, 3, 4, 4, 3, 3, 4, 4
 };
 
 /* prototypes for internal routines */
@@ -141,7 +159,7 @@ static void vm_extend(Relation rel, BlockNumber nvmblocks);
 
 
 /*
- *	visibilitymap_clear - clear a bit in visibility map
+ *	visibilitymap_clear - clear all bits in visibility map
  *
  * You must pass a buffer containing the correct map page to this function.
  * Call visibilitymap_pin first to pin the right one. This function doesn't do
@@ -153,7 +171,7 @@ visibilitymap_clear(Relation rel, BlockNumber heapBlk, Buffer buf)
 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
 	int			mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
 	int			mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
-	uint8		mask = 1 << mapBit;
+	uint8		mask = VISIBILITYMAP_VALID_BITS << mapBit;
 	char	   *map;
 
 #ifdef TRACE_VISIBILITYMAP
@@ -186,7 +204,7 @@ visibilitymap_clear(Relation rel, BlockNumber heapBlk, Buffer buf)
  * visibilitymap_set to actually set the bit.
  *
  * On entry, *buf should be InvalidBuffer or a valid buffer returned by
- * an earlier call to visibilitymap_pin or visibilitymap_test on the same
+ * an earlier call to visibilitymap_pin or visibilitymap_get_status on the same
  * relation. On return, *buf is a valid buffer with the map page containing
  * the bit for heapBlk.
  *
@@ -212,7 +230,7 @@ visibilitymap_pin(Relation rel, BlockNumber heapBlk, Buffer *buf)
  *	visibilitymap_pin_ok - do we already have the correct page pinned?
  *
  * On entry, buf should be InvalidBuffer or a valid buffer returned by
- * an earlier call to visibilitymap_pin or visibilitymap_test on the same
+ * an earlier call to visibilitymap_pin or visibilitymap_get_status on the same
  * relation.  The return value indicates whether the buffer covers the
  * given heapBlk.
  */
@@ -225,7 +243,7 @@ visibilitymap_pin_ok(BlockNumber heapBlk, Buffer buf)
 }
 
 /*
- *	visibilitymap_set - set a bit on a previously pinned page
+ *	visibilitymap_set - set bit(s) on a previously pinned page
  *
  * recptr is the LSN of the XLOG record we're replaying, if we're in recovery,
  * or InvalidXLogRecPtr in normal running.  The page LSN is advanced to the
@@ -234,10 +252,11 @@ visibilitymap_pin_ok(BlockNumber heapBlk, Buffer buf)
  * marked all-visible; it is needed for Hot Standby, and can be
  * InvalidTransactionId if the page contains no tuples.
  *
- * Caller is expected to set the heap page's PD_ALL_VISIBLE bit before calling
- * this function. Except in recovery, caller should also pass the heap
- * buffer. When checksums are enabled and we're not in recovery, we must add
- * the heap buffer to the WAL chain to protect it from being torn.
+ * Caller is expected to set the heap page's PD_ALL_VISIBLE or PD_ALL_FROZEN
+ * bit before calling this function. Except in recovery, caller should also
+ * pass the heap buffer and flags which indicates what flag we want to set.
+ * When checksums are enabled and we're not in recovery, we must add the heap
+ * buffer to the WAL chain to protect it from being torn.
  *
  * You must pass a buffer containing the correct map page to this function.
  * Call visibilitymap_pin first to pin the right one. This function doesn't do
@@ -245,13 +264,14 @@ visibilitymap_pin_ok(BlockNumber heapBlk, Buffer buf)
  */
 void
 visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
-				  XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid)
+				  XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid,
+				  uint8 flags)
 {
 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
 	uint32		mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
 	uint8		mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
 	Page		page;
-	char	   *map;
+	uint8		*map;
 
 #ifdef TRACE_VISIBILITYMAP
 	elog(DEBUG1, "vm_set %s %d", RelationGetRelationName(rel), heapBlk);
@@ -259,6 +279,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 
 	Assert(InRecovery || XLogRecPtrIsInvalid(recptr));
 	Assert(InRecovery || BufferIsValid(heapBuf));
+	Assert(flags & VISIBILITYMAP_VALID_BITS);
 
 	/* Check that we have the right heap page pinned, if present */
 	if (BufferIsValid(heapBuf) && BufferGetBlockNumber(heapBuf) != heapBlk)
@@ -269,14 +290,14 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 		elog(ERROR, "wrong VM buffer passed to visibilitymap_set");
 
 	page = BufferGetPage(vmBuf);
-	map = PageGetContents(page);
+	map = (uint8 *)PageGetContents(page);
 	LockBuffer(vmBuf, BUFFER_LOCK_EXCLUSIVE);
 
-	if (!(map[mapByte] & (1 << mapBit)))
+	if (flags != (map[mapByte] >> mapBit & VISIBILITYMAP_VALID_BITS))
 	{
 		START_CRIT_SECTION();
 
-		map[mapByte] |= (1 << mapBit);
+		map[mapByte] |= (flags << mapBit);
 		MarkBufferDirty(vmBuf);
 
 		if (RelationNeedsWAL(rel))
@@ -285,7 +306,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 			{
 				Assert(!InRecovery);
 				recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
-										  cutoff_xid);
+										  cutoff_xid, flags);
 
 				/*
 				 * If data checksums are enabled (or wal_log_hints=on), we
@@ -295,11 +316,16 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 				{
 					Page		heapPage = BufferGetPage(heapBuf);
 
-					/* caller is expected to set PD_ALL_VISIBLE first */
-					Assert(PageIsAllVisible(heapPage));
+					/*
+					 * Caller is expected to set PD_ALL_VISIBLE or
+					 * PD_ALL_FROZEN first.
+					 */
+					Assert(((flags | VISIBILITYMAP_ALL_VISIBLE) && PageIsAllVisible(heapPage)) ||
+						   ((flags | VISIBILITYMAP_ALL_FROZEN) && PageIsAllFrozen(heapPage)));
 					PageSetLSN(heapPage, recptr);
 				}
 			}
+
 			PageSetLSN(page, recptr);
 		}
 
@@ -310,15 +336,17 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 }
 
 /*
- *	visibilitymap_test - test if a bit is set
+ *	visibilitymap_get_status - get status of bits
  *
- * Are all tuples on heapBlk visible to all, according to the visibility map?
+ * Are all tuples on heapBlk visible to all or are marked frozen, according
+ * to the visibility map?
  *
  * On entry, *buf should be InvalidBuffer or a valid buffer returned by an
- * earlier call to visibilitymap_pin or visibilitymap_test on the same
+ * earlier call to visibilitymap_pin or visibilitymap_get_status on the same
  * relation. On return, *buf is a valid buffer with the map page containing
  * the bit for heapBlk, or InvalidBuffer. The caller is responsible for
- * releasing *buf after it's done testing and setting bits.
+ * releasing *buf after it's done testing and setting bits, and must pass flags
+ * for which it needs to check the value in visibility map.
  *
  * NOTE: This function is typically called without a lock on the heap page,
  * so somebody else could change the bit just after we look at it.  In fact,
@@ -327,17 +355,16 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
  * we might see the old value.  It is the caller's responsibility to deal with
  * all concurrency issues!
  */
-bool
-visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *buf)
+uint8
+visibilitymap_get_status(Relation rel, BlockNumber heapBlk, Buffer *buf)
 {
 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
 	uint32		mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
 	uint8		mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
-	bool		result;
 	char	   *map;
 
 #ifdef TRACE_VISIBILITYMAP
-	elog(DEBUG1, "vm_test %s %d", RelationGetRelationName(rel), heapBlk);
+	elog(DEBUG1, "vm_get_status %s %d", RelationGetRelationName(rel), heapBlk);
 #endif
 
 	/* Reuse the old pinned buffer if possible */
@@ -360,13 +387,11 @@ visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *buf)
 	map = PageGetContents(BufferGetPage(*buf));
 
 	/*
-	 * A single-bit read is atomic.  There could be memory-ordering effects
+	 * The double bits read is atomic.  There could be memory-ordering effects
 	 * here, but for performance reasons we make it the caller's job to worry
 	 * about that.
 	 */
-	result = (map[mapByte] & (1 << mapBit)) ? true : false;
-
-	return result;
+	return ((map[mapByte] >> mapBit) & VISIBILITYMAP_VALID_BITS);
 }
 
 /*
@@ -374,14 +399,21 @@ visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *buf)
  *
  * Note: we ignore the possibility of race conditions when the table is being
  * extended concurrently with the call.  New pages added to the table aren't
- * going to be marked all-visible, so they won't affect the result.
+ * going to be marked all-visible or all-frozen, so they won't affect the result.
+ * The caller must set the flags which indicates what flag we want to count.
  */
-BlockNumber
-visibilitymap_count(Relation rel)
+void
+visibilitymap_count(Relation rel, BlockNumber *all_visible, BlockNumber *all_frozen)
 {
-	BlockNumber result = 0;
 	BlockNumber mapBlock;
 
+	/* all_visible must be specified */
+	Assert(all_visible);
+
+	*all_visible = 0;
+	if (all_frozen)
+		*all_frozen = 0;
+
 	for (mapBlock = 0;; mapBlock++)
 	{
 		Buffer		mapBuffer;
@@ -406,13 +438,13 @@ visibilitymap_count(Relation rel)
 
 		for (i = 0; i < MAPSIZE; i++)
 		{
-			result += number_of_ones[map[i]];
+			*all_visible += number_of_ones_for_visible[map[i]];
+			if (all_frozen)
+				*all_frozen += number_of_ones_for_frozen[map[i]];
 		}
 
 		ReleaseBuffer(mapBuffer);
 	}
-
-	return result;
 }
 
 /*
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index c10be3d..071992d 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -1923,7 +1923,7 @@ index_update_stats(Relation rel,
 		BlockNumber relallvisible;
 
 		if (rd_rel->relkind != RELKIND_INDEX)
-			relallvisible = visibilitymap_count(rel);
+			visibilitymap_count(rel, &relallvisible, NULL);
 		else	/* don't bother for indexes */
 			relallvisible = 0;
 
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ccc030f..727d2a4 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -444,6 +444,7 @@ CREATE VIEW pg_stat_all_tables AS
             pg_stat_get_live_tuples(C.oid) AS n_live_tup,
             pg_stat_get_dead_tuples(C.oid) AS n_dead_tup,
             pg_stat_get_mod_since_analyze(C.oid) AS n_mod_since_analyze,
+            pg_stat_get_frozen_pages(C.oid) AS n_frozen_pages,
             pg_stat_get_last_vacuum_time(C.oid) as last_vacuum,
             pg_stat_get_last_autovacuum_time(C.oid) as last_autovacuum,
             pg_stat_get_last_analyze_time(C.oid) as last_analyze,
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index ddb68ab..2b2fd0d 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -314,6 +314,8 @@ do_analyze_rel(Relation onerel, int options, VacuumParams *params,
 	Oid			save_userid;
 	int			save_sec_context;
 	int			save_nestlevel;
+	BlockNumber	relallvisible,
+				relallfrozen;
 
 	if (inh)
 		ereport(elevel,
@@ -564,51 +566,56 @@ do_analyze_rel(Relation onerel, int options, VacuumParams *params,
 		}
 	}
 
-	/*
-	 * Update pages/tuples stats in pg_class ... but not if we're doing
-	 * inherited stats.
-	 */
 	if (!inh)
+	{
+		/* Calculate the number of all-visible and all-frozen bit */
+		visibilitymap_count(onerel, &relallvisible, &relallfrozen);
+
+		/*
+		 * Update pages/tuples stats in pg_class ... but not if we're doing
+		 * inherited stats.
+		 */
 		vac_update_relstats(onerel,
 							relpages,
 							totalrows,
-							visibilitymap_count(onerel),
+							relallvisible,
 							hasindex,
 							InvalidTransactionId,
 							InvalidMultiXactId,
 							in_outer_xact);
 
-	/*
-	 * Same for indexes. Vacuum always scans all indexes, so if we're part of
-	 * VACUUM ANALYZE, don't overwrite the accurate count already inserted by
-	 * VACUUM.
-	 */
-	if (!inh && !(options & VACOPT_VACUUM))
-	{
-		for (ind = 0; ind < nindexes; ind++)
+		/*
+		 * Same for indexes. Vacuum always scans all indexes, so if we're part of
+		 * VACUUM ANALYZE, don't overwrite the accurate count already inserted by
+		 * VACUUM.
+		 */
+		if (!(options & VACOPT_VACUUM))
 		{
-			AnlIndexData *thisdata = &indexdata[ind];
-			double		totalindexrows;
-
-			totalindexrows = ceil(thisdata->tupleFract * totalrows);
-			vac_update_relstats(Irel[ind],
-								RelationGetNumberOfBlocks(Irel[ind]),
-								totalindexrows,
-								0,
-								false,
-								InvalidTransactionId,
-								InvalidMultiXactId,
-								in_outer_xact);
+			for (ind = 0; ind < nindexes; ind++)
+			{
+				AnlIndexData *thisdata = &indexdata[ind];
+				double		totalindexrows;
+
+				totalindexrows = ceil(thisdata->tupleFract * totalrows);
+				vac_update_relstats(Irel[ind],
+									RelationGetNumberOfBlocks(Irel[ind]),
+									totalindexrows,
+									0,
+									false,
+									InvalidTransactionId,
+									InvalidMultiXactId,
+									in_outer_xact);
+			}
 		}
-	}
 
-	/*
-	 * Report ANALYZE to the stats collector, too.  However, if doing
-	 * inherited stats we shouldn't report, because the stats collector only
-	 * tracks per-table stats.
-	 */
-	if (!inh)
-		pgstat_report_analyze(onerel, totalrows, totaldeadrows);
+		/*
+		 * Report ANALYZE to the stats collector, too.  However, if doing
+		 * inherited stats we shouldn't report, because the stats collector only
+		 * tracks per-table stats.
+		 */
+		pgstat_report_analyze(onerel, totalrows, totaldeadrows, relallfrozen);
+
+	}
 
 	/* If this isn't part of VACUUM ANALYZE, let index AMs do cleanup */
 	if (!(options & VACOPT_VACUUM))
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 2429889..8dafd1b 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -106,6 +106,8 @@ typedef struct LVRelStats
 	BlockNumber rel_pages;		/* total number of pages */
 	BlockNumber scanned_pages;	/* number of pages we examined */
 	BlockNumber pinskipped_pages;		/* # of pages we skipped due to a pin */
+	BlockNumber vmskipped_frozen_pages; /* # of pages we skipped by all-frozen bit
+									of visibility map */
 	double		scanned_tuples; /* counts only tuples on scanned pages */
 	double		old_rel_tuples; /* previous value of pg_class.reltuples */
 	double		new_rel_tuples; /* new estimated total # of tuples */
@@ -155,8 +157,9 @@ static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
 					   ItemPointer itemptr);
 static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
 static int	vac_cmp_itemptr(const void *left, const void *right);
-static bool heap_page_is_all_visible(Relation rel, Buffer buf,
-						 TransactionId *visibility_cutoff_xid);
+static void heap_page_visible_status(Relation rel, Buffer buf,
+						 TransactionId *visibility_cutoff_xid,
+						 bool *all_visible, bool *all_frozen);
 
 
 /*
@@ -188,7 +191,8 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 	MultiXactId mxactFullScanLimit;
 	BlockNumber new_rel_pages;
 	double		new_rel_tuples;
-	BlockNumber new_rel_allvisible;
+	BlockNumber new_rel_allvisible,
+				new_rel_allfrozen;
 	double		new_live_tuples;
 	TransactionId new_frozen_xid;
 	MultiXactId new_min_multi;
@@ -221,7 +225,8 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 	 * We request a full scan if either the table's frozen Xid is now older
 	 * than or equal to the requested Xid full-table scan limit; or if the
 	 * table's minimum MultiXactId is older than or equal to the requested
-	 * mxid full-table scan limit.
+	 * mxid full-table scan limit. During full scan, we could skip some pages
+	 * according to all-frozen bit of visibility map.
 	 */
 	scan_all = TransactionIdPrecedesOrEquals(onerel->rd_rel->relfrozenxid,
 											 xidFullScanLimit);
@@ -253,7 +258,8 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 	 * NB: We need to check this before truncating the relation, because that
 	 * will change ->rel_pages.
 	 */
-	if (vacrelstats->scanned_pages < vacrelstats->rel_pages)
+	if ((vacrelstats->scanned_pages + vacrelstats->vmskipped_frozen_pages)
+		< vacrelstats->rel_pages)
 	{
 		Assert(!scan_all);
 		scanned_all = false;
@@ -301,10 +307,13 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 		new_rel_tuples = vacrelstats->old_rel_tuples;
 	}
 
-	new_rel_allvisible = visibilitymap_count(onerel);
+	visibilitymap_count(onerel, &new_rel_allvisible, &new_rel_allfrozen);
 	if (new_rel_allvisible > new_rel_pages)
 		new_rel_allvisible = new_rel_pages;
 
+	if (new_rel_allfrozen > new_rel_pages)
+		new_rel_allfrozen = new_rel_pages;
+
 	new_frozen_xid = scanned_all ? FreezeLimit : InvalidTransactionId;
 	new_min_multi = scanned_all ? MultiXactCutoff : InvalidMultiXactId;
 
@@ -325,7 +334,8 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 	pgstat_report_vacuum(RelationGetRelid(onerel),
 						 onerel->rd_rel->relisshared,
 						 new_live_tuples,
-						 vacrelstats->new_dead_tuples);
+						 vacrelstats->new_dead_tuples,
+						 new_rel_allfrozen);
 
 	/* and log the action if appropriate */
 	if (IsAutoVacuumWorkerProcess() && params->log_min_duration >= 0)
@@ -360,10 +370,11 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params,
 							 get_namespace_name(RelationGetNamespace(onerel)),
 							 RelationGetRelationName(onerel),
 							 vacrelstats->num_index_scans);
-			appendStringInfo(&buf, _("pages: %u removed, %u remain, %u skipped due to pins\n"),
+			appendStringInfo(&buf, _("pages: %u removed, %u remain, %u skipped due to pins, %u skipped according to vm\n"),
 							 vacrelstats->pages_removed,
 							 vacrelstats->rel_pages,
-							 vacrelstats->pinskipped_pages);
+							 vacrelstats->pinskipped_pages,
+							 vacrelstats->vmskipped_frozen_pages);
 			appendStringInfo(&buf,
 							 _("tuples: %.0f removed, %.0f remain, %.0f are dead but not yet removable\n"),
 							 vacrelstats->tuples_deleted,
@@ -486,9 +497,12 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 	 * consecutive pages.  Since we're reading sequentially, the OS should be
 	 * doing readahead for us, so there's no gain in skipping a page now and
 	 * then; that's likely to disable readahead and so be counterproductive.
-	 * Also, skipping even a single page means that we can't update
-	 * relfrozenxid, so we only want to do it if we can skip a goodly number
-	 * of pages.
+	 * Also, skipping even a single page according to all-visible bit of
+	 * visibility map means that we can't update relfrozenxid, so we only want
+	 * to do it if we can skip a goodly number. On the other hand, we count
+	 * both how many pages we skipped according to all-frozen bit of visibility
+	 * map and how many pages we freeze, so we can update relfrozenxid if
+	 * the sum of two is as many as pages of table.
 	 *
 	 * Before entering the main loop, establish the invariant that
 	 * next_not_all_visible_block is the next block number >= blkno that's not
@@ -498,24 +512,24 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 	 * started skipping blocks, we may as well skip everything up to the next
 	 * not-all-visible block.
 	 *
-	 * Note: if scan_all is true, we won't actually skip any pages; but we
+	 * Note: if scan_all is true, we might not actually skip any pages; but we
 	 * maintain next_not_all_visible_block anyway, so as to set up the
 	 * all_visible_according_to_vm flag correctly for each page.
 	 *
-	 * Note: The value returned by visibilitymap_test could be slightly
+	 * Note: The value returned by visibilitymap_get_status could be slightly
 	 * out-of-date, since we make this test before reading the corresponding
 	 * heap page or locking the buffer.  This is OK.  If we mistakenly think
-	 * that the page is all-visible when in fact the flag's just been cleared,
-	 * we might fail to vacuum the page.  But it's OK to skip pages when
-	 * scan_all is not set, so no great harm done; the next vacuum will find
-	 * them.  If we make the reverse mistake and vacuum a page unnecessarily,
-	 * it'll just be a no-op.
+	 * that the page is all-visible/all-frozen when in fact the flag's just
+	 * been cleared, we might fail to vacuum the page.  But it's OK to skip
+	 * pages when scan_all is not set, so no great harm done; the next vacuum
+	 * will find them.  If we make the reverse mistake and vacuum a page
+	 * unnecessarily, it'll just be a no-op.
 	 */
 	for (next_not_all_visible_block = 0;
 		 next_not_all_visible_block < nblocks;
 		 next_not_all_visible_block++)
 	{
-		if (!visibilitymap_test(onerel, next_not_all_visible_block, &vmbuffer))
+		if (!VM_ALL_VISIBLE(onerel, next_not_all_visible_block, &vmbuffer))
 			break;
 		vacuum_delay_point();
 	}
@@ -533,9 +547,13 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		bool		tupgone,
 					hastup;
 		int			prev_dead_count;
-		int			nfrozen;
+		int			nfrozen; /* # of tuples is frozen */
+		int			nalready_frozen; /* # of tuples is already frozen */
+		int			ntotal_frozen; /* # of frozen tuples in single page */
+		int			ntup_per_page;
 		Size		freespace;
 		bool		all_visible_according_to_vm;
+		bool		all_frozen_according_to_vm;
 		bool		all_visible;
 		bool		has_dead_tuples;
 		TransactionId visibility_cutoff_xid = InvalidTransactionId;
@@ -547,8 +565,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 				 next_not_all_visible_block < nblocks;
 				 next_not_all_visible_block++)
 			{
-				if (!visibilitymap_test(onerel, next_not_all_visible_block,
-										&vmbuffer))
+				if (!VM_ALL_VISIBLE(onerel, next_not_all_visible_block, &vmbuffer))
 					break;
 				vacuum_delay_point();
 			}
@@ -562,14 +579,29 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 				skipping_all_visible_blocks = true;
 			else
 				skipping_all_visible_blocks = false;
+
 			all_visible_according_to_vm = false;
+			all_frozen_according_to_vm = false;
 		}
 		else
 		{
-			/* Current block is all-visible */
-			if (skipping_all_visible_blocks && !scan_all)
-				continue;
+			/*
+			 * This block is at least all-visible according to the visibility map.
+			 * We check whether this block is all-frozen or not, to skip to
+			 * vacuum this page even if scan_all is true.
+			 */
+			bool	all_frozen = VM_ALL_FROZEN(onerel, blkno, &vmbuffer);
+
+			if (scan_all && all_frozen)
+			{
+					vacrelstats->vmskipped_frozen_pages++;
+					continue;
+			}
+			else if (!scan_all && skipping_all_visible_blocks)
+					continue;
+
 			all_visible_according_to_vm = true;
+			all_frozen_according_to_vm = all_frozen;
 		}
 
 		vacuum_delay_point();
@@ -716,7 +748,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			empty_pages++;
 			freespace = PageGetHeapFreeSpace(page);
 
-			/* empty pages are always all-visible */
+			/* empty pages are always all-visible and all-frozen */
 			if (!PageIsAllVisible(page))
 			{
 				START_CRIT_SECTION();
@@ -739,8 +771,10 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 					log_newpage_buffer(buf, true);
 
 				PageSetAllVisible(page);
+				PageSetAllFrozen(page);
 				visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
-								  vmbuffer, InvalidTransactionId);
+								  vmbuffer, InvalidTransactionId,
+								  VISIBILITYMAP_ALL_VISIBLE | VISIBILITYMAP_ALL_FROZEN);
 				END_CRIT_SECTION();
 			}
 
@@ -764,13 +798,15 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		all_visible = true;
 		has_dead_tuples = false;
 		nfrozen = 0;
+		nalready_frozen = 0;
+		ntup_per_page = 0;
 		hastup = false;
 		prev_dead_count = vacrelstats->num_dead_tuples;
 		maxoff = PageGetMaxOffsetNumber(page);
 
 		/*
 		 * Note: If you change anything in the loop below, also look at
-		 * heap_page_is_all_visible to see if that needs to be changed.
+		 * heap_page_visible_status to see if that needs to be changed.
 		 */
 		for (offnum = FirstOffsetNumber;
 			 offnum <= maxoff;
@@ -918,8 +954,13 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			else
 			{
 				num_tuples += 1;
+				ntup_per_page += 1;;
 				hastup = true;
 
+				/* Check whether this tuple is already frozen or not */
+				if (HeapTupleHeaderXminFrozen(tuple.t_data))
+					nalready_frozen += 1;
+
 				/*
 				 * Each non-removable tuple must be checked to see if it needs
 				 * freezing.  Note we already have exclusive buffer lock.
@@ -966,6 +1007,9 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			END_CRIT_SECTION();
 		}
 
+		/* Compute the number of frozen tuples in a page */
+		ntotal_frozen = nfrozen + nalready_frozen;
+
 		/*
 		 * If there are no indexes then we can vacuum the page right now
 		 * instead of doing a second scan.
@@ -988,26 +1032,46 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 
 		freespace = PageGetHeapFreeSpace(page);
 
-		/* mark page all-visible, if appropriate */
-		if (all_visible && !all_visible_according_to_vm)
+		/* This page is all visible */
+		if (all_visible)
 		{
-			/*
-			 * It should never be the case that the visibility map page is set
-			 * while the page-level bit is clear, but the reverse is allowed
-			 * (if checksums are not enabled).  Regardless, set the both bits
-			 * so that we get back in sync.
-			 *
-			 * NB: If the heap page is all-visible but the VM bit is not set,
-			 * we don't need to dirty the heap page.  However, if checksums
-			 * are enabled, we do need to make sure that the heap page is
-			 * dirtied before passing it to visibilitymap_set(), because it
-			 * may be logged.  Given that this situation should only happen in
-			 * rare cases after a crash, it is not worth optimizing.
-			 */
-			PageSetAllVisible(page);
-			MarkBufferDirty(buf);
-			visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
-							  vmbuffer, visibility_cutoff_xid);
+			uint8 flags = 0;
+
+			/* mark page all-visible, if appropriate */
+			if (!all_visible_according_to_vm)
+			{
+				/*
+				 * It should never be the case that the visibility map page is set
+				 * while the page-level bit is clear, but the reverse is allowed
+				 * (if checksums are not enabled).  Regardless, set the both bits
+				 * so that we get back in sync.
+				 *
+				 * NB: If the heap page is all-visible but the VM bit is not set,
+				 * we don't need to dirty the heap page.  However, if checksums
+				 * are enabled, we do need to make sure that the heap page is
+				 * dirtied before passing it to visibilitymap_set(), because it
+				 * may be logged.  Given that this situation should only happen in
+				 * rare cases after a crash, it is not worth optimizing.
+				 */
+				PageSetAllVisible(page);
+				flags |= VISIBILITYMAP_ALL_VISIBLE;
+			}
+
+			/* mark page all-frozen, if all tuples are frozen and not marked yet */
+			if ((ntotal_frozen == ntup_per_page) &&	!all_frozen_according_to_vm)
+			{
+				Assert(PageIsAllVisible(page));
+
+				PageSetAllFrozen(page);
+				flags |= VISIBILITYMAP_ALL_FROZEN;
+			}
+
+			if (flags)
+			{
+				MarkBufferDirty(buf);
+				visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
+								  vmbuffer, visibility_cutoff_xid, flags);
+			}
 		}
 
 		/*
@@ -1018,9 +1082,14 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		 * that something bad has happened.
 		 */
 		else if (all_visible_according_to_vm && !PageIsAllVisible(page)
-				 && visibilitymap_test(onerel, blkno, &vmbuffer))
+				 && VM_ALL_VISIBLE(onerel, blkno, &vmbuffer))
 		{
-			elog(WARNING, "page is not marked all-visible but visibility map bit is set in relation \"%s\" page %u",
+			/* If the all-frozen is set then all-visible must be set */
+			if (all_frozen_according_to_vm)
+				Assert(VM_ALL_FROZEN(onerel, blkno, &vmbuffer) &&
+					   VM_ALL_VISIBLE(onerel, blkno, &vmbuffer));
+
+			elog(WARNING, "page is not marked all-visible (and all-frozen) but visibility map bit(s) is set in relation \"%s\" page %u",
 				 relname, blkno);
 			visibilitymap_clear(onerel, blkno, vmbuffer);
 		}
@@ -1028,19 +1097,25 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		/*
 		 * It's possible for the value returned by GetOldestXmin() to move
 		 * backwards, so it's not wrong for us to see tuples that appear to
-		 * not be visible to everyone yet, while PD_ALL_VISIBLE is already
-		 * set. The real safe xmin value never moves backwards, but
-		 * GetOldestXmin() is conservative and sometimes returns a value
+		 * not be visible to everyone yet, while PD_ALL_VISIBLE (and PD_ALL_FROZEN)
+		 * are already set. The real safe xmin value never moves backwards,
+		 * but GetOldestXmin() is conservative and sometimes returns a value
 		 * that's unnecessarily small, so if we see that contradiction it just
 		 * means that the tuples that we think are not visible to everyone yet
-		 * actually are, and the PD_ALL_VISIBLE flag is correct.
+		 * actually are, and the PD_ALL_VISIBLE (and PD_ALL_FROZEN) flags are
+		 * correct.
 		 *
 		 * There should never be dead tuples on a page with PD_ALL_VISIBLE
 		 * set, however.
 		 */
 		else if (PageIsAllVisible(page) && has_dead_tuples)
 		{
-			elog(WARNING, "page containing dead tuples is marked as all-visible in relation \"%s\" page %u",
+			/* If the all-frozen is set then all-visible must be set */
+			if (PageIsAllFrozen(page))
+				Assert(VM_ALL_FROZEN(onerel, blkno, &vmbuffer) &&
+					   VM_ALL_VISIBLE(onerel, blkno, &vmbuffer));
+
+			elog(WARNING, "page containing dead tuples is marked as all-visible (and all-frozen) in relation \"%s\" page %u",
 				 relname, blkno);
 			PageClearAllVisible(page);
 			MarkBufferDirty(buf);
@@ -1114,6 +1189,13 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 						RelationGetRelationName(onerel),
 						tups_vacuumed, vacuumed_pages)));
 
+	/* Report how many frozen pages vacuum skipped according to visibility map */
+	ereport(elevel,
+			(errmsg_plural("skipped %d frozen page according to visibility map",
+						   "skipped %d frozen pages according to visibility map",
+						   vacrelstats->vmskipped_frozen_pages,
+						   vacrelstats->vmskipped_frozen_pages)));
+
 	/*
 	 * This is pretty messy, but we split it up so that we can skip emitting
 	 * individual parts of the message when not applicable.
@@ -1230,6 +1312,8 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 	OffsetNumber unused[MaxOffsetNumber];
 	int			uncnt = 0;
 	TransactionId visibility_cutoff_xid;
+	bool		all_visible;
+	bool		all_frozen;
 
 	START_CRIT_SECTION();
 
@@ -1281,19 +1365,36 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 	 * dirty, exclusively locked, and, if needed, a full page image has been
 	 * emitted in the log_heap_clean() above.
 	 */
-	if (heap_page_is_all_visible(onerel, buffer, &visibility_cutoff_xid))
+	heap_page_visible_status(onerel, buffer, &visibility_cutoff_xid,
+							 &all_visible, &all_frozen);
+	if (all_visible)
 		PageSetAllVisible(page);
 
 	/*
 	 * All the changes to the heap page have been done. If the all-visible
-	 * flag is now set, also set the VM bit.
+	 * flag is now set, also set the VM all-visible bit.
+	 * Also, if this page is all-frozen, set the VM all-frozen bit and flag.
 	 */
-	if (PageIsAllVisible(page) &&
-		!visibilitymap_test(onerel, blkno, vmbuffer))
+	if (PageIsAllVisible(page))
 	{
+		uint8 vm_status = visibilitymap_get_status(onerel, blkno, vmbuffer);
+		uint8 flags = 0;
+
+		if (!(vm_status & VISIBILITYMAP_ALL_VISIBLE))
+			flags |= VISIBILITYMAP_ALL_VISIBLE;
+
+		/* Set the VM all-frozen bit to flag, if needed */
+		if (all_frozen && !(vm_status & VISIBILITYMAP_ALL_FROZEN))
+		{
+			PageSetAllFrozen(page);
+			flags |= VISIBILITYMAP_ALL_FROZEN;
+		}
+
 		Assert(BufferIsValid(*vmbuffer));
-		visibilitymap_set(onerel, blkno, buffer, InvalidXLogRecPtr, *vmbuffer,
-						  visibility_cutoff_xid);
+
+		if (vm_status != flags)
+			visibilitymap_set(onerel, blkno, buffer, InvalidXLogRecPtr, *vmbuffer,
+							  visibility_cutoff_xid, flags);
 	}
 
 	return tupindex;
@@ -1783,18 +1884,21 @@ vac_cmp_itemptr(const void *left, const void *right)
 /*
  * Check if every tuple in the given page is visible to all current and future
  * transactions. Also return the visibility_cutoff_xid which is the highest
- * xmin amongst the visible tuples.
+ * xmin amongst the visible tuples, and all_frozen which implies that all tuples
+ * of this page are frozen.
  */
-static bool
-heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cutoff_xid)
+static void
+heap_page_visible_status(Relation rel, Buffer buf, TransactionId *visibility_cutoff_xid,
+					     bool *all_visible, bool *all_frozen)
 {
 	Page		page = BufferGetPage(buf);
 	BlockNumber blockno = BufferGetBlockNumber(buf);
 	OffsetNumber offnum,
 				maxoff;
-	bool		all_visible = true;
 
 	*visibility_cutoff_xid = InvalidTransactionId;
+	*all_visible = true;
+	*all_frozen = true;
 
 	/*
 	 * This is a stripped down version of the line pointer scan in
@@ -1802,7 +1906,7 @@ heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cut
 	 */
 	maxoff = PageGetMaxOffsetNumber(page);
 	for (offnum = FirstOffsetNumber;
-		 offnum <= maxoff && all_visible;
+		 offnum <= maxoff && *all_visible;
 		 offnum = OffsetNumberNext(offnum))
 	{
 		ItemId		itemid;
@@ -1818,11 +1922,12 @@ heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cut
 
 		/*
 		 * Dead line pointers can have index pointers pointing to them. So
-		 * they can't be treated as visible
+		 * they can't be treated as visible and frozen.
 		 */
 		if (ItemIdIsDead(itemid))
 		{
-			all_visible = false;
+			*all_visible = false;
+			*all_frozen = false;
 			break;
 		}
 
@@ -1841,7 +1946,7 @@ heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cut
 					/* Check comments in lazy_scan_heap. */
 					if (!HeapTupleHeaderXminCommitted(tuple.t_data))
 					{
-						all_visible = false;
+						*all_visible = false;
 						break;
 					}
 
@@ -1852,13 +1957,17 @@ heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cut
 					xmin = HeapTupleHeaderGetXmin(tuple.t_data);
 					if (!TransactionIdPrecedes(xmin, OldestXmin))
 					{
-						all_visible = false;
+						*all_visible = false;
 						break;
 					}
 
 					/* Track newest xmin on page. */
 					if (TransactionIdFollows(xmin, *visibility_cutoff_xid))
 						*visibility_cutoff_xid = xmin;
+
+					/* Check whether this tuple is already frozen or not */
+					if (!HeapTupleHeaderXminFrozen(tuple.t_data))
+						*all_frozen = false;
 				}
 				break;
 
@@ -1866,7 +1975,8 @@ heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cut
 			case HEAPTUPLE_RECENTLY_DEAD:
 			case HEAPTUPLE_INSERT_IN_PROGRESS:
 			case HEAPTUPLE_DELETE_IN_PROGRESS:
-				all_visible = false;
+				*all_visible = false;
+				*all_frozen = false;
 				break;
 
 			default:
@@ -1875,5 +1985,6 @@ heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cut
 		}
 	}							/* scan along page */
 
-	return all_visible;
+	if (!(*all_visible))
+		*all_frozen = false;
 }
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 9f54c46..e9cf4c8 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -85,9 +85,9 @@ IndexOnlyNext(IndexOnlyScanState *node)
 		 * which all tuples are known visible to everybody.  In any case,
 		 * we'll use the index tuple not the heap tuple as the data source.
 		 *
-		 * Note on Memory Ordering Effects: visibilitymap_test does not lock
-		 * the visibility map buffer, and therefore the result we read here
-		 * could be slightly stale.  However, it can't be stale enough to
+		 * Note on Memory Ordering Effects: visibilitymap_get_status does not
+		 * lock. The visibility map buffer, and therefore the result we read
+		 * here could be slightly stale.  However, it can't be stale enough to
 		 * matter.
 		 *
 		 * We need to detect clearing a VM bit due to an insert right away,
@@ -114,9 +114,9 @@ IndexOnlyNext(IndexOnlyScanState *node)
 		 * It's worth going through this complexity to avoid needing to lock
 		 * the VM buffer, which could cause significant contention.
 		 */
-		if (!visibilitymap_test(scandesc->heapRelation,
-								ItemPointerGetBlockNumber(tid),
-								&node->ioss_VMBuffer))
+		if (!VM_ALL_VISIBLE(scandesc->heapRelation,
+							ItemPointerGetBlockNumber(tid),
+							&node->ioss_VMBuffer))
 		{
 			/*
 			 * Rats, we have to visit the heap to check visibility.
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index ab018c4..ca7257a 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1329,7 +1329,8 @@ pgstat_report_autovac(Oid dboid)
  */
 void
 pgstat_report_vacuum(Oid tableoid, bool shared,
-					 PgStat_Counter livetuples, PgStat_Counter deadtuples)
+					 PgStat_Counter livetuples, PgStat_Counter deadtuples,
+					 int32 frozenpages)
 {
 	PgStat_MsgVacuum msg;
 
@@ -1343,6 +1344,7 @@ pgstat_report_vacuum(Oid tableoid, bool shared,
 	msg.m_vacuumtime = GetCurrentTimestamp();
 	msg.m_live_tuples = livetuples;
 	msg.m_dead_tuples = deadtuples;
+	msg.m_frozen_pages = frozenpages;
 	pgstat_send(&msg, sizeof(msg));
 }
 
@@ -1354,7 +1356,8 @@ pgstat_report_vacuum(Oid tableoid, bool shared,
  */
 void
 pgstat_report_analyze(Relation rel,
-					  PgStat_Counter livetuples, PgStat_Counter deadtuples)
+					  PgStat_Counter livetuples, PgStat_Counter deadtuples,
+					  int32 frozenpages)
 {
 	PgStat_MsgAnalyze msg;
 
@@ -1394,6 +1397,7 @@ pgstat_report_analyze(Relation rel,
 	msg.m_analyzetime = GetCurrentTimestamp();
 	msg.m_live_tuples = livetuples;
 	msg.m_dead_tuples = deadtuples;
+	msg.m_frozen_pages = frozenpages;
 	pgstat_send(&msg, sizeof(msg));
 }
 
@@ -3702,6 +3706,7 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
 		result->n_live_tuples = 0;
 		result->n_dead_tuples = 0;
 		result->changes_since_analyze = 0;
+		result->n_frozen_pages = 0;
 		result->blocks_fetched = 0;
 		result->blocks_hit = 0;
 		result->vacuum_timestamp = 0;
@@ -5069,6 +5074,7 @@ pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
 
 	tabentry->n_live_tuples = msg->m_live_tuples;
 	tabentry->n_dead_tuples = msg->m_dead_tuples;
+	tabentry->n_frozen_pages = msg->m_frozen_pages;
 
 	if (msg->m_autovacuum)
 	{
@@ -5103,6 +5109,7 @@ pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
 
 	tabentry->n_live_tuples = msg->m_live_tuples;
 	tabentry->n_dead_tuples = msg->m_dead_tuples;
+	tabentry->n_frozen_pages = msg->m_frozen_pages;
 
 	/*
 	 * We reset changes_since_analyze to zero, forgetting any changes that
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index f7c9bf6..98c14f9 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -46,6 +46,7 @@ extern Datum pg_stat_get_vacuum_count(PG_FUNCTION_ARGS);
 extern Datum pg_stat_get_autovacuum_count(PG_FUNCTION_ARGS);
 extern Datum pg_stat_get_analyze_count(PG_FUNCTION_ARGS);
 extern Datum pg_stat_get_autoanalyze_count(PG_FUNCTION_ARGS);
+extern Datum pg_stat_get_frozen_pages(PG_FUNCTION_ARGS);
 
 extern Datum pg_stat_get_function_calls(PG_FUNCTION_ARGS);
 extern Datum pg_stat_get_function_total_time(PG_FUNCTION_ARGS);
@@ -450,6 +451,21 @@ pg_stat_get_autoanalyze_count(PG_FUNCTION_ARGS)
 }
 
 Datum
+pg_stat_get_frozen_pages(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	int32		result;
+	PgStat_StatTabEntry *tabentry;
+
+	if ((tabentry = pgstat_fetch_stat_tabentry(relid)) == NULL)
+		result = 0;
+	else
+		result = (int32) (tabentry->n_frozen_pages);
+
+	PG_RETURN_INT32(result);
+}
+
+Datum
 pg_stat_get_function_calls(PG_FUNCTION_ARGS)
 {
 	Oid			funcid = PG_GETARG_OID(0);
diff --git a/src/bin/pg_upgrade/file.c b/src/bin/pg_upgrade/file.c
index c84783c..312dca6 100644
--- a/src/bin/pg_upgrade/file.c
+++ b/src/bin/pg_upgrade/file.c
@@ -9,7 +9,11 @@
 
 #include "postgres_fe.h"
 
+#include "access/visibilitymap.h"
 #include "pg_upgrade.h"
+#include "storage/bufpage.h"
+#include "storage/checksum.h"
+#include "storage/checksum_impl.h"
 
 #include <fcntl.h>
 
@@ -21,6 +25,43 @@ static int	copy_file(const char *fromfile, const char *tofile, bool force);
 static int	win32_pghardlink(const char *src, const char *dst);
 #endif
 
+static const char *rewriteVisibilitymap(const char *fromfile, const char *tofile,
+										bool force);
+
+/* table for fast rewriting vm file in order to add all-frozen information */
+static const uint16 rewrite_vm_table[256] = {
+	0,     1,     4,     5,     16,    17,    20,    21,    64,    65,    68,    69,    80,    81,    84,    85,
+	256,   257,   260,   261,   272,   273,   276,   277,   320,   321,   324,   325,   336,   337,   340,   341,
+	1024,  1025,  1028,  1029,  1040,  1041,  1044,  1045,  1088,  1089,  1092,  1093,  1104,  1105,  1108,  1109,
+	1280,  1281,  1284,  1285,  1296,  1297,  1300,  1301,  1344,  1345,  1348,  1349,  1360,  1361,  1364,  1365,
+	4096,  4097,  4100,  4101,  4112,  4113,  4116,  4117,  4160,  4161,  4164,  4165,  4176,  4177,  4180,  4181,
+	4352,  4353,  4356,  4357,  4368,  4369,  4372,  4373,  4416,  4417,  4420,  4421,  4432,  4433,  4436,  4437,
+	5120,  5121,  5124,  5125,  5136,  5137,  5140,  5141,  5184,  5185,  5188,  5189,  5200,  5201,  5204,  5205,
+	5376,  5377,  5380,  5381,  5392,  5393,  5396,  5397,  5440,  5441,  5444,  5445,  5456,  5457,  5460,  5461,
+	16384, 16385, 16388, 16389, 16400, 16401, 16404, 16405, 16448, 16449, 16452, 16453, 16464, 16465, 16468, 16469,
+	16640, 16641, 16644, 16645, 16656, 16657, 16660, 16661, 16704, 16705, 16708, 16709, 16720, 16721, 16724, 16725,
+	17408, 17409, 17412, 17413, 17424, 17425, 17428, 17429, 17472, 17473, 17476, 17477, 17488, 17489, 17492, 17493,
+	17664, 17665, 17668, 17669, 17680, 17681, 17684, 17685, 17728, 17729, 17732, 17733, 17744, 17745, 17748, 17749,
+	20480, 20481, 20484, 20485, 20496, 20497, 20500, 20501, 20544, 20545, 20548, 20549, 20560, 20561, 20564, 20565,
+	20736, 20737, 20740, 20741, 20752, 20753, 20756, 20757, 20800, 20801, 20804, 20805, 20816, 20817, 20820, 20821,
+	21504, 21505, 21508, 21509, 21520, 21521, 21524, 21525, 21568, 21569, 21572, 21573, 21584, 21585, 21588, 21589,
+	21760, 21761, 21764, 21765, 21776, 21777, 21780, 21781, 21824, 21825, 21828, 21829, 21840, 21841, 21844, 21845
+};
+
+/*
+ * copyOrRewriteFile()
+ * This function copies file or rewrite visibility map file.
+ * If rewrite_vm is true, we have to rewrite visibility map regardless value of pageConverter.
+ */
+const char *
+copyOrRewriteFile(pageCnvCtx *pageConverter,
+				  const char *src, const char *dst, bool force, bool rewrite_vm)
+{
+	if (rewrite_vm)
+		return rewriteVisibilitymap(src, dst, force);
+	else
+		return copyAndUpdateFile(pageConverter, src, dst, force);
+}
 
 /*
  * copyAndUpdateFile()
@@ -115,12 +156,14 @@ copyAndUpdateFile(pageCnvCtx *pageConverter,
  */
 const char *
 linkAndUpdateFile(pageCnvCtx *pageConverter,
-				  const char *src, const char *dst)
+				  const char *src, const char *dst, bool rewrite_vm)
 {
 	if (pageConverter != NULL)
 		return "Cannot in-place update this cluster, page-by-page conversion is required";
 
-	if (pg_link_file(src, dst) == -1)
+	if (rewrite_vm)
+		return rewriteVisibilitymap(src, dst, true);
+	else if (pg_link_file(src, dst) == -1)
 		return getErrorText();
 	else
 		return NULL;
@@ -205,6 +248,96 @@ copy_file(const char *srcfile, const char *dstfile, bool force)
 #endif
 
 
+/*
+ * rewriteVisibilitymap()
+ *
+ * Copies a visibility map file while adding all-frozen bit(0) into each bit.
+ */
+static const char *
+
+rewriteVisibilitymap(const char *fromfile, const char *tofile, bool force)
+{
+	int			src_fd = 0;
+	int			dst_fd = 0;
+	char		buffer[BLCKSZ];
+	ssize_t 	bytesRead;
+	int			rewriteVmBytesPerPage = (BLCKSZ - SizeOfPageHeaderData) / 2;
+	BlockNumber	blkno = 0;
+
+	/* Reset errno */
+	errno = 0;
+
+	if ((fromfile == NULL) || (tofile == NULL))
+		return getErrorText();
+
+	if ((src_fd = open(fromfile, O_RDONLY, 0)) < 0)
+		goto err;
+
+	if ((dst_fd = open(tofile, O_RDWR | O_CREAT | (force ? 0 : O_EXCL), S_IRUSR | S_IWUSR)) < 0)
+		goto err;
+
+	/* Perform data rewriting per page */
+	while ((bytesRead = read(src_fd, buffer, BLCKSZ)) == BLCKSZ)
+	{
+		char	*cur, *end, *blkend;
+		PageHeaderData	pageheader;
+		uint16	vm_bits;
+
+		/* Save the page header data */
+		memcpy(&pageheader, buffer, SizeOfPageHeaderData);
+
+		cur = buffer + SizeOfPageHeaderData;
+		end = buffer + SizeOfPageHeaderData + rewriteVmBytesPerPage;
+		blkend = buffer + bytesRead;
+
+		while (blkend >= end)
+		{
+			char	vmbuf[BLCKSZ];
+			char	*vmtmp = vmbuf;
+
+			/* Copy page header in advance */
+			memcpy(vmbuf, &pageheader, SizeOfPageHeaderData);
+
+			vmtmp += SizeOfPageHeaderData;
+
+			/* Rewrite visibility map bit one by one */
+			while (end > cur)
+			{
+				/* Write rewritten bit from table and its string representation */
+				vm_bits = rewrite_vm_table[(uint8) *cur];
+				memcpy(vmtmp, &vm_bits, BITS_PER_HEAPBLOCK);
+
+				cur++;
+				vmtmp += BITS_PER_HEAPBLOCK;
+			}
+
+			/* Set new checksum for a visibility map page, If enabled */
+			if (old_cluster.controldata.data_checksum_version != 0 &&
+				new_cluster.controldata.data_checksum_version != 0)
+				((PageHeader) vmbuf)->pd_checksum = pg_checksum_page(vmbuf, blkno);
+
+			if (write(dst_fd, vmbuf, BLCKSZ) != BLCKSZ)
+			{
+					if (errno == 0)
+						errno = ENOSPC;
+					goto err;
+			}
+
+			end += rewriteVmBytesPerPage;
+			blkno++;
+		}
+	}
+
+err:
+	if (src_fd != 0)
+		close(src_fd);
+
+	if (dst_fd != 0)
+		close(dst_fd);
+
+	return (errno == 0) ? NULL : getErrorText();
+}
+
 void
 check_hard_link(void)
 {
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index a43dff5..5a43c18 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -110,6 +110,10 @@ extern char *output_files[];
 #define VISIBILITY_MAP_CRASHSAFE_CAT_VER 201107031
 
 /*
+ * The format of visibility map is changed with this 9.6 commit,
+ */
+#define VISIBILITY_MAP_FROZEN_BIT_CAT_VER 201512171
+/*
  * pg_multixact format changed in 9.3 commit 0ac5ad5134f2769ccbaefec73844f85,
  * ("Improve concurrency of foreign key locking") which also updated catalog
  * version to this value.  pg_upgrade behavior depends on whether old and new
@@ -394,10 +398,12 @@ const pageCnvCtx *setupPageConverter(void);
 typedef void *pageCnvCtx;
 #endif
 
+const char *copyOrRewriteFile(pageCnvCtx *pageConverter, const char *src,
+				  const char *dst, bool force, bool rewrite_vm);
 const char *copyAndUpdateFile(pageCnvCtx *pageConverter, const char *src,
 				  const char *dst, bool force);
 const char *linkAndUpdateFile(pageCnvCtx *pageConverter, const char *src,
-				  const char *dst);
+				  const char *dst, bool rewrite_vm);
 
 void		check_hard_link(void);
 FILE	   *fopen_priv(const char *path, const char *mode);
diff --git a/src/bin/pg_upgrade/relfilenode.c b/src/bin/pg_upgrade/relfilenode.c
index bfde1b1..5992bda 100644
--- a/src/bin/pg_upgrade/relfilenode.c
+++ b/src/bin/pg_upgrade/relfilenode.c
@@ -18,7 +18,7 @@
 static void transfer_single_new_db(pageCnvCtx *pageConverter,
 					   FileNameMap *maps, int size, char *old_tablespace);
 static void transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map,
-				 const char *suffix);
+				 const char *suffix, bool vm_need_rewrite);
 
 
 /*
@@ -171,6 +171,7 @@ transfer_single_new_db(pageCnvCtx *pageConverter,
 {
 	int			mapnum;
 	bool		vm_crashsafe_match = true;
+	bool		vm_need_rewrite = false;
 
 	/*
 	 * Do the old and new cluster disagree on the crash-safetiness of the vm
@@ -180,13 +181,20 @@ transfer_single_new_db(pageCnvCtx *pageConverter,
 		new_cluster.controldata.cat_ver >= VISIBILITY_MAP_CRASHSAFE_CAT_VER)
 		vm_crashsafe_match = false;
 
+	/*
+	 * Do we need to rewrite visibilitymap?
+	 */
+	if (old_cluster.controldata.cat_ver < VISIBILITY_MAP_FROZEN_BIT_CAT_VER &&
+		new_cluster.controldata.cat_ver >= VISIBILITY_MAP_FROZEN_BIT_CAT_VER)
+		vm_need_rewrite = true;
+
 	for (mapnum = 0; mapnum < size; mapnum++)
 	{
 		if (old_tablespace == NULL ||
 			strcmp(maps[mapnum].old_tablespace, old_tablespace) == 0)
 		{
 			/* transfer primary file */
-			transfer_relfile(pageConverter, &maps[mapnum], "");
+			transfer_relfile(pageConverter, &maps[mapnum], "", false);
 
 			/* fsm/vm files added in PG 8.4 */
 			if (GET_MAJOR_VERSION(old_cluster.major_version) >= 804)
@@ -194,9 +202,14 @@ transfer_single_new_db(pageCnvCtx *pageConverter,
 				/*
 				 * Copy/link any fsm and vm files, if they exist
 				 */
-				transfer_relfile(pageConverter, &maps[mapnum], "_fsm");
+				transfer_relfile(pageConverter, &maps[mapnum], "_fsm", false);
 				if (vm_crashsafe_match)
-					transfer_relfile(pageConverter, &maps[mapnum], "_vm");
+				{
+					if (vm_need_rewrite)
+						transfer_relfile(pageConverter, &maps[mapnum], "_vm", true);
+					else
+						transfer_relfile(pageConverter, &maps[mapnum], "_vm", false);
+				}
 			}
 		}
 	}
@@ -210,7 +223,7 @@ transfer_single_new_db(pageCnvCtx *pageConverter,
  */
 static void
 transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map,
-				 const char *type_suffix)
+				 const char *type_suffix, bool vm_need_rewrite)
 {
 	const char *msg;
 	char		old_file[MAXPGPATH];
@@ -276,7 +289,7 @@ transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map,
 		{
 			pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n", old_file, new_file);
 
-			if ((msg = copyAndUpdateFile(pageConverter, old_file, new_file, true)) != NULL)
+			if ((msg = copyOrRewriteFile(pageConverter, old_file, new_file, true, vm_need_rewrite)) != NULL)
 				pg_fatal("error while copying relation \"%s.%s\" (\"%s\" to \"%s\"): %s\n",
 						 map->nspname, map->relname, old_file, new_file, msg);
 		}
@@ -284,7 +297,7 @@ transfer_relfile(pageCnvCtx *pageConverter, FileNameMap *map,
 		{
 			pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n", old_file, new_file);
 
-			if ((msg = linkAndUpdateFile(pageConverter, old_file, new_file)) != NULL)
+			if ((msg = linkAndUpdateFile(pageConverter, old_file, new_file, vm_need_rewrite)) != NULL)
 				pg_fatal("error while creating link for relation \"%s.%s\" (\"%s\" to \"%s\"): %s\n",
 						 map->nspname, map->relname, old_file, new_file, msg);
 		}
diff --git a/src/bin/pg_upgrade/test.sh b/src/bin/pg_upgrade/test.sh
index aa7f399..6b058d4 100644
--- a/src/bin/pg_upgrade/test.sh
+++ b/src/bin/pg_upgrade/test.sh
@@ -174,6 +174,11 @@ if "$MAKE" -C "$oldsrc" installcheck; then
 		mv "$temp_root"/dump1.sql "$temp_root"/dump1.sql.orig
 		sed "s;$oldsrc;$newsrc;g" "$temp_root"/dump1.sql.orig >"$temp_root"/dump1.sql
 	fi
+
+	vm_sql="SELECT c.relname, c.relallvisible FROM pg_class as c, pg_namespace as n WHERE c.relnamespace = n.oid AND n.nspname NOT IN ('information_schema', 'pg_toast', 'pg_catalog') ORDER BY c.relname;"
+	# Test for rewriting visibility map
+	vacuumdb -d regression || visibilitymap_vacuum1_status=$?
+	psql -d regression -c "$vm_sql" > "$temp_root"/vm_test1.txt || visibilitymap_test1_status=$?
 else
 	make_installcheck_status=$?
 fi
@@ -188,6 +193,14 @@ if [ -n "$pg_dumpall1_status" ]; then
 	echo "pg_dumpall of pre-upgrade database cluster failed"
 	exit 1
 fi
+if [ -n "$visibilitymap_vacuum1_status" ];then
+	echo "VACUUM of pre-upgrade database cluster for visibility map test failed"
+	exit 1
+fi
+if [ -n "$visibilitymap_test1_status" ];then
+	echo "SELECT of pre-upgrade database cluster for visibility map test failed"
+	exit 1
+fi
 
 PGDATA=$BASE_PGDATA
 
@@ -203,6 +216,8 @@ case $testhost in
 esac
 
 pg_dumpall -f "$temp_root"/dump2.sql || pg_dumpall2_status=$?
+vacuumdb -d regression || visibilitymap_vacuum2_status=$?
+psql -d regression -c "$vm_sql" > "$temp_root"/vm_test2.txt || visibilitymap_test2_status=$?
 pg_ctl -m fast stop
 
 # no need to echo commands anymore
@@ -214,11 +229,26 @@ if [ -n "$pg_dumpall2_status" ]; then
 	exit 1
 fi
 
+if [ -n "$visibilitymap_vacuum2_status" ];then
+	echo "VACUUM of post-upgrade database cluster for visibility map test failed"
+	exit 1
+fi
+
+if [ -n "$visibilitymap_test2_status" ];then
+	echo "SELECT of post-upgrade database cluster for visibility map test failed"
+	exit 1
+fi
+
 case $testhost in
 	MINGW*)	cmd /c delete_old_cluster.bat ;;
 	*)	    sh ./delete_old_cluster.sh ;;
 esac
 
+if ! diff "$temp_root"/vm_test1.txt "$temp_root"/vm_test2.txt >/dev/null; then
+	echo "Visibility map rewriting test failed"
+	exit 1
+fi
+
 if diff "$temp_root"/dump1.sql "$temp_root"/dump2.sql >/dev/null; then
 	echo PASSED
 	exit 0
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index caa0f14..93afb10 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -320,9 +320,10 @@ typedef struct xl_heap_freeze_page
 typedef struct xl_heap_visible
 {
 	TransactionId cutoff_xid;
+	uint8		  flags;
 } xl_heap_visible;
 
-#define SizeOfHeapVisible (offsetof(xl_heap_visible, cutoff_xid) + sizeof(TransactionId))
+#define SizeOfHeapVisible (offsetof(xl_heap_visible, flags) + sizeof(uint8))
 
 typedef struct xl_heap_new_cid
 {
@@ -389,6 +390,6 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
 						  xl_heap_freeze_tuple *xlrec_tp);
 extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
-				 Buffer vm_buffer, TransactionId cutoff_xid);
+			     Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags);
 
 #endif   /* HEAPAM_XLOG_H */
diff --git a/src/include/access/visibilitymap.h b/src/include/access/visibilitymap.h
index 0c0e0ef..bb4e184 100644
--- a/src/include/access/visibilitymap.h
+++ b/src/include/access/visibilitymap.h
@@ -19,15 +19,36 @@
 #include "storage/buf.h"
 #include "utils/relcache.h"
 
+/*
+ * Number of bits allocated for each heap block.
+ * One for all-visible, other for all-frozen.
+*/
+#define BITS_PER_HEAPBLOCK 2
+
+/* Number of heap blocks we can represent in one byte. */
+#define HEAPBLOCKS_PER_BYTE 4
+
+/* Flags for bit map */
+#define VISIBILITYMAP_ALL_VISIBLE	0x01
+#define VISIBILITYMAP_ALL_FROZEN	0x02
+#define VISIBILITYMAP_VALID_BITS	0x03 /* OR of all valid visiblitymap flags bits */
+
+/* Macros for visibilitymap test */
+#define VM_ALL_VISIBLE(r, b, v) \
+	((visibilitymap_get_status((r), (b), (v)) & VISIBILITYMAP_ALL_VISIBLE) != 0)
+#define VM_ALL_FROZEN(r, b, v) \
+	((visibilitymap_get_status((r), (b), (v)) & VISIBILITYMAP_ALL_FROZEN) != 0)
+
 extern void visibilitymap_clear(Relation rel, BlockNumber heapBlk,
 					Buffer vmbuf);
 extern void visibilitymap_pin(Relation rel, BlockNumber heapBlk,
 				  Buffer *vmbuf);
 extern bool visibilitymap_pin_ok(BlockNumber heapBlk, Buffer vmbuf);
 extern void visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
-				  XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid);
-extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
-extern BlockNumber visibilitymap_count(Relation rel);
+							  XLogRecPtr recptr, Buffer vmBuf, TransactionId cutoff_xid,
+							  uint8 flags);
+extern uint8 visibilitymap_get_status(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
+extern void visibilitymap_count(Relation rel, BlockNumber *all_visible, BlockNumber *all_frozen);
 extern void visibilitymap_truncate(Relation rel, BlockNumber nheapblocks);
 
 #endif   /* VISIBILITYMAP_H */
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index eba4150..30f3f79 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	201511071
+#define CATALOG_VERSION_NO	201512171
 
 #endif
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index d8640db..9a77d7d 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2779,6 +2779,8 @@ DATA(insert OID = 3056 ( pg_stat_get_analyze_count PGNSP PGUID 12 1 0 0 0 f f f
 DESCR("statistics: number of manual analyzes for a table");
 DATA(insert OID = 3057 ( pg_stat_get_autoanalyze_count PGNSP PGUID 12 1 0 0 0 f f f f t f s r 1 0 20 "26" _null_ _null_ _null_ _null_ _null_ pg_stat_get_autoanalyze_count _null_ _null_ _null_ ));
 DESCR("statistics: number of auto analyzes for a table");
+DATA(insert OID = 6015 ( pg_stat_get_frozen_pages PGNSP PGUID 12 1 0 0 0 f f f f t f s r 1 0 23 "26" _null_ _null_ _null_ _null_ _null_ pg_stat_get_frozen_pages _null_ _null_ _null_ ));
+DESCR("statistics: number of frozen pages of table");
 DATA(insert OID = 1936 (  pg_stat_get_backend_idset		PGNSP PGUID 12 1 100 0 0 f f f f t t s r 0 0 23 "" _null_ _null_ _null_ _null_ _null_ pg_stat_get_backend_idset _null_ _null_ _null_ ));
 DESCR("statistics: currently active backend IDs");
 DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 0 f f f f f t s r 1 0 2249 "23" "{23,26,23,26,25,25,25,16,1184,1184,1184,1184,869,25,23,28,28,16,25,25,23,16,25}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,pid,usesysid,application_name,state,query,waiting,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,ssl,sslversion,sslcipher,sslbits,sslcompression,sslclientdn}" _null_ _null_ pg_stat_get_activity _null_ _null_ _null_ ));
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 9ecc163..ed784bc 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -17,6 +17,7 @@
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"
 #include "storage/barrier.h"
+#include "storage/block.h"
 #include "utils/hsearch.h"
 #include "utils/relcache.h"
 
@@ -355,6 +356,7 @@ typedef struct PgStat_MsgVacuum
 	TimestampTz m_vacuumtime;
 	PgStat_Counter m_live_tuples;
 	PgStat_Counter m_dead_tuples;
+	BlockNumber	m_frozen_pages;
 } PgStat_MsgVacuum;
 
 
@@ -372,6 +374,7 @@ typedef struct PgStat_MsgAnalyze
 	TimestampTz m_analyzetime;
 	PgStat_Counter m_live_tuples;
 	PgStat_Counter m_dead_tuples;
+	int32		m_frozen_pages;
 } PgStat_MsgAnalyze;
 
 
@@ -551,7 +554,7 @@ typedef union PgStat_Msg
  * ------------------------------------------------------------
  */
 
-#define PGSTAT_FILE_FORMAT_ID	0x01A5BC9D
+#define PGSTAT_FILE_FORMAT_ID	0x01A5BC9E
 
 /* ----------
  * PgStat_StatDBEntry			The collector's data per database
@@ -615,6 +618,8 @@ typedef struct PgStat_StatTabEntry
 	PgStat_Counter n_dead_tuples;
 	PgStat_Counter changes_since_analyze;
 
+	int32 n_frozen_pages;
+
 	PgStat_Counter blocks_fetched;
 	PgStat_Counter blocks_hit;
 
@@ -917,9 +922,11 @@ extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type t
 
 extern void pgstat_report_autovac(Oid dboid);
 extern void pgstat_report_vacuum(Oid tableoid, bool shared,
-					 PgStat_Counter livetuples, PgStat_Counter deadtuples);
+					 PgStat_Counter livetuples, PgStat_Counter deadtuples,
+					 int32 frozenpages);
 extern void pgstat_report_analyze(Relation rel,
-					  PgStat_Counter livetuples, PgStat_Counter deadtuples);
+					  PgStat_Counter livetuples, PgStat_Counter deadtuples,
+					  int32 frozenpages);
 
 extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index a2f78ee..102aa81 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -178,8 +178,10 @@ typedef PageHeaderData *PageHeader;
 										 * tuple? */
 #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
 										 * everyone */
+#define PD_ALL_FROZEN		0x0008		/* all tuples on page are completely
+										   frozen */
 
-#define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
+#define PD_VALID_FLAG_BITS	0x000F		/* OR of all valid pd_flags bits */
 
 /*
  * Page layout version number 0 is for pre-7.3 Postgres releases.
@@ -367,7 +369,12 @@ typedef PageHeaderData *PageHeader;
 #define PageSetAllVisible(page) \
 	(((PageHeader) (page))->pd_flags |= PD_ALL_VISIBLE)
 #define PageClearAllVisible(page) \
-	(((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
+	(((PageHeader) (page))->pd_flags &= ~(PD_ALL_VISIBLE | PD_ALL_FROZEN))
+
+#define PageIsAllFrozen(page) \
+	(((PageHeader) (page))->pd_flags & PD_ALL_FROZEN)
+#define PageSetAllFrozen(page) \
+	(((PageHeader) (page))->pd_flags |= PD_ALL_FROZEN)
 
 #define PageIsPrunable(page, oldestxmin) \
 ( \
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 80374e4..c6514ad 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1683,6 +1683,7 @@ pg_stat_all_tables| SELECT c.oid AS relid,
     pg_stat_get_live_tuples(c.oid) AS n_live_tup,
     pg_stat_get_dead_tuples(c.oid) AS n_dead_tup,
     pg_stat_get_mod_since_analyze(c.oid) AS n_mod_since_analyze,
+    pg_stat_get_frozen_pages(c.oid) AS n_frozen_pages,
     pg_stat_get_last_vacuum_time(c.oid) AS last_vacuum,
     pg_stat_get_last_autovacuum_time(c.oid) AS last_autovacuum,
     pg_stat_get_last_analyze_time(c.oid) AS last_analyze,
@@ -1795,6 +1796,7 @@ pg_stat_sys_tables| SELECT pg_stat_all_tables.relid,
     pg_stat_all_tables.n_live_tup,
     pg_stat_all_tables.n_dead_tup,
     pg_stat_all_tables.n_mod_since_analyze,
+    pg_stat_all_tables.n_frozen_pages,
     pg_stat_all_tables.last_vacuum,
     pg_stat_all_tables.last_autovacuum,
     pg_stat_all_tables.last_analyze,
@@ -1838,6 +1840,7 @@ pg_stat_user_tables| SELECT pg_stat_all_tables.relid,
     pg_stat_all_tables.n_live_tup,
     pg_stat_all_tables.n_dead_tup,
     pg_stat_all_tables.n_mod_since_analyze,
+    pg_stat_all_tables.n_frozen_pages,
     pg_stat_all_tables.last_vacuum,
     pg_stat_all_tables.last_autovacuum,
     pg_stat_all_tables.last_analyze,
diff --git a/src/test/regress/expected/stats.out b/src/test/regress/expected/stats.out
index f5be70f..95ababf 100644
--- a/src/test/regress/expected/stats.out
+++ b/src/test/regress/expected/stats.out
@@ -127,6 +127,8 @@ SELECT count(*) FROM tenk2 WHERE unique1 = 1;
      1
 (1 row)
 
+-- do VACUUM FREEZE
+VACUUM FREEZE tenk2;
 -- force the rate-limiting logic in pgstat_report_tabstat() to time out
 -- and send a message
 SELECT pg_sleep(1.0);
@@ -175,6 +177,14 @@ SELECT st.heap_blks_read + st.heap_blks_hit >= pr.heap_blks + cl.relpages,
  t        | t
 (1 row)
 
+SELECT n_frozen_pages = (pg_relation_size('tenk2') / current_setting('block_size')::int)
+  FROM pg_stat_user_tables
+  WHERE relname ='tenk2';
+ ?column? 
+----------
+ t
+(1 row)
+
 SELECT pr.snap_ts < pg_stat_get_snapshot_timestamp() as snapshot_newer
 FROM prevstats AS pr;
  snapshot_newer 
diff --git a/src/test/regress/expected/visibilitymap.out b/src/test/regress/expected/visibilitymap.out
new file mode 100644
index 0000000..3be0354
--- /dev/null
+++ b/src/test/regress/expected/visibilitymap.out
@@ -0,0 +1,22 @@
+--
+-- Visibility Map
+--
+CREATE TABLE vmtest (i INT primary key);
+INSERT INTO vmtest SELECT generate_series(1,10000);
+-- All pages are become all-visible
+VACUUM FREEZE vmtest;
+SELECT relallvisible  = (pg_relation_size('vmtest') / current_setting('block_size')::int) FROM pg_class WHERE relname = 'vmtest';
+ ?column? 
+----------
+ t
+(1 row)
+
+-- Check whether vacuum skips all-frozen pages
+\set VERBOSITY terse
+VACUUM FREEZE VERBOSE vmtest;
+INFO:  vacuuming "public.vmtest"
+INFO:  index "vmtest_pkey" now contains 10000 row versions in 30 pages
+INFO:  skipped 45 frozen pages according to visibility map
+INFO:  "vmtest": found 0 removable, 0 nonremovable row versions in 0 out of 45 pages
+\set VERBOSITY default
+DROP TABLE vmtest;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index b1bc7c7..e31fa76 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -110,3 +110,6 @@ test: event_trigger
 
 # run stats by itself because its delay may be insufficient under heavy load
 test: stats
+
+# page info map and vacuum test cannot run concurrently with any test that runs SQL
+test: visibilitymap
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index ade9ef1..666e40c 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -161,3 +161,4 @@ test: with
 test: xml
 test: event_trigger
 test: stats
+test: visibilitymap
diff --git a/src/test/regress/sql/stats.sql b/src/test/regress/sql/stats.sql
index cd2d592..dea5553 100644
--- a/src/test/regress/sql/stats.sql
+++ b/src/test/regress/sql/stats.sql
@@ -120,6 +120,8 @@ ROLLBACK;
 SELECT count(*) FROM tenk2;
 -- do an indexscan
 SELECT count(*) FROM tenk2 WHERE unique1 = 1;
+-- do VACUUM FREEZE
+VACUUM FREEZE tenk2;
 
 -- force the rate-limiting logic in pgstat_report_tabstat() to time out
 -- and send a message
@@ -145,6 +147,10 @@ SELECT st.heap_blks_read + st.heap_blks_hit >= pr.heap_blks + cl.relpages,
   FROM pg_statio_user_tables AS st, pg_class AS cl, prevstats AS pr
  WHERE st.relname='tenk2' AND cl.relname='tenk2';
 
+SELECT n_frozen_pages = (pg_relation_size('tenk2') / current_setting('block_size')::int)
+  FROM pg_stat_user_tables
+  WHERE relname ='tenk2';
+
 SELECT pr.snap_ts < pg_stat_get_snapshot_timestamp() as snapshot_newer
 FROM prevstats AS pr;
 
diff --git a/src/test/regress/sql/visibilitymap.sql b/src/test/regress/sql/visibilitymap.sql
new file mode 100644
index 0000000..365570b
--- /dev/null
+++ b/src/test/regress/sql/visibilitymap.sql
@@ -0,0 +1,16 @@
+--
+-- Visibility Map
+--
+CREATE TABLE vmtest (i INT primary key);
+INSERT INTO vmtest SELECT generate_series(1,10000);
+
+-- All pages are become all-visible
+VACUUM FREEZE vmtest;
+SELECT relallvisible  = (pg_relation_size('vmtest') / current_setting('block_size')::int) FROM pg_class WHERE relname = 'vmtest';
+
+-- Check whether vacuum skips all-frozen pages
+\set VERBOSITY terse
+VACUUM FREEZE VERBOSE vmtest;
+\set VERBOSITY default
+
+DROP TABLE vmtest;
