On Sun, Apr 1, 2018 at 2:04 PM, Magnus Hagander <mag...@hagander.net> wrote:

> On Sat, Mar 31, 2018 at 5:38 PM, Tomas Vondra <
> tomas.von...@2ndquadrant.com> wrote:
>
>> On 03/31/2018 05:05 PM, Magnus Hagander wrote:
>> > On Sat, Mar 31, 2018 at 4:21 PM, Tomas Vondra
>> > <tomas.von...@2ndquadrant.com <mailto:tomas.von...@2ndquadrant.com>>
>> wrote:
>> >
>> > ...
>> >
>> >     I do think just waiting for all running transactions to complete is
>> >     fine, and it's not the first place where we use it - CREATE
>> SUBSCRIPTION
>> >     does pretty much exactly the same thing (and CREATE INDEX
>> CONCURRENTLY
>> >     too, to some extent). So we have a precedent / working code we can
>> copy.
>> >
>> >
>> > Thinking again, I don't think it should be done as part of
>> > BuildRelationList(). We should just do it once in the launcher before
>> > starting, that'll be both easier and cleaner. Anything started after
>> > that will have checksums on it, so we should be fine.
>> >
>> > PFA one that does this.
>> >
>>
>> Seems fine to me. I'd however log waitforxid, not the oldest one. If
>> you're a DBA and you want to make the checksumming to proceed, knowing
>> the oldest running XID is useless for that. If we log waitforxid, it can
>> be used to query pg_stat_activity and interrupt the sessions somehow.
>>
>
> Yeah, makes sense. Updated.
>
>
>
>> >     >     And if you try this with a temporary table (not hidden in
>> transaction,
>> >     >     so the bgworker can see it), the worker will fail with this:
>> >     >
>> >     >       ERROR:  cannot access temporary tables of other sessions
>> >     >
>> >     >     But of course, this is just another way how to crash without
>> updating
>> >     >     the result for the launcher, so checksums may end up being
>> enabled
>> >     >     anyway.
>> >     >
>> >     >
>> >     > Yeah, there will be plenty of side-effect issues from that
>> >     > crash-with-wrong-status case. Fixing that will at least make
>> things
>> >     > safer -- in that checksums won't be enabled when not put on all
>> pages.
>> >     >
>> >
>> >     Sure, the outcome with checksums enabled incorrectly is a
>> consequence of
>> >     bogus status, and fixing that will prevent that. But that wasn't my
>> main
>> >     point here - not articulated very clearly, though.
>> >
>> >     The bigger question is how to handle temporary tables gracefully, so
>> >     that it does not terminate the bgworker like this at all. This
>> might be
>> >     even bigger issue than dropped relations, considering that temporary
>> >     tables are pretty common part of applications (and it also includes
>> >     CREATE/DROP).
>> >
>> >     For some clusters it might mean the online checksum enabling would
>> >     crash+restart infinitely (well, until reaching MAX_ATTEMPTS).
>> >
>> >     Unfortunately, try_relation_open() won't fix this, as the error
>> comes
>> >     from ReadBufferExtended. And it's not a matter of simply creating a
>> >     ReadBuffer variant without that error check, because temporary
>> tables
>> >     use local buffers.
>> >
>> >     I wonder if we could just go and set the checksums anyway, ignoring
>> the
>> >     local buffers. If the other session does some changes, it'll
>> overwrite
>> >     our changes, this time with the correct checksums. But it seems
>> pretty
>> >     dangerous (I mean, what if they're writing stuff while we're
>> updating
>> >     the checksums? Considering the various short-cuts for temporary
>> tables,
>> >     I suspect that would be a boon for race conditions.)
>> >
>> >     Another option would be to do something similar to running
>> transactions,
>> >     i.e. wait until all temporary tables (that we've seen at the
>> beginning)
>> >     disappear. But we're starting to wait on more and more stuff.
>> >
>> >     If we do this, we should clearly log which backends we're waiting
>> for,
>> >     so that the admins can go and interrupt them manually.
>> >
>> >
>> >
>> > Yeah, waiting for all transactions at the beginning is pretty simple.
>> >
>> > Making the worker simply ignore temporary tables would also be easy.
>> >
>> > One of the bigger issues here is temporary tables are *session* scope
>> > and not transaction, so we'd actually need the other session to finish,
>> > not just the transaction.
>> >
>> > I guess what we could do is something like this:
>> >
>> > 1. Don't process temporary tables in the checksumworker, period.
>> > Instead, build a list of any temporary tables that existed when the
>> > worker started in this particular database (basically anything that we
>> > got in our scan). Once we have processed the complete database, keep
>> > re-scanning pg_class until those particular tables are gone (search by
>> oid).
>> >
>> > That means that any temporary tables that are created *while* we are
>> > processing a database are ignored, but they should already be receiving
>> > checksums.
>> >
>> > It definitely leads to a potential issue with long running temp tables.
>> > But as long as we look at the *actual tables* (by oid), we should be
>> > able to handle long-running sessions once they have dropped their temp
>> > tables.
>> >
>> > Does that sound workable to you?
>> >
>>
>> Yes, that's pretty much what I meant by 'wait until all temporary tables
>> disappear'. Again, we need to make it easy to determine which OIDs are
>> we waiting for, which sessions may need DBA's attention.
>>
>> I don't think it makes sense to log OIDs of the temporary tables. There
>> can be many of them, and in most cases the connection/session is managed
>> by the application, so the only thing you can do is kill the connection.
>>
>
> Yeah, agreed. I think it makes sense to show the *number* of temp tables.
> That's also a predictable amount of information -- logging all temp tables
> may as you say lead to an insane amount of data.
>
> PFA a patch that does this. I've also added some docs for it.
>
> And I also noticed pg_verify_checksums wasn't installed, so fixed that too.
>
>
PFA a rebase on top of the just committed verify-checksums patch.

-- 
 Magnus Hagander
 Me: https://www.hagander.net/ <http://www.hagander.net/>
 Work: https://www.redpill-linpro.com/ <http://www.redpill-linpro.com/>
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 5abb1c46fb..dcdd17ec0c 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -19507,6 +19507,71 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
 
   </sect2>
 
+  <sect2 id="functions-admin-checksum">
+   <title>Data Checksum Functions</title>
+
+   <para>
+    The functions shown in <xref linkend="functions-checksums-table" /> can
+    be used to enable or disable data checksums in a running cluster.
+    See <xref linkend="checksums" /> for details.
+   </para>
+
+   <table id="functions-checksums-table">
+    <title>Checksum <acronym>SQL</acronym> Functions</title>
+    <tgroup cols="3">
+     <thead>
+      <row>
+       <entry>Function</entry>
+       <entry>Return Type</entry>
+       <entry>Description</entry>
+      </row>
+     </thead>
+     <tbody>
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_enable_data_checksums</primary>
+        </indexterm>
+        <literal><function>pg_enable_data_checksums(<optional><parameter>cost_delay</parameter> <type>int</type>, <parameter>cost_limit</parameter> <type>int</type></optional>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        <para>
+         Initiates data checksums for the cluster. This will switch the data checksums mode
+         to <literal>in progress</literal> and start a background worker that will process
+         all data in the database and enable checksums for it. When all data pages have had
+         checksums enabled, the cluster will automatically switch to checksums
+         <literal>on</literal>.
+        </para>
+        <para>
+         If <parameter>cost_delay</parameter> and <parameter>cost_limit</parameter> are
+         specified, the speed of the process is throttled using the same principles as
+         <link linkend="runtime-config-resource-vacuum-cost">Cost-based Vacuum Delay</link>.
+        </para>
+       </entry>
+      </row>
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_disable_data_checksums</primary>
+        </indexterm>
+        <literal><function>pg_disable_data_checksums()</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Disables data checksums for the cluster.
+       </entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
+  </sect2>
+
   <sect2 id="functions-admin-dbobject">
    <title>Database Object Management Functions</title>
 
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 4e01e5641c..7cd6ee85dc 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -211,6 +211,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pgResetwal         SYSTEM "pg_resetwal.sgml">
 <!ENTITY pgRestore          SYSTEM "pg_restore.sgml">
 <!ENTITY pgRewind           SYSTEM "pg_rewind.sgml">
+<!ENTITY pgVerifyChecksums  SYSTEM "pg_verify_checksums.sgml">
 <!ENTITY pgtestfsync        SYSTEM "pgtestfsync.sgml">
 <!ENTITY pgtesttiming       SYSTEM "pgtesttiming.sgml">
 <!ENTITY pgupgrade          SYSTEM "pgupgrade.sgml">
diff --git a/doc/src/sgml/ref/initdb.sgml b/doc/src/sgml/ref/initdb.sgml
index 949b5a220f..826dd91f72 100644
--- a/doc/src/sgml/ref/initdb.sgml
+++ b/doc/src/sgml/ref/initdb.sgml
@@ -195,9 +195,9 @@ PostgreSQL documentation
        <para>
         Use checksums on data pages to help detect corruption by the
         I/O system that would otherwise be silent. Enabling checksums
-        may incur a noticeable performance penalty. This option can only
-        be set during initialization, and cannot be changed later. If
-        set, checksums are calculated for all objects, in all databases.
+        may incur a noticeable performance penalty. If set, checksums
+        are calculated for all objects, in all databases. See
+        <xref linkend="checksums" /> for details.
        </para>
       </listitem>
      </varlistentry>
diff --git a/doc/src/sgml/ref/pg_verify_checksums.sgml b/doc/src/sgml/ref/pg_verify_checksums.sgml
new file mode 100644
index 0000000000..463ecd5e1b
--- /dev/null
+++ b/doc/src/sgml/ref/pg_verify_checksums.sgml
@@ -0,0 +1,112 @@
+<!--
+doc/src/sgml/ref/pg_verify_checksums.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="pgverifychecksums">
+ <indexterm zone="pgverifychecksums">
+  <primary>pg_verify_checksums</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle><application>pg_verify_checksums</application></refentrytitle>
+  <manvolnum>1</manvolnum>
+  <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>pg_verify_checksums</refname>
+  <refpurpose>verify data checksums in an offline <productname>PostgreSQL</productname> database cluster</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+  <cmdsynopsis>
+   <command>pg_verify_checksums</command>
+   <arg choice="opt"><replaceable class="parameter">option</replaceable></arg>
+   <arg choice="opt"><arg choice="opt"><option>-D</option></arg> <replaceable class="parameter">datadir</replaceable></arg>
+  </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1 id="r1-app-pg_verify_checksums-1">
+  <title>Description</title>
+  <para>
+   <command>pg_verify_checksums</command> verifies data checksums in a PostgreSQL
+   cluster. It must be run against a cluster that's offline.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Options</title>
+
+   <para>
+    The following command-line options are available:
+
+    <variablelist>
+
+     <varlistentry>
+      <term><option>-r <replaceable>relfilenode</replaceable></option></term>
+      <listitem>
+       <para>
+        Only validate checksums in the relation with specified relfilenode.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-f</option></term>
+      <listitem>
+       <para>
+        Force check even if checksums are disabled on cluster.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-d</option></term>
+      <listitem>
+       <para>
+        Enable debug output. Lists all checked blocks and their checksum.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-V</option></term>
+       <term><option>--version</option></term>
+       <listitem>
+       <para>
+       Print the <application>pg_verify_checksums</application> version and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-?</option></term>
+      <term><option>--help</option></term>
+       <listitem>
+        <para>
+         Show help about <application>pg_verify_checksums</application> command line
+         arguments, and exit.
+        </para>
+       </listitem>
+      </varlistentry>
+    </variablelist>
+   </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Notes</title>
+  <para>
+    Can only be run when the server is offline.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>See Also</title>
+
+  <simplelist type="inline">
+   <member><xref linkend="checksums"/></member>
+  </simplelist>
+ </refsect1>
+
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index ef2270c467..78c214f1b0 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -284,6 +284,7 @@
    &pgtestfsync;
    &pgtesttiming;
    &pgupgrade;
+   &pgVerifyChecksums;
    &pgwaldump;
    &postgres;
    &postmaster;
diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml
index f4bc2d4161..123638bc3f 100644
--- a/doc/src/sgml/wal.sgml
+++ b/doc/src/sgml/wal.sgml
@@ -230,6 +230,99 @@
   </para>
  </sect1>
 
+ <sect1 id="checksums">
+  <title>Data checksums</title>
+  <indexterm>
+   <primary>checksums</primary>
+  </indexterm>
+
+  <para>
+   Data pages are not checksum protected by default, but this can optionally be enabled for a cluster.
+   When enabled, each data page will be assigned a checksum that is updated when the page is
+   written and verified every time the page is read. Only data pages are protected by checksums,
+   internal data structures and temporary files are not.
+  </para>
+
+  <para>
+   Checksums are normally enabled when the cluster is initialized using
+   <link linkend="app-initdb-data-checksums"><application>initdb</application></link>. They
+   can also be enabled or disabled at runtime. In all cases, checksums are enabled or disabled
+   at the full cluster level, and cannot be specified individually for databases or tables.
+  </para>
+
+  <para>
+   The current state of checksums in the cluster can be verified by viewing the value
+   of the read-only configuration variable <xref linkend="guc-data-checksums" /> by
+   issuing the command <command>SHOW data_checksums</command>.
+  </para>
+
+  <para>
+   When attempting to recover from corrupt data it may be necessarily to bypass the checksum
+   protection in order to recover data. To do this, temporarily set the configuration parameter
+   <xref linkend="guc-ignore-checksum-failure" />.
+  </para>
+
+  <sect2 id="checksums-enable-disable">
+   <title>On-line enabling of checksums</title>
+
+   <para>
+    Checksums can be enabled or disabled online, by calling the appropriate
+    <link linkend="functions-admin-checksum">functions</link>.
+    Disabling of checksums takes effect immediately when the function is called.
+   </para>
+
+   <para>
+    Enabling checksums will put the cluster in <literal>inprogress</literal> mode.
+    During this time, checksums will be written but not verified. In addition to
+    this, a background worker process is started that enables checksums on all
+    existing data in the cluster. Once this worker has completed processing all
+    databases in the cluster, the checksum mode will automatically switch to
+    <literal>on</literal>.
+   </para>
+
+   <para>
+    The process will initially wait for all open transactions to finish before
+    it starts, so that it can be certain that there are no tables that have been
+    created inside a transaction that has not committed yet and thus would not
+    be visible to the process enabling checksums. It will also, for each database,
+    wait for all pre-existing temporary tables to get removed before it finishes.
+    If long-lived temporary tables are used in the application it may be necessary
+    to terminate these application connections to allow the checksummer process
+    to complete.
+   </para>
+
+   <para>
+    If the cluster is stopped while in <literal>inprogress</literal> mode, for
+    any reason, then this process must be restarted manually. To do this,
+    re-execute the function <function>pg_enable_data_checksums()</function>
+    once the cluster has been restarted.
+   </para>
+
+   <note>
+    <para>
+     Enabling checksums can cause significant I/O to the system, as most of the
+     database pages will need to be rewritten, and will be written both to the
+     data files and the WAL.
+    </para>
+   </note>
+
+   <para>
+    Since checksums are clusterwide, all databases will get checksums added.
+    It is thus important that the worker is allowed to connect to all databases
+    for the process to succeed, see the <xref linkend="sql-alterdatabase"/>
+    command for how to allow connections.
+   </para>
+
+   <note>
+    <para>
+     <literal>template0</literal> is by default not accepting connections, to
+     enable checksums you'll need to temporarily make it accept connections.
+    </para>
+   </note>
+
+  </sect2>
+ </sect1>
+
   <sect1 id="wal-intro">
    <title>Write-Ahead Logging (<acronym>WAL</acronym>)</title>
 
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 00741c7b09..a31f8b806a 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -17,6 +17,7 @@
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "catalog/pg_control.h"
+#include "storage/bufpage.h"
 #include "utils/guc.h"
 #include "utils/timestamp.h"
 
@@ -137,6 +138,18 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec.ThisTimeLineID, xlrec.PrevTimeLineID,
 						 timestamptz_to_str(xlrec.end_time));
 	}
+	else if (info == XLOG_CHECKSUMS)
+	{
+		xl_checksum_state xlrec;
+
+		memcpy(&xlrec, rec, sizeof(xl_checksum_state));
+		if (xlrec.new_checksumtype == PG_DATA_CHECKSUM_VERSION)
+			appendStringInfo(buf, "on");
+		else if (xlrec.new_checksumtype == PG_DATA_CHECKSUM_INPROGRESS_VERSION)
+			appendStringInfo(buf, "inprogress");
+		else
+			appendStringInfo(buf, "off");
+	}
 }
 
 const char *
@@ -182,6 +195,9 @@ xlog_identify(uint8 info)
 		case XLOG_FPI_FOR_HINT:
 			id = "FPI_FOR_HINT";
 			break;
+		case XLOG_CHECKSUMS:
+			id = "CHECKSUMS";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b4fd8395b7..813b2afaac 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -856,6 +856,7 @@ static void SetLatestXTime(TimestampTz xtime);
 static void SetCurrentChunkStartTime(TimestampTz xtime);
 static void CheckRequiredParameterValues(void);
 static void XLogReportParameters(void);
+static void XlogChecksums(ChecksumType new_type);
 static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 					TimeLineID prevTLI);
 static void LocalSetXLogInsertAllowed(void);
@@ -1033,7 +1034,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		Assert(RedoRecPtr < Insert->RedoRecPtr);
 		RedoRecPtr = Insert->RedoRecPtr;
 	}
-	doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
+	doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites || DataChecksumsInProgress());
 
 	if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr && doPageWrites)
 	{
@@ -4673,10 +4674,6 @@ ReadControlFile(void)
 		(SizeOfXLogLongPHD - SizeOfXLogShortPHD);
 
 	CalculateCheckpointSegments();
-
-	/* Make the initdb settings visible as GUC variables, too */
-	SetConfigOption("data_checksums", DataChecksumsEnabled() ? "yes" : "no",
-					PGC_INTERNAL, PGC_S_OVERRIDE);
 }
 
 void
@@ -4748,12 +4745,90 @@ GetMockAuthenticationNonce(void)
  * Are checksums enabled for data pages?
  */
 bool
-DataChecksumsEnabled(void)
+DataChecksumsNeedWrite(void)
 {
 	Assert(ControlFile != NULL);
 	return (ControlFile->data_checksum_version > 0);
 }
 
+bool
+DataChecksumsNeedVerify(void)
+{
+	Assert(ControlFile != NULL);
+
+	/*
+	 * Only verify checksums if they are fully enabled in the cluster. In
+	 * inprogress state they are only updated, not verified.
+	 */
+	return (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_VERSION);
+}
+
+bool
+DataChecksumsInProgress(void)
+{
+	Assert(ControlFile != NULL);
+	return (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_INPROGRESS_VERSION);
+}
+
+void
+SetDataChecksumsInProgress(void)
+{
+	Assert(ControlFile != NULL);
+	if (ControlFile->data_checksum_version > 0)
+		return;
+
+	XlogChecksums(PG_DATA_CHECKSUM_INPROGRESS_VERSION);
+
+	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+	ControlFile->data_checksum_version = PG_DATA_CHECKSUM_INPROGRESS_VERSION;
+	UpdateControlFile();
+	LWLockRelease(ControlFileLock);
+}
+
+void
+SetDataChecksumsOn(void)
+{
+	Assert(ControlFile != NULL);
+
+	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+
+	if (ControlFile->data_checksum_version != PG_DATA_CHECKSUM_INPROGRESS_VERSION)
+	{
+		LWLockRelease(ControlFileLock);
+		elog(ERROR, "Checksums not in inprogress mode");
+	}
+
+	ControlFile->data_checksum_version = PG_DATA_CHECKSUM_VERSION;
+	UpdateControlFile();
+	LWLockRelease(ControlFileLock);
+
+	XlogChecksums(PG_DATA_CHECKSUM_VERSION);
+}
+
+void
+SetDataChecksumsOff(void)
+{
+	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+
+	ControlFile->data_checksum_version = 0;
+	UpdateControlFile();
+	LWLockRelease(ControlFileLock);
+
+	XlogChecksums(0);
+}
+
+/* guc hook */
+const char *
+show_data_checksums(void)
+{
+	if (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_VERSION)
+		return "on";
+	else if (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_INPROGRESS_VERSION)
+		return "inprogress";
+	else
+		return "off";
+}
+
 /*
  * Returns a fake LSN for unlogged relations.
  *
@@ -7789,6 +7864,16 @@ StartupXLOG(void)
 	CompleteCommitTsInitialization();
 
 	/*
+	 * If we reach this point with checksums in inprogress state, we notify
+	 * the user that they need to manually restart the process to enable
+	 * checksums.
+	 */
+	if (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_INPROGRESS_VERSION)
+		ereport(WARNING,
+				(errmsg("checksum state is \"inprogress\" with no worker"),
+				 errhint("Either disable or enable checksums by calling the pg_disable_data_checksums() or pg_enable_data_checksums() functions.")));
+
+	/*
 	 * All done with end-of-recovery actions.
 	 *
 	 * Now allow backends to write WAL and update the control file status in
@@ -9542,6 +9627,22 @@ XLogReportParameters(void)
 }
 
 /*
+ * Log the new state of checksums
+ */
+static void
+XlogChecksums(ChecksumType new_type)
+{
+	xl_checksum_state xlrec;
+
+	xlrec.new_checksumtype = new_type;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xl_checksum_state));
+
+	XLogInsert(RM_XLOG_ID, XLOG_CHECKSUMS);
+}
+
+/*
  * Update full_page_writes in shared memory, and write an
  * XLOG_FPW_CHANGE record if necessary.
  *
@@ -9969,6 +10070,17 @@ xlog_redo(XLogReaderState *record)
 		/* Keep track of full_page_writes */
 		lastFullPageWrites = fpw;
 	}
+	else if (info == XLOG_CHECKSUMS)
+	{
+		xl_checksum_state state;
+
+		memcpy(&state, XLogRecGetData(record), sizeof(xl_checksum_state));
+
+		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+		ControlFile->data_checksum_version = state.new_checksumtype;
+		UpdateControlFile();
+		LWLockRelease(ControlFileLock);
+	}
 }
 
 #ifdef WAL_DEBUG
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index 316edbe3c5..67b9cd8127 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -24,6 +24,7 @@
 #include "catalog/pg_type.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "postmaster/checksumhelper.h"
 #include "replication/walreceiver.h"
 #include "storage/smgr.h"
 #include "utils/builtins.h"
@@ -698,3 +699,50 @@ pg_backup_start_time(PG_FUNCTION_ARGS)
 
 	PG_RETURN_DATUM(xtime);
 }
+
+Datum
+disable_data_checksums(PG_FUNCTION_ARGS)
+{
+	/*
+	 * If we don't need to write new checksums, then clearly they are already
+	 * disabled.
+	 */
+	if (!DataChecksumsNeedWrite())
+		ereport(ERROR,
+				(errmsg("data checksums already disabled")));
+
+	ShutdownChecksumHelperIfRunning();
+
+	SetDataChecksumsOff();
+
+	PG_RETURN_VOID();
+}
+
+Datum
+enable_data_checksums(PG_FUNCTION_ARGS)
+{
+	int			cost_delay = PG_GETARG_INT32(0);
+	int			cost_limit = PG_GETARG_INT32(1);
+
+	if (cost_delay < 0)
+		ereport(ERROR,
+				(errmsg("cost delay cannot be less than zero")));
+	if (cost_limit <= 0)
+		ereport(ERROR,
+				(errmsg("cost limit must be a positive value")));
+
+	/*
+	 * Allow state change from "off" or from "inprogress", since this is how
+	 * we restart the worker if necessary.
+	 */
+	if (DataChecksumsNeedVerify())
+		ereport(ERROR,
+				(errmsg("data checksums already enabled")));
+
+	SetDataChecksumsInProgress();
+	if (!StartChecksumHelperLauncher(cost_delay, cost_limit))
+		ereport(ERROR,
+				(errmsg("failed to start checksum helper process")));
+
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index e9e188682f..5d567d0cf9 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1027,6 +1027,11 @@ CREATE OR REPLACE FUNCTION pg_stop_backup (
   RETURNS SETOF record STRICT VOLATILE LANGUAGE internal as 'pg_stop_backup_v2'
   PARALLEL RESTRICTED;
 
+CREATE OR REPLACE FUNCTION pg_enable_data_checksums (
+        cost_delay int DEFAULT 0, cost_limit int DEFAULT 100)
+  RETURNS void STRICT VOLATILE LANGUAGE internal AS 'enable_data_checksums'
+  PARALLEL RESTRICTED;
+
 -- legacy definition for compatibility with 9.3
 CREATE OR REPLACE FUNCTION
   json_populate_record(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 71c23211b2..ee8f8c1cd3 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -12,7 +12,8 @@ subdir = src/backend/postmaster
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \
-	pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o
+OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o checksumhelper.o \
+	fork_process.o pgarch.o pgstat.o postmaster.o startup.o syslogger.o \
+	walwriter.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index f651bb49b1..19529d77ad 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -20,6 +20,7 @@
 #include "pgstat.h"
 #include "port/atomics.h"
 #include "postmaster/bgworker_internals.h"
+#include "postmaster/checksumhelper.h"
 #include "postmaster/postmaster.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
@@ -129,6 +130,12 @@ static const struct
 	},
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
+	},
+	{
+		"ChecksumHelperLauncherMain", ChecksumHelperLauncherMain
+	},
+	{
+		"ChecksumHelperWorkerMain", ChecksumHelperWorkerMain
 	}
 };
 
diff --git a/src/backend/postmaster/checksumhelper.c b/src/backend/postmaster/checksumhelper.c
new file mode 100644
index 0000000000..84a8cc865b
--- /dev/null
+++ b/src/backend/postmaster/checksumhelper.c
@@ -0,0 +1,881 @@
+/*-------------------------------------------------------------------------
+ *
+ * checksumhelper.c
+ *	  Background worker to walk the database and write checksums to pages
+ *
+ * When enabling data checksums on a database at initdb time, no extra process
+ * is required as each page is checksummed, and verified, at accesses.  When
+ * enabling checksums on an already running cluster, which was not initialized
+ * with checksums, this helper worker will ensure that all pages are
+ * checksummed before verification of the checksums is turned on.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/postmaster/checksumhelper.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+#include "catalog/pg_database.h"
+#include "commands/vacuum.h"
+#include "common/relpath.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/bgwriter.h"
+#include "postmaster/checksumhelper.h"
+#include "storage/bufmgr.h"
+#include "storage/checksum.h"
+#include "storage/lmgr.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "storage/smgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/lsyscache.h"
+#include "utils/ps_status.h"
+
+/*
+ * Maximum number of times to try enabling checksums in a specific database
+ * before giving up.
+ */
+#define MAX_ATTEMPTS 4
+
+typedef enum
+{
+	SUCCESSFUL = 0,
+	ABORTED,
+	FAILED
+}			ChecksumHelperResult;
+
+typedef struct ChecksumHelperShmemStruct
+{
+	pg_atomic_flag launcher_started;
+	ChecksumHelperResult success;
+	bool		process_shared_catalogs;
+	bool		abort;
+	/* Parameter values set on start */
+	int			cost_delay;
+	int			cost_limit;
+}			ChecksumHelperShmemStruct;
+
+/* Shared memory segment for checksumhelper */
+static ChecksumHelperShmemStruct * ChecksumHelperShmem;
+
+/* Bookkeeping for work to do */
+typedef struct ChecksumHelperDatabase
+{
+	Oid			dboid;
+	char	   *dbname;
+	int			attempts;
+}			ChecksumHelperDatabase;
+
+typedef struct ChecksumHelperRelation
+{
+	Oid			reloid;
+	char		relkind;
+}			ChecksumHelperRelation;
+
+/* Prototypes */
+static List *BuildDatabaseList(void);
+static List *BuildRelationList(bool include_shared);
+static List *BuildTempTableList(void);
+static ChecksumHelperResult ProcessDatabase(ChecksumHelperDatabase * db);
+
+/*
+ * Main entry point for checksumhelper launcher process.
+ */
+bool
+StartChecksumHelperLauncher(int cost_delay, int cost_limit)
+{
+	BackgroundWorker bgw;
+	BackgroundWorkerHandle *bgw_handle;
+	HeapTuple	tup;
+	Relation	rel;
+	HeapScanDesc scan;
+
+	if (ChecksumHelperShmem->abort)
+	{
+		ereport(ERROR,
+				(errmsg("could not start checksumhelper: has been cancelled")));
+	}
+
+	if (!pg_atomic_test_set_flag(&ChecksumHelperShmem->launcher_started))
+	{
+		/* Failed to set means somebody else started */
+		ereport(ERROR,
+				(errmsg("could not start checksumhelper: already running")));
+	}
+
+	/*
+	 * Check that all databases allow connections.  This will be re-checked
+	 * when we build the list of databases to work on, the point of
+	 * duplicating this is to catch any databases we won't be able to open
+	 * while we can still send an error message to the client.
+	 */
+	rel = heap_open(DatabaseRelationId, AccessShareLock);
+	scan = heap_beginscan_catalog(rel, 0, NULL);
+
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_database pgdb = (Form_pg_database) GETSTRUCT(tup);
+
+		if (!pgdb->datallowconn)
+			ereport(ERROR,
+					(errmsg("database \"%s\" does not allow connections",
+							NameStr(pgdb->datname)),
+					 errhint("Allow connections using ALTER DATABASE and try again.")));
+	}
+
+	heap_endscan(scan);
+	heap_close(rel, AccessShareLock);
+
+	ChecksumHelperShmem->cost_delay = cost_delay;
+	ChecksumHelperShmem->cost_limit = cost_limit;
+
+	memset(&bgw, 0, sizeof(bgw));
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ChecksumHelperLauncherMain");
+	snprintf(bgw.bgw_name, BGW_MAXLEN, "checksumhelper launcher");
+	snprintf(bgw.bgw_type, BGW_MAXLEN, "checksumhelper launcher");
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+	bgw.bgw_notify_pid = MyProcPid;
+	bgw.bgw_main_arg = (Datum) 0;
+
+	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
+	{
+		pg_atomic_clear_flag(&ChecksumHelperShmem->launcher_started);
+		return false;
+	}
+
+	return true;
+}
+
+void
+ShutdownChecksumHelperIfRunning(void)
+{
+	/* If the launcher isn't started, there is nothing to shut down */
+	if (pg_atomic_unlocked_test_flag(&ChecksumHelperShmem->launcher_started))
+		return;
+
+	/*
+	 * We don't need an atomic variable for aborting, setting it multiple
+	 * times will not change the handling.
+	 */
+	ChecksumHelperShmem->abort = true;
+}
+
+/*
+ * Enable checksums in a single relation/fork.
+ * Returns true if successful, and false if *aborted*. On error, an actual error
+ * is raised in the lower levels.
+ */
+static bool
+ProcessSingleRelationFork(Relation reln, ForkNumber forkNum, BufferAccessStrategy strategy)
+{
+	BlockNumber numblocks = RelationGetNumberOfBlocksInFork(reln, forkNum);
+	BlockNumber b;
+	char		activity[NAMEDATALEN * 2 + 128];
+
+	for (b = 0; b < numblocks; b++)
+	{
+		Buffer		buf = ReadBufferExtended(reln, forkNum, b, RBM_NORMAL, strategy);
+
+		/*
+		 * Report to pgstat every 100 blocks (so as not to "spam")
+		 */
+		if ((b % 100) == 0)
+		{
+			snprintf(activity, sizeof(activity) - 1, "processing: %s.%s (%s block %d/%d)",
+					 get_namespace_name(RelationGetNamespace(reln)), RelationGetRelationName(reln),
+					 forkNames[forkNum], b, numblocks);
+			pgstat_report_activity(STATE_RUNNING, activity);
+		}
+
+		/* Need to get an exclusive lock before we can flag as dirty */
+		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+		/*
+		 * Mark the buffer as dirty and force a full page write.  We have to
+		 * re-write the page to wal even if the checksum hasn't changed,
+		 * because if there is a replica it might have a slightly different
+		 * version of the page with an invalid checksum, caused by unlogged
+		 * changes (e.g. hintbits) on the master happening while checksums
+		 * were off. This can happen if there was a valid checksum on the page
+		 * at one point in the past, so only when checksums are first on, then
+		 * off, and then turned on again.
+		 */
+		START_CRIT_SECTION();
+		MarkBufferDirty(buf);
+		log_newpage_buffer(buf, false);
+		END_CRIT_SECTION();
+
+		UnlockReleaseBuffer(buf);
+
+		/*
+		 * This is the only place where we check if we are asked to abort, the
+		 * abortion will bubble up from here.
+		 */
+		if (ChecksumHelperShmem->abort)
+			return false;
+
+		vacuum_delay_point();
+	}
+
+	return true;
+}
+
+/*
+ * Process a single relation based on oid.
+ * Returns true if successful, and false if *aborted*. On error, an actual error
+ * is raised in the lower levels.
+ */
+static bool
+ProcessSingleRelationByOid(Oid relationId, BufferAccessStrategy strategy)
+{
+	Relation	rel;
+	ForkNumber	fnum;
+	bool		aborted = false;
+
+	StartTransactionCommand();
+
+	elog(DEBUG2, "Checksumhelper starting to process relation %d", relationId);
+	rel = try_relation_open(relationId, AccessShareLock);
+	if (rel == NULL)
+	{
+		/*
+		 * Relation no longer exist. We consider this a success, since there are no
+		 * pages in it that need checksums, and thus return true.
+		 */
+		elog(DEBUG1, "Checksumhelper skipping relation %d as it no longer exists", relationId);
+		CommitTransactionCommand();
+		pgstat_report_activity(STATE_IDLE, NULL);
+		return true;
+	}
+	RelationOpenSmgr(rel);
+
+	for (fnum = 0; fnum <= MAX_FORKNUM; fnum++)
+	{
+		if (smgrexists(rel->rd_smgr, fnum))
+		{
+			if (!ProcessSingleRelationFork(rel, fnum, strategy))
+			{
+				aborted = true;
+				break;
+			}
+		}
+	}
+	relation_close(rel, AccessShareLock);
+	elog(DEBUG2, "Checksumhelper done with relation %d: %s",
+		 relationId, (aborted ? "aborted" : "finished"));
+
+	CommitTransactionCommand();
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+
+	return !aborted;
+}
+
+/*
+ * ProcessDatabase
+ *		Enable checksums in a single database.
+ *
+ * We do this by launching a dynamic background worker into this database, and
+ * waiting for it to finish.  We have to do this in a separate worker, since
+ * each process can only be connected to one database during its lifetime.
+ */
+static ChecksumHelperResult
+ProcessDatabase(ChecksumHelperDatabase * db)
+{
+	BackgroundWorker bgw;
+	BackgroundWorkerHandle *bgw_handle;
+	BgwHandleStatus status;
+	pid_t		pid;
+	char		activity[NAMEDATALEN + 64];
+
+	ChecksumHelperShmem->success = FAILED;
+
+	memset(&bgw, 0, sizeof(bgw));
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ChecksumHelperWorkerMain");
+	snprintf(bgw.bgw_name, BGW_MAXLEN, "checksumhelper worker");
+	snprintf(bgw.bgw_type, BGW_MAXLEN, "checksumhelper worker");
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+	bgw.bgw_notify_pid = MyProcPid;
+	bgw.bgw_main_arg = ObjectIdGetDatum(db->dboid);
+
+	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
+	{
+		ereport(LOG,
+				(errmsg("failed to start worker for checksumhelper in \"%s\"",
+						db->dbname)));
+		return FAILED;
+	}
+
+	status = WaitForBackgroundWorkerStartup(bgw_handle, &pid);
+	if (status != BGWH_STARTED)
+	{
+		ereport(LOG,
+				(errmsg("failed to wait for worker startup for checksumhelper in \"%s\"",
+						db->dbname)));
+		return FAILED;
+	}
+
+	ereport(DEBUG1,
+			(errmsg("started background worker for checksums in \"%s\"",
+					db->dbname)));
+
+	snprintf(activity, sizeof(activity) - 1,
+			 "Waiting for worker in database %s (pid %d)", db->dbname, pid);
+	pgstat_report_activity(STATE_RUNNING, activity);
+
+
+	status = WaitForBackgroundWorkerShutdown(bgw_handle);
+	if (status != BGWH_STOPPED)
+	{
+		ereport(LOG,
+				(errmsg("failed to wait for worker shutdown for checksumhelper in \"%s\"",
+						db->dbname)));
+		return FAILED;
+	}
+
+	if (ChecksumHelperShmem->success == ABORTED)
+		ereport(LOG,
+				(errmsg("checksumhelper was aborted during processing in \"%s\"",
+						db->dbname)));
+
+	ereport(DEBUG1,
+			(errmsg("background worker for checksums in \"%s\" completed",
+					db->dbname)));
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+
+	return ChecksumHelperShmem->success;
+}
+
+static void
+launcher_exit(int code, Datum arg)
+{
+	ChecksumHelperShmem->abort = false;
+	pg_atomic_clear_flag(&ChecksumHelperShmem->launcher_started);
+}
+
+static void
+WaitForAllTransactionsToFinish(void)
+{
+	TransactionId waitforxid;
+
+	LWLockAcquire(XidGenLock, LW_SHARED);
+	waitforxid = ShmemVariableCache->nextXid;
+	LWLockRelease(XidGenLock);
+
+	while (true)
+	{
+		TransactionId oldestxid = GetOldestActiveTransactionId();
+
+		elog(DEBUG1, "Checking old transactions");
+		if (TransactionIdPrecedes(oldestxid, waitforxid))
+		{
+			char activity[64];
+
+			/* Oldest running xid is older than us, so wait */
+			snprintf(activity, sizeof(activity), "Waiting for current transactions to finish (waiting for %d)", waitforxid);
+			pgstat_report_activity(STATE_RUNNING, activity);
+
+			/* Retry every 5 seconds */
+			ResetLatch(MyLatch);
+			(void) WaitLatch(MyLatch,
+							 WL_LATCH_SET | WL_TIMEOUT,
+							 5000,
+							 WAIT_EVENT_PG_SLEEP);
+		}
+		else
+		{
+			pgstat_report_activity(STATE_IDLE, NULL);
+			return;
+		}
+	}
+}
+
+void
+ChecksumHelperLauncherMain(Datum arg)
+{
+	List	   *DatabaseList;
+
+	on_shmem_exit(launcher_exit, 0);
+
+	ereport(DEBUG1,
+			(errmsg("checksumhelper launcher started")));
+
+	pqsignal(SIGTERM, die);
+
+	BackgroundWorkerUnblockSignals();
+
+	init_ps_display(pgstat_get_backend_desc(B_CHECKSUMHELPER_LAUNCHER), "", "", "");
+
+	/*
+	 * Initialize a connection to shared catalogs only.
+	 */
+	BackgroundWorkerInitializeConnection(NULL, NULL);
+
+	/*
+	 * Set up so first run processes shared catalogs, but not once in every
+	 * db.
+	 */
+	ChecksumHelperShmem->process_shared_catalogs = true;
+
+	/*
+	 * Wait for all existing transactions to finish. This will make sure that
+	 * we can see all tables all databases, so we don't miss any.
+	 * Anything created after this point is known to have checksums on
+	 * all pages already, so we don't have to care about those.
+	 */
+	WaitForAllTransactionsToFinish();
+
+	/*
+	 * Create a database list.  We don't need to concern ourselves with
+	 * rebuilding this list during runtime since any database created after
+	 * this process started will be running with checksums turned on from the
+	 * start.
+	 */
+	DatabaseList = BuildDatabaseList();
+
+	/*
+	 * If there are no databases at all to checksum, we can exit immediately
+	 * as there is no work to do.
+	 */
+	if (DatabaseList == NIL || list_length(DatabaseList) == 0)
+		return;
+
+	while (true)
+	{
+		List	   *remaining = NIL;
+		ListCell   *lc,
+				   *lc2;
+		List	   *CurrentDatabases = NIL;
+
+		foreach(lc, DatabaseList)
+		{
+			ChecksumHelperDatabase *db = (ChecksumHelperDatabase *) lfirst(lc);
+			ChecksumHelperResult processing;
+
+			processing = ProcessDatabase(db);
+
+			if (processing == SUCCESSFUL)
+			{
+				pfree(db->dbname);
+				pfree(db);
+
+				if (ChecksumHelperShmem->process_shared_catalogs)
+
+					/*
+					 * Now that one database has completed shared catalogs, we
+					 * don't have to process them again.
+					 */
+					ChecksumHelperShmem->process_shared_catalogs = false;
+			}
+			else if (processing == FAILED)
+			{
+				/*
+				 * Put failed databases on the remaining list.
+				 */
+				remaining = lappend(remaining, db);
+			}
+			else
+				/* aborted */
+				return;
+		}
+		list_free(DatabaseList);
+
+		DatabaseList = remaining;
+		remaining = NIL;
+
+		/*
+		 * DatabaseList now has all databases not yet processed. This can be
+		 * because they failed for some reason, or because the database was
+		 * dropped between us getting the database list and trying to process
+		 * it. Get a fresh list of databases to detect the second case where
+		 * the database was dropped before we had started processing it. Any
+		 * database that still exists but where enabling checksums failed, is
+		 * retried for a limited number of times before giving up. Any
+		 * database that remains in failed state after the retries expire will
+		 * fail the entire operation.
+		 */
+		CurrentDatabases = BuildDatabaseList();
+
+		foreach(lc, DatabaseList)
+		{
+			ChecksumHelperDatabase *db = (ChecksumHelperDatabase *) lfirst(lc);
+			bool		found = false;
+
+			foreach(lc2, CurrentDatabases)
+			{
+				ChecksumHelperDatabase *db2 = (ChecksumHelperDatabase *) lfirst(lc2);
+
+				if (db->dboid == db2->dboid)
+				{
+					/* Database still exists, time to give up? */
+					if (++db->attempts > MAX_ATTEMPTS)
+					{
+						/* Disable checksums on cluster, because we failed */
+						SetDataChecksumsOff();
+
+						ereport(ERROR,
+								(errmsg("failed to enable checksums in \"%s\", giving up.",
+										db->dbname)));
+					}
+					else
+						/* Try again with this db */
+						remaining = lappend(remaining, db);
+					found = true;
+					break;
+				}
+			}
+			if (!found)
+			{
+				ereport(LOG,
+						(errmsg("database \"%s\" has been dropped, skipping",
+								db->dbname)));
+				pfree(db->dbname);
+				pfree(db);
+			}
+		}
+
+		/* Free the extra list of databases */
+		foreach(lc, CurrentDatabases)
+		{
+			ChecksumHelperDatabase *db = (ChecksumHelperDatabase *) lfirst(lc);
+
+			pfree(db->dbname);
+			pfree(db);
+		}
+		list_free(CurrentDatabases);
+
+		/* All databases processed yet? */
+		if (remaining == NIL || list_length(remaining) == 0)
+			break;
+
+		DatabaseList = remaining;
+	}
+
+	/*
+	 * Force a checkpoint to get everything out to disk.
+	 */
+	RequestCheckpoint(CHECKPOINT_FORCE | CHECKPOINT_WAIT | CHECKPOINT_IMMEDIATE);
+
+	/*
+	 * Everything has been processed, so flag checksums enabled.
+	 */
+	SetDataChecksumsOn();
+
+	ereport(LOG,
+			(errmsg("checksums enabled, checksumhelper launcher shutting down")));
+}
+
+/*
+ * ChecksumHelperShmemSize
+ *		Compute required space for checksumhelper-related shared memory
+ */
+Size
+ChecksumHelperShmemSize(void)
+{
+	Size		size;
+
+	size = sizeof(ChecksumHelperShmemStruct);
+	size = MAXALIGN(size);
+
+	return size;
+}
+
+/*
+ * ChecksumHelperShmemInit
+ *		Allocate and initialize checksumhelper-related shared memory
+ */
+void
+ChecksumHelperShmemInit(void)
+{
+	bool		found;
+
+	ChecksumHelperShmem = (ChecksumHelperShmemStruct *)
+		ShmemInitStruct("ChecksumHelper Data",
+						ChecksumHelperShmemSize(),
+						&found);
+
+	pg_atomic_init_flag(&ChecksumHelperShmem->launcher_started);
+}
+
+/*
+ * BuildDatabaseList
+ *		Compile a list of all currently available databases in the cluster
+ *
+ * This creates the list of databases for the checksumhelper workers to add
+ * checksums to.
+ */
+static List *
+BuildDatabaseList(void)
+{
+	List	   *DatabaseList = NIL;
+	Relation	rel;
+	HeapScanDesc scan;
+	HeapTuple	tup;
+	MemoryContext ctx = CurrentMemoryContext;
+	MemoryContext oldctx;
+
+	StartTransactionCommand();
+
+	rel = heap_open(DatabaseRelationId, AccessShareLock);
+	scan = heap_beginscan_catalog(rel, 0, NULL);
+
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_database pgdb = (Form_pg_database) GETSTRUCT(tup);
+		ChecksumHelperDatabase *db;
+
+		if (!pgdb->datallowconn)
+			ereport(ERROR,
+					(errmsg("database \"%s\" does not allow connections",
+							NameStr(pgdb->datname)),
+					 errhint("Allow connections using ALTER DATABASE and try again.")));
+
+		oldctx = MemoryContextSwitchTo(ctx);
+
+		db = (ChecksumHelperDatabase *) palloc(sizeof(ChecksumHelperDatabase));
+
+		db->dboid = HeapTupleGetOid(tup);
+		db->dbname = pstrdup(NameStr(pgdb->datname));
+
+		DatabaseList = lappend(DatabaseList, db);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+
+	heap_endscan(scan);
+	heap_close(rel, AccessShareLock);
+
+	CommitTransactionCommand();
+
+	return DatabaseList;
+}
+
+/*
+ * BuildRelationList
+ *		Compile a list of all relations in the database
+ *
+ * If shared is true, both shared relations and local ones are returned, else
+ * all non-shared relations are returned.
+ * Temp tables are not included.
+ */
+static List *
+BuildRelationList(bool include_shared)
+{
+	List	   *RelationList = NIL;
+	Relation	rel;
+	HeapScanDesc scan;
+	HeapTuple	tup;
+	MemoryContext ctx = CurrentMemoryContext;
+	MemoryContext oldctx;
+
+	StartTransactionCommand();
+
+	rel = heap_open(RelationRelationId, AccessShareLock);
+	scan = heap_beginscan_catalog(rel, 0, NULL);
+
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_class pgc = (Form_pg_class) GETSTRUCT(tup);
+		ChecksumHelperRelation *relentry;
+
+		if (pgc->relpersistence == 't')
+			continue;
+
+		if (pgc->relisshared && !include_shared)
+			continue;
+
+		/*
+		 * Foreign tables have by definition no local storage that can be
+		 * checksummed, so skip.
+		 */
+		if (pgc->relkind == RELKIND_FOREIGN_TABLE)
+			continue;
+
+		oldctx = MemoryContextSwitchTo(ctx);
+		relentry = (ChecksumHelperRelation *) palloc(sizeof(ChecksumHelperRelation));
+
+		relentry->reloid = HeapTupleGetOid(tup);
+		relentry->relkind = pgc->relkind;
+
+		RelationList = lappend(RelationList, relentry);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+
+	heap_endscan(scan);
+	heap_close(rel, AccessShareLock);
+
+	CommitTransactionCommand();
+
+	return RelationList;
+}
+
+/*
+ * BuildTempTableList
+ *		Compile a list of all temporary tables in database
+ *
+ * Returns a List of oids.
+ */
+static List *
+BuildTempTableList(void)
+{
+	List	   *RelationList = NIL;
+	Relation	rel;
+	HeapScanDesc scan;
+	HeapTuple	tup;
+	MemoryContext ctx = CurrentMemoryContext;
+	MemoryContext oldctx;
+
+	StartTransactionCommand();
+
+	rel = heap_open(RelationRelationId, AccessShareLock);
+	scan = heap_beginscan_catalog(rel, 0, NULL);
+
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_class pgc = (Form_pg_class) GETSTRUCT(tup);
+
+		if (pgc->relpersistence != 't')
+			continue;
+
+		oldctx = MemoryContextSwitchTo(ctx);
+		RelationList = lappend_oid(RelationList, HeapTupleGetOid(tup));
+		MemoryContextSwitchTo(oldctx);
+	}
+
+	heap_endscan(scan);
+	heap_close(rel, AccessShareLock);
+
+	CommitTransactionCommand();
+
+	return RelationList;
+}
+
+/*
+ * Main function for enabling checksums in a single database
+ */
+void
+ChecksumHelperWorkerMain(Datum arg)
+{
+	Oid			dboid = DatumGetObjectId(arg);
+	List	   *RelationList = NIL;
+	List	   *InitialTempTableList = NIL;
+	ListCell   *lc;
+	BufferAccessStrategy strategy;
+	bool		aborted = false;
+
+	pqsignal(SIGTERM, die);
+
+	BackgroundWorkerUnblockSignals();
+
+	init_ps_display(pgstat_get_backend_desc(B_CHECKSUMHELPER_WORKER), "", "", "");
+
+	ereport(DEBUG1,
+			(errmsg("checksum worker starting for database oid %d", dboid)));
+
+	BackgroundWorkerInitializeConnectionByOid(dboid, InvalidOid);
+
+	/*
+	 * Get a list of all temp tables present as we start in this database.
+	 * We need to wait until they are all gone until we are done, since
+	 * we cannot access those files and modify them.
+	 */
+	InitialTempTableList = BuildTempTableList();
+
+	/*
+	 * Enable vacuum cost delay, if any.
+	 */
+	VacuumCostDelay = ChecksumHelperShmem->cost_delay;
+	VacuumCostLimit = ChecksumHelperShmem->cost_limit;
+	VacuumCostActive = (VacuumCostDelay > 0);
+	VacuumCostBalance = 0;
+	VacuumPageHit = 0;
+	VacuumPageMiss = 0;
+	VacuumPageDirty = 0;
+
+	/*
+	 * Create and set the vacuum strategy as our buffer strategy.
+	 */
+	strategy = GetAccessStrategy(BAS_VACUUM);
+
+	RelationList = BuildRelationList(ChecksumHelperShmem->process_shared_catalogs);
+	foreach(lc, RelationList)
+	{
+		ChecksumHelperRelation *rel = (ChecksumHelperRelation *) lfirst(lc);
+
+		if (!ProcessSingleRelationByOid(rel->reloid, strategy))
+		{
+			aborted = true;
+			break;
+		}
+	}
+	list_free_deep(RelationList);
+
+	if (aborted)
+	{
+		ChecksumHelperShmem->success = ABORTED;
+		ereport(DEBUG1,
+				(errmsg("checksum worker aborted in database oid %d", dboid)));
+		return;
+	}
+
+	/*
+	 * Wait for all temp tables that existed when we started to go away. This
+	 * is necessary since we cannot "reach" them to enable checksums.
+	 * Any temp tables created after we started will already have checksums
+	 * in them (due to the inprogress state), so those are safe.
+	 */
+	while (true)
+	{
+		List *CurrentTempTables;
+		ListCell *lc;
+		int numleft;
+		char activity[64];
+
+		CurrentTempTables = BuildTempTableList();
+		numleft = 0;
+		foreach(lc, InitialTempTableList)
+		{
+			if (list_member_oid(CurrentTempTables, lfirst_oid(lc)))
+				numleft++;
+		}
+		list_free(CurrentTempTables);
+
+		if (numleft == 0)
+			break;
+
+		/* At least one temp table left to wait for */
+		snprintf(activity, sizeof(activity), "Waiting for %d temp tables to be removed", numleft);
+		pgstat_report_activity(STATE_RUNNING, activity);
+
+		/* Retry every 5 seconds */
+		ResetLatch(MyLatch);
+		(void) WaitLatch(MyLatch,
+						 WL_LATCH_SET | WL_TIMEOUT,
+						 5000,
+						 WAIT_EVENT_PG_SLEEP);
+	}
+
+	list_free(InitialTempTableList);
+
+	ChecksumHelperShmem->success = SUCCESSFUL;
+	ereport(DEBUG1,
+			(errmsg("checksum worker completed in database oid %d", dboid)));
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 96ba216387..83328a2766 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4125,6 +4125,11 @@ pgstat_get_backend_desc(BackendType backendType)
 		case B_WAL_WRITER:
 			backendDesc = "walwriter";
 			break;
+		case B_CHECKSUMHELPER_LAUNCHER:
+			backendDesc = "checksumhelper launcher";
+			break;
+		case B_CHECKSUMHELPER_WORKER:
+			backendDesc = "checksumhelper worker";
 	}
 
 	return backendDesc;
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index c5b83232fd..5f26a03769 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -1377,7 +1377,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 
 	_tarWriteHeader(tarfilename, NULL, statbuf, false);
 
-	if (!noverify_checksums && DataChecksumsEnabled())
+	if (!noverify_checksums && DataChecksumsNeedVerify())
 	{
 		char	   *filename;
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 6eb0d5527e..84183f8203 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -198,6 +198,7 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_FPW_CHANGE:
 		case XLOG_FPI_FOR_HINT:
 		case XLOG_FPI:
+		case XLOG_CHECKSUMS:
 			break;
 		default:
 			elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 0c86a581c0..853e1e472f 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -27,6 +27,7 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/checksumhelper.h"
 #include "postmaster/postmaster.h"
 #include "replication/logicallauncher.h"
 #include "replication/slot.h"
@@ -261,6 +262,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	WalSndShmemInit();
 	WalRcvShmemInit();
 	ApplyLauncherShmemInit();
+	ChecksumHelperShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/storage/page/README b/src/backend/storage/page/README
index 5127d98da3..f408d56270 100644
--- a/src/backend/storage/page/README
+++ b/src/backend/storage/page/README
@@ -9,7 +9,8 @@ have a very low measured incidence according to research on large server farms,
 http://www.cs.toronto.edu/~bianca/papers/sigmetrics09.pdf, discussed
 2010/12/22 on -hackers list.
 
-Current implementation requires this be enabled system-wide at initdb time.
+Checksums can be enabled at initdb time, but can also be turned off and on
+using pg_enable_data_checksums()/pg_disable_data_checksums() at runtime.
 
 The checksum is not valid at all times on a data page!!
 The checksum is valid when the page leaves the shared pool and is checked
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index dfbda5458f..790e4b860a 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -93,7 +93,7 @@ PageIsVerified(Page page, BlockNumber blkno)
 	 */
 	if (!PageIsNew(page))
 	{
-		if (DataChecksumsEnabled())
+		if (DataChecksumsNeedVerify())
 		{
 			checksum = pg_checksum_page((char *) page, blkno);
 
@@ -1168,7 +1168,7 @@ PageSetChecksumCopy(Page page, BlockNumber blkno)
 	static char *pageCopy = NULL;
 
 	/* If we don't need a checksum, just return the passed-in data */
-	if (PageIsNew(page) || !DataChecksumsEnabled())
+	if (PageIsNew(page) || !DataChecksumsNeedWrite())
 		return (char *) page;
 
 	/*
@@ -1195,7 +1195,7 @@ void
 PageSetChecksumInplace(Page page, BlockNumber blkno)
 {
 	/* If we don't need a checksum, just return */
-	if (PageIsNew(page) || !DataChecksumsEnabled())
+	if (PageIsNew(page) || !DataChecksumsNeedWrite())
 		return;
 
 	((PageHeader) page)->pd_checksum = pg_checksum_page((char *) page, blkno);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4ffc8451ca..14aa575733 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -32,6 +32,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
+#include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
@@ -68,6 +69,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
+#include "storage/checksum.h"
 #include "storage/dsm_impl.h"
 #include "storage/standby.h"
 #include "storage/fd.h"
@@ -420,6 +422,17 @@ static const struct config_enum_entry password_encryption_options[] = {
 };
 
 /*
+ * data_checksum used to be a boolean, but was only set by initdb so there is
+ * no need to support variants of boolean input.
+ */
+static const struct config_enum_entry data_checksum_options[] = {
+	{"on", DATA_CHECKSUMS_ON, true},
+	{"off", DATA_CHECKSUMS_OFF, true},
+	{"inprogress", DATA_CHECKSUMS_INPROGRESS, true},
+	{NULL, 0, false}
+};
+
+/*
  * Options for enum values stored in other modules
  */
 extern const struct config_enum_entry wal_level_options[];
@@ -514,7 +527,7 @@ static int	max_identifier_length;
 static int	block_size;
 static int	segment_size;
 static int	wal_block_size;
-static bool data_checksums;
+static int	data_checksums_tmp; /* only accessed locally! */
 static bool integer_datetimes;
 static bool assert_enabled;
 
@@ -1684,17 +1697,6 @@ static struct config_bool ConfigureNamesBool[] =
 	},
 
 	{
-		{"data_checksums", PGC_INTERNAL, PRESET_OPTIONS,
-			gettext_noop("Shows whether data checksums are turned on for this cluster."),
-			NULL,
-			GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE
-		},
-		&data_checksums,
-		false,
-		NULL, NULL, NULL
-	},
-
-	{
 		{"syslog_sequence_numbers", PGC_SIGHUP, LOGGING_WHERE,
 			gettext_noop("Add sequence number to syslog messages to avoid duplicate suppression."),
 			NULL
@@ -4101,6 +4103,17 @@ static struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"data_checksums", PGC_INTERNAL, PRESET_OPTIONS,
+			gettext_noop("Shows whether data checksums are turned on for this cluster."),
+			NULL,
+			GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE
+		},
+		&data_checksums_tmp,
+		DATA_CHECKSUMS_OFF, data_checksum_options,
+		NULL, NULL, show_data_checksums
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
diff --git a/src/bin/Makefile b/src/bin/Makefile
index 3b35835abe..8c11060a2f 100644
--- a/src/bin/Makefile
+++ b/src/bin/Makefile
@@ -26,6 +26,7 @@ SUBDIRS = \
 	pg_test_fsync \
 	pg_test_timing \
 	pg_upgrade \
+	pg_verify_checksums \
 	pg_waldump \
 	pgbench \
 	psql \
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 0fe98a550e..4bb2b7e6ec 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -591,6 +591,15 @@ check_control_data(ControlData *oldctrl,
 	 */
 
 	/*
+	 * If checksums have been turned on in the old cluster, but the
+	 * checksumhelper have yet to finish, then disallow upgrading. The user
+	 * should either let the process finish, or turn off checksums, before
+	 * retrying.
+	 */
+	if (oldctrl->data_checksum_version == 2)
+		pg_fatal("transition to data checksums not completed in old cluster\n");
+
+	/*
 	 * We might eventually allow upgrades from checksum to no-checksum
 	 * clusters.
 	 */
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 7e5e971294..449a703c47 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -226,7 +226,7 @@ typedef struct
 	uint32		large_object;
 	bool		date_is_int;
 	bool		float8_pass_by_value;
-	bool		data_checksum_version;
+	uint32		data_checksum_version;
 } ControlData;
 
 /*
diff --git a/src/bin/pg_verify_checksums/.gitignore b/src/bin/pg_verify_checksums/.gitignore
new file mode 100644
index 0000000000..d1dcdaf0dd
--- /dev/null
+++ b/src/bin/pg_verify_checksums/.gitignore
@@ -0,0 +1 @@
+/pg_verify_checksums
diff --git a/src/bin/pg_verify_checksums/Makefile b/src/bin/pg_verify_checksums/Makefile
new file mode 100644
index 0000000000..d16261571f
--- /dev/null
+++ b/src/bin/pg_verify_checksums/Makefile
@@ -0,0 +1,36 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/bin/pg_verify_checksums
+#
+# Copyright (c) 1998-2018, PostgreSQL Global Development Group
+#
+# src/bin/pg_verify_checksums/Makefile
+#
+#-------------------------------------------------------------------------
+
+PGFILEDESC = "pg_verify_checksums - verify data checksums in an offline cluster"
+PGAPPICON=win32
+
+subdir = src/bin/pg_verify_checksums
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS= pg_verify_checksums.o $(WIN32RES)
+
+all: pg_verify_checksums
+
+pg_verify_checksums: $(OBJS) | submake-libpgport
+	$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+install: all installdirs
+	$(INSTALL_PROGRAM) pg_verify_checksums$(X) '$(DESTDIR)$(bindir)/pg_verify_checksums$(X)'
+
+installdirs:
+	$(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+	rm -f '$(DESTDIR)$(bindir)/pg_verify_checksums$(X)'
+
+clean distclean maintainer-clean:
+	rm -f pg_verify_checksums$(X) $(OBJS)
+	rm -rf tmp_check
diff --git a/src/bin/pg_verify_checksums/pg_verify_checksums.c b/src/bin/pg_verify_checksums/pg_verify_checksums.c
new file mode 100644
index 0000000000..e37f39bd2a
--- /dev/null
+++ b/src/bin/pg_verify_checksums/pg_verify_checksums.c
@@ -0,0 +1,315 @@
+/*
+ * pg_verify_checksums
+ *
+ * Verifies page level checksums in an offline cluster
+ *
+ *	Copyright (c) 2010-2018, PostgreSQL Global Development Group
+ *
+ *	src/bin/pg_verify_checksums/pg_verify_checksums.c
+ */
+
+#define FRONTEND 1
+
+#include "postgres.h"
+#include "catalog/pg_control.h"
+#include "common/controldata_utils.h"
+#include "storage/bufpage.h"
+#include "storage/checksum.h"
+#include "storage/checksum_impl.h"
+
+#include <sys/stat.h>
+#include <dirent.h>
+#include <unistd.h>
+
+#include "pg_getopt.h"
+
+
+static int64 files = 0;
+static int64 blocks = 0;
+static int64 badblocks = 0;
+static ControlFileData *ControlFile;
+
+static char *only_relfilenode = NULL;
+static bool debug = false;
+
+static const char *progname;
+
+static void
+usage()
+{
+	printf(_("%s verifies page level checksums in offline PostgreSQL database cluster.\n\n"), progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION] [DATADIR]\n"), progname);
+	printf(_("\nOptions:\n"));
+	printf(_(" [-D] DATADIR    data directory\n"));
+	printf(_("  -f,            force check even if checksums are disabled\n"));
+	printf(_("  -r relfilenode check only relation with specified relfilenode\n"));
+	printf(_("  -d             debug output, listing all checked blocks\n"));
+	printf(_("  -V, --version  output version information, then exit\n"));
+	printf(_("  -?, --help     show this help, then exit\n"));
+	printf(_("\nIf no data directory (DATADIR) is specified, "
+			 "the environment variable PGDATA\nis used.\n\n"));
+	printf(_("Report bugs to <pgsql-b...@postgresql.org>.\n"));
+}
+
+static const char *skip[] = {
+	"pg_control",
+	"pg_filenode.map",
+	"pg_internal.init",
+	"PG_VERSION",
+	NULL,
+};
+
+static bool
+skipfile(char *fn)
+{
+	const char **f;
+
+	if (strcmp(fn, ".") == 0 ||
+		strcmp(fn, "..") == 0)
+		return true;
+
+	for (f = skip; *f; f++)
+		if (strcmp(*f, fn) == 0)
+			return true;
+	return false;
+}
+
+static void
+scan_file(char *fn, int segmentno)
+{
+	char		buf[BLCKSZ];
+	PageHeader	header = (PageHeader) buf;
+	int			f;
+	int			blockno;
+
+	f = open(fn, 0);
+	if (f < 0)
+	{
+		fprintf(stderr, _("%s: could not open file \"%s\": %m\n"), progname, fn);
+		exit(1);
+	}
+
+	files++;
+
+	for (blockno = 0;; blockno++)
+	{
+		uint16		csum;
+		int			r = read(f, buf, BLCKSZ);
+
+		if (r == 0)
+			break;
+		if (r != BLCKSZ)
+		{
+			fprintf(stderr, _("%s: short read of block %d in file \"%s\", got only %d bytes\n"),
+					progname, blockno, fn, r);
+			exit(1);
+		}
+		blocks++;
+
+		csum = pg_checksum_page(buf, blockno + segmentno * RELSEG_SIZE);
+		if (csum != header->pd_checksum)
+		{
+			if (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_VERSION)
+				fprintf(stderr, _("%s: checksum verification failed in file \"%s\", block %d: calculated checksum %X but expected %X\n"),
+						progname, fn, blockno, csum, header->pd_checksum);
+			badblocks++;
+		}
+		else if (debug)
+			fprintf(stderr, _("%s: checksum verified in file \"%s\", block %d: %X\n"),
+					progname, fn, blockno, csum);
+	}
+
+	close(f);
+}
+
+static void
+scan_directory(char *basedir, char *subdir)
+{
+	char		path[MAXPGPATH];
+	DIR		   *dir;
+	struct dirent *de;
+
+	snprintf(path, MAXPGPATH, "%s/%s", basedir, subdir);
+	dir = opendir(path);
+	if (!dir)
+	{
+		fprintf(stderr, _("%s: could not open directory \"%s\": %m\n"),
+				progname, path);
+		exit(1);
+	}
+	while ((de = readdir(dir)) != NULL)
+	{
+		char		fn[MAXPGPATH];
+		struct stat st;
+
+		if (skipfile(de->d_name))
+			continue;
+
+		snprintf(fn, MAXPGPATH, "%s/%s", path, de->d_name);
+		if (lstat(fn, &st) < 0)
+		{
+			fprintf(stderr, _("%s: could not stat file \"%s\": %m\n"),
+					progname, fn);
+			exit(1);
+		}
+		if (S_ISREG(st.st_mode))
+		{
+			char	   *forkpath,
+					   *segmentpath;
+			int			segmentno = 0;
+
+			/*
+			 * Cut off at the segment boundary (".") to get the segment number
+			 * in order to mix it into the checksum. Then also cut off at the
+			 * fork boundary, to get the relfilenode the file belongs to for
+			 * filtering.
+			 */
+			segmentpath = strchr(de->d_name, '.');
+			if (segmentpath != NULL)
+			{
+				*segmentpath++ = '\0';
+				segmentno = atoi(segmentpath);
+				if (segmentno == 0)
+				{
+					fprintf(stderr, _("%s: invalid segment number %d in filename \"%s\"\n"),
+							progname, segmentno, fn);
+					exit(1);
+				}
+			}
+
+			forkpath = strchr(de->d_name, '_');
+			if (forkpath != NULL)
+				*forkpath++ = '\0';
+
+			if (only_relfilenode && strcmp(only_relfilenode, de->d_name) != 0)
+				/* Relfilenode not to be included */
+				continue;
+
+			scan_file(fn, segmentno);
+		}
+		else if (S_ISDIR(st.st_mode) || S_ISLNK(st.st_mode))
+			scan_directory(path, de->d_name);
+	}
+	closedir(dir);
+}
+
+int
+main(int argc, char *argv[])
+{
+	char	   *DataDir = NULL;
+	bool		force = false;
+	int			c;
+	bool		crc_ok;
+
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_verify_checksums"));
+
+	progname = get_progname(argv[0]);
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage();
+			exit(0);
+		}
+		if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
+		{
+			puts("pg_verify_checksums (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	while ((c = getopt(argc, argv, "D:fr:d")) != -1)
+	{
+		switch (c)
+		{
+			case 'd':
+				debug = true;
+				break;
+			case 'D':
+				DataDir = optarg;
+				break;
+			case 'f':
+				force = true;
+				break;
+			case 'r':
+				if (atoi(optarg) <= 0)
+				{
+					fprintf(stderr, _("%s: invalid relfilenode: %s\n"), progname, optarg);
+					exit(1);
+				}
+				only_relfilenode = pstrdup(optarg);
+				break;
+			default:
+				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+				exit(1);
+		}
+	}
+
+	if (DataDir == NULL)
+	{
+		if (optind < argc)
+			DataDir = argv[optind++];
+		else
+			DataDir = getenv("PGDATA");
+
+		/* If no DataDir was specified, and none could be found, error out */
+		if (DataDir == NULL)
+		{
+			fprintf(stderr, _("%s: no data directory specified\n"), progname);
+			fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+			exit(1);
+		}
+	}
+
+	/* Complain if any arguments remain */
+	if (optind < argc)
+	{
+		fprintf(stderr, _("%s: too many command-line arguments (first is \"%s\")\n"),
+				progname, argv[optind]);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	/* Check if cluster is running */
+	ControlFile = get_controlfile(DataDir, progname, &crc_ok);
+	if (!crc_ok)
+	{
+		fprintf(stderr, _("%s: pg_control CRC value is incorrect.\n"), progname);
+		exit(1);
+	}
+
+	if (ControlFile->state != DB_SHUTDOWNED &&
+		ControlFile->state != DB_SHUTDOWNED_IN_RECOVERY)
+	{
+		fprintf(stderr, _("%s: cluster must be shut down to verify checksums.\n"), progname);
+		exit(1);
+	}
+
+	if (ControlFile->data_checksum_version == 0 && !force)
+	{
+		fprintf(stderr, _("%s: data checksums are not enabled in cluster.\n"), progname);
+		exit(1);
+	}
+
+	/* Scan all files */
+	scan_directory(DataDir, "global");
+	scan_directory(DataDir, "base");
+	scan_directory(DataDir, "pg_tblspc");
+
+	printf(_("Checksum scan completed\n"));
+	printf(_("Data checksum version: %d\n"), ControlFile->data_checksum_version);
+	printf(_("Files scanned:  %" INT64_MODIFIER "d\n"), files);
+	printf(_("Blocks scanned: %" INT64_MODIFIER "d\n"), blocks);
+	if (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_INPROGRESS_VERSION)
+		printf(_("Blocks left in progress: %" INT64_MODIFIER "d\n"), badblocks);
+	else
+		printf(_("Bad checksums:  %" INT64_MODIFIER "d\n"), badblocks);
+
+	if (badblocks > 0)
+		return 1;
+
+	return 0;
+}
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 421ba6d775..f21870c644 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -154,7 +154,7 @@ extern PGDLLIMPORT int wal_level;
  * of the bits make it to disk, but the checksum wouldn't match.  Also WAL-log
  * them if forced by wal_log_hints=on.
  */
-#define XLogHintBitIsNeeded() (DataChecksumsEnabled() || wal_log_hints)
+#define XLogHintBitIsNeeded() (DataChecksumsNeedWrite() || wal_log_hints)
 
 /* Do we need to WAL-log information required only for Hot Standby and logical replication? */
 #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA)
@@ -257,7 +257,13 @@ extern char *XLogFileNameP(TimeLineID tli, XLogSegNo segno);
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
 extern char *GetMockAuthenticationNonce(void);
-extern bool DataChecksumsEnabled(void);
+extern bool DataChecksumsNeedWrite(void);
+extern bool DataChecksumsNeedVerify(void);
+extern bool DataChecksumsInProgress(void);
+extern void SetDataChecksumsInProgress(void);
+extern void SetDataChecksumsOn(void);
+extern void SetDataChecksumsOff(void);
+extern const char *show_data_checksums(void);
 extern XLogRecPtr GetFakeLSNForUnloggedRel(void);
 extern Size XLOGShmemSize(void);
 extern void XLOGShmemInit(void);
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index a5c074642f..0530fd1a43 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -25,6 +25,7 @@
 #include "lib/stringinfo.h"
 #include "pgtime.h"
 #include "storage/block.h"
+#include "storage/checksum.h"
 #include "storage/relfilenode.h"
 
 
@@ -240,6 +241,12 @@ typedef struct xl_restore_point
 	char		rp_name[MAXFNAMELEN];
 } xl_restore_point;
 
+/* Information logged when checksum level is changed */
+typedef struct xl_checksum_state
+{
+	ChecksumType new_checksumtype;
+}			xl_checksum_state;
+
 /* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */
 typedef struct xl_end_of_recovery
 {
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 773d9e6eba..33c59f9a63 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -76,6 +76,7 @@ typedef struct CheckPoint
 #define XLOG_END_OF_RECOVERY			0x90
 #define XLOG_FPI_FOR_HINT				0xA0
 #define XLOG_FPI						0xB0
+#define XLOG_CHECKSUMS					0xC0
 
 
 /*
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 90d994c71a..f601af71be 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5574,6 +5574,11 @@ DESCR("pg_controldata recovery state information as a function");
 DATA(insert OID = 3444 ( pg_control_init PGNSP PGUID 12 1 0 0 0 f f f t f v s 0 0 2249 "" "{23,23,23,23,23,23,23,23,23,16,16,23}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{max_data_alignment,database_block_size,blocks_per_segment,wal_block_size,bytes_per_wal_segment,max_identifier_length,max_index_columns,max_toast_chunk_size,large_object_chunk_size,float4_pass_by_value,float8_pass_by_value,data_page_checksum_version}" _null_ _null_ pg_control_init _null_ _null_ _null_ ));
 DESCR("pg_controldata init state information as a function");
 
+DATA(insert OID = 3996 ( pg_disable_data_checksums		PGNSP PGUID 12 1 0 0 0 f f f t f v s 0 0 2278 "" _null_ _null_ _null_ _null_ _null_ disable_data_checksums _null_ _null_ _null_ ));
+DESCR("disable data checksums");
+DATA(insert OID = 3998 ( pg_enable_data_checksums		PGNSP PGUID 12 1 0 0 0 f f f t f v s 2 0 2278 "23 23" _null_ _null_ "{cost_delay,cost_limit}" _null_ _null_ enable_data_checksums _null_ _null_ _null_ ));
+DESCR("enable data checksums");
+
 /* collation management functions */
 DATA(insert OID = 3445 ( pg_import_system_collations PGNSP PGUID 12 100 0 0 0 f f f t f v u 1 0 23 "4089" _null_ _null_ _null_ _null_ _null_ pg_import_system_collations _null_ _null_ _null_ ));
 DESCR("import collations from operating system");
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index be2f59239b..4ed9ed76cc 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -710,7 +710,9 @@ typedef enum BackendType
 	B_STARTUP,
 	B_WAL_RECEIVER,
 	B_WAL_SENDER,
-	B_WAL_WRITER
+	B_WAL_WRITER,
+	B_CHECKSUMHELPER_LAUNCHER,
+	B_CHECKSUMHELPER_WORKER
 } BackendType;
 
 
diff --git a/src/include/postmaster/checksumhelper.h b/src/include/postmaster/checksumhelper.h
new file mode 100644
index 0000000000..289bf2a935
--- /dev/null
+++ b/src/include/postmaster/checksumhelper.h
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ *
+ * checksumhelper.h
+ *	  header file for checksum helper background worker
+ *
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/postmaster/checksumhelper.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CHECKSUMHELPER_H
+#define CHECKSUMHELPER_H
+
+/* Shared memory */
+extern Size ChecksumHelperShmemSize(void);
+extern void ChecksumHelperShmemInit(void);
+
+/* Start the background processes for enabling checksums */
+bool		StartChecksumHelperLauncher(int cost_delay, int cost_limit);
+
+/* Shutdown the background processes, if any */
+void		ShutdownChecksumHelperIfRunning(void);
+
+/* Background worker entrypoints */
+void		ChecksumHelperLauncherMain(Datum arg);
+void		ChecksumHelperWorkerMain(Datum arg);
+
+#endif							/* CHECKSUMHELPER_H */
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 85dd10c45a..bd46bf2ce6 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -194,6 +194,7 @@ typedef PageHeaderData *PageHeader;
  */
 #define PG_PAGE_LAYOUT_VERSION		4
 #define PG_DATA_CHECKSUM_VERSION	1
+#define PG_DATA_CHECKSUM_INPROGRESS_VERSION		2
 
 /* ----------------------------------------------------------------
  *						page support macros
diff --git a/src/include/storage/checksum.h b/src/include/storage/checksum.h
index 433755e279..902ec29e2a 100644
--- a/src/include/storage/checksum.h
+++ b/src/include/storage/checksum.h
@@ -15,6 +15,13 @@
 
 #include "storage/block.h"
 
+typedef enum ChecksumType
+{
+	DATA_CHECKSUMS_OFF = 0,
+	DATA_CHECKSUMS_ON,
+	DATA_CHECKSUMS_INPROGRESS
+}			ChecksumType;
+
 /*
  * Compute the checksum for a Postgres page.  The page must be aligned on a
  * 4-byte boundary.
diff --git a/src/test/Makefile b/src/test/Makefile
index efb206aa75..6469ac94a4 100644
--- a/src/test/Makefile
+++ b/src/test/Makefile
@@ -12,7 +12,8 @@ subdir = src/test
 top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS = perl regress isolation modules authentication recovery subscription
+SUBDIRS = perl regress isolation modules authentication recovery subscription \
+			checksum
 
 # Test suites that are not safe by default but can be run if selected
 # by the user via the whitespace-separated list in variable
diff --git a/src/test/checksum/.gitignore b/src/test/checksum/.gitignore
new file mode 100644
index 0000000000..871e943d50
--- /dev/null
+++ b/src/test/checksum/.gitignore
@@ -0,0 +1,2 @@
+# Generated by test suite
+/tmp_check/
diff --git a/src/test/checksum/Makefile b/src/test/checksum/Makefile
new file mode 100644
index 0000000000..f3ad9dfae1
--- /dev/null
+++ b/src/test/checksum/Makefile
@@ -0,0 +1,24 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/test/checksum
+#
+# Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/test/checksum/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/test/checksum
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+check:
+	$(prove_check)
+
+installcheck:
+	$(prove_installcheck)
+
+clean distclean maintainer-clean:
+	rm -rf tmp_check
+
diff --git a/src/test/checksum/README b/src/test/checksum/README
new file mode 100644
index 0000000000..e3fbd2bdb5
--- /dev/null
+++ b/src/test/checksum/README
@@ -0,0 +1,22 @@
+src/test/checksum/README
+
+Regression tests for data checksums
+===================================
+
+This directory contains a test suite for enabling data checksums
+in a running cluster with streaming replication.
+
+Running the tests
+=================
+
+    make check
+
+or
+
+    make installcheck
+
+NOTE: This creates a temporary installation (in the case of "check"),
+with multiple nodes, be they master or standby(s) for the purpose of
+the tests.
+
+NOTE: This requires the --enable-tap-tests argument to configure.
diff --git a/src/test/checksum/t/001_standby_checksum.pl b/src/test/checksum/t/001_standby_checksum.pl
new file mode 100644
index 0000000000..4f8e0ab8f8
--- /dev/null
+++ b/src/test/checksum/t/001_standby_checksum.pl
@@ -0,0 +1,105 @@
+# Test suite for testing enabling data checksums with streaming replication
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 10;
+
+my $MAX_TRIES = 30;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+my $backup_name = 'my_backup';
+
+# Take backup
+$node_master->backup($backup_name);
+
+# Create streaming standby linking to master
+my $node_standby_1 = get_new_node('standby_1');
+$node_standby_1->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby_1->start;
+
+# Create some content on master to have un-checksummed data in the cluster
+$node_master->safe_psql('postgres',
+	"CREATE TABLE t AS SELECT generate_series(1,10000) AS a;");
+
+# Wait for standbys to catch up
+$node_master->wait_for_catchup($node_standby_1, 'replay',
+	$node_master->lsn('insert'));
+
+# Prep cluster for enabling checksums
+$node_master->safe_psql('postgres',
+	"ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true;");
+
+# Check that checksums are turned off
+my $result = $node_master->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "off", 'ensure checksums are turned off on master');
+
+$result = $node_standby_1->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "off", 'ensure checksums are turned off on standby_1');
+
+# Enable checksums for the cluster
+$node_master->safe_psql('postgres', "SELECT pg_enable_data_checksums();");
+
+# Ensure that the master has switched to inprogress immediately
+$result = $node_master->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "inprogress", 'ensure checksums are in progress on master');
+
+# Wait for checksum enable to be replayed
+$node_master->wait_for_catchup($node_standby_1, 'replay');
+
+# Ensure that the standby has switched to inprogress
+$result = $node_standby_1->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "inprogress", 'ensure checksums are in progress on standby_1');
+
+# Insert some more data which should be checksummed on INSERT
+$node_master->safe_psql('postgres',
+	"INSERT INTO t VALUES (generate_series(1,10000));");
+
+# Wait for checksums enabled on the master
+for (my $i = 0; $i < $MAX_TRIES; $i++)
+{
+	$result = $node_master->safe_psql('postgres',
+		"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+	last if ($result eq 'on');
+	sleep(1);
+}
+is ($result, "on", 'ensure checksums are enabled on master');
+
+# Wait for checksums enabled on the standby
+for (my $i = 0; $i < $MAX_TRIES; $i++)
+{
+	$result = $node_standby_1->safe_psql('postgres',
+		"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+	last if ($result eq 'on');
+	sleep(1);
+}
+is ($result, "on", 'ensure checksums are enabled on standby');
+
+$result = $node_master->safe_psql('postgres', "SELECT count(a) FROM t");
+is ($result, "20000", 'ensure we can safely read all data with checksums');
+
+# Disable checksums and ensure it's propagated to standby and that we can
+# still read all data
+$node_master->safe_psql('postgres', "SELECT pg_disable_data_checksums();");
+$result = $node_master->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "off", 'ensure checksums are in progress on master');
+
+# Wait for checksum disable to be replayed
+$node_master->wait_for_catchup($node_standby_1, 'replay');
+
+# Ensure that the standby has switched to off
+$result = $node_standby_1->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "off", 'ensure checksums are in progress on standby_1');
+
+$result = $node_master->safe_psql('postgres', "SELECT count(a) FROM t");
+is ($result, "20000", 'ensure we can safely read all data without checksums');
diff --git a/src/test/isolation/expected/checksum_cancel.out b/src/test/isolation/expected/checksum_cancel.out
new file mode 100644
index 0000000000..c449e7b6cc
--- /dev/null
+++ b/src/test/isolation/expected/checksum_cancel.out
@@ -0,0 +1,27 @@
+Parsed test spec with 2 sessions
+
+starting permutation: c_verify_checksums_off r_seqread c_enable_checksums c_verify_checksums_inprogress c_disable_checksums c_wait_checksums_off
+step c_verify_checksums_off: SELECT setting = 'off' FROM pg_catalog.pg_settings WHERE name = 'data_checksums';
+?column?       
+
+t              
+step r_seqread: SELECT * FROM reader_loop();
+reader_loop    
+
+t              
+step c_enable_checksums: SELECT pg_enable_data_checksums(1000);
+pg_enable_data_checksums
+
+               
+step c_verify_checksums_inprogress: SELECT setting = 'inprogress' FROM pg_catalog.pg_settings WHERE name = 'data_checksums';
+?column?       
+
+t              
+step c_disable_checksums: SELECT pg_disable_data_checksums();
+pg_disable_data_checksums
+
+               
+step c_wait_checksums_off: SELECT test_checksums_off();
+test_checksums_off
+
+t              
diff --git a/src/test/isolation/expected/checksum_enable.out b/src/test/isolation/expected/checksum_enable.out
new file mode 100644
index 0000000000..0a68f47023
--- /dev/null
+++ b/src/test/isolation/expected/checksum_enable.out
@@ -0,0 +1,27 @@
+Parsed test spec with 3 sessions
+
+starting permutation: c_verify_checksums_off w_insert100k r_seqread c_enable_checksums c_wait_for_checksums c_verify_checksums_on
+step c_verify_checksums_off: SELECT setting = 'off' FROM pg_catalog.pg_settings WHERE name = 'data_checksums';
+?column?       
+
+t              
+step w_insert100k: SELECT insert_1k(100);
+insert_1k      
+
+t              
+step r_seqread: SELECT * FROM reader_loop();
+reader_loop    
+
+t              
+step c_enable_checksums: SELECT pg_enable_data_checksums();
+pg_enable_data_checksums
+
+               
+step c_wait_for_checksums: SELECT test_checksums_on();
+test_checksums_on
+
+t              
+step c_verify_checksums_on: SELECT setting = 'on' FROM pg_catalog.pg_settings WHERE name = 'data_checksums';
+?column?       
+
+t              
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 99dd7c6bdb..31900cb920 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -72,3 +72,7 @@ test: timeouts
 test: vacuum-concurrent-drop
 test: predicate-gist
 test: predicate-gin
+# The checksum_enable suite will enable checksums for the cluster so should
+# not run before anything expecting the cluster to have checksums turned off
+test: checksum_cancel
+test: checksum_enable
diff --git a/src/test/isolation/specs/checksum_cancel.spec b/src/test/isolation/specs/checksum_cancel.spec
new file mode 100644
index 0000000000..a9da0d74c7
--- /dev/null
+++ b/src/test/isolation/specs/checksum_cancel.spec
@@ -0,0 +1,48 @@
+setup
+{
+	ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true;
+	CREATE TABLE t1 (a serial, b integer, c text);
+	INSERT INTO t1 (b, c) VALUES (generate_series(1,10000), 'starting values');
+
+	CREATE OR REPLACE FUNCTION test_checksums_off() RETURNS boolean AS $$
+	DECLARE
+		enabled boolean;
+	BEGIN
+		PERFORM pg_sleep(1);
+		SELECT setting = 'off' INTO enabled FROM pg_catalog.pg_settings WHERE name = 'data_checksums';
+		RETURN enabled;
+	END;
+	$$ LANGUAGE plpgsql;
+	
+	CREATE OR REPLACE FUNCTION reader_loop() RETURNS boolean AS $$
+	DECLARE
+		counter integer;
+		enabled boolean;
+	BEGIN
+		FOR counter IN 1..100 LOOP
+			PERFORM count(a) FROM t1;
+		END LOOP;
+		RETURN True;
+	END;
+	$$ LANGUAGE plpgsql;
+}
+
+teardown
+{
+	DROP FUNCTION reader_loop();
+	DROP FUNCTION test_checksums_off();
+
+	DROP TABLE t1;
+}
+
+session "reader"
+step "r_seqread"						{ SELECT * FROM reader_loop(); }
+
+session "checksums"
+step "c_verify_checksums_off"			{ SELECT setting = 'off' FROM pg_catalog.pg_settings WHERE name = 'data_checksums'; }
+step "c_enable_checksums"				{ SELECT pg_enable_data_checksums(1000); }
+step "c_disable_checksums"				{ SELECT pg_disable_data_checksums(); }
+step "c_verify_checksums_inprogress"	{ SELECT setting = 'inprogress' FROM pg_catalog.pg_settings WHERE name = 'data_checksums'; }
+step "c_wait_checksums_off"				{ SELECT test_checksums_off(); }
+
+permutation "c_verify_checksums_off" "r_seqread" "c_enable_checksums" "c_verify_checksums_inprogress" "c_disable_checksums" "c_wait_checksums_off"
diff --git a/src/test/isolation/specs/checksum_enable.spec b/src/test/isolation/specs/checksum_enable.spec
new file mode 100644
index 0000000000..7bfbe8d448
--- /dev/null
+++ b/src/test/isolation/specs/checksum_enable.spec
@@ -0,0 +1,71 @@
+setup
+{
+	ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true;
+	CREATE TABLE t1 (a serial, b integer, c text);
+	INSERT INTO t1 (b, c) VALUES (generate_series(1,10000), 'starting values');
+
+	CREATE OR REPLACE FUNCTION insert_1k(iterations int) RETURNS boolean AS $$
+	DECLARE
+		counter integer;
+	BEGIN
+		FOR counter IN 1..$1 LOOP
+			INSERT INTO t1 (b, c) VALUES (
+				generate_series(1, 1000),
+				array_to_string(array(select chr(97 + (random() * 25)::int) from generate_series(1,250)), '')
+			);
+			PERFORM pg_sleep(0.1);
+		END LOOP;
+		RETURN True;
+	END;
+	$$ LANGUAGE plpgsql;
+	
+	CREATE OR REPLACE FUNCTION test_checksums_on() RETURNS boolean AS $$
+	DECLARE
+		enabled boolean;
+	BEGIN
+		LOOP
+			SELECT setting = 'on' INTO enabled FROM pg_catalog.pg_settings WHERE name = 'data_checksums';
+			IF enabled THEN
+				EXIT;
+			END IF;
+			PERFORM pg_sleep(1);
+		END LOOP;
+		RETURN enabled;
+	END;
+	$$ LANGUAGE plpgsql;
+	
+	CREATE OR REPLACE FUNCTION reader_loop() RETURNS boolean AS $$
+	DECLARE
+		counter integer;
+	BEGIN
+		FOR counter IN 1..30 LOOP
+			PERFORM count(a) FROM t1;
+			PERFORM pg_sleep(0.2);
+		END LOOP;
+		RETURN True;
+	END;
+	$$ LANGUAGE plpgsql;
+}
+
+teardown
+{
+	DROP FUNCTION reader_loop();
+	DROP FUNCTION test_checksums_on();
+	DROP FUNCTION insert_1k(int);
+
+	DROP TABLE t1;
+}
+
+session "writer"
+step "w_insert100k"				{ SELECT insert_1k(100); }
+
+session "reader"
+step "r_seqread"				{ SELECT * FROM reader_loop(); }
+
+session "checksums"
+step "c_verify_checksums_off"	{ SELECT setting = 'off' FROM pg_catalog.pg_settings WHERE name = 'data_checksums'; }
+step "c_enable_checksums"		{ SELECT pg_enable_data_checksums(); }
+step "c_wait_for_checksums"		{ SELECT test_checksums_on(); }
+step "c_verify_checksums_on"	{ SELECT setting = 'on' FROM pg_catalog.pg_settings WHERE name = 'data_checksums'; }
+
+permutation "c_verify_checksums_off" "w_insert100k" "r_seqread" "c_enable_checksums" "c_wait_for_checksums" "c_verify_checksums_on"

Reply via email to