On Wed, Jun 2, 2021 at 4:34 AM Peter Smith <smithpb2...@gmail.com> wrote:
> Please find attached the latest patch set v82*

Few comments on 0001:
+ /*
+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+ * called within the PrepareTransactionBlock below.
+ */
+ BeginTransactionBlock();
+ CommitTransactionCommand();
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data.end_lsn;
+ replorigin_session_origin_timestamp = prepare_data.prepare_time;
+ PrepareTransactionBlock(gid);
+ CommitTransactionCommand();

Here, the call to CommitTransactionCommand() twice looks a bit odd.
Before the first call, can we write a comment like "This is to
complete the Begin command started by the previous call"?

@@ -85,11 +85,16 @@ typedef struct LogicalDecodingContext
  bool streaming;

- * Does the output plugin support two-phase decoding, and is it enabled?
+ * Does the output plugin support two-phase decoding.
  bool twophase;

+ * Is two-phase option given by output plugin?
+ */
+ bool twophase_opt_given;
+ /*
  * State for writing output.

I think we can write few comments as to why we need a separate
twophase parameter here? The description of twophase_opt_given can be
changed to: "Is two-phase option given by output plugin? This is to
allow output plugins to enable two_phase at the start of streaming. We
can't rely on twophase parameter that tells whether the plugin
provides all the necessary two_phase APIs for this purpose." Feel free
to add more to it.

@@ -432,10 +432,19 @@ CreateInitDecodingContext(const char *plugin,

- * We allow decoding of prepared transactions iff the two_phase option is
- * enabled at the time of slot creation.
+ * We allow decoding of prepared transactions when the two_phase is
+ * enabled at the time of slot creation, or when the two_phase option is
+ * given at the streaming start.
- ctx->twophase &= MyReplicationSlot->data.two_phase;
+ ctx->twophase &= (ctx->twophase_opt_given || slot->data.two_phase);
+ /* Mark slot to allow two_phase decoding if not already marked */
+ if (ctx->twophase && !slot->data.two_phase)
+ {
+ slot->data.two_phase = true;
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ }

Why do we need to change this during CreateInitDecodingContext which
is called at create_slot time? At that time, we don't need to consider
any options and there is no need to toggle slot's two_phase value.

- /* Binary mode and streaming are only supported in v14 and higher */
+ /*
+ * Binary, streaming, and two_phase are only supported in v14 and
+ * higher
+ */

We can say v15 for two_phase.


Isn't it better to define LOGICALREP_PROTO_MAX_VERSION_NUM as
LOGICALREP_PROTO_TWOPHASE_VERSION_NUM instead of specifying directly
the number?

+/* Commit (and abort) information */
 typedef struct LogicalRepCommitData
  XLogRecPtr commit_lsn;
@@ -122,6 +132,48 @@ typedef struct LogicalRepCommitData
  TimestampTz committime;
 } LogicalRepCommitData;

Is there a reason for the above comment addition? If so, how is it
related to this patch?

+++ b/src/test/subscription/t/021_twophase.pl
@@ -0,0 +1,299 @@
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;

In the nearby test files, we have Copyright notice like "# Copyright
(c) 2021, PostgreSQL Global Development Group". We should add one to
the new test files in this patch as well.

+# Also wait for two-phase to be enabled
+my $twophase_query =
+ "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT
IN ('e');";
+$node_subscriber->poll_query_until('postgres', $twophase_query)
+  or die "Timed out while waiting for subscriber to enable twophase";

Isn't it better to write this query as: "SELECT count(1) = 1 FROM
pg_subscription WHERE subtwophasestate ='e';"? It looks a bit odd to
use the NOT IN operator here. Similarly, change the same query used at
another place in the patch.

+# check that transaction is in prepared state on subscriber
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*)
FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+# Wait for the statistics to be updated
+ 'postgres', qq[
+ SELECT count(slot_name) >= 1 FROM pg_stat_replication_slots
+ WHERE slot_name = 'tap_sub'
+ AND total_txns > 0 AND total_bytes > 0;
+]) or die "Timed out while waiting for statistics to be updated";

I don't see the need to check for stats in this test. If we really
want to test stats then we can add a separate test in
contrib\test_decoding\sql\stats but I suggest leaving it. Please do
the same for other stats tests in the patch.

10. I think you missed to update LogicalRepRollbackPreparedTxnData in

With Regards,
Amit Kapila.

Reply via email to