On 2021-Dec-28, Alvaro Herrera wrote: > There are still some XXX comments. The one that bothers me most is the > lack of an implementation that allows changing the column list in a > publication without having to remove the table from the publication > first.
OK, I made some progress on this front; I added new forms of ALTER PUBLICATION to support it: ALTER PUBLICATION pub1 ALTER TABLE tbl SET COLUMNS (a, b, c); ALTER PUBLICATION pub1 ALTER TABLE tbl SET COLUMNS ALL; (not wedded to this syntax; other suggestions welcome) In order to implement it I changed the haphazardly chosen use of DEFELEM actions to a new enum. I also noticed that the division of labor between pg_publication.c and publicationcmds.c is quite broken (code to translate column names to numbers is in the former, should be in the latter; some code that deals with pg_publication tuples is in the latter, should be in the former, such as CreatePublication, AlterPublicationOptions). This new stuff is not yet finished. For example I didn't refactor handling of REPLICA IDENTITY, so the new command does not correctly check everything, such as the REPLICA IDENTITY FULL stuff. Also, no tests have been added yet. In manual tests it seems to behave as expected. I noticed that prattrs is inserted in user-specified order instead of catalog order, which is innocuous but quite weird. -- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/ "No renuncies a nada. No te aferres a nada."
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 34a7034282..5bc2e7a591 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6877,7 +6877,9 @@ Relation </listitem> </varlistentry> </variablelist> - Next, the following message part appears for each column (except generated columns): + Next, the following message part appears for each column (except + generated columns and other columns that don't appear in the column + filter list, for tables that have one): <variablelist> <varlistentry> <term> diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index bb4ef5e5e2..4951343f6f 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -25,12 +25,13 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD <replace ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET <replaceable class="parameter">publication_object</replaceable> [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP <replaceable class="parameter">publication_object</replaceable> [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ALTER TABLE <replaceable class="parameter">publication_object</replaceable> SET COLUMNS { ( <replaceable class="parameter">name</replaceable> [, ...] ) | ALL ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable> <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase> - TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ] + TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ] ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ] </synopsis> </refsynopsisdiv> @@ -62,6 +63,11 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r command retain their previous settings. </para> + <para> + The <literal>ALTER TABLE ... SET COLUMNS</literal> variant allows to change + the set of columns that are included in the publication. + </para> + <para> The remaining variants change the owner and the name of the publication. </para> @@ -110,6 +116,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r specified, the table and all its descendant tables (if any) are affected. Optionally, <literal>*</literal> can be specified after the table name to explicitly indicate that descendant tables are included. + Optionally, a column list can be specified. See <xref + linkend="sql-createpublication"/> for details. </para> </listitem> </varlistentry> @@ -164,9 +172,15 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete'); </para> <para> - Add some tables to the publication: + Add tables to the publication: <programlisting> -ALTER PUBLICATION mypublication ADD TABLE users, departments; +ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments; +</programlisting></para> + + <para> + Change the set of columns published for a table: +<programlisting> +ALTER PUBLICATION mypublication ALTER TABLE users SET COLUMNS (user_id, firstname, lastname); </programlisting></para> <para> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index d805e8e77a..73a23cbb02 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase> - TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ] + TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ] ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ] </synopsis> </refsynopsisdiv> @@ -78,6 +78,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> publication, so they are never explicitly added to the publication. </para> + <para> + When a column list is specified, only the listed columns are replicated; + any other columns are ignored for the purpose of replication through + this publication. If no column list is specified, all columns of the + table are replicated through this publication, including any columns + added later. If a column list is specified, it must include the replica + identity columns. + </para> + <para> Only persistent base tables and partitioned tables can be part of a publication. Temporary tables, unlogged tables, foreign tables, diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 62f10bcbd2..322bfc2a82 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -45,13 +45,23 @@ #include "utils/rel.h" #include "utils/syscache.h" + +static void check_publication_columns(Relation targetrel, Bitmapset *columns); +static AttrNumber *publication_translate_columns(Relation targetrel, List *columns, + int *natts, Bitmapset **attset); + /* - * Check if relation can be in given publication and throws appropriate - * error if not. + * Check if relation can be in given publication and that the column + * filter is sensible, and throws appropriate error if not. + * + * targetcols is the bitmapset of column specified as column filter, or NULL if + * no column filter was specified. */ static void -check_publication_add_relation(Relation targetrel) +check_publication_add_relation(Relation targetrel, Bitmapset *columns) { + bool replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + /* Must be a regular or partitioned table */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) @@ -82,6 +92,62 @@ check_publication_add_relation(Relation targetrel) errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("This operation is not supported for unlogged tables."))); + + /* Make sure the column list checks out */ + if (columns != NULL) + { + /* + * Even if the user listed all columns in the column list, we cannot + * allow a column list to be specified when REPLICA IDENTITY is FULL; + * that would cause problems if a new column is added later, because + * that could would have to be included (because of being part of the + * replica identity) but it's technically not allowed (because of not + * being in the publication's column list yet). So reject this case + * altogether. + */ + if (replidentfull) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("invalid column list for publishing relation \"%s\"", + RelationGetRelationName(targetrel)), + errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL.")); + + check_publication_columns(targetrel, columns); + } +} + +/* + * Enforce that the column filter can only leave out columns that aren't + * forced to be sent. + * + * No column can be excluded if REPLICA IDENTITY is FULL (since all the + * columns need to be sent regardless); and in other cases, the columns in + * the REPLICA IDENTITY cannot be left out. + */ +static void +check_publication_columns(Relation targetrel, Bitmapset *columns) +{ + Bitmapset *idattrs; + int x; + + idattrs = RelationGetIndexAttrBitmap(targetrel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + /* + * We have to test membership the hard way, because the values returned + * by RelationGetIndexAttrBitmap are offset. + */ + x = -1; + while ((x = bms_next_member(idattrs, x)) >= 0) + { + if (!bms_is_member(x + FirstLowInvalidHeapAttributeNumber, columns)) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("invalid column list for publishing relation \"%s\"", + RelationGetRelationName(targetrel)), + errdetail("All columns in REPLICA IDENTITY must be present in the column list.")); + } + + bms_free(idattrs); } /* @@ -287,8 +353,11 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; Oid relid = RelationGetRelid(targetrel->relation); - Oid prrelid; + Oid pubreloid; Publication *pub = GetPublication(pubid); + Bitmapset *attset = NULL; + AttrNumber *attarray; + int natts = 0; ObjectAddress myself, referenced; List *relids = NIL; @@ -314,19 +383,35 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, RelationGetRelationName(targetrel->relation), pub->name))); } - check_publication_add_relation(targetrel->relation); + /* Translate column names to numbers and verify suitability */ + attarray = publication_translate_columns(targetrel->relation, + targetrel->columns, + &natts, &attset); + + check_publication_add_relation(targetrel->relation, attset); + + bms_free(attset); /* Form a tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); - prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId, - Anum_pg_publication_rel_oid); - values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid); + pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId, + Anum_pg_publication_rel_oid); + values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid); values[Anum_pg_publication_rel_prpubid - 1] = ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + if (targetrel->columns) + { + int2vector *prattrs; + + prattrs = buildint2vector(attarray, natts); + values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs); + } + else + nulls[Anum_pg_publication_rel_prattrs - 1] = true; tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -334,8 +419,16 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, CatalogTupleInsert(rel, tup); heap_freetuple(tup); - ObjectAddressSet(myself, PublicationRelRelationId, prrelid); + /* Register dependencies as needed */ + ObjectAddressSet(myself, PublicationRelRelationId, pubreloid); + /* Add dependency on the columns, if any are listed */ + for (int i = 0; i < natts; i++) + { + ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + } + pfree(attarray); /* Add dependency on the publication */ ObjectAddressSet(referenced, PublicationRelationId, pubid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); @@ -363,6 +456,132 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, return myself; } +void +publication_set_table_columns(Relation pubrel, HeapTuple pubreltup, + Relation targetrel, List *columns) +{ + Bitmapset *attset; + AttrNumber *attarray; + HeapTuple copytup; + int natts; + bool nulls[Natts_pg_publication_rel]; + bool replaces[Natts_pg_publication_rel]; + Datum values[Natts_pg_publication_rel]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + replaces[Anum_pg_publication_rel_prattrs - 1] = true; + + deleteDependencyRecordsForClass(PublicationRelationId, + ((Form_pg_publication_rel) GETSTRUCT(pubreltup))->oid, + RelationRelationId, + DEPENDENCY_AUTO); + + if (columns == NULL) + { + nulls[Anum_pg_publication_rel_prattrs - 1] = true; + } + else + { + ObjectAddress myself, + referenced; + int2vector *prattrs; + + attarray = publication_translate_columns(targetrel, columns, + &natts, &attset); + + /* make sure the column list checks out */ + /* XXX missing to check for the REPLICA IDENTITY FULL case */ + /* XXX this should occur at caller in publicationcmds.c, not here */ + check_publication_columns(targetrel, attset); + + prattrs = buildint2vector(attarray, natts); + values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs); + + /* Add dependencies on the new list of columns */ + ObjectAddressSet(myself, PublicationRelRelationId, + ((Form_pg_publication_rel) GETSTRUCT(pubreltup))->oid); + for (int i = 0; i < natts; i++) + { + ObjectAddressSubSet(referenced, RelationRelationId, + RelationGetRelid(targetrel), attarray[i]); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + } + } + + copytup = heap_modify_tuple(pubreltup, RelationGetDescr(pubrel), + values, nulls, replaces); + + CatalogTupleUpdate(pubrel, &pubreltup->t_self, copytup); + + heap_freetuple(copytup); +} + +/* + * Translate a list of column names to an array of attribute numbers + * and a Bitmapset with them; verify that each attribute is appropriate + * to have in a publication column list. Other checks are done later; + * see check_publication_columns. + * + * Note that the attribute numbers are *not* offset by + * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this + * is okay. + */ +static AttrNumber * +publication_translate_columns(Relation targetrel, List *columns, int *natts, + Bitmapset **attset) +{ + AttrNumber *attarray; + Bitmapset *set = NULL; + ListCell *lc; + int n = 0; + + /* + * Translate list of columns to attnums. We prohibit system attributes and + * make sure there are no duplicate columns. + * + */ + attarray = palloc(sizeof(AttrNumber) * list_length(columns)); + foreach(lc, columns) + { + char *colname = strVal(lfirst(lc)); + AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname); + + if (attnum == InvalidAttrNumber) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colname, RelationGetRelationName(targetrel))); + + if (!AttrNumberIsForUserDefinedAttr(attnum)) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot reference system column \"%s\" in publication column list", + colname)); + + if (bms_is_member(attnum, set)) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("duplicate column \"%s\" in publication column list", + colname)); + + set = bms_add_member(set, attnum); + attarray[n++] = attnum; + } + + /* + * XXX qsort the array here, or maybe build just the bitmapset above and + * then scan that in order to produce the array? Do we care about the + * array being unsorted? + */ + + *natts = n; + *attset = set; + return attarray; +} + /* * Insert new publication / schema mapping. */ @@ -470,6 +689,74 @@ GetRelationPublications(Oid relid) return result; } +/* + * Gets a list of OIDs of all column-partial publications of the given + * relation, that is, those that specify a column list. + */ +List * +GetRelationColumnPartialPublications(Oid relid) +{ + CatCList *pubrellist; + List *pubs = NIL; + + pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid)); + for (int i = 0; i < pubrellist->n_members; i++) + { + HeapTuple tup = &pubrellist->members[i]->tuple; + bool isnull; + + (void) SysCacheGetAttr(PUBLICATIONRELMAP, tup, + Anum_pg_publication_rel_prattrs, + &isnull); + if (isnull) + continue; + + pubs = lappend_oid(pubs, + ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid); + } + + ReleaseSysCacheList(pubrellist); + + return pubs; +} + +/* + * For a relation in a publication that is known to have a non-null column + * list, return the list of attribute numbers that are in it. + */ +List * +GetRelationColumnListInPublication(Oid relid, Oid pubid) +{ + HeapTuple tup; + Datum adatum; + bool isnull; + ArrayType *arr; + int nelems; + int16 *elems; + List *attnos = NIL; + + tup = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pubid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for rel %u of publication %u", relid, pubid); + adatum = SysCacheGetAttr(PUBLICATIONRELMAP, tup, + Anum_pg_publication_rel_prattrs, &isnull); + if (isnull) + elog(ERROR, "found unexpected null in pg_publication_rel.prattrs"); + arr = DatumGetArrayTypeP(adatum); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + for (int i = 0; i < nelems; i++) + attnos = lappend_oid(attnos, elems[i]); + + ReleaseSysCache(tup); + + return attnos; +} + /* * Gets list of relation oids for a publication. * diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 404bb5d0c8..aefae8b3c4 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -48,7 +48,7 @@ #include "utils/syscache.h" #include "utils/varlena.h" -static List *OpenReliIdList(List *relids); +static List *OpenRelIdList(List *relids); static List *OpenTableList(List *tables); static void CloseTableList(List *rels); static void LockSchemaList(List *schemalist); @@ -376,6 +376,46 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) return myself; } +/* + * Change the column list of a relation in a publication + */ +static void +PublicationSetColumns(AlterPublicationStmt *stmt, + Form_pg_publication pubform, PublicationTable *table) +{ + Relation rel, + urel; + HeapTuple tup; + ObjectAddress obj, + secondary; + + rel = table_open(PublicationRelRelationId, RowExclusiveLock); + urel = table_openrv(table->relation, ShareUpdateExclusiveLock); + + tup = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(RelationGetRelid(urel)), + ObjectIdGetDatum(pubform->oid)); + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + errmsg("relation \"%s\" is not already in publication \"%s\"", + table->relation->relname, + NameStr(pubform->pubname))); + + publication_set_table_columns(rel, tup, urel, table->columns); + + ObjectAddressSet(obj, PublicationRelationId, + ((Form_pg_publication_rel) GETSTRUCT(tup))->oid); + ObjectAddressSet(secondary, RelationRelationId, RelationGetRelid(urel)); + EventTriggerCollectSimpleCommand(obj, secondary, (Node *) stmt); + + ReleaseSysCache(tup); + + table_close(rel, RowExclusiveLock); + table_close(urel, NoLock); + + InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0); +} + /* * Change options of a publication. */ @@ -499,15 +539,16 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, Oid pubid = pubform->oid; /* - * It is quite possible that for the SET case user has not specified any - * tables in which case we need to remove all the existing tables. + * Nothing to do if no objects, except in SET: for that it is quite + * possible that user has not specified any schemas in which case we need + * to remove all the existing schemas. */ - if (!tables && stmt->action != DEFELEM_SET) + if (!tables && stmt->action != AP_SetObjects) return; rels = OpenTableList(tables); - if (stmt->action == DEFELEM_ADD) + if (stmt->action == AP_AddObjects) { List *schemas = NIL; @@ -520,9 +561,17 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, PUBLICATIONOBJ_TABLE); PublicationAddTables(pubid, rels, false, stmt); } - else if (stmt->action == DEFELEM_DROP) + else if (stmt->action == AP_DropObjects) PublicationDropTables(pubid, rels, false); - else /* DEFELEM_SET */ + else if (stmt->action == AP_SetColumns) + { + Assert(schemaidlist == NIL); + Assert(list_length(tables) == 1); + + PublicationSetColumns(stmt, pubform, + linitial_node(PublicationTable, tables)); + } + else /* AP_SetObjects */ { List *oldrelids = GetPublicationRelations(pubid, PUBLICATION_PART_ROOT); @@ -561,7 +610,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, pubrel = palloc(sizeof(PublicationRelInfo)); pubrel->relation = oldrel; - + /* This is not needed to delete a table */ + pubrel->columns = NIL; delrels = lappend(delrels, pubrel); } } @@ -593,10 +643,11 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); /* - * It is quite possible that for the SET case user has not specified any - * schemas in which case we need to remove all the existing schemas. + * Nothing to do if no objects, except in SET: for that it is quite + * possible that user has not specified any schemas in which case we need + * to remove all the existing schemas. */ - if (!schemaidlist && stmt->action != DEFELEM_SET) + if (!schemaidlist && stmt->action != AP_SetObjects) return; /* @@ -604,13 +655,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, * concurrent schema deletion. */ LockSchemaList(schemaidlist); - if (stmt->action == DEFELEM_ADD) + if (stmt->action == AP_AddObjects) { List *rels; List *reloids; reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); - rels = OpenReliIdList(reloids); + rels = OpenRelIdList(reloids); CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, PUBLICATIONOBJ_TABLE_IN_SCHEMA); @@ -618,9 +669,9 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, CloseTableList(rels); PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); } - else if (stmt->action == DEFELEM_DROP) + else if (stmt->action == AP_DropObjects) PublicationDropSchemas(pubform->oid, schemaidlist, false); - else /* DEFELEM_SET */ + else if (stmt->action == AP_SetObjects) { List *oldschemaids = GetPublicationSchemas(pubform->oid); List *delschemas = NIL; @@ -643,6 +694,10 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, */ PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); } + else + { + /* Nothing to do for AP_SetColumns */ + } } /* @@ -655,7 +710,7 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, { Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); - if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) && + if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) && schemaidlist && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -868,7 +923,7 @@ RemovePublicationSchemaById(Oid psoid) * add them to a publication. */ static List * -OpenReliIdList(List *relids) +OpenRelIdList(List *relids) { ListCell *lc; List *rels = NIL; @@ -932,6 +987,8 @@ OpenTableList(List *tables) pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + pub_rel->columns = t->columns; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); @@ -965,8 +1022,11 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); + pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + pub_rel->columns = t->columns; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); } @@ -1074,6 +1134,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) Relation rel = pubrel->relation; Oid relid = RelationGetRelid(rel); + if (pubrel->columns) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column list must not be specified in ALTER PUBLICATION ... DROP")); + prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 45e59e3d5c..a9051eb5e7 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -40,8 +40,8 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" -#include "catalog/pg_tablespace.h" #include "catalog/pg_statistic_ext.h" +#include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" #include "catalog/storage.h" @@ -8347,6 +8347,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, bool missing_ok, LOCKMODE lockmode, ObjectAddresses *addrs) { + Oid relid = RelationGetRelid(rel); HeapTuple tuple; Form_pg_attribute targetatt; AttrNumber attnum; @@ -8366,7 +8367,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, /* * get the number of the attribute */ - tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName); + tuple = SearchSysCacheAttName(relid, colName); if (!HeapTupleIsValid(tuple)) { if (!missing_ok) @@ -8420,13 +8421,42 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, ReleaseSysCache(tuple); + /* + * Also, if the column is used in the column list of a publication, + * disallow the drop if the DROP is RESTRICT. We don't do anything if the + * DROP is CASCADE, which means that the dependency mechanism will remove + * the relation from the publication. + */ + if (behavior == DROP_RESTRICT) + { + List *pubs; + ListCell *lc; + + pubs = GetRelationColumnPartialPublications(relid); + foreach(lc, pubs) + { + Oid pubid = lfirst_oid(lc); + List *published_cols; + + published_cols = + GetRelationColumnListInPublication(relid, pubid); + + if (list_member_oid(published_cols, attnum)) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot drop column \"%s\" because it is part of publication \"%s\"", + colName, get_publication_name(pubid, false)), + errhint("Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication.")); + } + } + /* * Propagate to children as appropriate. Unlike most other ALTER * routines, we have to do this one level of recursion at a time; we can't * use find_all_inheritors to do it in one pass. */ children = - find_inheritance_children(RelationGetRelid(rel), lockmode); + find_inheritance_children(relid, lockmode); if (children) { @@ -8514,7 +8544,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, /* Add object to delete */ object.classId = RelationRelationId; - object.objectId = RelationGetRelid(rel); + object.objectId = relid; object.objectSubId = attnum; add_exact_object_address(&object, addrs); @@ -15603,6 +15633,11 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode Oid indexOid; Relation indexRel; int key; + List *pubs; + Bitmapset *indexed_cols = NULL; + ListCell *lc; + + pubs = GetRelationColumnPartialPublications(RelationGetRelid(rel)); if (stmt->identity_type == REPLICA_IDENTITY_DEFAULT) { @@ -15611,11 +15646,16 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode } else if (stmt->identity_type == REPLICA_IDENTITY_FULL) { + if (pubs != NIL) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set REPLICA IDENTITY FULL when column-partial publications exist")); relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true); return; } else if (stmt->identity_type == REPLICA_IDENTITY_NOTHING) { + /* XXX not sure what's the right check for publications here */ relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true); return; } @@ -15626,7 +15666,6 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode else elog(ERROR, "unexpected identity type %u", stmt->identity_type); - /* Check that the index exists */ indexOid = get_relname_relid(stmt->name, rel->rd_rel->relnamespace); if (!OidIsValid(indexOid)) @@ -15701,6 +15740,38 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable", RelationGetRelationName(indexRel), NameStr(attr->attname)))); + + /* + * Collect columns used, in case we have any publications that we need + * to vet. System attributes are disallowed so no need to subtract + * FirstLowInvalidHeapAttributeNumber. + */ + indexed_cols = bms_add_member(indexed_cols, attno); + } + + /* + * Check column-partial publications. All publications have to include all + * key columns of the new index. + */ + foreach(lc, pubs) + { + Oid pubid = lfirst_oid(lc); + List *published_cols; + + published_cols = + GetRelationColumnListInPublication(RelationGetRelid(rel), pubid); + + for (key = 0; key < IndexRelationGetNumberOfKeyAttributes(indexRel); key++) + { + int16 attno = indexRel->rd_index->indkey.values[key]; + + if (!list_member_oid(published_cols, attno)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("index \"%s\" cannot be used because publication \"%s\" does not include all indexed columns", + RelationGetRelationName(indexRel), + get_publication_name(pubid, false))); + } } /* This index is suitable for use as a replica identity. Mark it. */ diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index df0b747883..0ff4c1ceac 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4833,6 +4833,7 @@ _copyPublicationTable(const PublicationTable *from) PublicationTable *newnode = makeNode(PublicationTable); COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(columns); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index cb7ddd463c..d786a688ac 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2312,6 +2312,7 @@ static bool _equalPublicationTable(const PublicationTable *a, const PublicationTable *b) { COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(columns); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 3d4dd43e47..68b1136788 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9742,12 +9742,13 @@ CreatePublicationStmt: * relation_expr here. */ PublicationObjSpec: - TABLE relation_expr + TABLE relation_expr opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_TABLE; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $2; + $$->pubtable->columns = $3; } | ALL TABLES IN_P SCHEMA ColId { @@ -9762,28 +9763,38 @@ PublicationObjSpec: $$->pubobjtype = PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA; $$->location = @5; } - | ColId + | ColId opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; - $$->name = $1; + if ($2 != NULL) + { + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVar(NULL, $1, @1); + $$->pubtable->columns = $2; + $$->name = NULL; + } + else + $$->name = $1; $$->location = @1; } - | ColId indirection + | ColId indirection opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->pubtable->columns = $3; $$->location = @1; } /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ - | extended_relation_expr + | extended_relation_expr opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $1; + $$->pubtable->columns = $2; } | CURRENT_SCHEMA { @@ -9809,6 +9820,9 @@ pub_obj_list: PublicationObjSpec * * ALTER PUBLICATION name SET pub_obj [, ...] * + * ALTER PUBLICATION name SET COLUMNS table_name (column[, ...]) + * ALTER PUBLICATION name SET COLUMNS table_name ALL + * * pub_obj is one of: * * TABLE table_name [, ...] @@ -9830,7 +9844,7 @@ AlterPublicationStmt: n->pubname = $3; n->pubobjects = $5; preprocess_pubobj_list(n->pubobjects, yyscanner); - n->action = DEFELEM_ADD; + n->action = AP_AddObjects; $$ = (Node *)n; } | ALTER PUBLICATION name SET pub_obj_list @@ -9839,16 +9853,42 @@ AlterPublicationStmt: n->pubname = $3; n->pubobjects = $5; preprocess_pubobj_list(n->pubobjects, yyscanner); - n->action = DEFELEM_SET; + n->action = AP_SetObjects; $$ = (Node *)n; } + | ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS '(' columnList ')' + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + PublicationObjSpec *obj = makeNode(PublicationObjSpec); + obj->pubobjtype = PUBLICATIONOBJ_TABLE; + obj->pubtable = makeNode(PublicationTable); + obj->pubtable->relation = $6; + obj->pubtable->columns = $10; + n->pubname = $3; + n->pubobjects = list_make1(obj); + n->action = AP_SetColumns; + $$ = (Node *) n; + } + | ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS ALL + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + PublicationObjSpec *obj = makeNode(PublicationObjSpec); + obj->pubobjtype = PUBLICATIONOBJ_TABLE; + obj->pubtable = makeNode(PublicationTable); + obj->pubtable->relation = $6; + obj->pubtable->columns = NIL; + n->pubname = $3; + n->pubobjects = list_make1(obj); + n->action = AP_SetColumns; + $$ = (Node *) n; + } | ALTER PUBLICATION name DROP pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; n->pubobjects = $5; preprocess_pubobj_list(n->pubobjects, yyscanner); - n->action = DEFELEM_DROP; + n->action = AP_DropObjects; $$ = (Node *)n; } ; @@ -17435,8 +17475,9 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) { /* convert it to PublicationTable */ PublicationTable *pubtable = makeNode(PublicationTable); - pubtable->relation = makeRangeVar(NULL, pubobj->name, - pubobj->location); + + pubtable->relation = + makeRangeVar(NULL, pubobj->name, pubobj->location); pubobj->pubtable = pubtable; pubobj->name = NULL; } @@ -17444,6 +17485,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_SCHEMA || pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA) { + /* + * This can happen if a column list is specified in a continuation + * for a schema entry; reject it. + */ + if (pubobj->pubtable) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column specification not allowed for schemas"), + parser_errposition(pubobj->location)); + /* * We can distinguish between the different type of schema * objects based on whether name and pubtable is set. diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9f5bf4b639..3428984130 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -29,9 +29,11 @@ #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) -static void logicalrep_write_attrs(StringInfo out, Relation rel); +static void logicalrep_write_attrs(StringInfo out, Relation rel, + Bitmapset *columns); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); + HeapTuple tuple, bool binary, + Bitmapset *columns); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -398,7 +400,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - HeapTuple newtuple, bool binary) + HeapTuple newtuple, bool binary, Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +412,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, columns); } /* @@ -442,7 +444,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + HeapTuple oldtuple, HeapTuple newtuple, bool binary, + Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -463,11 +466,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, columns); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, columns); } /* @@ -536,7 +539,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, NULL); } /* @@ -651,7 +654,8 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, + Bitmapset *columns) { char *relname; @@ -673,7 +677,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel); + logicalrep_write_attrs(out, rel, columns); } /* @@ -749,7 +753,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, + bool binary, Bitmapset *columns) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; @@ -761,7 +766,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar for (i = 0; i < desc->natts; i++) { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + + /* Don't count attributes that are not to be sent. */ + if (columns != NULL && !bms_is_member(att->attnum, columns)) continue; nliveatts++; } @@ -783,6 +794,10 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar if (att->attisdropped || att->attgenerated) continue; + /* Ignore attributes that are not to be sent. */ + if (columns != NULL && !bms_is_member(att->attnum, columns)) + continue; + if (isnull[i]) { pq_sendbyte(out, LOGICALREP_COLUMN_NULL); @@ -904,7 +919,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { TupleDesc desc; int i; @@ -914,20 +929,24 @@ logicalrep_write_attrs(StringInfo out, Relation rel) desc = RelationGetDescr(rel); - /* send number of live attributes */ - for (i = 0; i < desc->natts; i++) - { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) - continue; - nliveatts++; - } - pq_sendint16(out, nliveatts); - /* fetch bitmap of REPLICATION IDENTITY attributes */ replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); if (!replidentfull) idattrs = RelationGetIdentityKeyBitmap(rel); + /* send number of live attributes */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + if (columns != NULL && !bms_is_member(att->attnum, columns)) + continue; + nliveatts++; + } + pq_sendint16(out, nliveatts); + /* send the attributes */ for (i = 0; i < desc->natts; i++) { @@ -936,7 +955,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel) if (att->attisdropped || att->attgenerated) continue; - + if (columns != NULL && !bms_is_member(att->attnum, columns)) + continue; /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f07983a43c..1303e85851 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -111,6 +111,7 @@ #include "replication/origin.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -697,17 +698,20 @@ fetch_remote_table_info(char *nspname, char *relname, WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; - Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID}; + Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; bool isnull; int natt; + ListCell *lc; + bool am_partition = false; + Bitmapset *included_cols = NULL; lrel->nspname = nspname; lrel->relname = relname; /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" @@ -737,14 +741,18 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); Assert(!isnull); + am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull)); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); - /* Now fetch columns. */ + /* + * Now fetch column names and types. + */ resetStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT a.attname," + "SELECT a.attnum," + " a.attname," " a.atttypid," " a.attnum = ANY(i.indkey)" " FROM pg_catalog.pg_attribute a" @@ -772,16 +780,92 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; + /* + * In server versions 15 and higher, obtain the applicable column filter, + * if any. + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + WalRcvExecResult *pubres; + TupleTableSlot *slot; + Oid attrsRow[] = {INT2OID}; + StringInfoData publications; + bool first = true; + + initStringInfo(&publications); + foreach(lc, MySubscription->publications) + { + if (!first) + appendStringInfo(&publications, ", "); + appendStringInfoString(&publications, quote_literal_cstr(strVal(lfirst(lc)))); + first = false; + } + + resetStringInfo(&cmd); + appendStringInfo(&cmd, + " SELECT pg_catalog.unnest(prattrs)\n" + " FROM pg_catalog.pg_publication p JOIN\n" + " pg_catalog.pg_publication_rel pr ON (p.oid = pr.prpubid)\n" + " WHERE p.pubname IN (%s) AND\n", + publications.data); + if (!am_partition) + appendStringInfo(&cmd, "prrelid = %u", lrel->remoteid); + else + appendStringInfo(&cmd, + "prrelid IN (SELECT relid\n" + " FROM pg_catalog.pg_partition_tree(pg_catalog.pg_partition_root(%u)))", + lrel->remoteid); + + pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(attrsRow), attrsRow); + + if (pubres->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch attribute info for table \"%s.%s\" from publisher: %s", + nspname, relname, pubres->err))); + + slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) + { + AttrNumber attnum; + + attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull)); + if (isnull) + continue; + included_cols = bms_add_member(included_cols, attnum); + } + ExecDropSingleTupleTableSlot(slot); + pfree(publications.data); + walrcv_clear_result(pubres); + } + + /* + * Store the column names only if they are contained in column filter + * LogicalRepRelation will only contain attributes corresponding to those + * specficied in column filters. + */ natt = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = - TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + char *rel_colname; + AttrNumber attnum; + + attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull)); Assert(!isnull); - lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); + + if (included_cols != NULL && !bms_is_member(attnum, included_cols)) + continue; + + rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); - if (DatumGetBool(slot_getattr(slot, 3, &isnull))) + + lrel->attnames[natt] = rel_colname; + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + if (DatumGetBool(slot_getattr(slot, 4, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); /* Should never happen. */ @@ -791,12 +875,13 @@ fetch_remote_table_info(char *nspname, char *relname, ExecClearTuple(slot); } + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + pfree(cmd.data); lrel->natts = natt; - walrcv_clear_result(res); - pfree(cmd.data); } /* @@ -829,8 +914,17 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); if (lrel.relkind == RELKIND_RELATION) - appendStringInfo(&cmd, "COPY %s TO STDOUT", + { + appendStringInfo(&cmd, "COPY %s (", quote_qualified_identifier(lrel.nspname, lrel.relname)); + for (int i = 0; i < lrel.natts; i++) + { + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + if (i < lrel.natts - 1) + appendStringInfoString(&cmd, ", "); + } + appendStringInfo(&cmd, ") TO STDOUT"); + } else { /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 6f6a203dea..34df5d4956 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,16 +15,19 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel_d.h" #include "commands/defrem.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx); + LogicalDecodingContext *ctx, + Bitmapset *columns); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -130,6 +134,13 @@ typedef struct RelationSyncEntry * having identical TupleDesc. */ TupleConversionMap *map; + + /* + * Set of columns included in the publication, or NULL if all columns are + * included implicitly. Note that the attnums in this list are not + * shifted by FirstLowInvalidHeapAttributeNumber. + */ + Bitmapset *columns; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -570,11 +581,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, } MemoryContextSwitchTo(oldctx); - send_relation_and_attrs(ancestor, xid, ctx); + send_relation_and_attrs(ancestor, xid, ctx, relentry->columns); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx); + send_relation_and_attrs(relation, xid, ctx, relentry->columns); if (in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -587,7 +598,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, */ static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx) + LogicalDecodingContext *ctx, + Bitmapset *columns) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -610,13 +622,17 @@ send_relation_and_attrs(Relation relation, TransactionId xid, if (att->atttypid < FirstGenbkiObjectId) continue; + /* Skip if attribute is not present in column filter. */ + if (columns != NULL && !bms_is_member(att->attnum, columns)) + continue; + OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation); + logicalrep_write_rel(ctx->out, xid, relation, columns); OutputPluginWrite(ctx, false); } @@ -693,7 +709,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, relation, tuple, - data->binary); + data->binary, relentry->columns); OutputPluginWrite(ctx, true); break; } @@ -722,7 +738,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); + newtuple, data->binary, relentry->columns); OutputPluginWrite(ctx, true); break; } @@ -1122,6 +1138,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); bool found; + Oid ancestor_id; MemoryContext oldctx; Assert(RelationSyncCache != NULL); @@ -1142,6 +1159,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->publish_as_relid = InvalidOid; + entry->columns = NULL; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ } @@ -1182,6 +1200,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Publication *pub = lfirst(lc); bool publish = false; + bool ancestor_published = false; if (pub->alltables) { @@ -1192,8 +1211,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (!publish) { - bool ancestor_published = false; - /* * For a partition, check if any of the ancestors are * published. If so, note down the topmost ancestor that is @@ -1219,6 +1236,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) pub->oid)) { ancestor_published = true; + ancestor_id = ancestor; if (pub->pubviaroot) publish_as_relid = ancestor; } @@ -1239,15 +1257,47 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (publish && (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) { + Oid relid; + HeapTuple pub_rel_tuple; + + relid = ancestor_published ? ancestor_id : publish_as_relid; + pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pub->oid)); + + if (HeapTupleIsValid(pub_rel_tuple)) + { + Datum pub_rel_cols; + bool isnull; + + pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP, + pub_rel_tuple, + Anum_pg_publication_rel_prattrs, + &isnull); + if (!isnull) + { + ArrayType *arr; + int nelems; + int16 *elems; + + arr = DatumGetArrayTypeP(pub_rel_cols); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + /* XXX is there a danger of memory leak here? beware */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + for (int i = 0; i < nelems; i++) + entry->columns = bms_add_member(entry->columns, + elems[i]); + MemoryContextSwitchTo(oldctx); + } + ReleaseSysCache(pub_rel_tuple); + } entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; } list_free(pubids); @@ -1343,6 +1393,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) entry->schema_sent = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; + bms_free(entry->columns); + entry->columns = NULL; if (entry->map) { /* diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index b52f3ccda2..d98b1b50c4 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4034,6 +4034,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_oid; int i_prpubid; int i_prrelid; + int i_prattrs; int i, j, ntups; @@ -4045,8 +4046,13 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* Collect all publication membership info. */ appendPQExpBufferStr(query, - "SELECT tableoid, oid, prpubid, prrelid " - "FROM pg_catalog.pg_publication_rel"); + "SELECT tableoid, oid, prpubid, prrelid"); + if (fout->remoteVersion >= 150000) + appendPQExpBufferStr(query, ", prattrs"); + else + appendPQExpBufferStr(query, ", NULL as prattrs"); + appendPQExpBufferStr(query, + " FROM pg_catalog.pg_publication_rel"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); ntups = PQntuples(res); @@ -4055,6 +4061,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_oid = PQfnumber(res, "oid"); i_prpubid = PQfnumber(res, "prpubid"); i_prrelid = PQfnumber(res, "prrelid"); + i_prattrs = PQfnumber(res, "prattrs"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4096,6 +4103,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) pubrinfo[j].publication = pubinfo; pubrinfo[j].pubtable = tbinfo; + if (!PQgetisnull(res, i, i_prattrs)) + { + char **attnames; + int nattnames; + PQExpBuffer attribs; + + if (!parsePGArray(PQgetvalue(res, i, i_prattrs), + &attnames, &nattnames)) + fatal("could not parse %s array", "prattrs"); + attribs = createPQExpBuffer(); + for (int k = 0; k < nattnames; k++) + { + if (k > 0) + appendPQExpBufferStr(attribs, ", "); + + appendPQExpBufferStr(attribs, fmtId(attnames[k])); + } + pubrinfo[i].pubrattrs = attribs->data; + } + else + pubrinfo[j].pubrattrs = NULL; + /* Decide whether we want to dump it */ selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout); @@ -4160,10 +4189,12 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) query = createPQExpBuffer(); - appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY", + appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY ", fmtId(pubinfo->dobj.name)); - appendPQExpBuffer(query, " %s;\n", - fmtQualifiedDumpable(tbinfo)); + appendPQExpBufferStr(query, fmtQualifiedDumpable(tbinfo)); + if (pubrinfo->pubrattrs) + appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs); + appendPQExpBufferStr(query, ";\n"); /* * There is no point in creating a drop query as the drop is done by table diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index f011ace8a8..3f7500accc 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -630,6 +630,7 @@ typedef struct _PublicationRelInfo DumpableObject dobj; PublicationInfo *publication; TableInfo *pubtable; + char *pubrattrs; } PublicationRelInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index c28788e84f..b9d0ebf762 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5815,7 +5815,7 @@ listPublications(const char *pattern) */ static bool addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, - bool singlecol, printTableContent *cont) + bool as_schema, printTableContent *cont) { PGresult *res; int count = 0; @@ -5832,10 +5832,14 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, for (i = 0; i < count; i++) { - if (!singlecol) + if (!as_schema) /* as table */ + { printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0), PQgetvalue(res, i, 1)); - else + if (!PQgetisnull(res, i, 2)) + appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 2)); + } + else /* as schema */ printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); printTableAddFooter(cont, buf->data); @@ -5963,8 +5967,20 @@ describePublications(const char *pattern) { /* Get the tables for the specified publication */ printfPQExpBuffer(&buf, - "SELECT n.nspname, c.relname\n" - "FROM pg_catalog.pg_class c,\n" + "SELECT n.nspname, c.relname, \n"); + if (pset.sversion >= 150000) + appendPQExpBufferStr(&buf, + " CASE WHEN pr.prattrs IS NOT NULL THEN\n" + " pg_catalog.array_to_string" + "(ARRAY(SELECT attname\n" + " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::int[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n" + " ELSE NULL END AS columns"); + else + appendPQExpBufferStr(&buf, "NULL as columns"); + appendPQExpBuffer(&buf, + "\nFROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" " pg_catalog.pg_publication_rel pr\n" "WHERE c.relnamespace = n.oid\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index cf30239f6d..25c7c08040 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1648,6 +1648,8 @@ psql_completion(const char *text, int start, int end) /* ALTER PUBLICATION <name> ADD */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD")) COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE")) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL); /* ALTER PUBLICATION <name> DROP */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP")) COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 902f2f2f0d..edd4f0c63c 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -86,6 +86,7 @@ typedef struct Publication typedef struct PublicationRelInfo { Relation relation; + List *columns; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); @@ -109,6 +110,8 @@ typedef enum PublicationPartOpt } PublicationPartOpt; extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetRelationColumnPartialPublications(Oid relid); +extern List *GetRelationColumnListInPublication(Oid relid, Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); extern List *GetPublicationSchemas(Oid pubid); @@ -127,6 +130,8 @@ extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *tar bool if_not_exists); extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists); +extern void publication_set_table_columns(Relation pubrel, HeapTuple pubreltup, + Relation targetrel, List *columns); extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index b5d5504cbb..7ad285faae 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ +#ifdef CATALOG_VARLEN + int2vector prattrs; /* Variable length field starts here */ +#endif } FormData_pg_publication_rel; /* ---------------- diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 4c5a8a39bf..91ea815e14 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3642,6 +3642,7 @@ typedef struct PublicationTable { NodeTag type; RangeVar *relation; /* relation to be published */ + List *columns; /* List of columns in a publication table */ } PublicationTable; /* @@ -3674,6 +3675,14 @@ typedef struct CreatePublicationStmt bool for_all_tables; /* Special publication for all tables in db */ } CreatePublicationStmt; +typedef enum AlterPublicationAction +{ + AP_AddObjects, /* add objects to publication */ + AP_DropObjects, /* remove objects from publication */ + AP_SetObjects, /* set list of objects */ + AP_SetColumns /* change list of columns for a table */ +} AlterPublicationAction; + typedef struct AlterPublicationStmt { NodeTag type; @@ -3688,7 +3697,7 @@ typedef struct AlterPublicationStmt */ List *pubobjects; /* Optional list of publication objects */ bool for_all_tables; /* Special publication for all tables in db */ - DefElemAction action; /* What action to perform with the + AlterPublicationAction action; /* What action to perform with the * tables/schemas */ } AlterPublicationStmt; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 83741dcf42..7a5cb9871d 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, - bool binary); + bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary); + HeapTuple newtuple, bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); @@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in, extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel); + Relation rel, Bitmapset *columns); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 5ac2d666a2..9f540e1144 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -165,7 +165,24 @@ Publications: regress_publication_user | t | t | t | f | f | f (1 row) -DROP TABLE testpub_tbl2; +CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text); +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); -- error +ERROR: column "x" of relation "testpub_tbl5" does not exist +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); -- error +ERROR: invalid column list for publishing relation "testpub_tbl5" +DETAIL: All columns in REPLICA IDENTITY must be present in the column list. +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- ok +ALTER TABLE testpub_tbl5 DROP COLUMN c; +ERROR: cannot drop column "c" because it is part of publication "testpub_fortable" +HINT: Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication. +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5 (a); +ERROR: column list must not be specified in ALTER PUBLICATION ... DROP +CREATE TABLE testpub_tbl6 (a int, b text, c text); +ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); -- error +ERROR: invalid column list for publishing relation "testpub_tbl6" +DETAIL: Cannot have column filter on relations with REPLICA IDENTITY FULL. +DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6; DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema; CREATE TABLE testpub_tbl3 (a int); CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); @@ -669,6 +686,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes Tables from schemas: "pub_test1" +-- Verify that it fails to add a schema with a column specification +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); +ERROR: syntax error at or near "(" +LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); + ^ +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b); +ERROR: column specification not allowed for schemas +LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)... + ^ -- cleanup pub_test1 schema for invalidation tests ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 56dd358554..d82b034efd 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -89,7 +89,18 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall \d+ testpub_tbl2 \dRp+ testpub_foralltables -DROP TABLE testpub_tbl2; +CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text); +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); -- error +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); -- error +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- ok +ALTER TABLE testpub_tbl5 DROP COLUMN c; +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5 (a); + +CREATE TABLE testpub_tbl6 (a int, b text, c text); +ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); -- error + +DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6; DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema; CREATE TABLE testpub_tbl3 (a int); @@ -362,6 +373,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1; \dRp+ testpub1_forschema +-- Verify that it fails to add a schema with a column specification +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b); + -- cleanup pub_test1 schema for invalidation tests ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;