From b5b2b5335bacb8c924320813a78aeb94cea66e0a Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Sat, 23 Jan 2021 09:53:21 +1100
Subject: [PATCH v18] Tablesync Solution1.

====

Features:

* The tablesync slot name is no longer tied to the Subscription slot name.

* The tablesync worker is now allowing multiple tx instead of single tx

* A new state (SUBREL_STATE_FINISHEDCOPY) is persisted after a successful copy_table in tablesync's LogicalRepSyncTableStart.

* If a re-launched tablesync finds state SUBREL_STATE_FINISHEDCOPY then it will bypass the initial copy_table phase.

* Now tablesync sets up replication origin tracking in LogicalRepSyncTableStart (similar as done for the apply worker). The origin is advanced when first created.

* The tablesync replication origin tracking record is cleaned up by:
- process_syncing_tables_for_apply
- DropSubscription
- AlterSubscription_refresh

* Updates to PG docs.

* New TAP test case

Known Issues:

* None.
---
 doc/src/sgml/catalogs.sgml                  |   1 +
 src/backend/commands/subscriptioncmds.c     |  39 ++++++
 src/backend/replication/logical/tablesync.c | 183 ++++++++++++++++++++++++----
 src/backend/replication/logical/worker.c    |  18 +--
 src/include/catalog/pg_subscription_rel.h   |   2 +
 src/include/replication/slot.h              |   1 +
 src/test/subscription/t/004_sync.pl         |  96 ++++++++++++++-
 7 files changed, 300 insertions(+), 40 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 43d7a1a..82e74e1 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7662,6 +7662,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        State code:
        <literal>i</literal> = initialize,
        <literal>d</literal> = data is being copied,
+       <literal>f</literal> = finished table copy,
        <literal>s</literal> = synchronized,
        <literal>r</literal> = ready (normal replication)
       </para></entry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 082f785..58f8a86 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -649,10 +649,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		if (!bsearch(&relid, pubrel_local_oids,
 					 list_length(pubrel_names), sizeof(Oid), oid_cmp))
 		{
+			char		originname[NAMEDATALEN];
+			RepOriginId originid;
+
 			RemoveSubscriptionRel(sub->oid, relid);
 
 			logicalrep_worker_stop_at_commit(sub->oid, relid);
 
+			/* Remove the tablesync's origin tracking if exists. */
+			snprintf(originname, sizeof(originname), "pg_%u_%u", sub->oid, relid);
+			originid = replorigin_by_name(originname, true);
+			if (originid != InvalidRepOriginId)
+			{
+				elog(DEBUG1, "AlterSubscription_refresh: dropping origin tracking for \"%s\"", originname);
+				replorigin_drop(originid, false /* nowait */ );
+			}
+
 			ereport(DEBUG1,
 					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
 							get_namespace_name(get_rel_namespace(relid)),
@@ -930,6 +942,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	WalReceiverConn *wrconn = NULL;
 	StringInfoData cmd;
 	Form_pg_subscription form;
+	List	   *rstates;
 
 	/*
 	 * Lock pg_subscription with AccessExclusiveLock to ensure that the
@@ -1042,6 +1055,32 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	list_free(subworkers);
 
+	/*
+	 * Tablesync resource cleanup (origins).
+	 *
+	 * Any READY-state relations have already done this.
+	 */
+	rstates = GetSubscriptionNotReadyRelations(subid);
+	foreach(lc, rstates)
+	{
+		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+		Oid			relid = rstate->relid;
+
+		/* Only cleanup the tablesync worker resources */
+		if (!OidIsValid(relid))
+			continue;
+
+		/* Remove the tablesync's origin tracking if exists. */
+		snprintf(originname, sizeof(originname), "pg_%u_%u", subid, relid);
+		originid = replorigin_by_name(originname, true);
+		if (originid != InvalidRepOriginId)
+		{
+			elog(DEBUG1, "DropSubscription: dropping origin tracking for \"%s\"", originname);
+			replorigin_drop(originid, false /* nowait */ );
+		}
+	}
+	list_free(rstates);
+
 	/* Clean up dependencies */
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 863d196..1f828fc 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -31,8 +31,10 @@
  *		 table state to INIT.
  *	   - Tablesync worker starts; changes table state from INIT to DATASYNC while
  *		 copying.
- *	   - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
- *		 waits for state change.
+ *	   - Tablesync worker does initial table copy; there is a FINISHEDCOPY state to
+ *		 indicate when the copy phase has completed, so if the worker crashes
+ *		 with this (non-memory) state then the copy will not be re-attempted.
+ *	   - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
  *	   - Apply worker periodically checks for tables in SYNCWAIT state.  When
  *		 any appear, it sets the table state to CATCHUP and starts loop-waiting
  *		 until either the table state is set to SYNCDONE or the sync worker
@@ -48,8 +50,8 @@
  *		 point it sets state to READY and stops tracking.  Again, there might
  *		 be zero changes in between.
  *
- *	  So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
- *	  CATCHUP -> SYNCDONE -> READY.
+ *	  So the state progression is always: INIT -> DATASYNC ->
+ *	  (sync worker FINISHEDCOPY) -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
  *
  *	  The catalog pg_subscription_rel is used to keep information about
  *	  subscribed tables and their state.  Some transient state during data
@@ -59,6 +61,7 @@
  *	  Example flows look like this:
  *	   - Apply is in front:
  *		  sync:8
+ *			-> set in catalog FINISHEDCOPY
  *			-> set in memory SYNCWAIT
  *		  apply:10
  *			-> set in memory CATCHUP
@@ -74,6 +77,7 @@
  *
  *	   - Sync is in front:
  *		  sync:10
+ *			-> set in catalog FINISHEDCOPY
  *			-> set in memory SYNCWAIT
  *		  apply:8
  *			-> set in memory CATCHUP
@@ -98,11 +102,16 @@
 #include "miscadmin.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
+#include "postmaster/interrupt.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
+#include "replication/origin.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -270,20 +279,35 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-	Assert(IsTransactionState());
+	bool		sync_done = false;
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+	sync_done = MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
+		current_lsn >= MyLogicalRepWorker->relstate_lsn;
+	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
-		current_lsn >= MyLogicalRepWorker->relstate_lsn)
+	if (sync_done)
 	{
 		TimeLineID	tli;
 
+		/*
+		 * Change state to SYNCDONE.
+		 */
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 		MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
 		MyLogicalRepWorker->relstate_lsn = current_lsn;
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+		/*
+		 * UpdateSubscriptionRelState must be called within a transaction.
+		 * That transaction will be ended within the finish_sync_worker().
+		 */
+		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+		}
+
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
@@ -292,8 +316,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		walrcv_endstreaming(wrconn, &tli);
 		finish_sync_worker();
 	}
-	else
-		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 }
 
 /*
@@ -404,6 +426,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			 */
 			if (current_lsn >= rstate->lsn)
 			{
+				char		originname[NAMEDATALEN];
+				RepOriginId originid;
+
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
 				if (!started_tx)
@@ -412,6 +437,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					started_tx = true;
 				}
 
+				/*
+				 * Remove the tablesync origin tracking if exists.
+				 *
+				 * The normal case origin drop must be done here, not in the
+				 * process_syncing_tables_for_sync function, because if the
+				 * tablesync worker process attempted to drop its own origin
+				 * then it would fail (origin is "busy").
+				 */
+				snprintf(originname, sizeof(originname), "pg_%u_%u", MyLogicalRepWorker->subid, rstate->relid);
+				originid = replorigin_by_name(originname, true);
+				if (OidIsValid(originid))
+				{
+					elog(DEBUG1,
+						 "process_syncing_tables_for_apply: dropping tablesync origin tracking for \"%s\".",
+						 originname);
+					replorigin_drop(originid, false /* nowait */ );
+				}
+
+				/*
+				 * Update the state to READY only after the origin cleanup.
+				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
 										   rstate->lsn);
@@ -808,6 +854,22 @@ copy_table(Relation rel)
 }
 
 /*
+ * Determine the tablesync slot name.
+ *
+ * The name must not exceed NAMEDATALEN -1 because of remote node constraints on
+ * slot name length.
+ *
+ * The returned slot name palloc'ed in current memory context.
+ */
+char *
+ReplicationSlotNameForTablesync(Oid suboid, Oid relid)
+{
+	char	   *syncslotname = psprintf("pg_%u_sync_%u", suboid, relid);
+
+	return syncslotname;
+}
+
+/*
  * Start syncing the table in the sync worker.
  *
  * If nothing needs to be done to sync the table, we exit the worker without
@@ -824,6 +886,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	XLogRecPtr	relstate_lsn;
 	Relation	rel;
 	WalRcvExecResult *res;
+	char		originname[NAMEDATALEN];
+	RepOriginId originid;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -849,19 +913,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 			finish_sync_worker();	/* doesn't return */
 	}
 
-	/*
-	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
-	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
-	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
-	 * NAMEDATALEN on the remote that matters, but this scheme will also work
-	 * reasonably if that is different.)
-	 */
-	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
-	slotname = psprintf("%.*s_%u_sync_%u",
-						NAMEDATALEN - 28,
-						MySubscription->slotname,
-						MySubscription->oid,
-						MyLogicalRepWorker->relid);
+	/* Calculate the name of the tablesync slot. */
+	slotname = ReplicationSlotNameForTablesync(
+											   MySubscription->oid,
+											   MyLogicalRepWorker->relid);
 
 	/*
 	 * Here we use the slot name instead of the subscription name as the
@@ -874,7 +929,38 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				(errmsg("could not connect to the publisher: %s", err)));
 
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
-		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
+
+	/* Assign the origin tracking record name. */
+	snprintf(originname, sizeof(originname), "pg_%u_%u", MySubscription->oid, MyLogicalRepWorker->relid);
+
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
+	{
+		/*
+		 * The COPY phase was previously done, but tablesync then crashed
+		 * before it was able to finish normally.
+		 */
+		StartTransactionCommand();
+
+		/*
+		 * Slot creation passes NULL lsn because the origin startpos is got
+		 * from origin tracking this time, not from the slot.
+		 */
+		walrcv_create_slot(wrconn, slotname, true /* temporary */ ,
+						   CRS_NOEXPORT_SNAPSHOT, NULL /* lsn */ );
+
+		/*
+		 * The origin tracking name must already exist. It was created first
+		 * time this tablesync was launched.
+		 */
+		originid = replorigin_by_name(originname, false /* missing_ok */ );
+		replorigin_session_setup(originid);
+		replorigin_session_origin = originid;
+		*origin_startpos = replorigin_session_get_progress(false);
+
+		goto copy_table_done;
+	}
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -890,9 +976,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	/*
-	 * We want to do the table data sync in a single transaction.
-	 */
 	StartTransactionCommand();
 
 	/*
@@ -942,6 +1025,54 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	/* Make the copy visible. */
 	CommandCounterIncrement();
 
+	/* Setup replication origin tracking. */
+	originid = replorigin_by_name(originname, true);
+	if (!OidIsValid(originid))
+	{
+		/*
+		 * Origin tracking does not exist, so create it now.
+		 *
+		 * Then advance to the LSN got from walrcv_create_slot. This is WAL
+		 * logged for for the purpose of recovery. Locks are to prevent the
+		 * replication origin from vanishing while advancing.
+		 */
+		originid = replorigin_create(originname);
+
+		LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+		replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+						   true /* go backward */ , true /* WAL log */ );
+		UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+		replorigin_session_setup(originid);
+		replorigin_session_origin = originid;
+	}
+	else
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_OBJECT),
+				 errmsg("replication origin \"%s\" already exists",
+						originname)));
+	}
+
+	/*
+	 * Update the persisted state to indicate the COPY phase is done; make it
+	 * visible to others.
+	 */
+	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+							   MyLogicalRepWorker->relid,
+							   SUBREL_STATE_FINISHEDCOPY,
+							   MyLogicalRepWorker->relstate_lsn);
+
+copy_table_done:
+
+	elog(DEBUG1,
+		 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+		 originname,
+		 (uint32) (*origin_startpos >> 32),
+		 (uint32) *origin_startpos);
+
+	CommitTransactionCommand();
+
 	/*
 	 * We are done with the initial data synchronization, update the state.
 	 */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f2b2549..0701a9b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s)
 	/* We must be in a valid transaction state */
 	Assert(IsTransactionState());
 
-	/* The synchronization worker runs in single transaction. */
-	if (!am_tablesync_worker())
-	{
-		/* Commit the per-stream transaction */
-		CommitTransactionCommand();
-	}
+	/* Commit the per-stream transaction */
+	CommitTransactionCommand();
 
 	in_streamed_transaction = false;
 
@@ -889,9 +885,7 @@ apply_handle_stream_abort(StringInfo s)
 			/* Cleanup the subxact info */
 			cleanup_subxact_info();
 
-			/* The synchronization worker runs in single transaction */
-			if (!am_tablesync_worker())
-				CommitTransactionCommand();
+			CommitTransactionCommand();
 			return;
 		}
 
@@ -918,8 +912,7 @@ apply_handle_stream_abort(StringInfo s)
 		/* write the updated subxact list */
 		subxact_info_write(MyLogicalRepWorker->subid, xid);
 
-		if (!am_tablesync_worker())
-			CommitTransactionCommand();
+		CommitTransactionCommand();
 	}
 }
 
@@ -1062,8 +1055,7 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data)
 {
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * Update origin state so we can restart streaming from correct
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 06663b9..9027c42 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -61,6 +61,8 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
+#define SUBREL_STATE_FINISHEDCOPY 'f'	/* tablesync copy phase is completed
+										 * (sublsn NULL) */
 #define SUBREL_STATE_SYNCDONE	's' /* synchronization finished in front of
 									 * apply (sublsn set) */
 #define SUBREL_STATE_READY		'r' /* ready (sublsn set) */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 53f636c..c784282 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -211,6 +211,7 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index e111ab9..ba96d1d 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -3,7 +3,9 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 10;
+use Time::HiRes qw(usleep);
+use Scalar::Util qw(looks_like_number);
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -149,7 +151,99 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, qq(20),
 	'changes for table added after subscription initialized replicated');
 
+##
+## slot integrity
+##
+## Manually create a slot with the same name that tablesync will want.
+## Expect tablesync ERROR when clash is detected.
+## Then remove the slot so tablesync can proceed.
+## Expect tablesync can now finish normally.
+##
+
+# drop the subscription
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+# empty the table tab_rep
+$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
+
+# empty the table tab_rep_next
+$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep_next;");
+
+# recreate the subscription again, but leave it disabled so that we can get the OID
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub
+	with (enabled = false)"
+);
+
+# need to create the name of the tablesync slot, for this we need the subscription OID
+# and the table OID.
+my $subid = $node_subscriber->safe_psql('postgres',
+	"SELECT oid FROM pg_subscription WHERE subname = 'tap_sub';");
+is(looks_like_number($subid), qq(1), 'get the subscription OID');
+
+my $relid = $node_subscriber->safe_psql('postgres',
+	"SELECT 'tab_rep_next'::regclass::oid");
+is(looks_like_number($relid), qq(1), 'get the table OID');
+
+# name of the tablesync slot is pg_'suboid'_sync_'tableoid'.
+my $slotname = 'pg_' . $subid . '_' . 'sync_' . $relid;
+
+# temporarily, create a slot having the same name of the tablesync slot.
+$node_publisher->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('$slotname', 'pgoutput', false);");
+
+# enable the subscription
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub ENABLE"
+);
+
+# check for occurrence of the expected error
+poll_output_until("replication slot \"$slotname\" already exists")
+    or die "no error stop for the pre-existing origin";
+
+# now drop the offending slot, the tablesync should recover.
+$node_publisher->safe_psql('postgres',
+	"SELECT pg_drop_replication_slot('$slotname');");
+
+# wait for sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_rep_next");
+is($result, qq(20),
+	'data for table added after subscription initialized are now synced');
+
+# Cleanup
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
+
+sub poll_output_until
+{
+    my ($expected) = @_;
+
+    $expected = 'xxxxxx' unless defined($expected); # default junk value
+
+    my $max_attempts = 10 * 10;
+    my $attempts     = 0;
+
+    my $output_file = '';
+    while ($attempts < $max_attempts)
+    {
+        $output_file = slurp_file($node_subscriber->logfile());
+
+        if ($output_file =~ $expected)
+        {
+            return 1;
+        }
+
+        # Wait 0.1 second before retrying.
+        usleep(100_000);
+        $attempts++;
+    }
+
+    # The output result didn't change in 180 seconds. Give up
+    return 0;
+}
-- 
1.8.3.1

