From: Andres Freund <and...@anarazel.de>

We decided to use low level functions to do the apply instead of producing sql
statements containing the data (or using prepared statements) because both, the
text conversion and the full executor overhead aren't introduce a significant
overhead which is unneccesary if youre using the same version of pg on the same
architecture.

There are loads of use cases though that require different methods of applyin
though - so the part doing the applying from an ApplyCache is just a bunch of
well abstracted callbacks getting passed all the required knowledge to change
the data representation into other formats.

Missing:

- TOAST handling. For physical apply not much needs to be done because the
  toast inserts will have been made beforehand. There needs to be an option in
  ApplyCache that helps reassembling TOAST datums to make it easier to write
  apply modules which convert to text.
---
 src/backend/replication/logical/Makefile |    2 +-
 src/backend/replication/logical/apply.c  |  313 ++++++++++++++++++++++++++++++
 src/include/replication/apply.h          |   24 +++
 3 files changed, 338 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/replication/logical/apply.c
 create mode 100644 src/include/replication/apply.h

diff --git a/src/backend/replication/logical/Makefile 
b/src/backend/replication/logical/Makefile
index c2d6d82..d0e0b13 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = applycache.o decode.o logical.o
+OBJS = apply.o applycache.o decode.o logical.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/apply.c 
b/src/backend/replication/logical/apply.c
new file mode 100644
index 0000000..646bd54
--- /dev/null
+++ b/src/backend/replication/logical/apply.c
@@ -0,0 +1,313 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical.c
+ *
+ * Support functions for logical/multimaster replication
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/logical.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/heapam.h"
+#include "access/genam.h"
+
+#include "catalog/pg_control.h"
+#include "catalog/index.h"
+
+#include "executor/executor.h"
+
+#include "replication/applycache.h"
+#include "replication/apply.h"
+
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
+
+
+
+static void
+UserTableUpdateIndexes(Relation heapRel, HeapTuple heapTuple);
+
+
+void apply_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+       ApplyApplyCacheState *state = cache->private_data;
+
+       state->original_resource_owner = CurrentResourceOwner;
+
+       PreventTransactionChain(true, "Apply Process cannot be started inside a 
txn");
+
+       StartTransactionCommand();
+
+       PushActiveSnapshot(GetTransactionSnapshot());
+}
+
+void apply_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+       ApplyApplyCacheState *state = cache->private_data;
+
+       current_replication_origin_lsn = txn->lsn;
+
+       PopActiveSnapshot();
+       CommitTransactionCommand();
+
+
+       /*
+        * for some reason after (Start|Commit)TransactionCommand we loose our
+        * resource owner, restore it.
+        * XXX: is that correct?
+        */
+       CurrentResourceOwner = state->original_resource_owner;
+
+       current_replication_origin_lsn.xlogid = 0;
+       current_replication_origin_lsn.xrecoff = 0;
+}
+
+
+void apply_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* 
subtxn, ApplyCacheChange* change)
+{
+       /* for inserting */
+       Relation        tuple_rel;
+
+       tuple_rel = heap_open(HeapTupleGetOid(change->table), RowExclusiveLock);
+
+       switch (change->action)
+       {
+               case APPLY_CACHE_CHANGE_INSERT:
+               {
+#ifdef VERBOSE_DEBUG
+                       elog(LOG, "INSERT");
+#endif
+                       simple_heap_insert(tuple_rel, &change->newtuple->tuple);
+
+                       UserTableUpdateIndexes(tuple_rel, 
&change->newtuple->tuple);
+                       break;
+               }
+               case APPLY_CACHE_CHANGE_UPDATE:
+               {
+                       Oid indexoid = InvalidOid;
+                       int16 pknratts;
+                       int16 pkattnum[INDEX_MAX_KEYS];
+                       Oid pktypoid[INDEX_MAX_KEYS];
+                       Oid pkopclass[INDEX_MAX_KEYS];
+
+                       ScanKeyData cur_skey[INDEX_MAX_KEYS];
+                       int i;
+                       bool isnull;
+                       TupleDesc desc = RelationGetDescr(tuple_rel);
+
+                       Relation index_rel;
+
+                       HeapTuple old_tuple;
+                       bool found = false;
+
+                       IndexScanDesc scan;
+
+#ifdef VERBOSE_DEBUG
+                       elog(LOG, "UPDATE");
+#endif
+                       MemSet(pkattnum, 0, sizeof(pkattnum));
+                       MemSet(pktypoid, 0, sizeof(pktypoid));
+                       MemSet(pkopclass, 0, sizeof(pkopclass));
+
+                       relationFindPrimaryKey(tuple_rel, &indexoid, &pknratts, 
pkattnum, pktypoid, pkopclass);
+
+                       if (!OidIsValid(indexoid))
+                          ereport(ERROR,
+                                  (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                   errmsg("there is no primary key for table 
\"%s\"",
+                                          
RelationGetRelationName(tuple_rel))));
+
+                       index_rel = index_open(indexoid, AccessShareLock);
+
+                       for (i = 0; i < pknratts; i++)
+                       {
+                               Oid operator;
+                               Oid opfamily;
+                               RegProcedure regop;
+
+                               opfamily = get_opclass_family(pkopclass[i]);
+
+                               operator = get_opfamily_member(opfamily, 
pktypoid[i], pktypoid[i], BTEqualStrategyNumber);
+
+                               regop = get_opcode(operator);
+
+                               ScanKeyInit(&cur_skey[i],
+                                           pkattnum[i],
+                                           BTEqualStrategyNumber,
+                                           regop,
+                                           
fastgetattr(&change->newtuple->tuple, pkattnum[i], desc, &isnull));
+
+                               Assert(!isnull);
+                       }
+
+                       scan = index_beginscan(tuple_rel, index_rel, 
GetTransactionSnapshot(),
+                                              pknratts, 0);
+                       index_rescan(scan, cur_skey, pknratts, NULL, 0);
+
+                       while ((old_tuple = index_getnext(scan, 
ForwardScanDirection)) != NULL)
+                       {
+                               if (found)
+                               {
+                                       elog(ERROR, "WTF, more than one tuple 
found via pk???");
+                               }
+                               found = true;
+
+                               simple_heap_update(tuple_rel, 
&old_tuple->t_self, &change->newtuple->tuple);
+                       }
+
+                       if (!found)
+                               elog(ERROR, "could not find tuple to update");
+
+                       index_endscan(scan);
+
+                       if (!HeapTupleIsHeapOnly(&change->newtuple->tuple))
+                               UserTableUpdateIndexes(tuple_rel, 
&change->newtuple->tuple);
+
+                       heap_close(index_rel, NoLock);
+
+                       break;
+               }
+               case APPLY_CACHE_CHANGE_DELETE:
+               {
+                       Oid indexoid = InvalidOid;
+                       int16 pknratts;
+                       int16 pkattnum[INDEX_MAX_KEYS];
+                       Oid pktypoid[INDEX_MAX_KEYS];
+                       Oid pkopclass[INDEX_MAX_KEYS];
+
+                       ScanKeyData cur_skey[INDEX_MAX_KEYS];
+                       int i;
+                       bool isnull;
+
+                       Relation index_rel;
+
+                       HeapTuple old_tuple;
+                       bool found = false;
+
+                       TupleDesc index_desc;
+
+                       IndexScanDesc scan;
+
+#ifdef VERBOSE_DEBUG
+                       elog(LOG, "DELETE comming");
+#endif
+                       MemSet(pkattnum, 0, sizeof(pkattnum));
+                       MemSet(pktypoid, 0, sizeof(pktypoid));
+                       MemSet(pkopclass, 0, sizeof(pkopclass));
+
+                       relationFindPrimaryKey(tuple_rel, &indexoid, &pknratts, 
pkattnum, pktypoid, pkopclass);
+
+                       if (!OidIsValid(indexoid))
+                          ereport(ERROR,
+                                  (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                   errmsg("there is no primary key for table 
\"%s\"",
+                                          
RelationGetRelationName(tuple_rel))));
+
+                       index_rel = index_open(indexoid, AccessShareLock);
+                       index_desc = RelationGetDescr(index_rel);
+
+                       for (i = 0; i < pknratts; i++)
+                       {
+                               Oid operator;
+                               Oid opfamily;
+                               RegProcedure regop;
+
+                               opfamily = get_opclass_family(pkopclass[i]);
+
+                               operator = get_opfamily_member(opfamily, 
pktypoid[i], pktypoid[i], BTEqualStrategyNumber);
+
+                               regop = get_opcode(operator);
+
+                               ScanKeyInit(&cur_skey[i],
+                                           pkattnum[i],
+                                           BTEqualStrategyNumber,
+                                           regop,
+                                           
fastgetattr(&change->oldtuple->tuple, i + 1, index_desc, &isnull));
+
+                               Assert(!isnull);
+                       }
+
+                       scan = index_beginscan(tuple_rel, index_rel, 
GetTransactionSnapshot(),
+                                              pknratts, 0);
+                       index_rescan(scan, cur_skey, pknratts, NULL, 0);
+
+
+                       while ((old_tuple = index_getnext(scan, 
ForwardScanDirection)) != NULL)
+                       {
+                               if (found)
+                               {
+                                       elog(ERROR, "WTF, more than one tuple 
found via pk???");
+                               }
+                               found = true;
+                               simple_heap_delete(tuple_rel, 
&old_tuple->t_self);
+                       }
+
+                       if (!found)
+                               elog(ERROR, "could not find tuple to update");
+
+                       index_endscan(scan);
+
+                       heap_close(index_rel, NoLock);
+
+                       break;
+               }
+       }
+       /* FIXME: locking */
+
+       heap_close(tuple_rel, NoLock);
+       CommandCounterIncrement();
+}
+
+/*
+ * The state object used by CatalogOpenIndexes and friends is actually the
+ * same as the executor's ResultRelInfo, but we give it another type name
+ * to decouple callers from that fact.
+ */
+typedef struct ResultRelInfo *UserTableIndexState;
+
+static void
+UserTableUpdateIndexes(Relation heapRel, HeapTuple heapTuple)
+{
+       /* this is largely copied together from copy.c's CopyFrom */
+       EState *estate = CreateExecutorState();
+       ResultRelInfo *resultRelInfo;
+       List *recheckIndexes = NIL;
+       TupleDesc tupleDesc = RelationGetDescr(heapRel);
+
+       resultRelInfo = makeNode(ResultRelInfo);
+       resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
+       resultRelInfo->ri_RelationDesc = heapRel;
+       resultRelInfo->ri_TrigInstrument = NULL;
+
+       ExecOpenIndices(resultRelInfo);
+
+       estate->es_result_relations = resultRelInfo;
+       estate->es_num_result_relations = 1;
+       estate->es_result_relation_info = resultRelInfo;
+
+       if (resultRelInfo->ri_NumIndices > 0)
+       {
+               TupleTableSlot *slot = ExecInitExtraTupleSlot(estate);
+               ExecSetSlotDescriptor(slot, tupleDesc);
+               ExecStoreTuple(heapTuple, slot, InvalidBuffer, false);
+
+               recheckIndexes = ExecInsertIndexTuples(slot, &heapTuple->t_self,
+                                                      estate);
+       }
+
+       ExecResetTupleTable(estate->es_tupleTable, false);
+
+       ExecCloseIndices(resultRelInfo);
+
+       FreeExecutorState(estate);
+       /* FIXME: recheck the indexes */
+       list_free(recheckIndexes);
+}
diff --git a/src/include/replication/apply.h b/src/include/replication/apply.h
new file mode 100644
index 0000000..3b818c0
--- /dev/null
+++ b/src/include/replication/apply.h
@@ -0,0 +1,24 @@
+/*
+ * apply.h
+ *
+ * PostgreSQL logical replay
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logical/replay.h
+ */
+#ifndef APPLY_H
+#define APPLY_H
+
+#include "utils/resowner.h"
+
+typedef struct ApplyApplyCacheState
+{
+       ResourceOwner original_resource_owner;
+} ApplyApplyCacheState;
+
+void apply_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn);
+void apply_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn);
+void apply_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* 
subtxn, ApplyCacheChange* change);
+
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to