From c566463ced067650b2f03ef260d7b3144f452ae2 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Sun, 23 Sep 2018 14:43:01 +1200
Subject: [PATCH 3/6] Track collation versions for indexes.

Record the current version of dependent collations in pg_depend when
creating or rebuilding an index.  Whenever we load an index into the
relcache, check if the collation versions still match those reported
by the collation provider.  Warn that the index may be corrupted if
not.

Author: Thomas Munro
Reviewed-by:
Discussion: https://postgr.es/m/CAEepm%3D0uEQCpfq_%2BLYFBdArCe4Ot98t1aR4eYiYTe%3DyavQygiQ%40mail.gmail.com
---
 src/backend/catalog/dependency.c   | 57 ++++++++++++++++++++++
 src/backend/catalog/index.c        | 78 +++++++++++++++++++++++++++++-
 src/backend/utils/adt/pg_locale.c  | 21 ++++++++
 src/backend/utils/cache/relcache.c |  4 ++
 src/include/catalog/dependency.h   |  8 +++
 src/include/catalog/index.h        |  2 +
 src/include/utils/pg_locale.h      |  1 +
 7 files changed, 170 insertions(+), 1 deletion(-)

diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 67f58517a70..c674120eaf5 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -422,6 +422,63 @@ performMultipleDeletions(const ObjectAddresses *objects,
 	heap_close(depRel, RowExclusiveLock);
 }
 
+/*
+ * Call a function for all objects that depend on 'object'.  If the function
+ * returns a non-NULL pointer to a new version string, update the version.
+ */
+void
+visitDependentObjects(const ObjectAddress *object,
+					  VisitDependentObjectsFun callback,
+					  void *userdata)
+{
+	Relation	depRel;
+	ScanKeyData key[3];
+	SysScanDesc scan;
+	HeapTuple	tup;
+	ObjectAddress otherObject;
+
+	ScanKeyInit(&key[0],
+				Anum_pg_depend_classid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(object->classId));
+	ScanKeyInit(&key[1],
+				Anum_pg_depend_objid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(object->objectId));
+	ScanKeyInit(&key[2],
+				Anum_pg_depend_objsubid,
+				BTEqualStrategyNumber, F_INT4EQ,
+				Int32GetDatum(object->objectSubId));
+
+	depRel = heap_open(DependRelationId, RowExclusiveLock);
+	scan = systable_beginscan(depRel, DependDependerIndexId, true,
+							  NULL, 3, key);
+
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
+		NameData *new_version;
+
+		otherObject.classId = foundDep->refclassid;
+		otherObject.objectId = foundDep->refobjid;
+		otherObject.objectSubId = foundDep->refobjsubid;
+
+		new_version = callback(&otherObject, &foundDep->refobjversion,
+							   userdata);
+		if (new_version)
+		{
+			/* Make a modifyable copy. */
+			tup = heap_copytuple(tup);
+			foundDep = (Form_pg_depend) GETSTRUCT(tup);
+			foundDep->refobjversion = *new_version;
+			CatalogTupleUpdate(depRel, &tup->t_self, tup);
+			heap_freetuple(tup);
+		}
+	}
+	systable_endscan(scan);
+	heap_close(depRel, RowExclusiveLock);
+}
+
 /*
  * findDependentObjects - find all objects that depend on 'object'
  *
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 7eb3e351667..96a8facfb29 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -69,6 +69,7 @@
 #include "utils/guc.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_locale.h"
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
 #include "utils/syscache.h"
@@ -1099,11 +1100,15 @@ index_create(Relation heapRelation,
 			if (OidIsValid(collationObjectId[i]) &&
 				collationObjectId[i] != DEFAULT_COLLATION_OID)
 			{
+				NameData version;
+
 				referenced.classId = CollationRelationId;
 				referenced.objectId = collationObjectId[i];
 				referenced.objectSubId = 0;
+				get_collation_version_for_oid(referenced.objectId, &version);
 
-				recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+				recordDependencyOnVersion(&myself, &referenced, &version,
+										  DEPENDENCY_NORMAL);
 			}
 		}
 
@@ -1207,6 +1212,74 @@ index_create(Relation heapRelation,
 	return indexRelationId;
 }
 
+static NameData *
+index_check_collation_version(const ObjectAddress *otherObject,
+							  const NameData *version,
+							  void *userdata)
+{
+	Oid			relid = *(Oid *) userdata;
+	NameData	current_version;
+
+	/* We only care about dependencies on collations. */
+	if (otherObject->classId != CollationRelationId)
+		return NULL;
+
+	/* Compare with the current version. */
+	get_collation_version_for_oid(otherObject->objectId, &current_version);
+	if (strncmp(NameStr(*version),
+				NameStr(current_version),
+				sizeof(NameData)) != 0)
+		ereport(WARNING,
+				(errmsg("index \"%s\" depends on collation %u version \"%s\", but the current version is \"%s\"",
+						get_rel_name(relid),
+						otherObject->objectId,
+						NameStr(*version),
+						NameStr(current_version)),
+				 errdetail("The index may be corrupted due to changes in sort order."),
+				 errhint("REINDEX to avoid the risk of corruption.")));
+
+	return NULL;
+}
+
+void
+index_check_collation_versions(Oid relid)
+{
+	ObjectAddress	object;
+
+	object.classId = RelationRelationId;
+	object.objectId = relid;
+	object.objectSubId = 0;
+	visitDependentObjects(&object, &index_check_collation_version, &relid);
+}
+
+static NameData *
+index_update_collation_version(const ObjectAddress *otherObject,
+							   const NameData *version,
+							   void *userdata)
+{
+	NameData   *current_version = (NameData *) userdata;
+
+	/* We only care about dependencies on collations. */
+	if (otherObject->classId != CollationRelationId)
+		return NULL;
+
+	get_collation_version_for_oid(otherObject->objectId, current_version);
+	return current_version;
+}
+
+static void
+index_update_collation_versions(Oid relid)
+{
+	ObjectAddress	object;
+	NameData	current_version;
+
+	object.classId = RelationRelationId;
+	object.objectId = relid;
+	object.objectSubId = 0;
+	visitDependentObjects(&object, &index_update_collation_version,
+						  &current_version);
+}
+
 /*
  * index_constraint_create
  *
@@ -3803,6 +3876,9 @@ reindex_index(Oid indexId, bool skip_constraint_checks, char persistence,
 	/* Close rels, but keep locks */
 	index_close(iRel, NoLock);
 	heap_close(heapRelation, NoLock);
+
+	/* Record the current versions of all depended-on collations. */
+	index_update_collation_versions(indexId);
 }
 
 /*
diff --git a/src/backend/utils/adt/pg_locale.c b/src/backend/utils/adt/pg_locale.c
index ddfe85e5940..da651344e14 100644
--- a/src/backend/utils/adt/pg_locale.c
+++ b/src/backend/utils/adt/pg_locale.c
@@ -1443,6 +1443,27 @@ get_collation_actual_version(char collprovider, const char *collcollate)
 	return collversion;
 }
 
+/*
+ * Get provider-specific collation version string given a collatoin OID.
+ */
+void
+get_collation_version_for_oid(Oid oid, NameData *output)
+{
+	HeapTuple	tp;
+	Form_pg_collation collform;
+	const char *version;
+
+	tp = SearchSysCache1(COLLOID, ObjectIdGetDatum(oid));
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup failed for collation %u", oid);
+	collform = (Form_pg_collation) GETSTRUCT(tp);
+	version = get_collation_actual_version(collform->collprovider,
+										   NameStr(collform->collcollate));
+	memset(output, 0, sizeof(NameData));
+	if (version)
+		strncpy(NameStr(*output), version, sizeof(NameData));
+	ReleaseSysCache(tp);
+}
 
 #ifdef USE_ICU
 /*
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index a4fc0011031..b58d4f2af6d 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1452,6 +1452,10 @@ RelationInitIndexAccessInfo(Relation relation)
 	indcoll = (oidvector *) DatumGetPointer(indcollDatum);
 	memcpy(relation->rd_indcollation, indcoll->values, indnkeyatts * sizeof(Oid));
 
+	/* Warn if any dependent collations' versions have moved. */
+	if (!IsCatalogRelation(relation))
+		index_check_collation_versions(RelationGetRelid(relation));
+
 	/*
 	 * indclass cannot be referenced directly through the C struct, because it
 	 * comes after the variable-width indkey field.  Must extract the datum
diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h
index 6ae4932cfba..66befde7568 100644
--- a/src/include/catalog/dependency.h
+++ b/src/include/catalog/dependency.h
@@ -227,6 +227,14 @@ extern void record_object_address_dependencies(const ObjectAddress *depender,
 
 extern void free_object_addresses(ObjectAddresses *addrs);
 
+typedef NameData *(*VisitDependentObjectsFun)(const ObjectAddress *otherObject,
+											  const NameData *version,
+											  void *userdata);
+
+extern void visitDependentObjects(const ObjectAddress *object,
+								  VisitDependentObjectsFun callback,
+								  void *userdata);
+
 /* in pg_depend.c */
 
 extern void recordDependencyOn(const ObjectAddress *depender,
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index f20c5f789b1..e232dccaef0 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -103,6 +103,8 @@ extern void FormIndexDatum(IndexInfo *indexInfo,
 			   Datum *values,
 			   bool *isnull);
 
+extern void index_check_collation_versions(Oid relid);
+
 extern void index_build(Relation heapRelation,
 			Relation indexRelation,
 			IndexInfo *indexInfo,
diff --git a/src/include/utils/pg_locale.h b/src/include/utils/pg_locale.h
index 88a3134862f..9c05374c32c 100644
--- a/src/include/utils/pg_locale.h
+++ b/src/include/utils/pg_locale.h
@@ -103,6 +103,7 @@ typedef struct pg_locale_struct *pg_locale_t;
 extern pg_locale_t pg_newlocale_from_collation(Oid collid);
 
 extern char *get_collation_actual_version(char collprovider, const char *collcollate);
+extern void get_collation_version_for_oid(Oid collid, NameData *output);
 
 #ifdef USE_ICU
 extern int32_t icu_to_uchar(UChar **buff_uchar, const char *buff, size_t nbytes);
-- 
2.17.0

