Dear All, Dear Tom

On 5/11/25 16:20, Tom Lane wrote:

Achilleas Mantzios<a.mantz...@cloud.gatewaynet.com> writes:
We use are own version of DBmirror, we run our replication in a highly
fine grained manner. So every upgrade I have to make the code compile
and test. Up to PostgreSQL 17, I only got minor compilation problems
that I managed to resolve fairly easily. However this didn't prove to be
the case with PostgreSQL 18beta1, it proved harder to compile and as my
fears were verified, it has serious problems.
My question : is 18's SPI stabilized ? Can I start work on our version
of DBmirror ? Or wait for 18beta2 or -RC ?
If you think there are changes we need to make, you'd better get
specific sooner not later.  I'm not aware of any large fixes that
are pending, cf

https://wiki.postgresql.org/wiki/PostgreSQL_18_Open_Items

I attach

a) our old source (pending.c.orig), as of PostgreSQL 17 (tested for some 7 months, so pretty well tested),

b) the compilation errors when compiled against 18beta1, and

c) the patch that I came up with, which seems (in my minimal testing) to yield correct results on 18beta1.

The majority of serious warnings have to do with de-toasting arrays and the PK's int2vector , while the error has to do with getting column details such as attisdropped and attname.

Please have a look, and share your thoughts. I haven't touched serious C coding till I first wrote the above sometime in 2004 with a bunch of additions some years ago.



                        regards, tom lane
/****************************************************************************
 * pending.c
 * $Id: pending.c,v 1.8 2006/03/02 14:31:29 achill4 Exp $
 *
 * This file contains a trigger for Postgresql-7.x to record changes to tables
 * to a pending table for mirroring.
 * All tables that should be mirrored should have this trigger hooked up to it.
 *
 *       Written by Steven Singer (ssin...@navtechinc.com)
 *       (c) 2001-2002 Navtech Systems Support Inc.
 *       ALL RIGHTS RESERVED
 *
 * Permission to use, copy, modify, and distribute this software and its
 * documentation for any purpose, without fee, and without a written agreement
 * is hereby granted, provided that the above copyright notice and this
 * paragraph and the following two paragraphs appear in all copies.
 *
 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 *
 *
 ***************************************************************************/
#include <string.h>

#include "postgres.h"

#include "executor/spi.h"
#include "commands/trigger.h"
#include "catalog/pg_type.h"
#include "utils/array.h"
#include "utils/rel.h"
#define Int2VectorSize(n)       (offsetof(int2vector, values) + (n) * 
sizeof(int16))

#define TRUE 1
#define FALSE 0

PG_MODULE_MAGIC;

enum FieldUsage
{
        PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE
};


int storePending(char *cpTableName, HeapTuple tBeforeTuple,
                         HeapTuple tAfterTuple,
                         TupleDesc tTupdesc,
                         Oid tableOid,
                         char cOp,int slaveid, char origOp);

int storexid(void);

int handler(char *cpTableName, HeapTuple tBeforeTuple,
                         HeapTuple tAfterTuple,
                         TupleDesc tTupdesc,
                         Oid tableOid,
                         char cOp, int slaveid, char *pkxpress);

int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress);
int createAccnt(char *cpTableName, int slaveid, char *pkxpress);
int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress);
int deleteAccnt(char *cpTableName, char *pkxpress);
int deleteSlaveAccnt(char *cpTableName,int slaveid, char *pkxpress);
char *getPKxpress(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc,Oid 
tableOid);
int *getSlaves(char *cpTableName,char *pkxpress);


int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);

int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);
int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc 
tupleDesc);
/*char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc 
tupleDesc);*/

int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, 
TupleDesc tupleDesc,Oid tableOid,int slaveid);
int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, 
TupleDesc tupleDesc,Oid tableOid,int slaveid);

int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, 
Oid tableOid);
int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid 
tbaleOid,int iIncludeKeyData);

int2vector *getPrimaryKey(Oid tblOid);

char *packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDecs, 
Oid tableOid,
                        enum FieldUsage eKeyUsage);

bool isExcluded(char *cpTableName,TupleDesc tTupleDesc,int iColumnCounter);

char *get_namespace_name(Oid nspid);


#define BUFFER_SIZE 256
#define MAX_OID_LEN 10
//#define DEBUG_OUTPUT 1
extern Datum recordchange(PG_FUNCTION_ARGS);

PG_FUNCTION_INFO_V1(recordchange);


/*****************************************************************************
 * The entry point for the trigger function.
 * The Trigger takes a single SQL 'text' argument indicating the name of the
 * table the trigger was applied to.  If this name is incorrect so will the
 * mirroring.
 ****************************************************************************/
Datum
recordchange(PG_FUNCTION_ARGS)
{
        TriggerData *trigdata;
        TupleDesc       tupdesc;
        HeapTuple       beforeTuple = NULL;
        HeapTuple       afterTuple = NULL;
        HeapTuple       retTuple = NULL;
        char       *tblname;
        char            op = 0;
        char       *schemaname;
        char       *fullyqualtblname;
        char *pkxpress;
        if (fcinfo->context != NULL)
        {

                if (SPI_connect() < 0)
                {
                        elog(NOTICE, "recordchange could not connect to SPI");
                        return -1;
                }
                trigdata = (TriggerData *) fcinfo->context;
                /* Extract the table name */
                tblname = SPI_getrelname(trigdata->tg_relation);
#ifndef NOSCHEMAS
                schemaname = 
get_namespace_name(RelationGetNamespace(trigdata->tg_relation));
                fullyqualtblname = palloc(strlen(tblname) + 
                                              strlen(schemaname) + 6);
                sprintf(fullyqualtblname,"\"%s\".\"%s\"",
                        schemaname,tblname);
#else
                fullyqualtblname = palloc(strlen(tblname) + 3);
                sprintf(fullyqualtblname,"\"%s\"",tblname);
#endif
                tupdesc = trigdata->tg_relation->rd_att;
                if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
                {
                        retTuple = trigdata->tg_newtuple;
                        beforeTuple = trigdata->tg_trigtuple;
                        afterTuple = trigdata->tg_newtuple;
                        op = 'u';

                }
                else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
                {
                        retTuple = trigdata->tg_trigtuple;
                        afterTuple = trigdata->tg_trigtuple;
                        op = 'i';
                }
                else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
                {
                        retTuple = trigdata->tg_trigtuple;
                        beforeTuple = trigdata->tg_trigtuple;
                        op = 'd';
                }

                if (storexid()) {
                        elog(ERROR, "Operation could not be mirrored. storexid 
problem");
                        return PointerGetDatum(NULL);
                }

                
pkxpress=getPKxpress(fullyqualtblname,retTuple,tupdesc,retTuple->t_tableOid);
                if (handler(fullyqualtblname, beforeTuple, afterTuple, 
tupdesc,retTuple->t_tableOid, 
op,getSlaveId(fullyqualtblname,retTuple,tupdesc),pkxpress))
                {
                        /* An error occoured. Skip the operation. */
                        elog(ERROR, "Operation could not be mirrored");
                        return PointerGetDatum(NULL);

                }
#if defined DEBUG_OUTPUT
                elog(NOTICE, "Returning on success");
#endif
                pfree(fullyqualtblname);
                pfree(pkxpress);
                SPI_finish();
                return PointerGetDatum(retTuple);
        }
        else
        {
                /*
                 * Not being called as a trigger.
                 */
                return PointerGetDatum(NULL);
        }
}

/*****************************************************************************
 * stores the current xid in dbmirror_xactions
 *****************************************************************************/

int
storexid(void) {
        //char     *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) VALUES 
($1)";
        char       *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) SELECT 
$1 WHERE NOT EXISTS (SELECT 1 FROM dbmirror_xactions WHERE xid=$1)";

        int             iResult = 0;

        Datum           saPlanData[1];
        Oid             taPlanArgTypes[1] = {INT4OID};
        void       *vpPlan;

        vpPlan = SPI_prepare(cpQueryBase, 1, taPlanArgTypes);
        if (vpPlan == NULL)
                elog(NOTICE, " storexid Error creating plan");

        saPlanData[0] = Int32GetDatum(GetCurrentTransactionId());

        iResult = SPI_execp(vpPlan, saPlanData, NULL , 1);
        if (iResult < 0) {
                elog(NOTICE, "storexid fired (%s) returned %d", cpQueryBase, 
iResult);
                return -1;
        }


#if defined DEBUG_OUTPUT
        elog(NOTICE, "row successfully stored in dbmirror_xactions");
#endif

        return 0;

}

/*****************************************************************************
 * Constructs and executes an SQL query to write a record of this tuple change
 * to the pending table.
 *****************************************************************************/
int
storePending(char *cpTableName, HeapTuple tBeforeTuple,
                         HeapTuple tAfterTuple,
                         TupleDesc tTupDesc,
                         Oid tableOid,
                         char cOp, int slaveid, char origOp)
{
        char       *cpQueryBase = "INSERT INTO dbmirror_pending 
(TableName,Op,XID,slaveid,origop) VALUES ($1,$2,$3,$4,$5)";

        int                     iResult = 0;
        HeapTuple       tCurTuple;
        char nulls[5]="    ";

        //Points the current tuple(before or after)
                Datum           saPlanData[5];
        Oid                     taPlanArgTypes[5] = {NAMEOID, CHAROID, INT4OID, 
INT4OID, CHAROID};
        void       *vpPlan;

        tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;



        vpPlan = SPI_prepare(cpQueryBase, 5, taPlanArgTypes);
        if (vpPlan == NULL)
                elog(NOTICE, "Error creating plan");
        /* SPI_saveplan(vpPlan); */

        saPlanData[0] = PointerGetDatum(cpTableName);
        saPlanData[1] = CharGetDatum(cOp);
        saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
        saPlanData[3] = Int32GetDatum(slaveid);
        if (slaveid <=0) nulls[3]='n';
        saPlanData[4] = CharGetDatum(origOp);

        iResult = SPI_execp(vpPlan, saPlanData, nulls, 1);
        if (iResult < 0)
                elog(NOTICE, "storedPending fired (%s) returned %d", 
cpQueryBase, iResult);


#if defined DEBUG_OUTPUT
        elog(NOTICE, "row successfully stored in pending table");
#endif

        if (cOp == 'd')
        {
                /**
                 * This is a record of a delete operation.
                 * Just store the key data.
                 */
                iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, 
tableOid);
        }
        else if (cOp == 'i')
        {
                /**
                 * An Insert operation.
                 * Store all data
                 */
                iResult = storeData(cpTableName, tAfterTuple, tTupDesc, 
tableOid,TRUE);

        }
        else
        {
                /* op must be an update. */
                iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, 
tableOid);
                iResult = iResult ? iResult : storeData(cpTableName, 
tAfterTuple, tTupDesc,tableOid,TRUE);
        }

#if defined DEBUG_OUTPUT
        elog(NOTICE, "Done storing keyinfo/data");
#endif

        return iResult;

}

int
storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
                         TupleDesc tTupleDesc, Oid tableOid)
{

        Oid                     saPlanArgTypes[1] = {VARCHAROID};
        char       *insQuery = "INSERT INTO dbmirror_pendingdata 
(SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)";
        void       *pplan;
        Datum           saPlanData[1];
        char       *cpKeyData;
        char       *cpKeyData_tmp;
        int                     iRetCode;

        pplan = SPI_prepare(insQuery, 1, saPlanArgTypes);
        if (pplan == NULL)
        {
                elog(NOTICE, "Could not prepare INSERT plan");
                return -1;
        }

        /* pplan = SPI_saveplan(pplan); */
        cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, 
PRIMARY);
        if (cpKeyData == NULL)
        {
                elog(ERROR,"Could not determine primary key data");
                return -1;
        }
#if defined DEBUG_OUTPUT
        elog(NOTICE, "KeyData: %s", cpKeyData);
#endif
        cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
        memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
        SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
        saPlanData[0] = PointerGetDatum(cpKeyData_tmp);

        iRetCode = SPI_execp(pplan, saPlanData, NULL, 1);

        if (cpKeyData != NULL)
                pfree(cpKeyData);
        if (cpKeyData_tmp != 0)
                pfree(cpKeyData_tmp);

        if (iRetCode != SPI_OK_INSERT)
        {
                elog(NOTICE, "Error inserting row in storeKeyInfo");
                return -1;
        }
#if defined DEBUG_OUTPUT
        elog(NOTICE, "Insert successful");
#endif

        return 0;

}




int2vector *
getPrimaryKey(Oid tblOid)
{
        char       *queryBase;
        char       *query;
        bool            isNull;
        int2vector *resultKey;
        int2vector *tpResultKey;
        HeapTuple       resTuple;
        Datum           resDatum;
        int                     ret;

        queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND 
indrelid=";
        query = palloc(strlen(queryBase) + MAX_OID_LEN + 1);
        sprintf(query, "%s%d", queryBase, tblOid);
        ret = SPI_exec(query, 1);
        if (ret != SPI_OK_SELECT || SPI_processed != 1)
        {
                elog(NOTICE, "Could not select primary index key");
                return NULL;
        }

        resTuple = SPI_tuptable->vals[0];
        resDatum = SPI_getbinval(resTuple, SPI_tuptable->tupdesc, 1, &isNull);

        if (isNull) {
                elog(NOTICE, "PKey is NULL");
                return NULL;
        }

        tpResultKey = (int2vector *) 
DatumGetPointer(PG_DETOAST_DATUM(resDatum));
        int n=tpResultKey->dim1;
        resultKey = palloc(Int2VectorSize(n));
        if (n > 0)
                memcpy(resultKey->values, tpResultKey->values, n * 
sizeof(int16));

        SET_VARSIZE(resultKey, Int2VectorSize(n));
        resultKey->ndim = 1;
        resultKey->dataoffset = 0;
        resultKey->elemtype = INT2OID;
        resultKey->dim1 = n;
        resultKey->lbound1 = 0;

        pfree(query);
        return resultKey;
}

/******************************************************************************
 * Stores a copy of the non-key data for the row.
 *****************************************************************************/
int
storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid 
tableOid, int iIncludeKeyData)
{

        Oid                     planArgTypes[1] = {VARCHAROID};
        char       *insQuery = "INSERT INTO dbmirror_pendingdata 
(SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)";
        SPIPlanPtr pplan;
        Datum           planData[1];
        char       *cpKeyData;
        char       *cpKeyData_tmp;
        int                     iRetValue;

        pplan = SPI_prepare(insQuery, 1, planArgTypes);
        if (pplan == NULL)
        {
                elog(NOTICE, "Could not prepare INSERT plan");
                return -1;
        }

        /* pplan = SPI_saveplan(pplan); */
        if (iIncludeKeyData == 0)
                cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc, 
tableOid, NONPRIMARY);
        else
                cpKeyData = packageData(cpTableName,tTupleData, 
tTupleDesc,tableOid, ALL);

        cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData));
        memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData));
        SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData));
        planData[0] = PointerGetDatum(cpKeyData_tmp);

        iRetValue = SPI_execp(pplan, planData, NULL, 1);

        if (cpKeyData != 0)
                pfree(cpKeyData);
        if (cpKeyData_tmp != 0)
                pfree(cpKeyData_tmp);

        if (iRetValue != SPI_OK_INSERT)
        {
                elog(NOTICE, "Error inserting row in storeData");
                return -1;
        }
#if defined DEBUG_OUTPUT
        elog(NOTICE, "Insert successful");
#endif

        return 0;

}

/**
 * Packages the data in tTupleData into a string of the format
 * FieldName='value text'  where any quotes inside of value text
 * are escaped with a backslash and any backslashes in value text
 * are esacped by a second back slash.
 *
 * tTupleDesc should be a description of the tuple stored in
 * tTupleData.
 *
 * eFieldUsage specifies which fields to use.
 *      PRIMARY implies include only primary key fields.
 *      NONPRIMARY implies include only non-primary key fields.
 *      ALL implies include all fields.
 */
char *
packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid 
tableOid,
                        enum FieldUsage eKeyUsage)
{
        int                     iNumCols;
        int2vector *tpPKeys = NULL;
        int                     iColumnCounter;
        char       *cpDataBlock;
        int                     iDataBlockSize;
        int                     iUsedDataBlock;

        iNumCols = tTupleDesc->natts;

        if (eKeyUsage != ALL)
        {
                tpPKeys = getPrimaryKey(tableOid);
                if (tpPKeys == NULL)
                        return NULL;
        }
#if defined DEBUG_OUTPUT
        if (tpPKeys != NULL)
                elog(NOTICE, "Have primary keys");
#endif
        cpDataBlock = palloc(BUFFER_SIZE);
        iDataBlockSize = BUFFER_SIZE;
        iUsedDataBlock = 0;                     /* To account for the null */

        for (iColumnCounter = 1; iColumnCounter <= iNumCols; iColumnCounter++)
        {
                int                     iIsPrimaryKey;
                int                     iPrimaryKeyIndex;
                char       *cpUnFormatedPtr;
                char       *cpFormatedPtr;

                char       *cpFieldName;
                char       *cpFieldData;

                if (eKeyUsage != ALL)
                {
                        /* Determine if this is a primary key or not. */
                        iIsPrimaryKey = 0;
                        int16 *tpPKeysV = tpPKeys->values;
                        int tpPKeysSize = tpPKeys->dim1;
                        for (iPrimaryKeyIndex = 0; iPrimaryKeyIndex<tpPKeysSize;
                                 iPrimaryKeyIndex++)
                        {
                                if (tpPKeysV[iPrimaryKeyIndex] == 
iColumnCounter)
                                {
                                        iIsPrimaryKey = 1;
                                        break;
                                }
                        }
                        if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage 
!= NONPRIMARY))
                        {
                                /**
                                 * Don't use.
                                 */
#if defined DEBUG_OUTPUT
                                elog(NOTICE, "Skipping column");
#endif
                                continue;
                        }
                }                                               /* 
KeyUsage!=ALL */
#ifndef  NODROPCOLUMN
               // this comment is for 11
               if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
               //if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
                 {
                   /**
                    * This column has been dropped.
                    * Do not mirror it.
                    */
                   continue;
                 }
#endif
/***************************************
Edw mpainei o kwdikas gia elegxo enanti,
tou dbmirror_exclude_attributes, gia
eksairetea columns
****************************************/
                if (isExcluded(cpTableName,tTupleDesc,iColumnCounter)) {
                        continue;
                }

                // this comment is for 11
                cpFieldName = 
DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
                //cpFieldName = 
DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname));
#if defined DEBUG_OUTPUT
                elog(NOTICE, "FieldName: %s", cpFieldName);
#endif
                while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 
6)
                {
                        cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + 
BUFFER_SIZE);
                        iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
                }
                sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName);
                iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3;
                cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, 
iColumnCounter);

                cpUnFormatedPtr = cpFieldData;
                cpFormatedPtr = cpDataBlock + iUsedDataBlock;
                if (cpFieldData != NULL)
                {
                        *cpFormatedPtr = '\'';
                        iUsedDataBlock++;
                        cpFormatedPtr++;
                }
                else
                {
                        sprintf(cpFormatedPtr," ");
                        iUsedDataBlock++;
                        cpFormatedPtr++;
                        continue;

                }
#if defined DEBUG_OUTPUT
                elog(NOTICE, "FieldData: %s", cpFieldData);
                elog(NOTICE, "Starting format loop");
#endif
                while (*cpUnFormatedPtr != 0)
                {
                        while (iDataBlockSize - iUsedDataBlock < 2)
                        {
                                cpDataBlock = repalloc(cpDataBlock, 
iDataBlockSize + BUFFER_SIZE);
                                iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
                                cpFormatedPtr = cpDataBlock + iUsedDataBlock;
                        }
                        if (*cpUnFormatedPtr == '\\' || *cpUnFormatedPtr == 
'\'')
                        {
                                *cpFormatedPtr = '\\';
                                cpFormatedPtr++;
                                iUsedDataBlock++;
                        }
                        *cpFormatedPtr = *cpUnFormatedPtr;
                        cpFormatedPtr++;
                        cpUnFormatedPtr++;
                        iUsedDataBlock++;
                }

                pfree(cpFieldData);

                while (iDataBlockSize - iUsedDataBlock < 3)
                {
                        cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + 
BUFFER_SIZE);
                        iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
                        cpFormatedPtr = cpDataBlock + iUsedDataBlock;
                }
                sprintf(cpFormatedPtr, "' ");
                iUsedDataBlock = iUsedDataBlock + 2;
#if defined DEBUG_OUTPUT
                elog(NOTICE, "DataBlock: %s", cpDataBlock);
#endif

        }                                                       /* for 
iColumnCounter  */
        if (tpPKeys != NULL)
                pfree(tpPKeys);
#if defined DEBUG_OUTPUT
        elog(NOTICE, "Returning: DataBlockSize:%d 
iUsedDataBlock:%d",iDataBlockSize,
                        iUsedDataBlock);
#endif
        memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - 
iUsedDataBlock);

        return cpDataBlock;

}





bool isExcluded(char *cpTableName, TupleDesc tupleDesc, int iColumnNumber) {
        char *qb1;
        char *qb2;
        char *q;
        char *fieldName;
        int ret;
        HeapTuple resTuple;
        SPITupleTable *SIDKEY_tupTable;
        int     SIDKEY_processed;
        char *value;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in isExcluded of %s",cpTableName);
#endif
        fieldName = SPI_fname(tupleDesc,iColumnNumber);
#if defined DEBUG_OUTPUT
        elog(NOTICE, "fieldName=%s",fieldName);
#endif
        qb1 = "SELECT ";
        qb2 = " = any(attnames) from dbmirror_exclude_attributes where 
tblname=";
#if defined DEBUG_OUTPUT
        elog(NOTICE, "%s q 
size=%d",fieldName,strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2);
#endif
/*
to +1 parakatw einai gia to 0 (null)
*/
        q = 
palloc(strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2+1);
        sprintf(q,"%s'%s'%s'%s'",qb1,fieldName,qb2,cpTableName);
        ret = SPI_exec(q,1);
        pfree(q);
        pfree(fieldName);

        if (ret != SPI_OK_SELECT || SPI_processed != 1) return FALSE;
        SIDKEY_tupTable = SPI_tuptable;
        SIDKEY_processed = SPI_processed;
        resTuple = SIDKEY_tupTable->vals[0];
        value = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
        if (value == NULL) return FALSE;

        if (strncmp(value,"t",1) == 0 || strncmp(value,"T",1) == 0) {
                pfree(value);
                return TRUE;
        } 
        else {
                pfree(value);
                return FALSE;
        }
}




#define MAXKCOLS 32
#define MAXCOL_LEN 128
int handler (char *cpTableName,HeapTuple tBeforeTuple,HeapTuple 
tAfterTuple,TupleDesc tupleDesc,Oid tableOid,char op,int slaveid,char 
*pkxpress) {

        HeapTuple       tuple;
        tuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
#if defined DEBUG_OUTPUT
        elog(NOTICE,"---->IN handler with tableid=%d,tablename=%s,slaveid=%d, 
op=%c,pkxpress=%s",tableOid,cpTableName,slaveid,op,pkxpress);
#endif

        if (slaveid == 0 && op == 'm') {
                elog(ERROR,"Found an explicit table ISEXPUNCOND with the 'm' 
(implicit) operation. Real slave ids ARE NEVER 0. That shouldnt happen");
                return -1;
        }
        /*
        ISEXPUNCOND
        */
        else if (slaveid == 0) return 
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
        /*
        ISEXPWITHSLAVEID
        */
        else if (slaveid > 0 && op =='d') {
                int retval;
                retval =  
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
                return 
updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,slaveid);
        }
        else if (slaveid > 0 && op == 'i') {
                int retval;
                retval = 
handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
                retval= 
(retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
                return retval;
        }
        else if (slaveid > 0 && op == 'u') {
                int retval;
                int old_slaveid;
                old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
                retval = 
handleParents(cpTableName,(slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
                if (old_slaveid != -3 && old_slaveid != slaveid) {
                        retval = retval || 
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
                }
                if (old_slaveid != slaveid) {
                        retval = retval || 
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
                }
                else {
                        retval= 
(retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op);
                }
                if (old_slaveid != -3) {
                        retval = 
(retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
                }
                return retval;
        }
        else if (slaveid == -3 && (op =='d' || op =='i')) {
                return 0;
        }
        else if (slaveid == -3 && op =='u') {
                int retval=0;
                int old_slaveid;
                old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc);
                if (old_slaveid > 0) {
                        
retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
                        retval = 
(retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
                }
                return retval;
        }
        /*
        ISIMPLBACKWARD KALESMENO apo handleParents
        */
        else if (slaveid >0 && op == 'm') {
                if (getSlaveId(cpTableName,tuple,tupleDesc) >= 0) {
                return 0;
/*
SE SXOLIO Giati twra mporei enas pateras ISEXPUNCOND/ISEXPWITHSLAVEID na exei 
paidi ISIMPLBACKWARD
*/
/*
                        elog(ERROR,"Found an explicit table ISEXPUNCOND or 
ISEXPWITHSLAVEID with the 'm' (implicit) operation. .Fathers of ISEXPUNCOND 
must BE ALWAYS ISEXPUNCOND. Fathers of ISEXPWITHSLAVEID must be always 
ISIMPL.That shouldnt happen");
*/
                }
                if (getSlaveId(cpTableName,tuple,tupleDesc) == -2) {
                return 0;
/*
*/
                }
                if (getSlaveId(cpTableName,tuple,tupleDesc) == -3) {
                return 0;
                }
                if (existsInAccnt(cpTableName,slaveid,pkxpress)) return 0;
                else {
                        int retval;
                        retval = 
handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid);
                        retval = 
(retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op);
                        return 
(retval)?retval:createAccnt(cpTableName,slaveid,pkxpress);
                }
        }
        /*
        ISIMPL* but called from trigger ('i','d','u')
        */

        /*
        ISIMPLBACKWARD
        */
        else if (slaveid == -1) {
                if (op == 'i') return 0;
                /*
                Delete in this fashion may happen for "orphan" rows.
                The deletions will be "triggered" by childern deletions/updates.
                */
                else if (op == 'd') {
                        int retval=0;
                        int *slaves;
                        int *run;
                        slaves = getSlaves(cpTableName,pkxpress);
                        if (slaves == NULL) return 0;
                        for (run=slaves;*run;run++) {
                                retval = retval || 
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
                                retval = retval || 
updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,*run);
                        }

                        pfree(slaves);
                        return retval || deleteAccnt(cpTableName,pkxpress); 

                }
                else if (op == 'u') {
                        int retval=0;
                        int *slaves;
                        int *run;

                        slaves = getSlaves(cpTableName,pkxpress);
#if defined DEBUG_OUTPUT
                                elog(NOTICE,"in slaveid=-1, op='u', 
slaves=%d",(int)slaves);
#endif
                        if (slaves == NULL) return 0;
                        for (run=slaves;*run;run++) {
#if defined DEBUG_OUTPUT
                                elog(NOTICE,"in slaveid=-1, op='u', 
runslave=%d",*run);
#endif
                                retval = retval || 
handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
                                retval = retval || 
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op);
                                retval = retval || 
updateAccntParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run);
                        }
                        pfree(slaves);
                        return retval;
                }
        }
        /*
        ISIMPLFORWARD
        */
        else if (slaveid == -2) {
                if (op == 'i') {
                        int retval=0;
                        int forw_slaveid = 
getComputedSlaveId(cpTableName,tuple,tupleDesc);
                        if (forw_slaveid>0) {
                                retval = retval || 
handleParents(cpTableName,NULL,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
                                retval = retval || 
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
                        }
                        return retval;
                }
                else if (op == 'd') {
                        int retval=0;
                        int forw_slaveid = 
getComputedSlaveId(cpTableName,tuple,tupleDesc);

                        if (forw_slaveid>0) {
                                retval = retval || 
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
                                retval = retval || 
updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,forw_slaveid);
                        }
                        return retval ; 

                }
                else if (op == 'u') {
/*
POTE ALLAGH TOU VALUE TOU COLUMN NAME TOU PATH dhl tou COLUMN ME NAME PATHCOL
*/
                        int retval=0;

                        int forw_slaveid = 
getComputedSlaveId(cpTableName,tAfterTuple,tupleDesc);
                        int old_slaveid = 
getOldComputedSlaveId(cpTableName,tBeforeTuple,tupleDesc);
                        //char origop = 
getForwardParentOrigOp(cpTableName,tBeforeTuple,tupleDesc);
                        //if (origop == 'i') {
                        //      old_slaveid = forw_slaveid;
                        //}
                        //elog(NOTICE, "in handler ISIMPLFWD nu, op='%c' , 
forw_slaveid= %d , old_slaveid= %d ", op,forw_slaveid,old_slaveid);
                        // LEGACY CODE as of 2016-03-18
                        //retval = retval || 
handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
                        //retval = retval || 
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid);
                        //retval = retval || 
updateAccntParents(cpTableName,tBeforeTuple,(forw_slaveid!=old_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
                        if (forw_slaveid>0) {
                                retval = 
handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid);
                                if (old_slaveid != -3 && old_slaveid != 
forw_slaveid) {
                                        retval = retval || 
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
                                }
                                if (old_slaveid != forw_slaveid) {
                                        retval = retval || 
storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',forw_slaveid,op);
                                }
                                else {
                                        retval= 
(retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op);
                                }
                                if (old_slaveid != -3) {
                                        retval = 
(retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=forw_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid);
                                }
                        }
                        else { // ISIMPLFORWARD me vslid IS NULL --> -3
                                if (old_slaveid > 0) {
                                        
retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op);
                                        retval = 
(retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid);
                                }
                        }
                        return retval;
                }
        }
        return 0;
}

int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
        char *slave_siteidkeyname;
        char *qb;
        char *q;
        char *foo;
        int ret;
        HeapTuple resTuple;
        int attnum;
        int slaveid;
        SPITupleTable *SIDKEY_tupTable;
        int     SIDKEY_processed;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getSlaveId of %s",cpTableName);
#endif
        qb = "SELECT siteidkeyname from dbmirror_explicitreptables where 
tblname=";
        q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
        sprintf(q,"%s'%s'",qb,cpTableName);
        ret = SPI_exec(q,1);
        pfree(q);
                /*
                ISIMPLBACKWARD
                */
        if (ret != SPI_OK_SELECT || SPI_processed != 1) return -1;
        SIDKEY_tupTable = SPI_tuptable;
        SIDKEY_processed = SPI_processed;
        resTuple = SIDKEY_tupTable->vals[0];
        slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
                /*
                ISEXPUNCOND
                */
        if (slave_siteidkeyname == NULL) return 0;
                /*
                ISIMPLFORWARD
                */
        if (strncmp(slave_siteidkeyname,"___",3) == 0) {
                pfree(slave_siteidkeyname);
                return -2;
        }
        attnum = SPI_fnumber(tupleDesc,slave_siteidkeyname);
        if (attnum == SPI_ERROR_NOATTRIBUTE) {
                elog(ERROR,"WRONG slaveidkeyname=%s given for table 
%s",slave_siteidkeyname,cpTableName);
                pfree(slave_siteidkeyname);
                return 0;
        }
        foo=SPI_getvalue(tuple,tupleDesc,attnum);
                /*
                ISEXPWITHSLAVEID BUT siteidkeyname is null
                */
        if (foo == NULL) {
                pfree(slave_siteidkeyname);
                return -3;
        }
        pfree(slave_siteidkeyname);
        slaveid = atoi(foo);
        pfree(foo);
                /*
                ISEXPWITHSLAVEID
                */
        return slaveid;
}

int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getComputedSlaveId of %s",cpTableName);
#endif
        qb = "SELECT siteidkeyname from dbmirror_explicitreptables where 
tblname=";
        q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
        sprintf(q,"%s'%s'",qb,cpTableName);
        ret = SPI_exec(q,1);
        pfree(q);
        if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                elog(ERROR,"ISIMPLFORWARD Table %s NOT IN 
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT 
IMMEDIATELY.",cpTableName);
        }
        SIDKEY_tupTable = SPI_tuptable;
        SIDKEY_processed = SPI_processed;
        resTuple = SIDKEY_tupTable->vals[0];
        slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
        if (slave_siteidkeyname == NULL)  {
                pfree(slave_siteidkeyname);
                elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in 
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT 
IMMEDIATELY.",cpTableName);
        }
        if (strncmp(slave_siteidkeyname,"___",3) != 0) {
                pfree(slave_siteidkeyname);
                elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not 
starting with \"___\" in dbmirror_explicitreptables. How did we get here? 
CONTACT IT DEPT IMMEDIATELY.",cpTableName);
        }
//      elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
        path = slave_siteidkeyname + 3;
//      elog(NOTICE,"path=-%s-",path);
        run = index(path,'_');
        if (run == NULL) {
                pfree(slave_siteidkeyname);
                elog(ERROR,"ISIMPLFORWARD Table %s has not the right 
\"___localcol_forwtable\" format siteidkeyname  in dbmirror_explicitreptables. 
How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
        }
        pathcol = palloc(MAXCOL_LEN);
//      elog(NOTICE,"path=-%d-",(unsigned)path);
//      elog(NOTICE,"run=-%d-",(unsigned)run);
//      elog(NOTICE,"run-path=-%d-",run-path);
        *run = 0;
        strncpy(pathcol,path,run-path+1);

//      elog(NOTICE,"pathcol=-%s-",pathcol);

        forw_table = run+1;

        attnum = SPI_fnumber(tupleDesc,pathcol);
        if (attnum == SPI_ERROR_NOATTRIBUTE) {
                pfree(slave_siteidkeyname);
                elog(ERROR,"WRONG pathcol=%s given for table 
%s",slave_siteidkeyname,cpTableName);
                return 0;
        }
        pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
        
        /*
        forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
        */

        run = index(forw_table,'@');
        if (run == NULL) {
                forw_table_id = palloc(MAXCOL_LEN);
                strncpy(forw_table_id,"id",3);
        }
        else {
                forw_table_id = palloc(MAXCOL_LEN);
                *run=0;
                strncpy(forw_table_id,forw_table,run-forw_table+1);
                forw_table=run+1;
        }

        q = palloc(512);
        sprintf(q,"SELECT * FROM public.%s where 
%s='%s'",forw_table,forw_table_id,pathcolval);
        ret = SPI_exec(q,1);
        if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                pfree(slave_siteidkeyname);
                elog(ERROR, "Fatal error: ComputedSlaveId: par table %s should 
have row with %s=%s",forw_table,forw_table_id,pathcolval);
                return -2;
        }
        pfree(q);
        RealPar_tupTable = SPI_tuptable;
        RealPar_tuple = RealPar_tupTable->vals[0];
        ParTableName = palloc(256);
        sprintf(ParTableName,"\"public\".\"%s\"",forw_table);
        ret = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
        if (ret == -2)  {
                pfree(slave_siteidkeyname);
                ret = 
getComputedSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
                pfree(ParTableName);
                return ret;
        }
        if (ret <= 0) {
                pfree(slave_siteidkeyname);
                pfree(ParTableName);
//              elog(ERROR, "Fatal error: par table %s with slaveid!=-2 should 
have slaveid>0",forw_table);
/***
                apopiera xeirismou NULL slaveid, dhl isodynamo ISEXPWITHSLAVEID 
kai slaveid IS NULL
***/
                return -3;
        }
        pfree(ParTableName);
        pfree(slave_siteidkeyname);
        return ret;
/*
PROSOXH:
To swsto einai apla sto slave_siteidkeyname na fylame MONO to parent table, kai 
meta
me query na briskoume to tuple tou parent table.
*/
}

int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc 
tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getOldComputedSlaveId of %s",cpTableName);
#endif
        qb = "SELECT siteidkeyname from dbmirror_explicitreptables where 
tblname=";
        q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
        sprintf(q,"%s'%s'",qb,cpTableName);
        ret = SPI_exec(q,1);
        pfree(q);
        if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                elog(ERROR,"ISIMPLFORWARD Table %s NOT IN 
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT 
IMMEDIATELY.",cpTableName);
        }
        SIDKEY_tupTable = SPI_tuptable;
        SIDKEY_processed = SPI_processed;
        resTuple = SIDKEY_tupTable->vals[0];
        slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
        if (slave_siteidkeyname == NULL)  {
                pfree(slave_siteidkeyname);
                elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in 
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT 
IMMEDIATELY.",cpTableName);
        }
        if (strncmp(slave_siteidkeyname,"___",3) != 0) {
                pfree(slave_siteidkeyname);
                elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not 
starting with \"___\" in dbmirror_explicitreptables. How did we get here? 
CONTACT IT DEPT IMMEDIATELY.",cpTableName);
        }
//      elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
        path = slave_siteidkeyname + 3;
//      elog(NOTICE,"path=-%s-",path);
        run = index(path,'_');
        if (run == NULL) {
                pfree(slave_siteidkeyname);
                elog(ERROR,"ISIMPLFORWARD Table %s has not the right 
\"___localcol_forwtable\" format siteidkeyname  in dbmirror_explicitreptables. 
How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
        }
        pathcol = palloc(MAXCOL_LEN);
//      elog(NOTICE,"path=-%d-",(unsigned)path);
//      elog(NOTICE,"run=-%d-",(unsigned)run);
//      elog(NOTICE,"run-path=-%d-",run-path);
        *run = 0;
        strncpy(pathcol,path,run-path+1);

//      elog(NOTICE,"pathcol=-%s-",pathcol);

        forw_table = run+1;

        attnum = SPI_fnumber(tupleDesc,pathcol);
        if (attnum == SPI_ERROR_NOATTRIBUTE) {
                pfree(slave_siteidkeyname);
                elog(ERROR,"WRONG pathcol=%s given for table 
%s",slave_siteidkeyname,cpTableName);
                return 0;
        }
        pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
        
        /*
        forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
        */

        run = index(forw_table,'@');
        if (run == NULL) {
                forw_table_id = palloc(MAXCOL_LEN);
                strncpy(forw_table_id,"id",3);
        }
        else {
                forw_table_id = palloc(MAXCOL_LEN);
                *run=0;
                strncpy(forw_table_id,forw_table,run-forw_table+1);
                forw_table=run+1;
        }
/*
consult : 
https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing
an parent's op=='d' tote sigoura parent's origop=='u'
*/
        q = palloc(512);
        sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE 
tablename='\"public\".\"%s\"' AND op='d' AND xid=%d ORDER BY seqid DESC LIMIT 1 
",forw_table,GetCurrentTransactionId());
        ret = SPI_exec(q,1);
        if (ret == SPI_OK_SELECT && SPI_processed == 1) {
                RealPar_tupTable = SPI_tuptable;
                RealPar_tuple = RealPar_tupTable->vals[0];
                foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
                if (foo == NULL) {
                        slaveid_d = -3;
                }
                else {
                        slaveid_d = atoi(foo);
                        pfree(foo);
                }
                pfree(q);
                pfree(pathcolval);
                return slaveid_d;
        }
/* dirty patch code to handle copy supplycase. it was supposed to solve the 
problem of not turning 'u' to 'i' for e.g supplycasesquotes */ 
/* but it doesnt work for fb_ , as it turns 'i' to 'u' as per : dbmirror issues 
when changing vslid (null->int, int->null, intX->intY) */ 
/* following code must be here (not commended) as of 2018-12-20 */
/* the aim is make everything work correctly, supplies + fb_ + all type 
ISIMPLFORWARD (-2) tables */
/*

        q = palloc(512);
        sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE 
tablename='%s' AND op='i' AND xid=%d ORDER BY seqid DESC LIMIT 1 
",cpTableName,GetCurrentTransactionId());
        ret = SPI_exec(q,1);
        if (ret == SPI_OK_SELECT && SPI_processed == 1) {
                RealPar_tupTable = SPI_tuptable;
                RealPar_tuple = RealPar_tupTable->vals[0];
                foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
                if (foo == NULL) {
                        slaveid_d = -3;
                }
                else {
                        slaveid_d = atoi(foo);
                        pfree(foo);
                }
                pfree(q);
                pfree(pathcolval);
                return slaveid_d;
        }
*/

/**
pls consult : 
https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing

if (parent op=='i' and origop <> 'i' exist) -pou simainei oti origop='u' kai 
(eite slaveid NULL -> NOT NULL eite X -> Y, X!=Y) - we consider this as an 
insert and return -3 in order to communicate this to handler().
NOTE, this means that it is ILLEGAL for the app to do UPDATE on the parent, 
then update on the kid, and then perform a second UPDATE on the kid, this will 
result in TWO inserts and produce an duplicate ERROR.
else - diladi origop == 'i' or no parent op=='i' exists - pou simainei : (
eite parent origop == 'i' (synepagetai oti parent op == 'i') 
eite parent op != 'i' : (synepagetai : 
        eite parent op == 'd' opote den tha ftasei edo o kwdikas, logw tou 1ou 
statement pio pano
        eite parent op == 'u' (case NOT NULL (X) -> NOT NULL (Y) me X==Y )
)
)
 then we let the code fallback to return 
getComputedSlaveId(cpTableName,tuple,tupleDesc) which will return forw_slaveid .
**/
        q = palloc(512);
        sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE 
tablename='\"public\".\"%s\"' AND op='i' AND origop <> 'i' AND xid=%d ORDER BY 
seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId());
        ret = SPI_exec(q,1);
        if (ret == SPI_OK_SELECT && SPI_processed == 1) {
                pfree(slave_siteidkeyname);
                pfree(pathcolval);
                return -3;
        }

        pfree(slave_siteidkeyname);
        pfree(pathcolval);
        return getComputedSlaveId(cpTableName,tuple,tupleDesc) ; /* ISA 'u' */

/*
* PROYPOTHETOUME OTI O AMESOS BABAS exei entry sto dbmirror_pending me to idio 
xid kai op='d'
*/
}

/*
char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc 
tupleDesc) {
char *slave_siteidkeyname;
char *path;
char *forw_table;
char *forw_table_id;
char *pathcol;
char *pathcolval;
char *qb;
char *q;
HeapTuple resTuple;
int ret;
char *run;
int attnum;
SPITupleTable *SIDKEY_tupTable;
int SIDKEY_processed;
HeapTuple RealPar_tuple;
SPITupleTable *RealPar_tupTable;
char *ParTableName;
char *foo;
int slaveid_d=-3;
char origop;
#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getForwardParentOrigOp of %s",cpTableName);
#endif
        qb = "SELECT siteidkeyname from dbmirror_explicitreptables where 
tblname=";
        q = palloc(strlen(qb)+strlen(cpTableName)+2+1);
        sprintf(q,"%s'%s'",qb,cpTableName);
        ret = SPI_exec(q,1);
        pfree(q);
        if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                elog(ERROR,"ISIMPLFORWARD Table %s NOT IN 
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT 
IMMEDIATELY.",cpTableName);
        }
        SIDKEY_tupTable = SPI_tuptable;
        SIDKEY_processed = SPI_processed;
        resTuple = SIDKEY_tupTable->vals[0];
        slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1);
        if (slave_siteidkeyname == NULL)  {
                pfree(slave_siteidkeyname);
                elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in 
dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT 
IMMEDIATELY.",cpTableName);
        }
        if (strncmp(slave_siteidkeyname,"___",3) != 0) {
                pfree(slave_siteidkeyname);
                elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not 
starting with \"___\" in dbmirror_explicitreptables. How did we get here? 
CONTACT IT DEPT IMMEDIATELY.",cpTableName);
        }
//      elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname);
        path = slave_siteidkeyname + 3;
//      elog(NOTICE,"path=-%s-",path);
        run = index(path,'_');
        if (run == NULL) {
                pfree(slave_siteidkeyname);
                elog(ERROR,"ISIMPLFORWARD Table %s has not the right 
\"___localcol_forwtable\" format siteidkeyname  in dbmirror_explicitreptables. 
How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName);
        }
        pathcol = palloc(MAXCOL_LEN);
//      elog(NOTICE,"path=-%d-",(unsigned)path);
//      elog(NOTICE,"run=-%d-",(unsigned)run);
//      elog(NOTICE,"run-path=-%d-",run-path);
        *run = 0;
        strncpy(pathcol,path,run-path+1);

//      elog(NOTICE,"pathcol=-%s-",pathcol);

        forw_table = run+1;

        attnum = SPI_fnumber(tupleDesc,pathcol);
        if (attnum == SPI_ERROR_NOATTRIBUTE) {
                pfree(slave_siteidkeyname);
                elog(ERROR,"WRONG pathcol=%s given for table 
%s",slave_siteidkeyname,cpTableName);
                return 0;
        }
        pathcolval=SPI_getvalue(tuple,tupleDesc,attnum);
*/      
        /*
        forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id"
        */
/*

        run = index(forw_table,'@');
        if (run == NULL) {
                forw_table_id = palloc(MAXCOL_LEN);
                strncpy(forw_table_id,"id",3);
        }
        else {
                forw_table_id = palloc(MAXCOL_LEN);
                *run=0;
                strncpy(forw_table_id,forw_table,run-forw_table+1);
                forw_table=run+1;
        }

        q = palloc(512);
        sprintf(q," SELECT origop FROM public.dbmirror_pending WHERE 
tablename='\"public\".\"%s\"' AND xid=%d ORDER BY seqid DESC LIMIT 1 
",forw_table,GetCurrentTransactionId());
        ret = SPI_exec(q,1);
        if (ret == SPI_OK_SELECT && SPI_processed == 1) {
                RealPar_tupTable = SPI_tuptable;
                RealPar_tuple = RealPar_tupTable->vals[0];
                foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1);
                if (foo == NULL) {
                        pfree(q);
                        pfree(pathcolval);
                        elog(ERROR,"WRONG forward parent table %s has null 
origop in this transaction",forw_table);
                }
                else {
                        origop = foo[0];
                        pfree(foo);
                }
                pfree(q);
                pfree(pathcolval);
                return origop;
        }
        return ' ';
}
*/

#define MAX_WHERE_CLAUSE        512
#define MAX_QUERY_LEN   512
int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, 
TupleDesc tupleDesc,Oid tableOid,int slaveid) {
        char *qb;
        char *q;
        bool isNull;
        int ret;
        int i;
        HeapTuple resTuple;
        Datum resDatum;
        Oid confrelid;
        char *confrelname;
        int16   *thisCols;
        int16   *fkCols;
        int16   *run;
        ArrayType *arr;
        char    AthisColsVals[MAXKCOLS][MAXCOL_LEN];
        char    fkColsNames[MAXKCOLS][MAXCOL_LEN];
        char    thisColsTypes[MAXKCOLS][100];
        short   numOfCols;
        SPITupleTable *FK_tupTable;
        int     FK_processed;

#if defined DEBUG_OUTPUT
        elog(NOTICE, "in handleParents of Table=%d",tableOid);
#endif

        qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM 
pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND 
c.confrelid = f.oid AND c.conrelid = ";
        q = palloc(strlen(qb)+MAX_OID_LEN+1);
        sprintf(q,"%s%d",qb,tableOid);
        ret = SPI_exec(q,0);
        pfree(q);
        if (ret != SPI_OK_SELECT || SPI_processed < 0) { 
                elog(NOTICE, "no FKs");
                return 0;
        }
        /*
        For every FK dependency we track and handle the parent table
        */
        FK_tupTable = SPI_tuptable;
        FK_processed = SPI_processed;

        for (i=0;i<FK_processed;i++) {
                char *WHERE;
                char *ParTableName;
                int j;
                char FkHasNullValueORXcluded=0;
                char handleit=0;

                SPITupleTable *RealPar_tupTable;
                int     RealPar_processed;
                HeapTuple RealPar_tuple;
                int colrun;
                SPITupleTable *thisatt_tupTable;
                int     thisatt_processed;
                HeapTuple thisatt_tuple;

                resTuple = FK_tupTable->vals[i];


                resDatum = 
SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
                confrelid = (Oid) DatumGetObjectId(resDatum);

                resDatum = 
SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
                arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
                thisCols = (int16 *)ARR_DATA_PTR(arr);
                numOfCols=ARR_DIMS(arr)[0];
#if defined DEBUG_OUTPUT
                int DIM_0 = ARR_DIMS(arr)[0];
                int LBOUND_0 = ARR_LBOUND(arr)[0];
//              resDatum = 
SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
//              confrelname = (char *) DatumGetName(resDatum);
//              elog(NOTICE, "in handleParents of Table=%d, IN fetching FK for 
table=%s, DIM=%u, LBOUND=%u",tableOid,confrelname,DIM_0,LBOUND_0);
#endif
                for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++)  {
                        int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
                        char *fooname;

                        q = palloc(512);
                        sprintf(q,"SELECT attname FROM pg_attribute WHERE 
attrelid=%d and attnum=%d",tableOid,*run);
                        ret = SPI_exec(q,1);
                        if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                                elog(ERROR, "Fatal error: handleParents: this 
table oid=%d should have attribute with attnum=%d",tableOid,*run);
                                return -2;
                        }
                        pfree(q);
                        thisatt_processed = SPI_processed;
                        thisatt_tupTable = SPI_tuptable;
                        thisatt_tuple = thisatt_tupTable->vals[0];
                        fooname = 
SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
                        char *value = 
SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
                        char *coltype = 
SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));

                        

#if defined DEBUG_OUTPUT
                        elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with 
After value=%s",fooname,*run,thiscolrun,value);
#endif
                        if (value == NULL || 
isExcluded(cpTableName,tupleDesc,thiscolrun))  {
                                FkHasNullValueORXcluded=1;
                                break;
                        }
                        
memcpy(&(AthisColsVals[colrun][0]),value,strlen(value)+1);
                        
memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);
                        
                        if (Btuple != NULL) {
                                char *Bvalue = 
SPI_getvalue(Btuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
                                elog(NOTICE,"Found colname=%s syscol=%d 
thiscol=%d with After value=%s,Before 
value=%s",fooname,*run,thiscolrun,value,Bvalue);
#endif
                                if (Bvalue == NULL) handleit=1;
                                else if (strcmp(Bvalue,value)) {
                                        handleit=1;
                                        pfree(Bvalue);
                                }

                        }
                        else handleit=1;
                        pfree(value);
                }
#if defined DEBUG_OUTPUT
                elog(NOTICE,"in handleParents--> END OF ATTR LOOP");
                elog(NOTICE,"handle it = %d, ",handleit);
                elog(NOTICE,"FkHasNullValueORXcluded  = %d, 
",FkHasNullValueORXcluded);
#endif
                if (FkHasNullValueORXcluded || !handleit) continue;
                resDatum = 
SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
                arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
                fkCols = (int16 *)ARR_DATA_PTR(arr);
                for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
                        SPITupleTable *PAR_tupTable;
                        int     PAR_processed;
                        HeapTuple PAR_resTuple;
                        char *value;
                        q = palloc(strlen("SELECT attname from pg_attribute 
where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
                        sprintf(q,"SELECT attname from pg_attribute where 
attrelid=%d and attnum=%d",confrelid,*run);
                        ret = SPI_exec(q,1);
                        if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                                elog(ERROR, "Fatal Error:no PK for FK 
relation");
                                return -2;
                        }
                        pfree(q);
                        PAR_tupTable = SPI_tuptable;
                        PAR_processed = SPI_processed;
                        PAR_resTuple = PAR_tupTable->vals[0];
                        value = 
SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
                        memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
                        pfree(value);

                }
                
                resDatum = 
SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
                confrelname = (char *) DatumGetName(resDatum);
        /*
        There must be at least 1 column in compound FK!!
        */
                WHERE = palloc(MAX_WHERE_CLAUSE);
                if (!strncmp(thisColsTypes[0],"int",3))
                        
sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],AthisColsVals[0]);
                else
                        
sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],AthisColsVals[0]);
                for (j=1;j<numOfCols;j++) {
                        if (!strncmp(thisColsTypes[j],"int",3))
                                sprintf(WHERE,"%s AND 
\"%s\"=%s",WHERE,fkColsNames[j],AthisColsVals[j]);
                        else
                                sprintf(WHERE,"%s AND 
\"%s\"='%s'",WHERE,fkColsNames[j],AthisColsVals[j]);
                }

                q = palloc(512);
                sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
                //elog(NOTICE,"handleParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
                elog(NOTICE,"Found FK, SQL=%s",q);
#endif
                ret = SPI_exec(q,1);
                if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                        elog(ERROR, "Fatal error: handleParents: par table %s 
should have row with %s",confrelname,WHERE);
                        return -2;
                }
                pfree(q);
                RealPar_processed = SPI_processed;
                RealPar_tupTable = SPI_tuptable;
                RealPar_tuple = RealPar_tupTable->vals[0];
                ParTableName = palloc(256);
                sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
                ret = 
handler(ParTableName,NULL,RealPar_tuple,RealPar_tupTable->tupdesc,confrelid,'m',slaveid,WHERE);
                pfree(ParTableName);
                pfree(WHERE);

        }
        return 0;
}

int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress) {
        char *q;
        int ret;

#if defined DEBUG_OUTPUT
        elog(NOTICE, "in existsInAccnt of Table=%s",cpTableName);
#endif

        q = palloc(MAX_QUERY_LEN);
        /*
        sprintf(q,"SELECT 1 FROM dbmirror_accounting WHERE tblname='%s' AND 
pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
        ret = SPI_exec(q,1);
        pfree(q);
        if (ret != SPI_OK_SELECT || SPI_processed != 1) return 0;
        else return 1;
        */
        sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt+1 WHERE tblname='%s' 
AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
        ret = SPI_exec(q,1);
        pfree(q);
        if (ret != SPI_OK_UPDATE || SPI_processed != 1) return 0;
        else return 1;
}

int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress) {
        char *q;
        int ret;
        bool isNull;

#if defined DEBUG_OUTPUT
        elog(NOTICE, "in decreaseAccnt of Table=%s",cpTableName);
#endif

        q = palloc(MAX_QUERY_LEN);
        sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt-1 WHERE tblname='%s' 
AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
        ret = SPI_exec(q,1);
        pfree(q);
        if (ret != SPI_OK_UPDATE || SPI_processed != 1) elog(ERROR,"problem in 
decreaseAccnt for table %s, slaveid=%d, 
pkxpress=%s",cpTableName,slaveid,pkxpress);
        q = palloc(MAX_QUERY_LEN);
        sprintf(q,"SELECT cnt FROM dbmirror_accounting WHERE tblname='%s' AND 
pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
        ret = SPI_exec(q,1);
        pfree(q);
        if (ret != SPI_OK_SELECT || SPI_processed != 1) elog(ERROR,"problem in 
decreaseAccnt for table %s, slaveid=%d, 
pkxpress=%s",cpTableName,slaveid,pkxpress);
        return 
DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],SPI_tuptable->tupdesc,1,&isNull));
}

int createAccnt(char *cpTableName, int slaveid, char *pkxpress) {
        char *q;
        int ret;

#if defined DEBUG_OUTPUT
        elog(NOTICE, "in createAccnt of Table=%s",cpTableName);
#endif

        q = palloc(MAX_QUERY_LEN);
        /*
        public.dbmirror_accounting.cnt DEFAULT = 1
        */
        sprintf(q,"INSERT INTO dbmirror_accounting (tblname,pkxpress,slaveid) 
VALUES('%s','%s',%d)",cpTableName,pkxpress,slaveid);
        ret = SPI_exec(q,1);
        pfree(q);
        if (ret != SPI_OK_INSERT || SPI_processed != 1) return -2;
        else return 0;
}


char *getPKxpress(char *cpTableName, HeapTuple tuple, TupleDesc tupleDesc, Oid 
tableOid) {
        int2vector *pk;
        int i;
        char *WHERE = palloc(MAX_WHERE_CLAUSE);

#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getPKxpress of Table=%s",cpTableName);
#endif

        pk = getPrimaryKey(tableOid);
        if (pk == NULL) return NULL;
        int16 *pkV = pk->values;
        int pkSize = pk->dim1;
        if (pkSize>0) {
                char *keyname;
                char *keyval;
                keyname = SPI_fname(tupleDesc,pkV[0]);
                if (keyname == NULL) {
                        elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for 
table %s",pkV[0],cpTableName);
                        return NULL;
                }
                keyval = SPI_getvalue(tuple,tupleDesc,pkV[0]);
                if (keyval == NULL) {
                        elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for 
table %s",pkV[0],cpTableName);
                        return NULL;
                }

                sprintf(WHERE,"\"%s\"=%s",keyname,keyval);
                pfree(keyname);
                pfree(keyval);
        }
        for (i=1;i<pkSize;i++) {
                char *keyname;
                char *keyval;
                keyname = SPI_fname(tupleDesc,pkV[i]);
                if (keyname == NULL) {
                        elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for 
table %s",pkV[i],cpTableName);
                        return NULL;
                }
                keyval = SPI_getvalue(tuple,tupleDesc,pkV[i]);
                if (keyval == NULL) {
                        elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for 
table %s",pkV[i],cpTableName);
                        return NULL;
                }

                sprintf(WHERE,"%s AND \"%s\"=%s",WHERE,keyname,keyval);
                pfree(keyname);
                pfree(keyval);
                
        }
        pfree(pk);
        return WHERE;
}
int *getSlaves(char *cpTableName,char *pkxpress) {
        char *q;
        int ret;
        int SLAVE_processed;
        SPITupleTable *SLAVE_tupTable;
        int *slaves;

#if defined DEBUG_OUTPUT
        elog(NOTICE, "in getSlaves of Table=%s, 
pkxpress=%s",cpTableName,pkxpress);
#endif

        if (pkxpress == NULL) return NULL;
        
        q = palloc(MAX_QUERY_LEN);
        sprintf(q,"SELECT slaveid FROM dbmirror_accounting WHERE tblname='%s' 
AND pkxpress='%s'",cpTableName,pkxpress);
        ret = SPI_exec(q,0);
        pfree(q);
        if (ret != SPI_OK_SELECT || SPI_processed <= 0) return NULL;
        SLAVE_tupTable = SPI_tuptable;
        SLAVE_processed = SPI_processed;
        slaves = palloc((SLAVE_processed+1) * sizeof(int));
#if defined DEBUG_OUTPUT
        elog(NOTICE,"SLAVE_processed=%d",SLAVE_processed);
#endif
        for (ret=0;ret<SLAVE_processed;ret++) {
                char *slaveidStr = 
SPI_getvalue(SLAVE_tupTable->vals[ret],SLAVE_tupTable->tupdesc,1);
                *(slaves+ret) = atoi(slaveidStr);
                pfree(slaveidStr);
                
        }
        *(slaves+SLAVE_processed) = 0;
        return slaves;


}

int deleteAccnt (char *cpTableName, char *pkxpress) {
        char *q;
        int ret;

#if defined DEBUG_OUTPUT
        elog(NOTICE, "in deleteAccnt of Table=%s",cpTableName);
#endif

        q = palloc(MAX_QUERY_LEN);
        sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND 
pkxpress='%s'",cpTableName,pkxpress);
        ret = SPI_exec(q,0);
        pfree(q);
        if (ret == SPI_OK_DELETE) return 0;
        else return -2;
}

int deleteSlaveAccnt (char *cpTableName,int slaveid, char *pkxpress) {
        char *q;
        int ret;

#if defined DEBUG_OUTPUT
        elog(NOTICE, "in deleteSlaveAccnt of Table=%s",cpTableName);
#endif

        q = palloc(MAX_QUERY_LEN);
        sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND 
pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid);
        ret = SPI_exec(q,0);
        pfree(q);
        if (ret == SPI_OK_DELETE) return 0;
        else return -2;
}

int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, 
TupleDesc tupleDesc,Oid tableOid,int slaveid) {
        char *qb;
        char *q;
        bool isNull;
        int ret;
        int i;
        HeapTuple resTuple;
        Datum resDatum;
        Oid confrelid;
        char *confrelname;
        int16   *thisCols;
        int16   *fkCols;
        int16   *run;
        ArrayType *arr;
        char    BthisColsVals[MAXKCOLS][MAXCOL_LEN];
        char    fkColsNames[MAXKCOLS][MAXCOL_LEN];
        char    thisColsTypes[MAXKCOLS][100];
        short   numOfCols;
        SPITupleTable *FK_tupTable;
        int     FK_processed;
        int ParSlaveId;

#if defined DEBUG_OUTPUT
        elog(NOTICE, "in updateAccnt parents of Table=%d",tableOid);
#endif

        qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM 
pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND 
c.confrelid = f.oid AND c.conrelid = ";
        q = palloc(strlen(qb)+MAX_OID_LEN+1);
        sprintf(q,"%s%d",qb,tableOid);
        ret = SPI_exec(q,0);
        pfree(q);
        if (ret != SPI_OK_SELECT || SPI_processed < 0) { 
                elog(NOTICE, "no FKs");
                return 0;
        }
        /*
        For every FK dependency we track and handle the parent table
        */
        FK_tupTable = SPI_tuptable;
        FK_processed = SPI_processed;

        for (i=0;i<FK_processed;i++) {
                char *WHERE;
                char *ParTableName;
                int j;
                char FkHasNullValueORXcluded=0;
                char decreaseit=0;

                SPITupleTable *RealPar_tupTable;
                int     RealPar_processed;
                HeapTuple RealPar_tuple;
                int colrun;
                SPITupleTable *thisatt_tupTable;
                int     thisatt_processed;
                HeapTuple thisatt_tuple;


                resTuple = FK_tupTable->vals[i];


                resDatum = 
SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull);
                confrelid = (Oid) DatumGetObjectId(resDatum);

                resDatum = 
SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
                arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
                thisCols = (int16 *)ARR_DATA_PTR(arr);
                numOfCols=ARR_DIMS(arr)[0];
                for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++)  {
                        int thiscolrun;
#if defined DEBUG_OUTPUT
#endif
                        char *fooname;

                        q = palloc(512);
                        sprintf(q,"SELECT attname FROM pg_attribute WHERE 
attrelid=%d and attnum=%d",tableOid,*run);
                        ret = SPI_exec(q,1);
                        if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                                elog(ERROR, "Fatal error: handleParents: this 
table oid=%d should have attribute with attnum=%d",tableOid,*run);
                                return -2;
                        }
                        pfree(q);
                        thisatt_processed = SPI_processed;
                        thisatt_tupTable = SPI_tuptable;
                        thisatt_tuple = thisatt_tupTable->vals[0];
                        fooname = 
SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1);
                        char *value = 
SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));

                        char *coltype = 
SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));



#if defined DEBUG_OUTPUT
                        elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with 
Before value=%s",fooname,*run,thiscolrun,value);
#endif
                        if (value == NULL || 
isExcluded(cpTableName,tupleDesc,thiscolrun))  {
                                FkHasNullValueORXcluded=1;
                                break;
                                /*
                                Nothing is done regarding "Before" status of 
parent since there is not one
                                */
                        }
                        
memcpy(&(BthisColsVals[colrun][0]),value,strlen(value)+1);
                        
memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1);

                        if (Atuple != NULL) {
                                char *Avalue = 
SPI_getvalue(Atuple,tupleDesc,thiscolrun);
#if defined DEBUG_OUTPUT
                                elog(NOTICE,"Found colname=%s syscol=%d 
thiscol=%d with Before value=%s,After 
value=%s",fooname,*run,thiscolrun,value,Avalue);
#endif
                                if (Avalue == NULL) decreaseit=1;
                                else if (strcmp(Avalue,value)) {
                                        decreaseit=1;
                                        pfree(Avalue);
                                }
                        }
                        else decreaseit=1; /* is delete operation*/
                        pfree(value);
                }
#if defined DEBUG_OUTPUT
                elog(NOTICE,"in updateAccnParents--> END OF ATTR LOOP");
                elog(NOTICE,"decrease it = %d, ",decreaseit);
                elog(NOTICE,"FkHasNullValueORXcluded  = %d, 
",FkHasNullValueORXcluded);
#endif

                if (FkHasNullValueORXcluded || !decreaseit) continue;


                
                
                resDatum = 
SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
                arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
                fkCols = (int16 *)ARR_DATA_PTR(arr);
                for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
                        SPITupleTable *PAR_tupTable;
                        int     PAR_processed;
                        HeapTuple PAR_resTuple;
                        char *value;
                        q = palloc(strlen("SELECT attname from pg_attribute 
where attrelid= and attnum=")+2*(MAX_OID_LEN+1));
                        sprintf(q,"SELECT attname from pg_attribute where 
attrelid=%d and attnum=%d",confrelid,*run);
                        ret = SPI_exec(q,1);
                        if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                                elog(ERROR, "Fatal Error:no PK for FK 
relation");
                                return -2;
                        }
                        pfree(q);
                        PAR_tupTable = SPI_tuptable;
                        PAR_processed = SPI_processed;
                        PAR_resTuple = PAR_tupTable->vals[0];
                        value = 
SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1);
                        memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1);
                        pfree(value);

                }
                
                resDatum = 
SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull);
                confrelname = (char *) DatumGetName(resDatum);
        /*
        there must be at least 1 column in compound FK!!
        */
                WHERE = palloc(MAX_WHERE_CLAUSE);
                if (!strncmp(thisColsTypes[0],"int",3))
                        
sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],BthisColsVals[0]);
                else
                        
sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],BthisColsVals[0]);
                for (j=1;j<numOfCols;j++) {
                        if (!strncmp(thisColsTypes[j],"int",3))
                                sprintf(WHERE,"%s AND 
\"%s\"=%s",WHERE,fkColsNames[j],BthisColsVals[j]);
                        else
                                sprintf(WHERE,"%s AND 
\"%s\"='%s'",WHERE,fkColsNames[j],BthisColsVals[j]);
                }

                q = palloc(512);
                sprintf(q,"SELECT * FROM public.%s WHERE %s",confrelname,WHERE);
                //elog(NOTICE,"updateAccntParents : Found FK, SQL=%s",q);
#if defined DEBUG_OUTPUT
                elog(NOTICE,"Found FK, SQL=%s",q);
#endif
                ret = SPI_exec(q,1);
                if (ret != SPI_OK_SELECT || SPI_processed != 1) {
                        elog(ERROR, "Fatal error: updateAcctParents: par table 
%s should have row with %s",confrelname,WHERE);
                        return -2;
                }
                pfree(q);
                RealPar_processed = SPI_processed;
                RealPar_tupTable = SPI_tuptable;
                RealPar_tuple = RealPar_tupTable->vals[0];
                ParTableName = palloc(256);
                sprintf(ParTableName,"\"public\".\"%s\"",confrelname);
                ParSlaveId = 
getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc);
                if (ParSlaveId >= 0 || ParSlaveId == -2 || ParSlaveId == -3) {
                        pfree(ParTableName);
                        continue;
                }
                if (decreaseAccnt(ParTableName,slaveid,WHERE)==0) {
                        ret = 
storePending(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,'d',slaveid,'f');
                        ret = deleteSlaveAccnt(ParTableName,slaveid,WHERE);
                        ret = 
updateAccntParents(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,slaveid);

                }
                pfree(ParTableName);
                pfree(WHERE);

        }
        return 0;
}
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wcast-function-type -Wshadow=compatible-local -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -O2 -fPIC -DPIC -fvisibility=hidden -I. -I./ -I/usr/local/pgsql/include/server -I/usr/local/pgsql/include/internal -I/usr/local/include     -c -o pending.o pending.c
pending.c: In function 'storePending':
pending.c:249:25: warning: variable 'tCurTuple' set but not used [-Wunused-but-set-variable]
  249 |         HeapTuple       tCurTuple;
      |                         ^~~~~~~~~
In file included from /usr/local/pgsql/include/server/nodes/execnodes.h:34,
                 from /usr/local/pgsql/include/server/commands/trigger.h:18,
                 from /usr/local/pgsql/include/server/executor/spi.h:16,
                 from pending.c:36:
pending.c: In function 'getPrimaryKey':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
  241 |         pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      |         |
      |         struct varlena *
pending.c:402:54: note: in expansion of macro 'PG_DETOAST_DATUM'
  402 |         tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
      |                                                      ^~~~~~~~~~~~~~~~
In file included from pending.c:34:
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
  317 | DatumGetPointer(Datum X)
      |                 ~~~~~~^
pending.c:403:9: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
  403 |         int n=tpResultKey->dim1;
      |         ^~~
pending.c: In function 'packageData':
pending.c:527:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
  527 |                         int16 *tpPKeysV = tpPKeys->values;
      |                         ^~~~~
pending.c:551:29: error: 'struct TupleDescData' has no member named 'attrs'
  551 |                if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
      |                             ^~
pending.c:571:71: error: 'struct TupleDescData' has no member named 'attrs'
  571 |                 cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
      |                                                                       ^~
pending.c: In function 'isExcluded':
pending.c:664:17: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
  664 |         int     SIDKEY_processed;
      |                 ^~~~~~~~~~~~~~~~
pending.c: In function 'handler':
pending.c:729:21: warning: variable 'retval' set but not used [-Wunused-but-set-variable]
  729 |                 int retval;
      |                     ^~~~~~
pending.c: In function 'getSlaveId':
pending.c:928:17: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
  928 |         int     SIDKEY_processed;
      |                 ^~~~~~~~~~~~~~~~
pending.c: In function 'getComputedSlaveId':
pending.c:993:5: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
  993 | int SIDKEY_processed;
      |     ^~~~~~~~~~~~~~~~
pending.c: In function 'getOldComputedSlaveId':
pending.c:1119:7: warning: unused variable 'ParTableName' [-Wunused-variable]
 1119 | char *ParTableName;
      |       ^~~~~~~~~~~~
pending.c:1116:5: warning: variable 'SIDKEY_processed' set but not used [-Wunused-but-set-variable]
 1116 | int SIDKEY_processed;
      |     ^~~~~~~~~~~~~~~~
pending.c: In function 'handleParents':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
  241 |         pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      |         |
      |         struct varlena *
pending.c:1444:53: note: in expansion of macro 'PG_DETOAST_DATUM'
 1444 |                 arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
      |                                                     ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
  317 | DatumGetPointer(Datum X)
      |                 ~~~~~~^
pending.c:1472:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
 1472 |                         char *value = SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
      |                         ^~~~
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
  241 |         pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      |         |
      |         struct varlena *
pending.c:1509:53: note: in expansion of macro 'PG_DETOAST_DATUM'
 1509 |                 arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
      |                                                     ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
  317 | DatumGetPointer(Datum X)
      |                 ~~~~~~^
pending.c:1513:33: warning: variable 'PAR_processed' set but not used [-Wunused-but-set-variable]
 1513 |                         int     PAR_processed;
      |                                 ^~~~~~~~~~~~~
pending.c:1434:25: warning: variable 'thisatt_processed' set but not used [-Wunused-but-set-variable]
 1434 |                 int     thisatt_processed;
      |                         ^~~~~~~~~~~~~~~~~
pending.c:1430:25: warning: variable 'RealPar_processed' set but not used [-Wunused-but-set-variable]
 1430 |                 int     RealPar_processed;
      |                         ^~~~~~~~~~~~~~~~~
pending.c: In function 'getPKxpress':
pending.c:1651:9: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
 1651 |         int16 *pkV = pk->values;
      |         ^~~~~
pending.c: In function 'updateAccntParents':
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
  241 |         pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      |         |
      |         struct varlena *
pending.c:1825:53: note: in expansion of macro 'PG_DETOAST_DATUM'
 1825 |                 arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
      |                                                     ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
  317 | DatumGetPointer(Datum X)
      |                 ~~~~~~^
pending.c:1846:25: warning: ISO C90 forbids mixed declarations and code [-Wdeclaration-after-statement]
 1846 |                         char *value = SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname));
      |                         ^~~~
/usr/local/pgsql/include/server/fmgr.h:241:9: warning: passing argument 1 of 'DatumGetPointer' makes integer from pointer without a cast [-Wint-conversion]
  241 |         pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
      |         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      |         |
      |         struct varlena *
pending.c:1891:53: note: in expansion of macro 'PG_DETOAST_DATUM'
 1891 |                 arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
      |                                                     ^~~~~~~~~~~~~~~~
/usr/local/pgsql/include/server/postgres.h:317:23: note: expected 'Datum' {aka 'long unsigned int'} but argument is of type 'struct varlena *'
  317 | DatumGetPointer(Datum X)
      |                 ~~~~~~^
pending.c:1895:33: warning: variable 'PAR_processed' set but not used [-Wunused-but-set-variable]
 1895 |                         int     PAR_processed;
      |                                 ^~~~~~~~~~~~~
pending.c:1814:25: warning: variable 'thisatt_processed' set but not used [-Wunused-but-set-variable]
 1814 |                 int     thisatt_processed;
      |                         ^~~~~~~~~~~~~~~~~
pending.c:1810:25: warning: variable 'RealPar_processed' set but not used [-Wunused-but-set-variable]
 1810 |                 int     RealPar_processed;
      |                         ^~~~~~~~~~~~~~~~~
gmake: *** [<builtin>: pending.o] Error 1
diff --git a/code/dbmirror/pending.c b/code/dbmirror/pending.c
index fca76318..b1c61e02 100644
--- a/code/dbmirror/pending.c
+++ b/code/dbmirror/pending.c
@@ -98,7 +98,7 @@ char *get_namespace_name(Oid nspid);
 
 #define BUFFER_SIZE 256
 #define MAX_OID_LEN 10
-//#define DEBUG_OUTPUT 1
+#define DEBUG_OUTPUT 1
 extern Datum recordchange(PG_FUNCTION_ARGS);
 
 PG_FUNCTION_INFO_V1(recordchange);
@@ -399,7 +399,8 @@ getPrimaryKey(Oid tblOid)
 		return NULL;
 	}
 
-	tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+	//tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+	tpResultKey = (int2vector *) PG_DETOAST_DATUM(resDatum);
 	int n=tpResultKey->dim1;
 	resultKey = palloc(Int2VectorSize(n));
 	if (n > 0)
@@ -548,7 +549,8 @@ packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid ta
 		}						/* KeyUsage!=ALL */
 #ifndef  NODROPCOLUMN
                // this comment is for 11
-               if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
+               ////if(tTupleDesc->attrs[iColumnCounter-1].attisdropped)
+		if (TupleDescAttr(tTupleDesc,iColumnCounter-1)->attisdropped)
                //if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
                  {
                    /**
@@ -568,7 +570,8 @@ eksairetea columns
 		}
 
 		// this comment is for 11
-		cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
+		////cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname));
+		cpFieldName = NameStr(TupleDescAttr(tTupleDesc,iColumnCounter - 1)->attname );
 		//cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname));
 #if defined DEBUG_OUTPUT
 		elog(NOTICE, "FieldName: %s", cpFieldName);
@@ -1441,7 +1444,8 @@ int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc
 		confrelid = (Oid) DatumGetObjectId(resDatum);
 
 		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
-		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		//arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
 		thisCols = (int16 *)ARR_DATA_PTR(arr);
 		numOfCols=ARR_DIMS(arr)[0];
 #if defined DEBUG_OUTPUT
@@ -1506,7 +1510,8 @@ int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc
 #endif
 		if (FkHasNullValueORXcluded || !handleit) continue;
 		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
-		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		//arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
 		fkCols = (int16 *)ARR_DATA_PTR(arr);
 		for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
 			SPITupleTable *PAR_tupTable;
@@ -1822,7 +1827,8 @@ int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, Tupl
 		confrelid = (Oid) DatumGetObjectId(resDatum);
 
 		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull);
-		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		//arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
 		thisCols = (int16 *)ARR_DATA_PTR(arr);
 		numOfCols=ARR_DIMS(arr)[0];
 		for (run=thisCols,colrun=0;colrun<numOfCols;run++,colrun++)  {
@@ -1888,7 +1894,8 @@ int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, Tupl
 		
 		
 		resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull);
-		arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		//arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum));
+		arr = (ArrayType *) PG_DETOAST_DATUM(resDatum);
 		fkCols = (int16 *)ARR_DATA_PTR(arr);
 		for (run=fkCols,colrun=0;colrun<numOfCols;run++,colrun++) {
 			SPITupleTable *PAR_tupTable;

Reply via email to