From: Andres Freund <and...@anarazel.de> We decided to use low level functions to do the apply instead of producing sql statements containing the data (or using prepared statements) because both, the text conversion and the full executor overhead aren't introduce a significant overhead which is unneccesary if youre using the same version of pg on the same architecture.
There are loads of use cases though that require different methods of applyin though - so the part doing the applying from an ApplyCache is just a bunch of well abstracted callbacks getting passed all the required knowledge to change the data representation into other formats. Missing: - TOAST handling. For physical apply not much needs to be done because the toast inserts will have been made beforehand. There needs to be an option in ApplyCache that helps reassembling TOAST datums to make it easier to write apply modules which convert to text. --- src/backend/replication/logical/Makefile | 2 +- src/backend/replication/logical/apply.c | 313 ++++++++++++++++++++++++++++++ src/include/replication/apply.h | 24 +++ 3 files changed, 338 insertions(+), 1 deletion(-) create mode 100644 src/backend/replication/logical/apply.c create mode 100644 src/include/replication/apply.h diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c2d6d82..d0e0b13 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) -OBJS = applycache.o decode.o logical.o +OBJS = apply.o applycache.o decode.o logical.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/logical/apply.c b/src/backend/replication/logical/apply.c new file mode 100644 index 0000000..646bd54 --- /dev/null +++ b/src/backend/replication/logical/apply.c @@ -0,0 +1,313 @@ +/*------------------------------------------------------------------------- + * + * logical.c + * + * Support functions for logical/multimaster replication + * + * + * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/replication/logical.c + * + */ +#include "postgres.h" + +#include "access/xact.h" +#include "access/heapam.h" +#include "access/genam.h" + +#include "catalog/pg_control.h" +#include "catalog/index.h" + +#include "executor/executor.h" + +#include "replication/applycache.h" +#include "replication/apply.h" + +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/lsyscache.h" + + + +static void +UserTableUpdateIndexes(Relation heapRel, HeapTuple heapTuple); + + +void apply_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn) +{ + ApplyApplyCacheState *state = cache->private_data; + + state->original_resource_owner = CurrentResourceOwner; + + PreventTransactionChain(true, "Apply Process cannot be started inside a txn"); + + StartTransactionCommand(); + + PushActiveSnapshot(GetTransactionSnapshot()); +} + +void apply_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn) +{ + ApplyApplyCacheState *state = cache->private_data; + + current_replication_origin_lsn = txn->lsn; + + PopActiveSnapshot(); + CommitTransactionCommand(); + + + /* + * for some reason after (Start|Commit)TransactionCommand we loose our + * resource owner, restore it. + * XXX: is that correct? + */ + CurrentResourceOwner = state->original_resource_owner; + + current_replication_origin_lsn.xlogid = 0; + current_replication_origin_lsn.xrecoff = 0; +} + + +void apply_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change) +{ + /* for inserting */ + Relation tuple_rel; + + tuple_rel = heap_open(HeapTupleGetOid(change->table), RowExclusiveLock); + + switch (change->action) + { + case APPLY_CACHE_CHANGE_INSERT: + { +#ifdef VERBOSE_DEBUG + elog(LOG, "INSERT"); +#endif + simple_heap_insert(tuple_rel, &change->newtuple->tuple); + + UserTableUpdateIndexes(tuple_rel, &change->newtuple->tuple); + break; + } + case APPLY_CACHE_CHANGE_UPDATE: + { + Oid indexoid = InvalidOid; + int16 pknratts; + int16 pkattnum[INDEX_MAX_KEYS]; + Oid pktypoid[INDEX_MAX_KEYS]; + Oid pkopclass[INDEX_MAX_KEYS]; + + ScanKeyData cur_skey[INDEX_MAX_KEYS]; + int i; + bool isnull; + TupleDesc desc = RelationGetDescr(tuple_rel); + + Relation index_rel; + + HeapTuple old_tuple; + bool found = false; + + IndexScanDesc scan; + +#ifdef VERBOSE_DEBUG + elog(LOG, "UPDATE"); +#endif + MemSet(pkattnum, 0, sizeof(pkattnum)); + MemSet(pktypoid, 0, sizeof(pktypoid)); + MemSet(pkopclass, 0, sizeof(pkopclass)); + + relationFindPrimaryKey(tuple_rel, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass); + + if (!OidIsValid(indexoid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("there is no primary key for table \"%s\"", + RelationGetRelationName(tuple_rel)))); + + index_rel = index_open(indexoid, AccessShareLock); + + for (i = 0; i < pknratts; i++) + { + Oid operator; + Oid opfamily; + RegProcedure regop; + + opfamily = get_opclass_family(pkopclass[i]); + + operator = get_opfamily_member(opfamily, pktypoid[i], pktypoid[i], BTEqualStrategyNumber); + + regop = get_opcode(operator); + + ScanKeyInit(&cur_skey[i], + pkattnum[i], + BTEqualStrategyNumber, + regop, + fastgetattr(&change->newtuple->tuple, pkattnum[i], desc, &isnull)); + + Assert(!isnull); + } + + scan = index_beginscan(tuple_rel, index_rel, GetTransactionSnapshot(), + pknratts, 0); + index_rescan(scan, cur_skey, pknratts, NULL, 0); + + while ((old_tuple = index_getnext(scan, ForwardScanDirection)) != NULL) + { + if (found) + { + elog(ERROR, "WTF, more than one tuple found via pk???"); + } + found = true; + + simple_heap_update(tuple_rel, &old_tuple->t_self, &change->newtuple->tuple); + } + + if (!found) + elog(ERROR, "could not find tuple to update"); + + index_endscan(scan); + + if (!HeapTupleIsHeapOnly(&change->newtuple->tuple)) + UserTableUpdateIndexes(tuple_rel, &change->newtuple->tuple); + + heap_close(index_rel, NoLock); + + break; + } + case APPLY_CACHE_CHANGE_DELETE: + { + Oid indexoid = InvalidOid; + int16 pknratts; + int16 pkattnum[INDEX_MAX_KEYS]; + Oid pktypoid[INDEX_MAX_KEYS]; + Oid pkopclass[INDEX_MAX_KEYS]; + + ScanKeyData cur_skey[INDEX_MAX_KEYS]; + int i; + bool isnull; + + Relation index_rel; + + HeapTuple old_tuple; + bool found = false; + + TupleDesc index_desc; + + IndexScanDesc scan; + +#ifdef VERBOSE_DEBUG + elog(LOG, "DELETE comming"); +#endif + MemSet(pkattnum, 0, sizeof(pkattnum)); + MemSet(pktypoid, 0, sizeof(pktypoid)); + MemSet(pkopclass, 0, sizeof(pkopclass)); + + relationFindPrimaryKey(tuple_rel, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass); + + if (!OidIsValid(indexoid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("there is no primary key for table \"%s\"", + RelationGetRelationName(tuple_rel)))); + + index_rel = index_open(indexoid, AccessShareLock); + index_desc = RelationGetDescr(index_rel); + + for (i = 0; i < pknratts; i++) + { + Oid operator; + Oid opfamily; + RegProcedure regop; + + opfamily = get_opclass_family(pkopclass[i]); + + operator = get_opfamily_member(opfamily, pktypoid[i], pktypoid[i], BTEqualStrategyNumber); + + regop = get_opcode(operator); + + ScanKeyInit(&cur_skey[i], + pkattnum[i], + BTEqualStrategyNumber, + regop, + fastgetattr(&change->oldtuple->tuple, i + 1, index_desc, &isnull)); + + Assert(!isnull); + } + + scan = index_beginscan(tuple_rel, index_rel, GetTransactionSnapshot(), + pknratts, 0); + index_rescan(scan, cur_skey, pknratts, NULL, 0); + + + while ((old_tuple = index_getnext(scan, ForwardScanDirection)) != NULL) + { + if (found) + { + elog(ERROR, "WTF, more than one tuple found via pk???"); + } + found = true; + simple_heap_delete(tuple_rel, &old_tuple->t_self); + } + + if (!found) + elog(ERROR, "could not find tuple to update"); + + index_endscan(scan); + + heap_close(index_rel, NoLock); + + break; + } + } + /* FIXME: locking */ + + heap_close(tuple_rel, NoLock); + CommandCounterIncrement(); +} + +/* + * The state object used by CatalogOpenIndexes and friends is actually the + * same as the executor's ResultRelInfo, but we give it another type name + * to decouple callers from that fact. + */ +typedef struct ResultRelInfo *UserTableIndexState; + +static void +UserTableUpdateIndexes(Relation heapRel, HeapTuple heapTuple) +{ + /* this is largely copied together from copy.c's CopyFrom */ + EState *estate = CreateExecutorState(); + ResultRelInfo *resultRelInfo; + List *recheckIndexes = NIL; + TupleDesc tupleDesc = RelationGetDescr(heapRel); + + resultRelInfo = makeNode(ResultRelInfo); + resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ + resultRelInfo->ri_RelationDesc = heapRel; + resultRelInfo->ri_TrigInstrument = NULL; + + ExecOpenIndices(resultRelInfo); + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + if (resultRelInfo->ri_NumIndices > 0) + { + TupleTableSlot *slot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(slot, tupleDesc); + ExecStoreTuple(heapTuple, slot, InvalidBuffer, false); + + recheckIndexes = ExecInsertIndexTuples(slot, &heapTuple->t_self, + estate); + } + + ExecResetTupleTable(estate->es_tupleTable, false); + + ExecCloseIndices(resultRelInfo); + + FreeExecutorState(estate); + /* FIXME: recheck the indexes */ + list_free(recheckIndexes); +} diff --git a/src/include/replication/apply.h b/src/include/replication/apply.h new file mode 100644 index 0000000..3b818c0 --- /dev/null +++ b/src/include/replication/apply.h @@ -0,0 +1,24 @@ +/* + * apply.h + * + * PostgreSQL logical replay + * + * Portions Copyright (c) 2012, PostgreSQL Global Development Group + * + * src/include/replication/logical/replay.h + */ +#ifndef APPLY_H +#define APPLY_H + +#include "utils/resowner.h" + +typedef struct ApplyApplyCacheState +{ + ResourceOwner original_resource_owner; +} ApplyApplyCacheState; + +void apply_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn); +void apply_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn); +void apply_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change); + +#endif -- 1.7.10.rc3.3.g19a6c.dirty -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers