Hmm, I messed up the patch file I sent. Here's the complete patch.
-- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/ "Doing what he did amounts to sticking his fingers under the hood of the implementation; if he gets his fingers burnt, it's his problem." (Tom Lane)
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index bb4ef5e5e2..c86055b93c 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase> - TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ] + TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ] ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ] </synopsis> </refsynopsisdiv> @@ -110,6 +110,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r specified, the table and all its descendant tables (if any) are affected. Optionally, <literal>*</literal> can be specified after the table name to explicitly indicate that descendant tables are included. + Optionally, a column list can be specified. See <xref + linkend="sql-createpublication"/> for details. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index d805e8e77a..73a23cbb02 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase> - TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ] + TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ] ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ] </synopsis> </refsynopsisdiv> @@ -78,6 +78,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> publication, so they are never explicitly added to the publication. </para> + <para> + When a column list is specified, only the listed columns are replicated; + any other columns are ignored for the purpose of replication through + this publication. If no column list is specified, all columns of the + table are replicated through this publication, including any columns + added later. If a column list is specified, it must include the replica + identity columns. + </para> + <para> Only persistent base tables and partitioned tables can be part of a publication. Temporary tables, unlogged tables, foreign tables, diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index fe9c714257..a88d12e8ae 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -1472,7 +1472,7 @@ doDeletion(const ObjectAddress *object, int flags) break; case OCLASS_PUBLICATION_REL: - RemovePublicationRelById(object->objectId); + RemovePublicationRelById(object->objectId, object->objectSubId); break; case OCLASS_PUBLICATION: @@ -2754,8 +2754,12 @@ free_object_addresses(ObjectAddresses *addrs) ObjectClass getObjectClass(const ObjectAddress *object) { - /* only pg_class entries can have nonzero objectSubId */ + /* + * only pg_class and pg_publication_rel entries can have nonzero + * objectSubId + */ if (object->classId != RelationRelationId && + object->classId != PublicationRelRelationId && object->objectSubId != 0) elog(ERROR, "invalid non-zero objectSubId for object class %u", object->classId); diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index 2bae3fbb17..5eed248dcb 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -4019,6 +4019,7 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) /* translator: first %s is, e.g., "table %s" */ appendStringInfo(&buffer, _("publication of %s in publication %s"), rel.data, pubname); + /* FIXME add objectSubId support */ pfree(rel.data); ReleaseSysCache(tup); break; @@ -5853,9 +5854,16 @@ getObjectIdentityParts(const ObjectAddress *object, getRelationIdentity(&buffer, prform->prrelid, objname, false); appendStringInfo(&buffer, " in publication %s", pubname); + if (object->objectSubId) /* FIXME maybe get_attname */ + appendStringInfo(&buffer, " column %d", object->objectSubId); if (objargs) + { *objargs = list_make1(pubname); + if (object->objectSubId) + *objargs = lappend(*objargs, + psprintf("%d", object->objectSubId)); + } ReleaseSysCache(tup); break; diff --git a/src/backend/catalog/pg_depend.c b/src/backend/catalog/pg_depend.c index 5f37bf6d10..dfcb450e61 100644 --- a/src/backend/catalog/pg_depend.c +++ b/src/backend/catalog/pg_depend.c @@ -658,6 +658,56 @@ isObjectPinned(const ObjectAddress *object) * Various special-purpose lookups and manipulations of pg_depend. */ +/* + * Find all objects of the given class that reference the specified object, + * and add them to the given ObjectAddresses. + */ +void +findAndAddAddresses(ObjectAddresses *addrs, Oid classId, + Oid refclassId, Oid refobjectId, int32 refobjsubId) +{ + Relation depRel; + ScanKeyData key[3]; + SysScanDesc scan; + HeapTuple tup; + + depRel = table_open(DependRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_refclassid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(refclassId)); + ScanKeyInit(&key[1], + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(refobjectId)); + ScanKeyInit(&key[2], + Anum_pg_depend_refobjsubid, + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum(refobjsubId)); + + scan = systable_beginscan(depRel, DependReferenceIndexId, true, + NULL, 3, key); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_depend depform = (Form_pg_depend) GETSTRUCT(tup); + ObjectAddress object; + + if (depform->classid != classId) + continue; + + ObjectAddressSubSet(object, depform->classid, depform->objid, + depform->refobjsubid); + + add_exact_object_address(&object, addrs); + } + + systable_endscan(scan); + + table_close(depRel, AccessShareLock); +} + /* * Find the extension containing the specified object, if any diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 62f10bcbd2..ae58adc8e5 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -46,12 +46,18 @@ #include "utils/syscache.h" /* - * Check if relation can be in given publication and throws appropriate - * error if not. + * Check if relation can be in given publication and that the column + * filter is sensible, and throws appropriate error if not. + * + * targetcols is the bitmapset of column specified as column filter + * (shifted by FirstLowInvalidHeapAttributeNumber), or NULL if no column + * filter was specified. */ static void -check_publication_add_relation(Relation targetrel) +check_publication_add_relation(Relation targetrel, Bitmapset *columns) { + bool replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + /* Must be a regular or partitioned table */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) @@ -82,6 +88,40 @@ check_publication_add_relation(Relation targetrel) errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("This operation is not supported for unlogged tables."))); + + /* + * Enforce that the column filter can only leave out columns that aren't + * forced to be sent. + * + * No column can be excluded if REPLICA IDENTITY is FULL (since all the + * columns need to be sent regardless); and in other cases, the columns in + * the REPLICA IDENTITY cannot be left out. + */ + if (columns != NULL) + { + if (replidentfull) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("invalid column list for publishing relation \"%s\"", + RelationGetRelationName(targetrel)), + errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL.")); + else + { + Bitmapset *idattrs; + + idattrs = RelationGetIndexAttrBitmap(targetrel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + if (!bms_is_subset(idattrs, columns)) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("invalid column list for publishing relation \"%s\"", + RelationGetRelationName(targetrel)), + errdetail("All columns in REPLICA IDENTITY must be present in the column list.")); + + if (idattrs) + pfree(idattrs); + } + } } /* @@ -289,9 +329,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, Oid relid = RelationGetRelid(targetrel->relation); Oid prrelid; Publication *pub = GetPublication(pubid); + Bitmapset *attmap = NULL; + AttrNumber *attarray; + int natts = 0; + int attnum; ObjectAddress myself, referenced; List *relids = NIL; + ListCell *lc; rel = table_open(PublicationRelRelationId, RowExclusiveLock); @@ -305,6 +350,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, { table_close(rel, RowExclusiveLock); + /* FIXME need to handle the case of different column list */ + if (if_not_exists) return InvalidObjectAddress; @@ -314,7 +361,34 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, RelationGetRelationName(targetrel->relation), pub->name))); } - check_publication_add_relation(targetrel->relation); + attarray = palloc(sizeof(AttrNumber) * list_length(targetrel->columns)); + foreach(lc, targetrel->columns) + { + char *colname = strVal(lfirst(lc)); + AttrNumber attnum = get_attnum(relid, colname); + + if (attnum == InvalidAttrNumber) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colname, RelationGetRelationName(targetrel->relation))); + if (attnum < 0) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot reference system column \"%s\" in publication column list", + colname)); + + if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, attmap)) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("column \"%s\" specified twice in publication column list", + colname)); + + attmap = bms_add_member(attmap, attnum - FirstLowInvalidHeapAttributeNumber); + attarray[natts++] = attnum; + } + + check_publication_add_relation(targetrel->relation, attmap); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -327,6 +401,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + if (targetrel->columns) + { + int2vector *prattrs; + + prattrs = buildint2vector(attarray, natts); + values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs); + } + else + nulls[Anum_pg_publication_rel_prattrs - 1] = true; tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -344,6 +427,21 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, ObjectAddressSet(referenced, RelationRelationId, relid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + /* + * If there's an explicit column list, make one dependency entry for each + * column. Note that the referencing side of the dependency is also + * specific to one column, so that it can be dropped separately if the + * column is dropped. + */ + while ((attnum = bms_first_member(attmap)) >= 0) + { + ObjectAddressSubSet(referenced, RelationRelationId, relid, + attnum + FirstLowInvalidHeapAttributeNumber); + myself.objectSubId = attnum + FirstLowInvalidHeapAttributeNumber; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + myself.objectSubId = 0; /* need to undo this bit */ + /* Close the table. */ table_close(rel, RowExclusiveLock); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 404bb5d0c8..a070914bdd 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -561,7 +561,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, pubrel = palloc(sizeof(PublicationRelInfo)); pubrel->relation = oldrel; - + pubrel->columns = NIL; delrels = lappend(delrels, pubrel); } } @@ -757,10 +757,11 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) } /* - * Remove relation from publication by mapping OID. + * Remove relation from publication by mapping OID, or publication status + * of one column of that relation in the publication if an attnum is given. */ void -RemovePublicationRelById(Oid proid) +RemovePublicationRelById(Oid proid, int32 attnum) { Relation rel; HeapTuple tup; @@ -790,7 +791,81 @@ RemovePublicationRelById(Oid proid) InvalidatePublicationRels(relids); - CatalogTupleDelete(rel, &tup->t_self); + /* + * If no column is given, simply delete the relation from the publication. + * + * If a column is given, what we do instead is to remove that column from + * the column list. The relation remains in the publication, with the + * other columns. However, dropping the last column is disallowed. + */ + if (attnum == 0) + { + CatalogTupleDelete(rel, &tup->t_self); + } + else + { + Datum adatum; + ArrayType *arr; + int nelems; + int16 *elems; + int16 *newelems; + int2vector *newvec; + Datum values[Natts_pg_publication_rel]; + bool nulls[Natts_pg_publication_rel]; + bool replace[Natts_pg_publication_rel]; + HeapTuple newtup; + int i, + j; + bool isnull; + + /* Obtain the original column list */ + adatum = SysCacheGetAttr(PUBLICATIONRELMAP, + tup, + Anum_pg_publication_rel_prattrs, + &isnull); + if (isnull) /* shouldn't happen */ + elog(ERROR, "can't drop column from publication without a column list"); + arr = DatumGetArrayTypeP(adatum); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + /* Construct a list excluding the given column */ + newelems = palloc(sizeof(int16) * nelems - 1); + for (i = 0, j = 0; i < nelems - 1; i++) + { + if (elems[i] == attnum) + continue; + newelems[j++] = elems[i]; + } + + /* + * If this is the last column used in the publication, disallow the + * command. We could alternatively just drop the relation from the + * publication. + */ + if (j == 0) + { + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop the last column in publication \"%s\"", + get_publication_name(pubrel->prpubid, false)), + errhint("Remove table \"%s\" from the publication first.", + get_rel_name(pubrel->prrelid))); + } + + /* Build the updated tuple */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, false, sizeof(nulls)); + MemSet(replace, false, sizeof(replace)); + newvec = buildint2vector(newelems, j); + values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(newvec); + replace[Anum_pg_publication_rel_prattrs - 1] = true; + + /* Execute the update */ + newtup = heap_modify_tuple(tup, RelationGetDescr(rel), + values, nulls, replace); + CatalogTupleUpdate(rel, &tup->t_self, newtup); + } ReleaseSysCache(tup); @@ -932,6 +1007,8 @@ OpenTableList(List *tables) pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + pub_rel->columns = t->columns; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); @@ -965,8 +1042,11 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); + pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + pub_rel->columns = t->columns; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); } @@ -1074,6 +1154,12 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) Relation rel = pubrel->relation; Oid relid = RelationGetRelid(rel); + if (pubrel->columns) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column list may not be specified for relation \"%s\" in ALTER PUBLICATION ... SET/DROP command", + RelationGetRelationName(pubrel->relation))); + prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 47b29001d5..7207dcf9c0 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -40,8 +40,9 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" -#include "catalog/pg_tablespace.h" +#include "catalog/pg_publication_rel.h" #include "catalog/pg_statistic_ext.h" +#include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" #include "catalog/storage.h" @@ -8420,6 +8421,13 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, ReleaseSysCache(tuple); + /* + * If the column is part of a replication column list, arrange to get that + * removed too. + */ + findAndAddAddresses(addrs, PublicationRelRelationId, + RelationRelationId, RelationGetRelid(rel), attnum); + /* * Propagate to children as appropriate. Unlike most other ALTER * routines, we have to do this one level of recursion at a time; we can't diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index df0b747883..0ff4c1ceac 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4833,6 +4833,7 @@ _copyPublicationTable(const PublicationTable *from) PublicationTable *newnode = makeNode(PublicationTable); COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(columns); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index cb7ddd463c..d786a688ac 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2312,6 +2312,7 @@ static bool _equalPublicationTable(const PublicationTable *a, const PublicationTable *b) { COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(columns); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 3d4dd43e47..4dad6fedfb 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9742,12 +9742,13 @@ CreatePublicationStmt: * relation_expr here. */ PublicationObjSpec: - TABLE relation_expr + TABLE relation_expr opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_TABLE; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $2; + $$->pubtable->columns = $3; } | ALL TABLES IN_P SCHEMA ColId { @@ -9762,28 +9763,38 @@ PublicationObjSpec: $$->pubobjtype = PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA; $$->location = @5; } - | ColId + | ColId opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; - $$->name = $1; + if ($2 != NULL) + { + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVar(NULL, $1, @1); + $$->pubtable->columns = $2; + $$->name = NULL; + } + else + $$->name = $1; $$->location = @1; } - | ColId indirection + | ColId indirection opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->pubtable->columns = $3; $$->location = @1; } /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ - | extended_relation_expr + | extended_relation_expr opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $1; + $$->pubtable->columns = $2; } | CURRENT_SCHEMA { @@ -17435,8 +17446,9 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) { /* convert it to PublicationTable */ PublicationTable *pubtable = makeNode(PublicationTable); - pubtable->relation = makeRangeVar(NULL, pubobj->name, - pubobj->location); + + pubtable->relation = + makeRangeVar(NULL, pubobj->name, pubobj->location); pubobj->pubtable = pubtable; pubobj->name = NULL; } @@ -17444,6 +17456,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_SCHEMA || pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA) { + /* + * This can happen if a column list is specified in a continuation + * for a schema entry; reject it. + */ + if (pubobj->pubtable) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column specification not allowed for schemas"), + parser_errposition(pubobj->location)); + /* * We can distinguish between the different type of schema * objects based on whether name and pubtable is set. diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9f5bf4b639..15d8192238 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -29,9 +29,9 @@ #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) -static void logicalrep_write_attrs(StringInfo out, Relation rel); +static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); + HeapTuple tuple, bool binary, Bitmapset *att_map); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - HeapTuple newtuple, bool binary) + HeapTuple newtuple, bool binary, Bitmapset *att_map) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, att_map); } /* @@ -442,7 +442,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + HeapTuple oldtuple, HeapTuple newtuple, bool binary, Bitmapset *att_map) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -463,11 +463,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, att_map); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, att_map); } /* @@ -536,7 +536,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, NULL); } /* @@ -651,7 +651,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *att_map) { char *relname; @@ -673,7 +673,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel); + logicalrep_write_attrs(out, rel, att_map); } /* @@ -749,20 +749,42 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary, + Bitmapset *att_map) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; bool isnull[MaxTupleAttributeNumber]; int i; uint16 nliveatts = 0; + Bitmapset *idattrs = NULL; + bool replidentfull; + Form_pg_attribute att; desc = RelationGetDescr(rel); + replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + if (!replidentfull) + idattrs = RelationGetIdentityKeyBitmap(rel); + for (i = 0; i < desc->natts; i++) { + att = TupleDescAttr(desc, i); if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) continue; + + /* + * Do not increment count of attributes if not a part of column + * filters except for replica identity columns or if replica identity + * is full. + */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map) && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs) && + !replidentfull) + continue; nliveatts++; } pq_sendint16(out, nliveatts); @@ -800,6 +822,19 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar continue; } + /* + * Do not send attribute data if it is not a part of column filters, + * except if it is a part of REPLICA IDENTITY or REPLICA IDENTITY is + * full, send the data. + */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map) && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs) && + !replidentfull) + continue; + typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid)); if (!HeapTupleIsValid(typtup)) elog(ERROR, "cache lookup failed for type %u", att->atttypid); @@ -904,7 +939,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map) { TupleDesc desc; int i; @@ -914,20 +949,35 @@ logicalrep_write_attrs(StringInfo out, Relation rel) desc = RelationGetDescr(rel); - /* send number of live attributes */ - for (i = 0; i < desc->natts; i++) - { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) - continue; - nliveatts++; - } - pq_sendint16(out, nliveatts); - /* fetch bitmap of REPLICATION IDENTITY attributes */ replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); if (!replidentfull) idattrs = RelationGetIdentityKeyBitmap(rel); + /* send number of live attributes */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ + if (replidentfull || + bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs)) + { + nliveatts++; + continue; + } + /* Skip sending if not a part of column filter */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map)) + continue; + nliveatts++; + } + pq_sendint16(out, nliveatts); + /* send the attributes */ for (i = 0; i < desc->natts; i++) { @@ -937,6 +987,17 @@ logicalrep_write_attrs(StringInfo out, Relation rel) if (att->attisdropped || att->attgenerated) continue; + /* + * Exclude filtered columns, but REPLICA IDENTITY columns can't be + * excluded + */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map) && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs) + && !replidentfull) + continue; /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f07983a43c..15902faf56 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -111,6 +111,7 @@ #include "replication/origin.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -695,19 +696,25 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel) { WalRcvExecResult *res; + WalRcvExecResult *res_pub; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; + TupleTableSlot *slot_pub; + Oid tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID}; Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid pubRow[] = {TEXTARRAYOID}; bool isnull; int natt; + List *pub_columns = NIL; + ListCell *lc; + bool am_partition = false; lrel->nspname = nspname; lrel->relname = relname; /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" @@ -737,6 +744,7 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); Assert(!isnull); + am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull)); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -774,11 +782,101 @@ fetch_remote_table_info(char *nspname, char *relname, natt = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + + /* + * Now, fetch the values of publications' column filters. + * + * For a partition, use pg_inherit to find the parent, as the + * pg_publication_rel contains only the topmost parent table entry in case + * the table is partitioned. Run a recursive query to iterate through all + * the parents of the partition and retreive the record for the parent + * that exists in pg_publication_rel. + */ + resetStringInfo(&cmd); + appendStringInfoString(&cmd, + "SELECT CASE WHEN prattrs IS NOT NULL THEN\n" + " ARRAY(SELECT attname\n" + " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(prattrs::int[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = prrelid AND attnum = prattrs[s])\n" + " ELSE NULL END AS columns\n" + "FROM pg_catalog.pg_publication_rel\n"); + if (!am_partition) + appendStringInfo(&cmd, "WHERE prrelid = %u", lrel->remoteid); + else + appendStringInfo(&cmd, + "WHERE prrelid IN (SELECT relid \n" + "FROM pg_catalog.pg_partition_tree(pg_catalog.pg_partition_root(%u)))", + lrel->remoteid); + + res_pub = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(pubRow), pubRow); + + if (res_pub->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch published columns info for table \"%s.%s\" from publisher: %s", + nspname, relname, res_pub->err))); + slot_pub = MakeSingleTupleTableSlot(res_pub->tupledesc, &TTSOpsMinimalTuple); + + while (tuplestore_gettupleslot(res_pub->tuplestore, true, false, slot_pub)) + { + Datum adatum; + Datum *elems; + bool *nulls; + int nelems; + + adatum = slot_getattr(slot_pub, 1, &isnull); + if (isnull) /* shouldn't happen */ + elog(ERROR, "unexpected null value in publication column filter"); + deconstruct_array(DatumGetArrayTypeP(adatum), + TEXTOID, -1, false, TYPALIGN_INT, + &elems, &nulls, &nelems); + for (int i = 0; i < nelems; i++) + { + if (nulls[i]) /* shouldn't happen */ + elog(ERROR, "unexpected null value in publication column filter"); + pub_columns = lappend(pub_columns, TextDatumGetCString(elems[i])); + } + ExecClearTuple(slot_pub); + } + ExecDropSingleTupleTableSlot(slot_pub); + walrcv_clear_result(res_pub); + + /* + * Store the column names only if they are contained in column filter + * LogicalRepRelation will only contain attributes corresponding to those + * specficied in column filters. + */ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = - TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + char *rel_colname; + bool found = false; + + rel_colname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); Assert(!isnull); + if (pub_columns != NIL) + { + foreach(lc, pub_columns) + { + char *pub_colname = lfirst(lc); + + if (!strcmp(pub_colname, rel_colname)) + { + found = true; + lrel->attnames[natt] = rel_colname; + break; + } + } + } + else + { + found = true; + lrel->attnames[natt] = rel_colname; + } + if (!found) + continue; + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); Assert(!isnull); if (DatumGetBool(slot_getattr(slot, 3, &isnull))) @@ -829,8 +927,17 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); if (lrel.relkind == RELKIND_RELATION) - appendStringInfo(&cmd, "COPY %s TO STDOUT", + { + appendStringInfo(&cmd, "COPY %s (", quote_qualified_identifier(lrel.nspname, lrel.relname)); + for (int i = 0; i < lrel.natts; i++) + { + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + if (i < lrel.natts - 1) + appendStringInfoString(&cmd, ", "); + } + appendStringInfo(&cmd, ") TO STDOUT"); + } else { /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 6f6a203dea..f9f9ecd0c0 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,16 +15,19 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel_d.h" #include "commands/defrem.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx); + LogicalDecodingContext *ctx, + Bitmapset *att_map); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -130,6 +134,7 @@ typedef struct RelationSyncEntry * having identical TupleDesc. */ TupleConversionMap *map; + Bitmapset *att_map; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -570,11 +575,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, } MemoryContextSwitchTo(oldctx); - send_relation_and_attrs(ancestor, xid, ctx); + send_relation_and_attrs(ancestor, xid, ctx, relentry->att_map); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx); + send_relation_and_attrs(relation, xid, ctx, relentry->att_map); if (in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -587,7 +592,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, */ static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx) + LogicalDecodingContext *ctx, + Bitmapset *att_map) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -610,13 +616,25 @@ send_relation_and_attrs(Relation relation, TransactionId xid, if (att->atttypid < FirstGenbkiObjectId) continue; + /* + * Do not send type information if attribute is not present in column + * filter. XXX Allow sending type information for REPLICA IDENTITY + * COLUMNS with user created type. even when they are not mentioned in + * column filters. + * + * FIXME -- this code seems not verified by tests. + */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map)) + continue; OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation); + logicalrep_write_rel(ctx->out, xid, relation, att_map); OutputPluginWrite(ctx, false); } @@ -693,7 +711,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, relation, tuple, - data->binary); + data->binary, relentry->att_map); OutputPluginWrite(ctx, true); break; } @@ -722,7 +740,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); + newtuple, data->binary, relentry->att_map); OutputPluginWrite(ctx, true); break; } @@ -1122,6 +1140,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); bool found; + Oid ancestor_id; MemoryContext oldctx; Assert(RelationSyncCache != NULL); @@ -1142,6 +1161,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->publish_as_relid = InvalidOid; + entry->att_map = NULL; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ } @@ -1182,6 +1202,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Publication *pub = lfirst(lc); bool publish = false; + bool ancestor_published = false; if (pub->alltables) { @@ -1192,8 +1213,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (!publish) { - bool ancestor_published = false; - /* * For a partition, check if any of the ancestors are * published. If so, note down the topmost ancestor that is @@ -1219,6 +1238,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) pub->oid)) { ancestor_published = true; + ancestor_id = ancestor; if (pub->pubviaroot) publish_as_relid = ancestor; } @@ -1239,15 +1259,47 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (publish && (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) { + Oid relid; + HeapTuple pub_rel_tuple; + + relid = ancestor_published ? ancestor_id : publish_as_relid; + pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pub->oid)); + + if (HeapTupleIsValid(pub_rel_tuple)) + { + Datum pub_rel_cols; + bool isnull; + + pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP, + pub_rel_tuple, + Anum_pg_publication_rel_prattrs, + &isnull); + if (!isnull) + { + ArrayType *arr; + int nelems; + int16 *elems; + + arr = DatumGetArrayTypeP(pub_rel_cols); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + /* XXX is there a danger of memory leak here? beware */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + for (int i = 0; i < nelems; i++) + entry->att_map = bms_add_member(entry->att_map, + elems[i] - FirstLowInvalidHeapAttributeNumber); + MemoryContextSwitchTo(oldctx); + } + ReleaseSysCache(pub_rel_tuple); + } entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; } list_free(pubids); @@ -1343,6 +1395,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) entry->schema_sent = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; + bms_free(entry->att_map); + entry->att_map = NULL; if (entry->map) { /* diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 10a86f9810..0c438481dc 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4265,6 +4265,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_oid; int i_prpubid; int i_prrelid; + int i_prattrs; int i, j, ntups; @@ -4276,8 +4277,13 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* Collect all publication membership info. */ appendPQExpBufferStr(query, - "SELECT tableoid, oid, prpubid, prrelid " - "FROM pg_catalog.pg_publication_rel"); + "SELECT tableoid, oid, prpubid, prrelid"); + if (fout->remoteVersion >= 150000) + appendPQExpBufferStr(query, ", prattrs"); + else + appendPQExpBufferStr(query, ", NULL as prattrs"); + appendPQExpBufferStr(query, + " FROM pg_catalog.pg_publication_rel"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); ntups = PQntuples(res); @@ -4286,6 +4292,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_oid = PQfnumber(res, "oid"); i_prpubid = PQfnumber(res, "prpubid"); i_prrelid = PQfnumber(res, "prrelid"); + i_prattrs = PQfnumber(res, "prattrs"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4327,6 +4334,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) pubrinfo[j].publication = pubinfo; pubrinfo[j].pubtable = tbinfo; + if (!PQgetisnull(res, i, i_prattrs)) + { + char **attnames; + int nattnames; + PQExpBuffer attribs; + + if (!parsePGArray(PQgetvalue(res, i, i_prattrs), + &attnames, &nattnames)) + fatal("could not parse %s array", "prattrs"); + attribs = createPQExpBuffer(); + for (int k = 0; k < nattnames; k++) + { + if (k > 0) + appendPQExpBufferStr(attribs, ", "); + + appendPQExpBufferStr(attribs, fmtId(attnames[k])); + } + pubrinfo[i].pubrattrs = attribs->data; + } + else + pubrinfo[j].pubrattrs = NULL; + /* Decide whether we want to dump it */ selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout); @@ -4391,10 +4420,12 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) query = createPQExpBuffer(); - appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY", + appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY ", fmtId(pubinfo->dobj.name)); - appendPQExpBuffer(query, " %s;\n", - fmtQualifiedDumpable(tbinfo)); + appendPQExpBufferStr(query, fmtQualifiedDumpable(tbinfo)); + if (pubrinfo->pubrattrs) + appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs); + appendPQExpBufferStr(query, ";\n"); /* * There is no point in creating a drop query as the drop is done by table diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 6dccb4be4e..50a5b885f6 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -633,6 +633,7 @@ typedef struct _PublicationRelInfo DumpableObject dobj; PublicationInfo *publication; TableInfo *pubtable; + char *pubrattrs; } PublicationRelInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 72d8547628..46fa616406 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6302,7 +6302,7 @@ listPublications(const char *pattern) */ static bool addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, - bool singlecol, printTableContent *cont) + bool as_schema, printTableContent *cont) { PGresult *res; int count = 0; @@ -6319,10 +6319,14 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, for (i = 0; i < count; i++) { - if (!singlecol) + if (!as_schema) /* as table */ + { printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0), PQgetvalue(res, i, 1)); - else + if (!PQgetisnull(res, i, 2)) + appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 2)); + } + else /* as schema */ printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); printTableAddFooter(cont, buf->data); @@ -6450,8 +6454,20 @@ describePublications(const char *pattern) { /* Get the tables for the specified publication */ printfPQExpBuffer(&buf, - "SELECT n.nspname, c.relname\n" - "FROM pg_catalog.pg_class c,\n" + "SELECT n.nspname, c.relname, \n"); + if (pset.sversion >= 150000) + appendPQExpBufferStr(&buf, + " CASE WHEN pr.prattrs IS NOT NULL THEN\n" + " pg_catalog.array_to_string" + "(ARRAY(SELECT attname\n" + " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::int[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n" + " ELSE NULL END AS columns"); + else + appendPQExpBufferStr(&buf, "NULL as columns"); + appendPQExpBuffer(&buf, + "\nFROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" " pg_catalog.pg_publication_rel pr\n" "WHERE c.relnamespace = n.oid\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 2f412ca3db..84ee807e0b 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1648,6 +1648,8 @@ psql_completion(const char *text, int start, int end) /* ALTER PUBLICATION <name> ADD */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD")) COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE")) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL); /* ALTER PUBLICATION <name> DROP */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP")) COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h index 3eca295ff4..76d421e09e 100644 --- a/src/include/catalog/dependency.h +++ b/src/include/catalog/dependency.h @@ -214,6 +214,9 @@ extern long changeDependenciesOf(Oid classId, Oid oldObjectId, extern long changeDependenciesOn(Oid refClassId, Oid oldRefObjectId, Oid newRefObjectId); +extern void findAndAddAddresses(ObjectAddresses *addrs, Oid classId, + Oid refclassId, Oid refobjectId, int32 refobjsubId); + extern Oid getExtensionOfObject(Oid classId, Oid objectId); extern List *getAutoExtensionsOfObject(Oid classId, Oid objectId); diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 902f2f2f0d..f5ae2065e9 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -86,6 +86,7 @@ typedef struct Publication typedef struct PublicationRelInfo { Relation relation; + List *columns; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index b5d5504cbb..7ad285faae 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ +#ifdef CATALOG_VARLEN + int2vector prattrs; /* Variable length field starts here */ +#endif } FormData_pg_publication_rel; /* ---------------- diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 4ba68c70ee..23f037df7f 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -25,7 +25,7 @@ extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt); extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt); extern void RemovePublicationById(Oid pubid); -extern void RemovePublicationRelById(Oid proid); +extern void RemovePublicationRelById(Oid proid, int32 attnum); extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 4c5a8a39bf..02b547d044 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3642,6 +3642,7 @@ typedef struct PublicationTable { NodeTag type; RangeVar *relation; /* relation to be published */ + List *columns; /* List of columns in a publication table */ } PublicationTable; /* diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 83741dcf42..709b4be916 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, - bool binary); + bool binary, Bitmapset *att_map); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary); + HeapTuple newtuple, bool binary, Bitmapset *att_map); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); @@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in, extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel); + Relation rel, Bitmapset *att_map); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 5ac2d666a2..84afe0ebef 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -165,7 +165,35 @@ Publications: regress_publication_user | t | t | t | f | f | f (1 row) -DROP TABLE testpub_tbl2; +CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text); +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (x, y, z); -- error +ERROR: column "x" of relation "testpub_tbl5" does not exist +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); -- error +ERROR: column "x" of relation "testpub_tbl5" does not exist +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); -- error +ERROR: invalid column list for publishing relation "testpub_tbl5" +DETAIL: All columns in REPLICA IDENTITY must be present in the column list. +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- ok +ALTER TABLE testpub_tbl5 DROP COLUMN c; +\dRp+ testpub_fortable + Publication testpub_fortable + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_tbl5" (a) +Tables from schemas: + "pub_test" + +ALTER TABLE testpub_tbl5 DROP COLUMN a; +ERROR: cannot drop the last column in publication "testpub_fortable" +HINT: Remove table "testpub_tbl5" from the publication first. +CREATE TABLE testpub_tbl6 (a int, b text, c text); +ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); -- error +ERROR: invalid column list for publishing relation "testpub_tbl6" +DETAIL: Cannot have column filter on relations with REPLICA IDENTITY FULL. +DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6; DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema; CREATE TABLE testpub_tbl3 (a int); CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); @@ -669,6 +697,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes Tables from schemas: "pub_test1" +-- Verify that it fails to add a schema with a column specification +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); +ERROR: syntax error at or near "(" +LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); + ^ +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b); +ERROR: column specification not allowed for schemas +LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)... + ^ -- cleanup pub_test1 schema for invalidation tests ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 56dd358554..200158ba69 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -89,7 +89,20 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall \d+ testpub_tbl2 \dRp+ testpub_foralltables -DROP TABLE testpub_tbl2; +CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text); +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (x, y, z); -- error +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); -- error +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); -- error +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- ok +ALTER TABLE testpub_tbl5 DROP COLUMN c; +\dRp+ testpub_fortable +ALTER TABLE testpub_tbl5 DROP COLUMN a; + +CREATE TABLE testpub_tbl6 (a int, b text, c text); +ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); -- error + +DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6; DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema; CREATE TABLE testpub_tbl3 (a int); @@ -362,6 +375,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1; \dRp+ testpub1_forschema +-- Verify that it fails to add a schema with a column specification +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b); + -- cleanup pub_test1 schema for invalidation tests ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; diff --git a/src/test/subscription/t/021_column_filter.pl b/src/test/subscription/t/021_column_filter.pl new file mode 100644 index 0000000000..354e6ac363 --- /dev/null +++ b/src/test/subscription/t/021_column_filter.pl @@ -0,0 +1,162 @@ +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test TRUNCATE +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 10; + +# setup + +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 6)); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int)"); +# Test with weird column names +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)"); +#Test replication with multi-level partition +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)"); + +#Test create publication with column filtering +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(a,b)"); + +my $result = $node_publisher->safe_psql('postgres', + "select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;"); +is($result, qq(tab1|1 2 +tab3|1 3 +test_part|1 2), 'publication relation updated'); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); +#Initial sync +$node_publisher->wait_for_catchup('sub1'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1,2,3)"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab3 VALUES (1,2,3)"); +#Test for replication of partition data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')"); +#Test for replication of multi-level partition data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (4,'abc', '2021-07-04 12:00:00')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (5,'bcd', '2021-07-03 11:12:13')"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1"); +is($result, qq(1|2|), 'insert on column c is not replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab3"); +is($result, qq(1|3), 'insert on column b is not replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_part"); +is($result, qq(1|abc\n2|bcd\n4|abc\n5|bcd), 'insert on all columns is replicated'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET c = 5 where a = 1"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1"); +is($result, qq(1|2|), 'update on column c is not replicated'); + +#Test alter publication with column filtering +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION" +); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1,'abc',3)"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab2"); +is($result, qq(1|abc), 'insert on column c is not replicated'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET c = 5 where a = 1"); +is($result, qq(1|abc), 'update on column c is not replicated'); + +# Test behavior when a column is dropped +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_part DROP COLUMN b"); +$result = $node_publisher->safe_psql('postgres', + "select prrelid::regclass, prattrs from pg_publication_rel pb;"); +is($result, + q(tab1|1 2 +tab3|1 3 +tab2|1 2 +test_part|1), 'column test_part.b removed'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (3, '2021-12-13 12:13:14')"); +$node_publisher->wait_for_catchup('sub1'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_part WHERE a = 3"); +is($result, "3|", 'only column a is replicated'); + +$node_publisher->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY, b int, c int, d int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY, b int, d int)"); +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab4 (a, b)"); +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab4 (a, d)"); +$node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2, pub3"); +$node_publisher->wait_for_catchup('sub2'); +$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (1, 11, 111, 1111)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (2, 22, 222, 2222)"); +$node_publisher->wait_for_catchup('sub2'); +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab4;"), + qq(1|11|1111 +2|22|2222), + 'overlapping publications with overlapping column lists');