On 29.09.2017 11:27, Craig Ringer wrote:
On 29 September 2017 at 15:57, Konstantin Knizhnik
<k.knizh...@postgrespro.ru <mailto:k.knizh...@postgrespro.ru>> wrote:
So you are saying that Postgresql 2PC mechanism is not complete
and user needs to maintain some extra information to make it work?
No, it provides what's needed for an implementation of what in XA
terms is a local resource manager (LRM). What it does not provide is
infrastructure to make postgres its self into a global transaction
manager (GTM) for co-ordinating multiple LRMs.
It sounds like you're trying to build a GTM using PostgreSQL's
existing LRM book-keeping as your authorative data store, right?
No exactly. I am trying to add 2PC to our pg_shardman: combination of
pg_pathman + postgres_fdw + logical replication, which should provide HA
and write scalability.
This architecture definitely not assume presence of GTM. Most of
transactions are expected to be local (involves only one node) and
number of participants of distributed transaction is expected to be much
smaller than total number of nodes (usually 2). So we need to perform
2PC without GTM.
The problems with 2PC arrive when coordinator node is not
available but is expected to be recovered in future.
In this case we may have not enough information to make a decision
whether to abort or commit prepared transaction.
But it is a different story. We need to use 3PC or some other
protocol to prevent such situation.
In that case the node sits and waits patiently for the GTM (or in more
complex architectures, *a* valid voting quorum of GTMs) to be
reachable again. Likely using a protocol like Raft, Paxos, 3PC etc to
co-ordinate.
It can't do anything else, since if it unilaterally commits or rolls
back it might later find out that the nodes on the other side of the
network partition or whatever made the opposite decision and, boom!
Ok, I am not sure if pg_prepared_xact_status can be really useful or not.
I agree with you that if we are implementing distributed transaction on
top of Poasgres, then we need some better mechanism to determine
transaction state.
But a lot of people are using 2PC without GTM or whatever else. For
example, many Java ORMs are using 2PC for their transactions.
I think that it is better to provide to DBA or programmer some way to
determine status of such transaction by GID (which is usually unique and
known), as far as this information
is available in Postgres WAL.
In any case, I attached slightly improved version of this function which
traverse log not only since last checkpoint, but also try iterates
backward inspecting previous WAL segments.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index ae83291..fbf91f5 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -84,6 +84,7 @@
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "access/xlog.h"
+#include "access/xlog_internal.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "access/xlogreader.h"
@@ -2408,3 +2409,106 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
return;
}
+
+Datum
+pg_prepared_xact_status(PG_FUNCTION_ARGS)
+{
+ char const* gid = PG_GETARG_CSTRING(0);
+ XLogRecord *record;
+ XLogReaderState *xlogreader;
+ char *errormsg;
+ XLogRecPtr start_lsn;
+ XLogRecPtr lsn;
+ char const* xact_status = "unknown";
+ bool done = false;
+ TimeLineID timeline;
+ TransactionId xid = InvalidTransactionId;
+ XLogRecPtr wal_end = GetFlushRecPtr();
+
+ GetOldestRestartPoint(&start_lsn, &timeline);
+
+ xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
+ if (!xlogreader)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating a WAL reading processor.")));
+ while (true)
+ {
+ lsn = start_lsn;
+ do
+ {
+ record = XLogReadRecord(xlogreader, lsn, &errormsg);
+ if (record == NULL)
+ break;
+ lsn = InvalidXLogRecPtr; /* continue after the record */
+ if (XLogRecGetRmid(xlogreader) == RM_XACT_ID)
+ {
+ uint32 info = XLogRecGetInfo(xlogreader);
+ switch (info & XLOG_XACT_OPMASK)
+ {
+ case XLOG_XACT_PREPARE:
+ {
+ TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *)XLogRecGetData(xlogreader);
+ char* xact_gid = (char*)hdr + MAXALIGN(sizeof(TwoPhaseFileHeader));
+ if (strcmp(xact_gid, gid) == 0)
+ {
+ xid = hdr->xid;
+ xact_status = "prepared";
+ }
+ break;
+ }
+ case XLOG_XACT_COMMIT_PREPARED:
+ {
+ xl_xact_commit *xlrec;
+ xl_xact_parsed_commit parsed;
+
+ xlrec = (xl_xact_commit *) XLogRecGetData(xlogreader);
+ ParseCommitRecord(info, xlrec, &parsed);
+ if (xid == parsed.twophase_xid)
+ {
+ Assert(TransactionIdIsValid(xid));
+ xact_status = "committed";
+ done = true;
+ }
+ break;
+ }
+ case XLOG_XACT_ABORT_PREPARED:
+ {
+ xl_xact_abort *xlrec;
+ xl_xact_parsed_abort parsed;
+
+ xlrec = (xl_xact_abort *) XLogRecGetData(xlogreader);
+ ParseAbortRecord(info, xlrec, &parsed);
+ if (xid == parsed.twophase_xid)
+ {
+ Assert(TransactionIdIsValid(xid));
+ xact_status = "aborted";
+ done = true;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ } while (!done && xlogreader->EndRecPtr < wal_end);
+
+ if (done)
+ break;
+
+ lsn = start_lsn;
+ XLogSegNoOffsetToRecPtr(lsn/XLogSegSize, 0, start_lsn);
+ start_lsn = XLogFindNextRecord(xlogreader, start_lsn);
+ if (start_lsn == lsn)
+ {
+ if (lsn <= XLogSegSize)
+ break;
+ XLogSegNoOffsetToRecPtr(lsn/XLogSegSize-1, 0, start_lsn);
+ start_lsn = XLogFindNextRecord(xlogreader, start_lsn);
+ }
+ }
+
+ XLogReaderFree(xlogreader);
+ PG_RETURN_CSTRING(xact_status);
+}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 0781a7b..bdc552c 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -849,7 +849,6 @@ ValidXLogPageHeader(XLogReaderState *state, XLogRecPtr recptr,
return true;
}
-#ifdef FRONTEND
/*
* Functions that are currently not needed in the backend, but are better
* implemented inside xlogreader.c because of the internal facilities available
@@ -974,8 +973,6 @@ out:
return found;
}
-#endif /* FRONTEND */
-
/* ----------------------------------------
* Functions for decoding the data and block references in a record.
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 7671598..743f220 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -202,9 +202,7 @@ extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
/* Invalidate read state */
extern void XLogReaderInvalReadState(XLogReaderState *state);
-#ifdef FRONTEND
extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
-#endif /* FRONTEND */
/* Functions for decoding an XLogRecord */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 8b33b4e..001f586 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -3148,6 +3148,10 @@ DATA(insert OID = 3378 ( pg_isolation_test_session_is_blocked PGNSP PGUID 12 1
DESCR("isolationtester support function");
DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 0 0 2249 "" "{28,25,1184,26,26}" "{o,o,o,o,o}" "{transaction,gid,prepared,ownerid,dbid}" _null_ _null_ pg_prepared_xact _null_ _null_ _null_ ));
DESCR("view two-phase transactions");
+
+DATA(insert OID = 6015 ( pg_prepared_xact_status PGNSP PGUID 12 1 0 0 0 f f f f t f i s 1 0 2275 "2275" _null_ _null_ _null_ _null_ _null_ pg_prepared_xact_status _null_ _null_ _null_ ));
+DESCR("I/O");
+
DATA(insert OID = 3819 ( pg_get_multixact_members PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 1 0 2249 "28" "{28,28,25}" "{i,o,o}" "{multixid,xid,mode}" _null_ _null_ pg_get_multixact_members _null_ _null_ _null_ ));
DESCR("view members of a multixactid");
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers