Dear All, Dear Tom
On 5/11/25 16:20, Tom Lane wrote:
Achilleas Mantzios<a.mantz...@cloud.gatewaynet.com> writes:
We use are own version of DBmirror, we run our replication in a highly
fine grained manner. So every upgrade I have to make the code compile
and test. Up to PostgreSQL 17, I only got minor compilation problems
that I managed to resolve fairly easily. However this didn't prove to be
the case with PostgreSQL 18beta1, it proved harder to compile and as my
fears were verified, it has serious problems.
My question : is 18's SPI stabilized ? Can I start work on our version
of DBmirror ? Or wait for 18beta2 or -RC ?
If you think there are changes we need to make, you'd better get
specific sooner not later. I'm not aware of any large fixes that
are pending, cf
https://wiki.postgresql.org/wiki/PostgreSQL_18_Open_Items
I attach
a) our old source (pending.c.orig), as of PostgreSQL 17 (tested for some
7 months, so pretty well tested),
b) the compilation errors when compiled against 18beta1, and
c) the patch that I came up with, which seems (in my minimal testing) to
yield correct results on 18beta1.
The majority of serious warnings have to do with de-toasting arrays and
the PK's int2vector , while the error has to do with getting column
details such as attisdropped and attname.
Please have a look, and share your thoughts. I haven't touched serious C
coding till I first wrote the above sometime in 2004 with a bunch of
additions some years ago.
regards, tom lane
/****************************************************************************
* pending.c
* $Id: pending.c,v 1.8 2006/03/02 14:31:29 achill4 Exp $
*
* This file contains a trigger for Postgresql-7.x to record changes to tables
* to a pending table for mirroring.
* All tables that should be mirrored should have this trigger hooked up to it.
*
* Written by Steven Singer (ssin...@navtechinc.com)
* (c) 2001-2002 Navtech Systems Support Inc.
* ALL RIGHTS RESERVED
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*
***************************************************************************/
#include <string.h>
#include "postgres.h"
#include "executor/spi.h"
#include "commands/trigger.h"
#include "catalog/pg_type.h"
#include "utils/array.h"
#include "utils/rel.h"
#define Int2VectorSize(n) (offsetof(int2vector, values) + (n) *
sizeof(int16))
#define TRUE 1
#define FALSE 0
PG_MODULE_MAGIC;
enum FieldUsage
{
PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE
};
int storePending(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple,
TupleDesc tTupdesc,
Oid tableOid,
char cOp,int slaveid, char origOp);
int storexid(void);
int handler(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple,
TupleDesc tTupdesc,
Oid tableOid,
char cOp, int slaveid, char *pkxpress);
int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress);
int createAccnt(char *cpTableName, int slaveid, char *pkxpress);
int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress);
int deleteAccnt(char *cpTableName, char *pkxpress);
int deleteSlaveAccnt(char *cpTableName,int slaveid, char *pkxpress);
char *getPKxpress(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc,Oid
tableOid);
int *getSlaves(char *cpTableName,char *pkxpress);
int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc
tupleDesc);
/*char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc
tupleDesc);*/
int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple,
TupleDesc tupleDesc,Oid tableOid,int slaveid);
int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple,
TupleDesc tupleDesc,Oid tableOid,int slaveid);
int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc,
Oid tableOid);
int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid
tbaleOid,int iIncludeKeyData);
int2vector *getPrimaryKey(Oid tblOid);
char *packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDecs,
Oid tableOid,
enum FieldUsage eKeyUsage);
bool isExcluded(char *cpTableName,TupleDesc tTupleDesc,int iColumnCounter);
char *get_namespace_name(Oid nspid);
#define BUFFER_SIZE 256
#define MAX_OID_LEN 10
//#define DEBUG_OUTPUT 1
extern Datum recordchange(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(recordchange);
/*****************************************************************************
* The entry point for the trigger function.
* The Trigger takes a single SQL 'text' argument indicating the name of the
* table the trigger was applied to. If this name is incorrect so will the
* mirroring.
****************************************************************************/
Datum
recordchange(PG_FUNCTION_ARGS)
{
TriggerData *trigdata;
TupleDesc tupdesc;
HeapTuple beforeTuple = NULL;
HeapTuple afterTuple = NULL;
HeapTuple retTuple = NULL;
char *tblname;
char op = 0;
char *schemaname;
char *fullyqualtblname;
char *pkxpress;
if (fcinfo->context != NULL)
{
if (SPI_connect() < 0)
{
elog(NOTICE, "recordchange could not connect to SPI");
return -1;
}
trigdata = (TriggerData *) fcinfo->context;
/* Extract the table name */
tblname = SPI_getrelname(trigdata->tg_relation);
#ifndef NOSCHEMAS
schemaname =
get_namespace_name(RelationGetNamespace(trigdata->tg_relation));
fullyqualtblname = palloc(strlen(tblname) +
strlen(schemaname) + 6);
sprintf(fullyqualtblname,"\"%s\".\"%s\"",
schemaname,tblname);
#else
fullyqualtblname = palloc(strlen(tblname) + 3);
sprintf(fullyqualtblname,"\"%s\"",tblname);
#endif
tupdesc = trigdata->tg_relation->rd_att;
if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
{
retTuple = trigdata->tg_newtuple;
beforeTuple = trigdata->tg_trigtuple;
afterTuple = trigdata->tg_newtuple;
op = 'u';
}
else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
{
retTuple = trigdata->tg_trigtuple;
afterTuple = trigdata->tg_trigtuple;
op = 'i';
}
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
{
retTuple = trigdata->tg_trigtuple;
beforeTuple = trigdata->tg_trigtuple;
op = 'd';
}
if (storexid()) {
elog(ERROR, "Operation could not be mirrored. storexid
problem");
return PointerGetDatum(NULL);
}
pkxpress=getPKxpress(fullyqualtblname,retTuple,tupdesc,retTuple->t_tableOid);
if (handler(fullyqualtblname, beforeTuple, afterTuple,
tupdesc,retTuple->t_tableOid,
op,getSlaveId(fullyqualtblname,retTuple,tupdesc),pkxpress))
{
/* An error occoured. Skip the operation. */
elog(ERROR, "Operation could not be mirrored");
return PointerGetDatum(NULL);
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Returning on success");
#endif
pfree(fullyqualtblname);
pfree(pkxpress);
SPI_finish();
return PointerGetDatum(retTuple);
}
else
{
/*
* Not being called as a trigger.
*/
return PointerGetDatum(NULL);
}
}
/*****************************************************************************
* stores the current xid in dbmirror_xactions
*****************************************************************************/
int
storexid(void) {
//char *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) VALUES
($1)";
char *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) SELECT
$1 WHERE NOT EXISTS (SELECT 1 FROM dbmirror_xactions WHERE xid=$1)";
int iResult = 0;
Datum saPlanData[1];
Oid taPlanArgTypes[1] = {INT4OID};
void *vpPlan;
vpPlan = SPI_prepare(cpQueryBase, 1, taPlanArgTypes);
if (vpPlan == NULL)
elog(NOTICE, " storexid Error creating plan");
saPlanData[0] = Int32GetDatum(GetCurrentTransactionId());
iResult = SPI_execp(vpPlan, saPlanData, NULL , 1);
if (iResult < 0) {
elog(NOTICE, "storexid fired (%s) returned %d", cpQueryBase,
iResult);
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "row successfully stored in dbmirror_xactions");
#endif
return 0;
}
/*****************************************************************************
* Constructs and executes an SQL query to write a record of this tuple change
* to the pending table.
*****************************************************************************/
int
storePending(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple,
TupleDesc tTupDesc,
Oid tableOid,
char cOp, int slaveid, char origOp)
{
char *cpQueryBase = "INSERT INTO dbmirror_pending
(TableName,Op,XID,slaveid,origop) VALUES ($1,$2,$3,$4,$5)";
int iResult = 0;
HeapTuple tCurTuple;
char nulls[5]=" ";
//Points the current tuple(before or after)
Datum saPlanData[5];
Oid taPlanArgTypes[5] = {NAMEOID, CHAROID, INT4OID,
INT4OID, CHAROID};
void *vpPlan;
tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
vpPlan = SPI_prepare(cpQueryBase, 5, taPlanArgTypes);
if (vpPlan == NULL)
elog(NOTICE, "Error creating plan");
/* SPI_saveplan(vpPlan); */
saPlanData[0] = PointerGetDatum(cpTableName);
saPlanData[1] = CharGetDatum(cOp);
saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
saPlanData[3] = Int32GetDatum(slaveid);
if (slaveid <=0) nulls[3]='n';
saPlanData[4] = CharGetDatum(origOp);
iResult = SPI_execp(vpPlan, saPlanData, nulls, 1);
if (iResult < 0)
elog(NOTICE, "storedPending fired (%s) returned %d",
cpQueryBase, iResult);
#if defined DEBUG_OUTPUT
elog(NOTICE, "row successfully stored in pending table");
#endif
if (cOp == 'd')
{
/**
* This is a record of a delete operation.
* Just store the key data.
*/
iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc,
tableOid);
}
else if (cOp == 'i')
{
/**
* An Insert operation.
* Store all data
*/
iResult = storeData(cpTableName, tAfterTuple, tTupDesc,
tableOid,TRUE);
}
else
{
/* op must be an update. */
iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc,
tableOid);
iResult = iResult ? iResult : storeData(cpTableName,
tAfterTuple, tTupDesc,tableOid,TRUE);
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Done storing keyinfo/data");
#endif
return iResult;
}
int
storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
TupleDesc tTupleDesc, Oid tableOid)
{
Oid saPlanArgTypes[1] = {VARCHAROID};
char *insQuery = "INSERT INTO dbmirror_pendingdata
(SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)";
void *pplan;
Datum saPlanData[1];
char *cpKeyData;
char *cpKeyData_tmp;
int iRetCode;
pplan = SPI_prepare(insQuery, 1, saPlanArgTypes);
if (pplan == NULL)
{
elog(NOTICE, "Could not prepare INSERT plan");
return -1;
}
/* pplan = SPI_saveplan(pplan); */
cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid,
PRIMARY);
if (cpKeyData == NULL)
{
elog(ERROR,"Could not determine primary key data");
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "KeyData: %s", cpKeyData);
#endif
cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
saPlanData[0] = PointerGetDatum(cpKeyData_tmp);
iRetCode = SPI_execp(pplan, saPlanData, NULL, 1);
if (cpKeyData != NULL)
pfree(cpKeyData);
if (cpKeyData_tmp != 0)
pfree(cpKeyData_tmp);
if (iRetCode != SPI_OK_INSERT)
{
elog(NOTICE, "Error inserting row in storeKeyInfo");
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Insert successful");
#endif
return 0;
}
int2vector *
getPrimaryKey(Oid tblOid)
{
char *queryBase;
char *query;
bool isNull;
int2vector *resultKey;
int2vector *tpResultKey;
HeapTuple resTuple;
Datum resDatum;
int ret;
queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND
indrelid=";
query = palloc(strlen(queryBase) + MAX_OID_LEN + 1);
sprintf(query, "%s%d", queryBase, tblOid);
ret = SPI_exec(query, 1);
if (ret != SPI_OK_SELECT || SPI_processed != 1)
{
elog(NOTICE, "Could not select primary index key");
return NULL;
}
resTuple = SPI_tuptable->vals[0];
resDatum = SPI_getbinval(resTuple, SPI_tuptable->tupdesc, 1, &isNull);
if (isNull) {
elog(NOTICE, "PKey is NULL");
return NULL;
}
tpResultKey = (int2vector *)
DatumGetPointer(PG_DETOAST_DATUM(resDatum));
int n=tpResultKey->dim1;
resultKey = palloc(Int2VectorSize(n));
if (n > 0)
memcpy(resultKey->values, tpResultKey->values, n *
sizeof(int16));
SET_VARSIZE(resultKey, Int2VectorSize(n));
resultKey->ndim = 1;
resultKey->dataoffset = 0;
resultKey->elemtype = INT2OID;
resultKey->dim1 = n;
resultKey->lbound1 = 0;
pfree(query);
return resultKey;
}
/******************************************************************************
* Stores a copy of the non-key data for the row.
*****************************************************************************/
int
storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid
tableOid, int iIncludeKeyData)
{
Oid planArgTypes[1] = {VARCHAROID};
char *insQuery = "INSERT INTO dbmirror_pendingdata
(SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)";
SPIPlanPtr pplan;
Datum planData[1];
char *cpKeyData;
char *cpKeyData_tmp;
int iRetValue;
pplan = SPI_prepare(insQuery, 1, planArgTypes);
if (pplan == NULL)
{
elog(NOTICE, "Could not prepare INSERT plan");
return -1;
}
/* pplan = SPI_saveplan(pplan); */
if (iIncludeKeyData == 0)
cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,
tableOid, NONPRIMARY);
else
cpKeyData = packageData(cpTableName,tTupleData,
tTupleDesc,tableOid, ALL);
cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
planData[0] = PointerGetDatum(cpKeyData_tmp);
iRetValue = SPI_execp(pplan, planData, NULL, 1);
if (cpKeyData != 0)
pfree(cpKeyData);
if (cpKeyData_tmp != 0)
pfree(cpKeyData_tmp);
if (iRetValue != SPI_OK_INSERT)
{
elog(NOTICE, "Error inserting row in storeData");
return -1;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "Insert successful");
#endif
return 0;
}
/**
* Packages the data in tTupleData into a string of the format
* FieldName='value text' where any quotes inside of value text
* are escaped with a backslash and any backslashes in value text
* are esacped by a second back slash.
*
* tTupleDesc should be a description of the tuple stored in
* tTupleData.
*
* eFieldUsage specifies which fields to use.
* PRIMARY implies include only primary key fields.
* NONPRIMARY implies include only non-primary key fields.
* ALL implies include all fields.
*/
char *
packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid
tableOid,
enum FieldUsage eKeyUsage)
{
int iNumCols;
int2vector *tpPKeys = NULL;
int iColumnCounter;
char *cpDataBlock;
int iDataBlockSize;
int iUsedDataBlock;
iNumCols = tTupleDesc->natts;
if (eKeyUsage != ALL)
{
tpPKeys = getPrimaryKey(tableOid);
if (tpPKeys == NULL)
return NULL;
}
#if defined DEBUG_OUTPUT
if (tpPKeys != NULL)
elog(NOTICE, "Have primary keys");
#endif
cpDataBlock = palloc(BUFFER_SIZE);
iDataBlockSize = BUFFER_SIZE;
iUsedDataBlock = 0; /* To account for the null */
for (iColumnCounter = 1; iColumnCounter <= iNumCols; iColumnCounter++)
{
int iIsPrimaryKey;
int iPrimaryKeyIndex;
char *cpUnFormatedPtr;
char *cpFormatedPtr;
char *cpFieldName;
char *cpFieldData;
if (eKeyUsage != ALL)
{
/* Determine if this is a primary key or not. */
iIsPrimaryKey = 0;
int16 *tpPKeysV = tpPKeys->values;
int tpPKeysSize = tpPKeys->dim1;
for (iPrimaryKeyIndex = 0; iPrimaryKeyIndex<tpPKeysSize;
iPrimaryKeyIndex++)
{
if (tpPKeysV[iPrimaryKeyIndex] ==
iColumnCounter)
{
iIsPrimaryKey = 1;
break;
}
}
if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage
!= NONPRIMARY))
{
/**
* Don't use.
*/
#if defined DEBUG_OUTPUT
elog(NOTICE, "Skipping column");
#endif
continue;
}
} /*
KeyUsage!=ALL */
#ifndef NODROPCOLUMN
// this comment is for 11
if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
//if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
{
/**
* This column has been dropped.
* Do not mirror it.
*/
continue;
}
#endif
/***************************************
Edw mpainei o kwdikas gia elegxo enanti,
tou dbmirror_exclude_attributes, gia
eksairetea columns
****************************************/
if (isExcluded(cpTableName,tTupleDesc,iColumnCounter)) {
continue;
}
// this comment is for 11
cpFieldName =
DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
//cpFieldName =
DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname));
#if defined DEBUG_OUTPUT
elog(NOTICE, "FieldName: %s", cpFieldName);
#endif
while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) +
6)
{
cpDataBlock = repalloc(cpDataBlock, iDataBlockSize +
BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
}
sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName);
iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3;
cpFieldData = SPI_getvalue(tTupleData, tTupleDesc,
iColumnCounter);
cpUnFormatedPtr = cpFieldData;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
if (cpFieldData != NULL)
{
*cpFormatedPtr = '\'';
iUsedDataBlock++;
cpFormatedPtr++;
}
else
{
sprintf(cpFormatedPtr," ");
iUsedDataBlock++;
cpFormatedPtr++;
continue;
}
#if defined DEBUG_OUTPUT
elog(NOTICE, "FieldData: %s", cpFieldData);
elog(NOTICE, "Starting format loop");
#endif
while (*cpUnFormatedPtr != 0)
{
while (iDataBlockSize - iUsedDataBlock < 2)
{
cpDataBlock = repalloc(cpDataBlock,
iDataBlockSize + BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
}
if (*cpUnFormatedPtr == '\\' || *cpUnFormatedPtr ==
'\'')
{
*cpFormatedPtr = '\\';
cpFormatedPtr++;
iUsedDataBlock++;
}
*cpFormatedPtr = *cpUnFormatedPtr;
cpFormatedPtr++;
cpUnFormatedPtr++;
iUsedDataBlock++;
}
pfree(cpFieldData);
while (iDataBlockSize - iUsedDataBlock < 3)
{
cpDataBlock = repalloc(cpDataBlock, iDataBlockSize +
BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
}
sprintf(cpFormatedPtr, "' ");
iUsedDataBlock = iUsedDataBlock + 2;
#if defined DEBUG_OUTPUT
elog(NOTICE, "DataBlock: %s", cpDataBlock);
#endif
} /* for
iColumnCounter */
if (tpPKeys != NULL)
pfree(tpPKeys);
#if defined DEBUG_OUTPUT
elog(NOTICE, "Returning: DataBlockSize:%d
iUsedDataBlock:%d",iDataBlockSize,
iUsedDataBlock);
#endif
memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize -
iUsedDataBlock);
return cpDataBlock;
}
bool isExcluded(char *cpTableName, TupleDesc tupleDesc, int iColumnNumber) {
char *qb1;
char *qb2;
char *q;
char *fieldName;
int ret;
HeapTuple resTuple;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
char *value;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in isExcluded of %s",cpTableName);
#endif
fieldName = SPI_fname(tupleDesc,iColumnNumber);
#if defined DEBUG_OUTPUT
elog(NOTICE, "fieldName=%s",fieldName);
#endif
qb1 = "SELECT ";
qb2 = " = any(attnames) from dbmirror_exclude_attributes where
tblname=";
#if defined DEBUG_OUTPUT
elog(NOTICE, "%s q
size=%d",fieldName,strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2);
#endif
/*
to +1 parakatw einai gia to 0 (null)
*/
q =
palloc(strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2+1);
sprintf(q,"%s'%s'%s'%s'",qb1,fieldName,qb2,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
pfree(fieldName);
if (ret != SPI_OK_SELECT || SPI_processed != 1) return FALSE;
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
value = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (value == NULL) return FALSE;
if (strncmp(value,"t",1) == 0 || strncmp(value,"T",1) == 0) {
pfree(value);
return TRUE;
}
else {
pfree(value);
return FALSE;
}
}
#define MAXKCOLS 32
#define MAXCOL_LEN 128
int handler (char *cpTableName,HeapTuple tBeforeTuple,HeapTuple
tAfterTuple,TupleDesc tupleDesc,Oid tableOid,char op,int slaveid,char
*pkxpress) {
HeapTuple tuple;
tuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
#if defined DEBUG_OUTPUT
elog(NOTICE,"---->IN handler with tableid=%d,tablename=%s,slaveid=%d,
op=%c,pkxpress=%s",tableOid,cpTableName,slaveid,op,pkxpress);
#endif
if (slaveid == 0 && op == 'm') {
elog(ERROR,"Found an explicit table ISEXPUNCOND with the 'm'
(implicit) operation. Real slave ids ARE NEVER 0. That shouldnt happen");
return -1;
}
/*
ISEXPUNCOND
*/
else if (slaveid == 0) return
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
/*
ISEXPWITHSLAVEID
*/
else if (slaveid > 0 && op =='d') {
int retval;
retval =
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
return
updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,slaveid);
}
else if (slaveid > 0 && op == 'i') {
int retval;
retval =
handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
retval=
(retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
return retval;
}
else if (slaveid > 0 && op == 'u') {
int retval;
int old_slaveid;
old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
retval =
handleParents(cpTableName,(slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
if (old_slaveid != -3 && old_slaveid != slaveid) {
retval = retval ||
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
}
if (old_slaveid != slaveid) {
retval = retval ||
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
}
else {
retval=
(retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
}
if (old_slaveid != -3) {
retval =
(retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
}
return retval;
}
else if (slaveid == -3 && (op =='d' || op =='i')) {
return 0;
}
else if (slaveid == -3 && op =='u') {
int retval=0;
int old_slaveid;
old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
if (old_slaveid > 0) {
retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
retval =
(retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
}
return retval;
}
/*
ISIMPLBACKWARD KALESMENO apo handleParents
*/
else if (slaveid >0 && op == 'm') {
if (getSlaveId(cpTableName,tuple,tupleDesc) >= 0) {
return 0;
/*
SE SXOLIO Giati twra mporei enas pateras ISEXPUNCOND/ISEXPWITHSLAVEID na exei
paidi ISIMPLBACKWARD
*/
/*
elog(ERROR,"Found an explicit table ISEXPUNCOND or
ISEXPWITHSLAVEID with the 'm' (implicit) operation. .Fathers of ISEXPUNCOND
must BE ALWAYS ISEXPUNCOND. Fathers of ISEXPWITHSLAVEID must be always
ISIMPL.That shouldnt happen");
*/
}
if (getSlaveId(cpTableName,tuple,tupleDesc) == -2) {
return 0;
/*
*/
}
if (getSlaveId(cpTableName,tuple,tupleDesc) == -3) {
return 0;
}
if (existsInAccnt(cpTableName,slaveid,pkxpress)) return 0;
else {
int retval;
retval =
handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
retval =
(retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
return
(retval)?retval:createAccnt(cpTableName,slaveid,pkxpress);
}
}
/*
ISIMPL* but called from trigger ('i','d','u')
*/
/*
ISIMPLBACKWARD
*/
else if (slaveid == -1) {
if (op == 'i') return 0;
/*
Delete in this fashion may happen for "orphan" rows.
The deletions will be "triggered" by childern deletions/updates.
*/
else if (op == 'd') {
int retval=0;
int *slaves;
int *run;
slaves = getSlaves(cpTableName,pkxpress);
if (slaves == NULL) return 0;
for (run=slaves;*run;run++) {
retval = retval ||
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
retval = retval ||
updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,*run);
}
pfree(slaves);
return retval || deleteAccnt(cpTableName,pkxpress);
}
else if (op == 'u') {
int retval=0;
int *slaves;
int *run;
slaves = getSlaves(cpTableName,pkxpress);
#if defined DEBUG_OUTPUT
elog(NOTICE,"in slaveid=-1, op='u',
slaves=%d",(int)slaves);
#endif
if (slaves == NULL) return 0;
for (run=slaves;*run;run++) {
#if defined DEBUG_OUTPUT
elog(NOTICE,"in slaveid=-1, op='u',
runslave=%d",*run);
#endif
retval = retval ||
handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
retval = retval ||
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
retval = retval ||
updateAccntParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
}
pfree(slaves);
return retval;
}
}
/*
ISIMPLFORWARD
*/
else if (slaveid == -2) {
if (op == 'i') {
int retval=0;
int forw_slaveid =
getComputedSlaveId(cpTableName,tuple,tupleDesc);
if (forw_slaveid>0) {
retval = retval ||
handleParents(cpTableName,NULL,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
retval = retval ||
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
}
return retval;
}
else if (op == 'd') {
int retval=0;
int forw_slaveid =
getComputedSlaveId(cpTableName,tuple,tupleDesc);
if (forw_slaveid>0) {
retval = retval ||
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
retval = retval ||
updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,forw_slaveid);
}
return retval ;
}
else if (op == 'u') {
/*
POTE ALLAGH TOU VALUE TOU COLUMN NAME TOU PATH dhl tou COLUMN ME NAME PATHCOL
*/
int retval=0;
int forw_slaveid =
getComputedSlaveId(cpTableName,tAfterTuple,tupleDesc);
int old_slaveid =
getOldComputedSlaveId(cpTableName,tBeforeTuple,tupleDesc);
//char origop =
getForwardParentOrigOp(cpTableName,tBeforeTuple,tupleDesc);
//if (origop == 'i') {
// old_slaveid = forw_slaveid;
//}
//elog(NOTICE, "in handler ISIMPLFWD nu, op='%c' ,
forw_slaveid= %d , old_slaveid= %d ", op,forw_slaveid,old_slaveid);
// LEGACY CODE as of 2016-03-18
//retval = retval ||
handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
//retval = retval ||
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid);
//retval = retval ||
updateAccntParents(cpTableName,tBeforeTuple,(forw_slaveid!=old_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
if (forw_slaveid>0) {
retval =
handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
if (old_slaveid != -3 && old_slaveid !=
forw_slaveid) {
retval = retval ||
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
}
if (old_slaveid != forw_slaveid) {
retval = retval ||
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',forw_slaveid,op);
}
else {
retval=
(retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
}
if (old_slaveid != -3) {
retval =
(retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=forw_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
}
}
else { // ISIMPLFORWARD me vslid IS NULL --> -3
if (old_slaveid > 0) {
retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
retval =
(retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
}
}
return retval;
}
}
return 0;
}
int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *qb;
char *q;
char *foo;
int ret;
HeapTuple resTuple;
int attnum;
int slaveid;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getSlaveId of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where
tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
/*
ISIMPLBACKWARD
*/
if (ret != SPI_OK_SELECT || SPI_processed != 1) return -1;
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
/*
ISEXPUNCOND
*/
if (slave_siteidkeyname == NULL) return 0;
/*
ISIMPLFORWARD
*/
if (strncmp(slave_siteidkeyname,"___",3) == 0) {
pfree(slave_siteidkeyname);
return -2;
}
attnum = SPI_fnumber(tupleDesc,slave_siteidkeyname);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
elog(ERROR,"WRONG slaveidkeyname=%s given for table
%s",slave_siteidkeyname,cpTableName);
pfree(slave_siteidkeyname);
return 0;
}
foo=SPI_getvalue(tuple,tupleDesc,attnum);
/*
ISEXPWITHSLAVEID BUT siteidkeyname is null
*/
if (foo == NULL) {
pfree(slave_siteidkeyname);
return -3;
}
pfree(slave_siteidkeyname);
slaveid = atoi(foo);
pfree(foo);
/*
ISEXPWITHSLAVEID
*/
return slaveid;
}
int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getComputedSlaveId of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where
tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR,"ISIMPLFORWARD Table %s NOT IN
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT
IMMEDIATELY.",cpTableName);
}
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (slave_siteidkeyname == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT
IMMEDIATELY.",cpTableName);
}
if (strncmp(slave_siteidkeyname,"___",3) != 0) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not
starting with \"___\" in dbmirror_explicitreptables. How did we get here?
CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
// elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
path = slave_siteidkeyname + 3;
// elog(NOTICE,"path=-%s-",path);
run = index(path,'_');
if (run == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has not the right
\"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables.
How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
pathcol = palloc(MAXCOL_LEN);
// elog(NOTICE,"path=-%d-",(unsigned)path);
// elog(NOTICE,"run=-%d-",(unsigned)run);
// elog(NOTICE,"run-path=-%d-",run-path);
*run = 0;
strncpy(pathcol,path,run-path+1);
// elog(NOTICE,"pathcol=-%s-",pathcol);
forw_table = run+1;
attnum = SPI_fnumber(tupleDesc,pathcol);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
pfree(slave_siteidkeyname);
elog(ERROR,"WRONG pathcol=%s given for table
%s",slave_siteidkeyname,cpTableName);
return 0;
}
pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
/*
forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
*/
run = index(forw_table,'@');
if (run == NULL) {
forw_table_id = palloc(MAXCOL_LEN);
strncpy(forw_table_id,"id",3);
}
else {
forw_table_id = palloc(MAXCOL_LEN);
*run=0;
strncpy(forw_table_id,forw_table,run-forw_table+1);
forw_table=run+1;
}
q = palloc(512);
sprintf(q,"SELECT * FROM public.%s where
%s='%s'",forw_table,forw_table_id,pathcolval);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
pfree(slave_siteidkeyname);
elog(ERROR, "Fatal error: ComputedSlaveId: par table %s should
have row with %s=%s",forw_table,forw_table_id,pathcolval);
return -2;
}
pfree(q);
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
ParTableName = palloc(256);
sprintf(ParTableName,"\"public\".\"%s\"",forw_table);
ret = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
if (ret == -2) {
pfree(slave_siteidkeyname);
ret =
getComputedSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
pfree(ParTableName);
return ret;
}
if (ret <= 0) {
pfree(slave_siteidkeyname);
pfree(ParTableName);
// elog(ERROR, "Fatal error: par table %s with slaveid!=-2 should
have slaveid>0",forw_table);
/***
apopiera xeirismou NULL slaveid, dhl isodynamo ISEXPWITHSLAVEID
kai slaveid IS NULL
***/
return -3;
}
pfree(ParTableName);
pfree(slave_siteidkeyname);
return ret;
/*
PROSOXH:
To swsto einai apla sto slave_siteidkeyname na fylame MONO to parent table, kai
meta
me query na briskoume to tuple tou parent table.
*/
}
int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc
tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getOldComputedSlaveId of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where
tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR,"ISIMPLFORWARD Table %s NOT IN
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT
IMMEDIATELY.",cpTableName);
}
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (slave_siteidkeyname == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT
IMMEDIATELY.",cpTableName);
}
if (strncmp(slave_siteidkeyname,"___",3) != 0) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not
starting with \"___\" in dbmirror_explicitreptables. How did we get here?
CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
// elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
path = slave_siteidkeyname + 3;
// elog(NOTICE,"path=-%s-",path);
run = index(path,'_');
if (run == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has not the right
\"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables.
How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
pathcol = palloc(MAXCOL_LEN);
// elog(NOTICE,"path=-%d-",(unsigned)path);
// elog(NOTICE,"run=-%d-",(unsigned)run);
// elog(NOTICE,"run-path=-%d-",run-path);
*run = 0;
strncpy(pathcol,path,run-path+1);
// elog(NOTICE,"pathcol=-%s-",pathcol);
forw_table = run+1;
attnum = SPI_fnumber(tupleDesc,pathcol);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
pfree(slave_siteidkeyname);
elog(ERROR,"WRONG pathcol=%s given for table
%s",slave_siteidkeyname,cpTableName);
return 0;
}
pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
/*
forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
*/
run = index(forw_table,'@');
if (run == NULL) {
forw_table_id = palloc(MAXCOL_LEN);
strncpy(forw_table_id,"id",3);
}
else {
forw_table_id = palloc(MAXCOL_LEN);
*run=0;
strncpy(forw_table_id,forw_table,run-forw_table+1);
forw_table=run+1;
}
/*
consult :
https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing
an parent's op=='d' tote sigoura parent's origop=='u'
*/
q = palloc(512);
sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE
tablename='\"public\".\"%s\"' AND op='d' AND xid=%d ORDER BY seqid DESC LIMIT 1
",forw_table,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
if (foo == NULL) {
slaveid_d = -3;
}
else {
slaveid_d = atoi(foo);
pfree(foo);
}
pfree(q);
pfree(pathcolval);
return slaveid_d;
}
/* dirty patch code to handle copy supplycase. it was supposed to solve the
problem of not turning 'u' to 'i' for e.g supplycasesquotes */
/* but it doesnt work for fb_ , as it turns 'i' to 'u' as per : dbmirror issues
when changing vslid (null->int, int->null, intX->intY) */
/* following code must be here (not commended) as of 2018-12-20 */
/* the aim is make everything work correctly, supplies + fb_ + all type
ISIMPLFORWARD (-2) tables */
/*
q = palloc(512);
sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE
tablename='%s' AND op='i' AND xid=%d ORDER BY seqid DESC LIMIT 1
",cpTableName,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
if (foo == NULL) {
slaveid_d = -3;
}
else {
slaveid_d = atoi(foo);
pfree(foo);
}
pfree(q);
pfree(pathcolval);
return slaveid_d;
}
*/
/**
pls consult :
https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing
if (parent op=='i' and origop <> 'i' exist) -pou simainei oti origop='u' kai
(eite slaveid NULL -> NOT NULL eite X -> Y, X!=Y) - we consider this as an
insert and return -3 in order to communicate this to handler().
NOTE, this means that it is ILLEGAL for the app to do UPDATE on the parent,
then update on the kid, and then perform a second UPDATE on the kid, this will
result in TWO inserts and produce an duplicate ERROR.
else - diladi origop == 'i' or no parent op=='i' exists - pou simainei : (
eite parent origop == 'i' (synepagetai oti parent op == 'i')
eite parent op != 'i' : (synepagetai :
eite parent op == 'd' opote den tha ftasei edo o kwdikas, logw tou 1ou
statement pio pano
eite parent op == 'u' (case NOT NULL (X) -> NOT NULL (Y) me X==Y )
)
)
then we let the code fallback to return
getComputedSlaveId(cpTableName,tuple,tupleDesc) which will return forw_slaveid .
**/
q = palloc(512);
sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE
tablename='\"public\".\"%s\"' AND op='i' AND origop <> 'i' AND xid=%d ORDER BY
seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
pfree(slave_siteidkeyname);
pfree(pathcolval);
return -3;
}
pfree(slave_siteidkeyname);
pfree(pathcolval);
return getComputedSlaveId(cpTableName,tuple,tupleDesc) ; /* ISA 'u' */
/*
* PROYPOTHETOUME OTI O AMESOS BABAS exei entry sto dbmirror_pending me to idio
xid kai op='d'
*/
}
/*
char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc
tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
char origop;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getForwardParentOrigOp of %s",cpTableName);
#endif
qb = "SELECT siteidkeyname from dbmirror_explicitreptables where
tblname=";
q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
sprintf(q,"%s'%s'",qb,cpTableName);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR,"ISIMPLFORWARD Table %s NOT IN
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT
IMMEDIATELY.",cpTableName);
}
SIDKEY_tupTable = SPI_tuptable;
SIDKEY_processed = SPI_processed;
resTuple = SIDKEY_tupTable->vals[0];
slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
if (slave_siteidkeyname == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT
IMMEDIATELY.",cpTableName);
}
if (strncmp(slave_siteidkeyname,"___",3) != 0) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not
starting with \"___\" in dbmirror_explicitreptables. How did we get here?
CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
// elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
path = slave_siteidkeyname + 3;
// elog(NOTICE,"path=-%s-",path);
run = index(path,'_');
if (run == NULL) {
pfree(slave_siteidkeyname);
elog(ERROR,"ISIMPLFORWARD Table %s has not the right
\"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables.
How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
}
pathcol = palloc(MAXCOL_LEN);
// elog(NOTICE,"path=-%d-",(unsigned)path);
// elog(NOTICE,"run=-%d-",(unsigned)run);
// elog(NOTICE,"run-path=-%d-",run-path);
*run = 0;
strncpy(pathcol,path,run-path+1);
// elog(NOTICE,"pathcol=-%s-",pathcol);
forw_table = run+1;
attnum = SPI_fnumber(tupleDesc,pathcol);
if (attnum == SPI_ERROR_NOATTRIBUTE) {
pfree(slave_siteidkeyname);
elog(ERROR,"WRONG pathcol=%s given for table
%s",slave_siteidkeyname,cpTableName);
return 0;
}
pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
*/
/*
forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
*/
/*
run = index(forw_table,'@');
if (run == NULL) {
forw_table_id = palloc(MAXCOL_LEN);
strncpy(forw_table_id,"id",3);
}
else {
forw_table_id = palloc(MAXCOL_LEN);
*run=0;
strncpy(forw_table_id,forw_table,run-forw_table+1);
forw_table=run+1;
}
q = palloc(512);
sprintf(q," SELECT origop FROM public.dbmirror_pending WHERE
tablename='\"public\".\"%s\"' AND xid=%d ORDER BY seqid DESC LIMIT 1
",forw_table,GetCurrentTransactionId());
ret = SPI_exec(q,1);
if (ret == SPI_OK_SELECT && SPI_processed == 1) {
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
if (foo == NULL) {
pfree(q);
pfree(pathcolval);
elog(ERROR,"WRONG forward parent table %s has null
origop in this transaction",forw_table);
}
else {
origop = foo[0];
pfree(foo);
}
pfree(q);
pfree(pathcolval);
return origop;
}
return ' ';
}
*/
#define MAX_WHERE_CLAUSE 512
#define MAX_QUERY_LEN 512
int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple,
TupleDesc tupleDesc,Oid tableOid,int slaveid) {
char *qb;
char *q;
bool isNull;
int ret;
int i;
HeapTuple resTuple;
Datum resDatum;
Oid confrelid;
char *confrelname;
int16 *thisCols;
int16 *fkCols;
int16 *run;
ArrayType *arr;
char AthisColsVals[MAXKCOLS][MAXCOL_LEN];
char fkColsNames[MAXKCOLS][MAXCOL_LEN];
char thisColsTypes[MAXKCOLS][100];
short numOfCols;
SPITupleTable *FK_tupTable;
int FK_processed;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in handleParents of Table=%d",tableOid);
#endif
qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM
pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND
c.confrelid = f.oid AND c.conrelid = ";
q = palloc(strlen(qb)+MAX_OID_LEN+1);
sprintf(q,"%s%d",qb,tableOid);
ret = SPI_exec(q,0);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed < 0) {
elog(NOTICE, "no FKs");
return 0;
}
/*
For every FK dependency we track and handle the parent table
*/
FK_tupTable = SPI_tuptable;
FK_processed = SPI_processed;
for (i=0;i<FK_processed;i++) {
char *WHERE;
char *ParTableName;
int j;
char FkHasNullValueORXcluded=0;
char handleit=0;
SPITupleTable *RealPar_tupTable;
int RealPar_processed;
HeapTuple RealPar_tuple;
int colrun;
SPITupleTable *thisatt_tupTable;
int thisatt_processed;
HeapTuple thisatt_tuple;
resTuple = FK_tupTable->vals[i];
resDatum =
SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
confrelid = (Oid) DatumGetObjectId(resDatum);
resDatum =
SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
thisCols = (int16 *)ARR_DATA_PTR(arr);
numOfCols=ARR_DIMS(arr)[0];
#if defined DEBUG_OUTPUT
int DIM_0 = ARR_DIMS(arr)[0];
int LBOUND_0 = ARR_LBOUND(arr)[0];
// resDatum =
SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
// confrelname = (char *) DatumGetName(resDatum);
// elog(NOTICE, "in handleParents of Table=%d, IN fetching FK for
table=%s, DIM=%u, LBOUND=%u",tableOid,confrelname,DIM_0,LBOUND_0);
#endif
for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++) {
int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
char *fooname;
q = palloc(512);
sprintf(q,"SELECT attname FROM pg_attribute WHERE
attrelid=%d and attnum=%d",tableOid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: handleParents: this
table oid=%d should have attribute with attnum=%d",tableOid,*run);
return -2;
}
pfree(q);
thisatt_processed = SPI_processed;
thisatt_tupTable = SPI_tuptable;
thisatt_tuple = thisatt_tupTable->vals[0];
fooname =
SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
char *value =
SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
char *coltype =
SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with
After value=%s",fooname,*run,thiscolrun,value);
#endif
if (value == NULL ||
isExcluded(cpTableName,tupleDesc,thiscolrun)) {
FkHasNullValueORXcluded=1;
break;
}
memcpy(&(AthisColsVals[colrun][0]),value,strlen(value)+1);
memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);
if (Btuple != NULL) {
char *Bvalue =
SPI_getvalue(Btuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d
thiscol=%d with After value=%s,Before
value=%s",fooname,*run,thiscolrun,value,Bvalue);
#endif
if (Bvalue == NULL) handleit=1;
else if (strcmp(Bvalue,value)) {
handleit=1;
pfree(Bvalue);
}
}
else handleit=1;
pfree(value);
}
#if defined DEBUG_OUTPUT
elog(NOTICE,"in handleParents--> END OF ATTR LOOP");
elog(NOTICE,"handle it = %d, ",handleit);
elog(NOTICE,"FkHasNullValueORXcluded = %d,
",FkHasNullValueORXcluded);
#endif
if (FkHasNullValueORXcluded || !handleit) continue;
resDatum =
SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
fkCols = (int16 *)ARR_DATA_PTR(arr);
for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
SPITupleTable *PAR_tupTable;
int PAR_processed;
HeapTuple PAR_resTuple;
char *value;
q = palloc(strlen("SELECT attname from pg_attribute
where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
sprintf(q,"SELECT attname from pg_attribute where
attrelid=%d and attnum=%d",confrelid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal Error:no PK for FK
relation");
return -2;
}
pfree(q);
PAR_tupTable = SPI_tuptable;
PAR_processed = SPI_processed;
PAR_resTuple = PAR_tupTable->vals[0];
value =
SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
pfree(value);
}
resDatum =
SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
confrelname = (char *) DatumGetName(resDatum);
/*
There must be at least 1 column in compound FK!!
*/
WHERE = palloc(MAX_WHERE_CLAUSE);
if (!strncmp(thisColsTypes[0],"int",3))
sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],AthisColsVals[0]);
else
sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],AthisColsVals[0]);
for (j=1;j<numOfCols;j++) {
if (!strncmp(thisColsTypes[j],"int",3))
sprintf(WHERE,"%s AND
\"%s\"=%s",WHERE,fkColsNames[j],AthisColsVals[j]);
else
sprintf(WHERE,"%s AND
\"%s\"='%s'",WHERE,fkColsNames[j],AthisColsVals[j]);
}
q = palloc(512);
sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
//elog(NOTICE,"handleParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found FK, SQL=%s",q);
#endif
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: handleParents: par table %s
should have row with %s",confrelname,WHERE);
return -2;
}
pfree(q);
RealPar_processed = SPI_processed;
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
ParTableName = palloc(256);
sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
ret =
handler(ParTableName,NULL,RealPar_tuple,RealPar_tupTable->tupdesc,confrelid,'m',slaveid,WHERE);
pfree(ParTableName);
pfree(WHERE);
}
return 0;
}
int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in existsInAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
/*
sprintf(q,"SELECT 1 FROM dbmirror_accounting WHERE tblname='%s' AND
pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) return 0;
else return 1;
*/
sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt+1 WHERE tblname='%s'
AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_UPDATE || SPI_processed != 1) return 0;
else return 1;
}
int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress) {
char *q;
int ret;
bool isNull;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in decreaseAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt-1 WHERE tblname='%s'
AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_UPDATE || SPI_processed != 1) elog(ERROR,"problem in
decreaseAccnt for table %s, slaveid=%d,
pkxpress=%s",cpTableName,slaveid,pkxpress);
q = palloc(MAX_QUERY_LEN);
sprintf(q,"SELECT cnt FROM dbmirror_accounting WHERE tblname='%s' AND
pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed != 1) elog(ERROR,"problem in
decreaseAccnt for table %s, slaveid=%d,
pkxpress=%s",cpTableName,slaveid,pkxpress);
return
DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],SPI_tuptable->tupdesc,1,&isNull));
}
int createAccnt(char *cpTableName, int slaveid, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in createAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
/*
public.dbmirror_accounting.cnt DEFAULT = 1
*/
sprintf(q,"INSERT INTO dbmirror_accounting (tblname,pkxpress,slaveid)
VALUES('%s','%s',%d)",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,1);
pfree(q);
if (ret != SPI_OK_INSERT || SPI_processed != 1) return -2;
else return 0;
}
char *getPKxpress(char *cpTableName, HeapTuple tuple, TupleDesc tupleDesc, Oid
tableOid) {
int2vector *pk;
int i;
char *WHERE = palloc(MAX_WHERE_CLAUSE);
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getPKxpress of Table=%s",cpTableName);
#endif
pk = getPrimaryKey(tableOid);
if (pk == NULL) return NULL;
int16 *pkV = pk->values;
int pkSize = pk->dim1;
if (pkSize>0) {
char *keyname;
char *keyval;
keyname = SPI_fname(tupleDesc,pkV[0]);
if (keyname == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for
table %s",pkV[0],cpTableName);
return NULL;
}
keyval = SPI_getvalue(tuple,tupleDesc,pkV[0]);
if (keyval == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for
table %s",pkV[0],cpTableName);
return NULL;
}
sprintf(WHERE,"\"%s\"=%s",keyname,keyval);
pfree(keyname);
pfree(keyval);
}
for (i=1;i<pkSize;i++) {
char *keyname;
char *keyval;
keyname = SPI_fname(tupleDesc,pkV[i]);
if (keyname == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for
table %s",pkV[i],cpTableName);
return NULL;
}
keyval = SPI_getvalue(tuple,tupleDesc,pkV[i]);
if (keyval == NULL) {
elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for
table %s",pkV[i],cpTableName);
return NULL;
}
sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,keyname,keyval);
pfree(keyname);
pfree(keyval);
}
pfree(pk);
return WHERE;
}
int *getSlaves(char *cpTableName,char *pkxpress) {
char *q;
int ret;
int SLAVE_processed;
SPITupleTable *SLAVE_tupTable;
int *slaves;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in getSlaves of Table=%s,
pkxpress=%s",cpTableName,pkxpress);
#endif
if (pkxpress == NULL) return NULL;
q = palloc(MAX_QUERY_LEN);
sprintf(q,"SELECT slaveid FROM dbmirror_accounting WHERE tblname='%s'
AND pkxpress='%s'",cpTableName,pkxpress);
ret = SPI_exec(q,0);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed <= 0) return NULL;
SLAVE_tupTable = SPI_tuptable;
SLAVE_processed = SPI_processed;
slaves = palloc((SLAVE_processed+1) * sizeof(int));
#if defined DEBUG_OUTPUT
elog(NOTICE,"SLAVE_processed=%d",SLAVE_processed);
#endif
for (ret=0;ret<SLAVE_processed;ret++) {
char *slaveidStr =
SPI_getvalue(SLAVE_tupTable->vals[ret],SLAVE_tupTable->tupdesc,1);
*(slaves+ret) = atoi(slaveidStr);
pfree(slaveidStr);
}
*(slaves+SLAVE_processed) = 0;
return slaves;
}
int deleteAccnt (char *cpTableName, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in deleteAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND
pkxpress='%s'",cpTableName,pkxpress);
ret = SPI_exec(q,0);
pfree(q);
if (ret == SPI_OK_DELETE) return 0;
else return -2;
}
int deleteSlaveAccnt (char *cpTableName,int slaveid, char *pkxpress) {
char *q;
int ret;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in deleteSlaveAccnt of Table=%s",cpTableName);
#endif
q = palloc(MAX_QUERY_LEN);
sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND
pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
ret = SPI_exec(q,0);
pfree(q);
if (ret == SPI_OK_DELETE) return 0;
else return -2;
}
int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple,
TupleDesc tupleDesc,Oid tableOid,int slaveid) {
char *qb;
char *q;
bool isNull;
int ret;
int i;
HeapTuple resTuple;
Datum resDatum;
Oid confrelid;
char *confrelname;
int16 *thisCols;
int16 *fkCols;
int16 *run;
ArrayType *arr;
char BthisColsVals[MAXKCOLS][MAXCOL_LEN];
char fkColsNames[MAXKCOLS][MAXCOL_LEN];
char thisColsTypes[MAXKCOLS][100];
short numOfCols;
SPITupleTable *FK_tupTable;
int FK_processed;
int ParSlaveId;
#if defined DEBUG_OUTPUT
elog(NOTICE, "in updateAccnt parents of Table=%d",tableOid);
#endif
qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM
pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND
c.confrelid = f.oid AND c.conrelid = ";
q = palloc(strlen(qb)+MAX_OID_LEN+1);
sprintf(q,"%s%d",qb,tableOid);
ret = SPI_exec(q,0);
pfree(q);
if (ret != SPI_OK_SELECT || SPI_processed < 0) {
elog(NOTICE, "no FKs");
return 0;
}
/*
For every FK dependency we track and handle the parent table
*/
FK_tupTable = SPI_tuptable;
FK_processed = SPI_processed;
for (i=0;i<FK_processed;i++) {
char *WHERE;
char *ParTableName;
int j;
char FkHasNullValueORXcluded=0;
char decreaseit=0;
SPITupleTable *RealPar_tupTable;
int RealPar_processed;
HeapTuple RealPar_tuple;
int colrun;
SPITupleTable *thisatt_tupTable;
int thisatt_processed;
HeapTuple thisatt_tuple;
resTuple = FK_tupTable->vals[i];
resDatum =
SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
confrelid = (Oid) DatumGetObjectId(resDatum);
resDatum =
SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
thisCols = (int16 *)ARR_DATA_PTR(arr);
numOfCols=ARR_DIMS(arr)[0];
for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++) {
int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
char *fooname;
q = palloc(512);
sprintf(q,"SELECT attname FROM pg_attribute WHERE
attrelid=%d and attnum=%d",tableOid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: handleParents: this
table oid=%d should have attribute with attnum=%d",tableOid,*run);
return -2;
}
pfree(q);
thisatt_processed = SPI_processed;
thisatt_tupTable = SPI_tuptable;
thisatt_tuple = thisatt_tupTable->vals[0];
fooname =
SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
char *value =
SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
char *coltype =
SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with
Before value=%s",fooname,*run,thiscolrun,value);
#endif
if (value == NULL ||
isExcluded(cpTableName,tupleDesc,thiscolrun)) {
FkHasNullValueORXcluded=1;
break;
/*
Nothing is done regarding "Before" status of
parent since there is not one
*/
}
memcpy(&(BthisColsVals[colrun][0]),value,strlen(value)+1);
memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);
if (Atuple != NULL) {
char *Avalue =
SPI_getvalue(Atuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found colname=%s syscol=%d
thiscol=%d with Before value=%s,After
value=%s",fooname,*run,thiscolrun,value,Avalue);
#endif
if (Avalue == NULL) decreaseit=1;
else if (strcmp(Avalue,value)) {
decreaseit=1;
pfree(Avalue);
}
}
else decreaseit=1; /* is delete operation*/
pfree(value);
}
#if defined DEBUG_OUTPUT
elog(NOTICE,"in updateAccnParents--> END OF ATTR LOOP");
elog(NOTICE,"decrease it = %d, ",decreaseit);
elog(NOTICE,"FkHasNullValueORXcluded = %d,
",FkHasNullValueORXcluded);
#endif
if (FkHasNullValueORXcluded || !decreaseit) continue;
resDatum =
SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
fkCols = (int16 *)ARR_DATA_PTR(arr);
for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
SPITupleTable *PAR_tupTable;
int PAR_processed;
HeapTuple PAR_resTuple;
char *value;
q = palloc(strlen("SELECT attname from pg_attribute
where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
sprintf(q,"SELECT attname from pg_attribute where
attrelid=%d and attnum=%d",confrelid,*run);
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal Error:no PK for FK
relation");
return -2;
}
pfree(q);
PAR_tupTable = SPI_tuptable;
PAR_processed = SPI_processed;
PAR_resTuple = PAR_tupTable->vals[0];
value =
SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
pfree(value);
}
resDatum =
SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
confrelname = (char *) DatumGetName(resDatum);
/*
there must be at least 1 column in compound FK!!
*/
WHERE = palloc(MAX_WHERE_CLAUSE);
if (!strncmp(thisColsTypes[0],"int",3))
sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],BthisColsVals[0]);
else
sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],BthisColsVals[0]);
for (j=1;j<numOfCols;j++) {
if (!strncmp(thisColsTypes[j],"int",3))
sprintf(WHERE,"%s AND
\"%s\"=%s",WHERE,fkColsNames[j],BthisColsVals[j]);
else
sprintf(WHERE,"%s AND
\"%s\"='%s'",WHERE,fkColsNames[j],BthisColsVals[j]);
}
q = palloc(512);
sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
//elog(NOTICE,"updateAccntParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
elog(NOTICE,"Found FK, SQL=%s",q);
#endif
ret = SPI_exec(q,1);
if (ret != SPI_OK_SELECT || SPI_processed != 1) {
elog(ERROR, "Fatal error: updateAcctParents: par table
%s should have row with %s",confrelname,WHERE);
return -2;
}
pfree(q);
RealPar_processed = SPI_processed;
RealPar_tupTable = SPI_tuptable;
RealPar_tuple = RealPar_tupTable->vals[0];
ParTableName = palloc(256);
sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
ParSlaveId =
getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
if (ParSlaveId >= 0 || ParSlaveId == -2 || ParSlaveId == -3) {
pfree(ParTableName);
continue;
}
if (decreaseAccnt(ParTableName,slaveid,WHERE)==0) {
ret =
storePending(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,'d',slaveid,'f');
ret = deleteSlaveAccnt(ParTableName,slaveid,WHERE);
ret =
updateAccntParents(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,slaveid);
}
pfree(ParTableName);
pfree(WHERE);
}
return 0;
}
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wcast-function-type -Wshadow=compatible-local -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -O2 -fPIC -DPIC -fvisibility=hidden -I. -I./ -I/usr/local/pgsql/include/server -I/usr/local/pgsql/include/internal -I/usr/local/include -c -o pending.o pending.c
pending.c: In function 'storePending':
pending.c:249:25: warning: variable 'tCurTuple' set but not used [-Wunused-but-set-variable]
249 | HeapTuple tCurTuple;
| ^~~~~~~~~
In file included from /usr/local/pgsql/include/server/nodes/execnodes.h:34,
from /usr/local/pgsql/include/server/commands/trigger.h:18,
from /usr/local/pgsql/include/server/executor/spi.h:16,
from pending.c:36:
pending.c: In function 'getPrimaryKey':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
241 | pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| |
| struct varlena *
pending.c:402:54: note: in expansion of macro 'PG_DETOAST_DATUM'
402 | tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
| ^~~~~~~~~~~~~~~~
In file included from pending.c:34:
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
317 | DatumGetPointer(Datum X)
| ~~~~~~^
pending.c:403:9: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
403 | int n=tpResultKey->dim1;
| ^~~
pending.c: In function 'packageData':
pending.c:527:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
527 | int16 *tpPKeysV = tpPKeys->values;
| ^~~~~
pending.c:551:29: error: 'struct TupleDescData' has no member named 'attrs'
551 | if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
| ^~
pending.c:571:71: error: 'struct TupleDescData' has no member named 'attrs'
571 | cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
| ^~
pending.c: In function 'isExcluded':
pending.c:664:17: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
664 | int SIDKEY_processed;
| ^~~~~~~~~~~~~~~~
pending.c: In function 'handler':
pending.c:729:21: warning: variable 'retval' set but not used [-Wunused-but-set-variable]
729 | int retval;
| ^~~~~~
pending.c: In function 'getSlaveId':
pending.c:928:17: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
928 | int SIDKEY_processed;
| ^~~~~~~~~~~~~~~~
pending.c: In function 'getComputedSlaveId':
pending.c:993:5: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
993 | int SIDKEY_processed;
| ^~~~~~~~~~~~~~~~
pending.c: In function 'getOldComputedSlaveId':
pending.c:1119:7: warning: unused variable 'ParTableName' [-Wunused-variable]
1119 | char *ParTableName;
| ^~~~~~~~~~~~
pending.c:1116:5: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
1116 | int SIDKEY_processed;
| ^~~~~~~~~~~~~~~~
pending.c: In function 'handleParents':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
241 | pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| |
| struct varlena *
pending.c:1444:53: note: in expansion of macro 'PG_DETOAST_DATUM'
1444 | arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
| ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
317 | DatumGetPointer(Datum X)
| ~~~~~~^
pending.c:1472:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
1472 | char *value = SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
| ^~~~
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
241 | pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| |
| struct varlena *
pending.c:1509:53: note: in expansion of macro 'PG_DETOAST_DATUM'
1509 | arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
| ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
317 | DatumGetPointer(Datum X)
| ~~~~~~^
pending.c:1513:33: warning: variable 'PAR_processed' set but not used [-Wunused-but-set-variable]
1513 | int PAR_processed;
| ^~~~~~~~~~~~~
pending.c:1434:25: warning: variable 'thisatt_processed' set but not used [-Wunused-but-set-variable]
1434 | int thisatt_processed;
| ^~~~~~~~~~~~~~~~~
pending.c:1430:25: warning: variable 'RealPar_processed' set but not used [-Wunused-but-set-variable]
1430 | int RealPar_processed;
| ^~~~~~~~~~~~~~~~~
pending.c: In function 'getPKxpress':
pending.c:1651:9: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
1651 | int16 *pkV = pk->values;
| ^~~~~
pending.c: In function 'updateAccntParents':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
241 | pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| |
| struct varlena *
pending.c:1825:53: note: in expansion of macro 'PG_DETOAST_DATUM'
1825 | arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
| ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
317 | DatumGetPointer(Datum X)
| ~~~~~~^
pending.c:1846:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
1846 | char *value = SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
| ^~~~
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
241 | pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| |
| struct varlena *
pending.c:1891:53: note: in expansion of macro 'PG_DETOAST_DATUM'
1891 | arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
| ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
317 | DatumGetPointer(Datum X)
| ~~~~~~^
pending.c:1895:33: warning: variable 'PAR_processed' set but not used [-Wunused-but-set-variable]
1895 | int PAR_processed;
| ^~~~~~~~~~~~~
pending.c:1814:25: warning: variable 'thisatt_processed' set but not used [-Wunused-but-set-variable]
1814 | int thisatt_processed;
| ^~~~~~~~~~~~~~~~~
pending.c:1810:25: warning: variable 'RealPar_processed' set but not used [-Wunused-but-set-variable]
1810 | int RealPar_processed;
| ^~~~~~~~~~~~~~~~~
gmake: *** [<builtin>: pending.o] Error 1
diff --git a/code/dbmirror/pending.c b/code/dbmirror/pending.c
index fca76318..b1c61e02 100644
--- a/code/dbmirror/pending.c
+++ b/code/dbmirror/pending.c
@@ -98,7 +98,7 @@ char *get_namespace_name(Oid nspid);
#define BUFFER_SIZE 256
#define MAX_OID_LEN 10
-//#define DEBUG_OUTPUT 1
+#define DEBUG_OUTPUT 1
extern Datum recordchange(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(recordchange);
@@ -399,7 +399,8 @@ getPrimaryKey(Oid tblOid)
return NULL;
}
- tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ //tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ tpResultKey = (int2vector *) PG_DETOAST_DATUM(resDatum);
int n=tpResultKey->dim1;
resultKey = palloc(Int2VectorSize(n));
if (n > 0)
@@ -548,7 +549,8 @@ packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid ta
} /* KeyUsage!=ALL */
#ifndef NODROPCOLUMN
// this comment is for 11
- if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
+ ////if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
+ if (TupleDescAttr(tTupleDesc,iColumnCounter-1)->attisdropped)
//if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
{
/**
@@ -568,7 +570,8 @@ eksairetea columns
}
// this comment is for 11
- cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
+ ////cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
+ cpFieldName = NameStr(TupleDescAttr(tTupleDesc,iColumnCounter - 1)->attname );
//cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname));
#if defined DEBUG_OUTPUT
elog(NOTICE, "FieldName: %s", cpFieldName);
@@ -1441,7 +1444,8 @@ int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc
confrelid = (Oid) DatumGetObjectId(resDatum);
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
- arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ //arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
thisCols = (int16 *)ARR_DATA_PTR(arr);
numOfCols=ARR_DIMS(arr)[0];
#if defined DEBUG_OUTPUT
@@ -1506,7 +1510,8 @@ int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc
#endif
if (FkHasNullValueORXcluded || !handleit) continue;
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
- arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ //arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
fkCols = (int16 *)ARR_DATA_PTR(arr);
for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
SPITupleTable *PAR_tupTable;
@@ -1822,7 +1827,8 @@ int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, Tupl
confrelid = (Oid) DatumGetObjectId(resDatum);
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
- arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ //arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
thisCols = (int16 *)ARR_DATA_PTR(arr);
numOfCols=ARR_DIMS(arr)[0];
for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++) {
@@ -1888,7 +1894,8 @@ int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, Tupl
resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
- arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ //arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+ arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
fkCols = (int16 *)ARR_DATA_PTR(arr);
for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
SPITupleTable *PAR_tupTable;