From de11b29d6dc29acc4685c2dc5d90cfea876f4577 Mon Sep 17 00:00:00 2001
From: Nikhil Kumar Veldanda <veldanda.nikhilkumar17@gmail.com>
Date: Sat, 19 Jul 2025 23:31:23 +0000
Subject: [PATCH v26 15/15] Implement Zstd compression (no dictionary support)

---
 contrib/amcheck/verify_heapam.c               |   1 +
 doc/src/sgml/catalogs.sgml                    |   1 +
 doc/src/sgml/config.sgml                      |  12 +-
 doc/src/sgml/ref/alter_table.sgml             |   8 +-
 doc/src/sgml/ref/create_table.sgml            |   7 +-
 src/backend/access/common/detoast.c           |  12 +-
 src/backend/access/common/toast_compression.c | 139 ++++++++++
 src/backend/access/common/toast_internals.c   |   4 +
 src/backend/utils/adt/varlena.c               |   3 +
 src/backend/utils/misc/guc_tables.c           |   3 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +-
 src/bin/pg_dump/pg_dump.c                     |   3 +
 src/bin/psql/describe.c                       |   5 +-
 src/bin/psql/tab-complete.in.c                |   2 +-
 src/include/access/toast_compression.h        |  33 ++-
 src/include/access/toast_internals.h          |  22 +-
 .../regress/expected/compression_zstd.out     | 249 ++++++++++++++++++
 .../regress/expected/compression_zstd_1.out   |   7 +
 src/test/regress/parallel_schedule            |   2 +-
 src/test/regress/sql/compression_zstd.sql     | 129 +++++++++
 20 files changed, 605 insertions(+), 39 deletions(-)
 create mode 100644 src/test/regress/expected/compression_zstd.out
 create mode 100644 src/test/regress/expected/compression_zstd_1.out
 create mode 100644 src/test/regress/sql/compression_zstd.sql

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 3a23dddcff4..e482ddc106a 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -1806,6 +1806,7 @@ check_tuple_attribute(HeapCheckContext *ctx)
 				/* List of all valid compression method IDs */
 			case TOAST_PGLZ_COMPRESSION_ID:
 			case TOAST_LZ4_COMPRESSION_ID:
+			case TOAST_ZSTD_COMPRESSION_ID:
 				valid = true;
 				break;
 
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 0d23bc1b122..ee0c1a1f185 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -1249,6 +1249,7 @@
        (see <xref linkend="guc-default-toast-compression"/>).  Otherwise,
        <literal>'p'</literal> selects pglz compression, while
        <literal>'l'</literal> selects <productname>LZ4</productname>
+       compression, and <literal>'z'</literal> selects <productname>ZSTD</productname>
        compression.  However, this field is ignored
        whenever <structfield>attstorage</structfield> does not allow
        compression.
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 93d948e9161..7d784147032 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3404,8 +3404,8 @@ include_dir 'conf.d'
         A compressed page image will be decompressed during WAL replay.
         The supported methods are <literal>pglz</literal>,
         <literal>lz4</literal> (if <productname>PostgreSQL</productname>
-        was compiled with <option>--with-lz4</option>) and
-        <literal>zstd</literal> (if <productname>PostgreSQL</productname>
+        was compiled with <option>--with-lz4</option>),
+        and <literal>zstd</literal> (if <productname>PostgreSQL</productname>
         was compiled with <option>--with-zstd</option>).
         The default value is <literal>off</literal>.
         Only superusers and users with the appropriate <literal>SET</literal>
@@ -9824,9 +9824,11 @@ COPY postgres_log FROM '/full/path/to/logfile.csv' WITH csv;
         the <literal>COMPRESSION</literal> column option in
         <command>CREATE TABLE</command> or
         <command>ALTER TABLE</command>.)
-        The supported compression methods are <literal>pglz</literal> and
-        (if <productname>PostgreSQL</productname> was compiled with
-        <option>--with-lz4</option>) <literal>lz4</literal>.
+        The supported compression methods are <literal>pglz</literal>,
+        <literal>lz4</literal> (if <productname>PostgreSQL</productname>
+        was compiled with <option>--with-lz4</option>),
+        and <literal>zstd</literal> (if <productname>PostgreSQL</productname>
+        was compiled with <option>--with-zstd</option>).
         The default is <literal>pglz</literal>.
        </para>
       </listitem>
diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml
index 1e4f26c13f6..d4bd847d416 100644
--- a/doc/src/sgml/ref/alter_table.sgml
+++ b/doc/src/sgml/ref/alter_table.sgml
@@ -443,10 +443,10 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       its existing compression method, rather than being recompressed with the
       compression method of the target column.
       The supported compression
-      methods are <literal>pglz</literal> and <literal>lz4</literal>.
-      (<literal>lz4</literal> is available only if <option>--with-lz4</option>
-      was used when building <productname>PostgreSQL</productname>.)  In
-      addition, <replaceable class="parameter">compression_method</replaceable>
+      methods are <literal>pglz</literal>,
+      <literal>lz4</literal> (if <productname>PostgreSQL</productname> was compiled with <option>--with-lz4</option>),
+      and <literal>zstd</literal> (if <productname>PostgreSQL</productname> was compiled with <option>--with-zstd</option>).
+      In addition, <replaceable class="parameter">compression_method</replaceable>
       can be <literal>default</literal>, which selects the default behavior of
       consulting the <xref linkend="guc-default-toast-compression"/> setting
       at the time of data insertion to determine the method to use.
diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index dc000e913c1..4fd68af2a09 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -343,10 +343,9 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       column storage modes.) Setting this property for a partitioned table
       has no direct effect, because such tables have no storage of their own,
       but the configured value will be inherited by newly-created partitions.
-      The supported compression methods are <literal>pglz</literal> and
-      <literal>lz4</literal>.  (<literal>lz4</literal> is available only if
-      <option>--with-lz4</option> was used when building
-      <productname>PostgreSQL</productname>.)  In addition,
+      The supported compression methods are <literal>pglz</literal>,
+      <literal>lz4</literal> (if <productname>PostgreSQL</productname> was compiled with <option>--with-lz4</option>),
+      and <literal>zstd</literal> (if <productname>PostgreSQL</productname> was compiled with <option>--with-zstd</option>). In addition,
       <replaceable class="parameter">compression_method</replaceable>
       can be <literal>default</literal> to explicitly specify the default
       behavior, which is to consult the
diff --git a/src/backend/access/common/detoast.c b/src/backend/access/common/detoast.c
index 34a3f7c6694..0b28a5e7365 100644
--- a/src/backend/access/common/detoast.c
+++ b/src/backend/access/common/detoast.c
@@ -247,10 +247,10 @@ detoast_attr_slice(struct varlena *attr,
 			 * Determine maximum amount of compressed data needed for a prefix
 			 * of a given length (after decompression).
 			 *
-			 * At least for now, if it's LZ4 data, we'll have to fetch the
-			 * whole thing, because there doesn't seem to be an API call to
-			 * determine how much compressed data we need to be sure of being
-			 * able to decompress the required slice.
+			 * At least for now, if it's LZ4 or Zstandard data, we'll have to
+			 * fetch the whole thing, because there doesn't seem to be an API
+			 * call to determine how much compressed data we need to be sure
+			 * of being able to decompress the required slice.
 			 */
 			if (toast_pointer.compression_method == TOAST_PGLZ_COMPRESSION_ID)
 				max_size = pglz_maximum_compressed_size(slicelimit, max_size);
@@ -491,6 +491,8 @@ toast_decompress_datum(struct varlena *attr)
 			return pglz_decompress_datum(attr);
 		case TOAST_LZ4_COMPRESSION_ID:
 			return lz4_decompress_datum(attr);
+		case TOAST_ZSTD_COMPRESSION_ID:
+			return zstd_decompress_datum(attr);
 		default:
 			elog(ERROR, "invalid compression method id %d", cmid);
 			return NULL;		/* keep compiler quiet */
@@ -534,6 +536,8 @@ toast_decompress_datum_slice(struct varlena *attr, int32 slicelength)
 			return pglz_decompress_datum_slice(attr, slicelength);
 		case TOAST_LZ4_COMPRESSION_ID:
 			return lz4_decompress_datum_slice(attr, slicelength);
+		case TOAST_ZSTD_COMPRESSION_ID:
+			return zstd_decompress_datum_slice(attr, slicelength);
 		default:
 			elog(ERROR, "invalid compression method id %d", cmid);
 			return NULL;		/* keep compiler quiet */
diff --git a/src/backend/access/common/toast_compression.c b/src/backend/access/common/toast_compression.c
index 94606a58c8f..56b4af251e1 100644
--- a/src/backend/access/common/toast_compression.c
+++ b/src/backend/access/common/toast_compression.c
@@ -17,6 +17,10 @@
 #include <lz4.h>
 #endif
 
+#ifdef USE_ZSTD
+#include <zstd.h>
+#endif
+
 #include "access/detoast.h"
 #include "access/toast_compression.h"
 #include "access/toast_external.h"
@@ -246,6 +250,132 @@ lz4_decompress_datum_slice(const struct varlena *value, int32 slicelength)
 #endif
 }
 
+/* Compress datum using ZSTD */
+struct varlena *
+zstd_compress_datum(const struct varlena *value)
+{
+#ifdef USE_ZSTD
+	uint32		valsize = VARSIZE_ANY_EXHDR(value);
+	size_t		max_size = ZSTD_compressBound(valsize);
+	struct varlena *compressed;
+	size_t		cmp_size;
+
+	/* Allocate space for the compressed varlena (header + data) */
+	compressed = (struct varlena *) palloc(max_size + VARHDRSZ_EXTENDED_COMPRESSED);
+
+	cmp_size = ZSTD_compress(VARDATA_EXTENDED_COMPRESSED(compressed),
+							 max_size,
+							 VARDATA_ANY(value),
+							 valsize,
+							 ZSTD_CLEVEL_DEFAULT);
+
+	if (ZSTD_isError(cmp_size))
+		elog(ERROR, "zstd compression failed");
+
+	/**
+	 *  If compression did not reduce size, return NULL so that the uncompressed data is stored
+	 */
+	if (cmp_size > valsize)
+	{
+		pfree(compressed);
+		return NULL;
+	}
+
+	/* Set the compressed size in the varlena header */
+	SET_VARSIZE_COMPRESSED(compressed, cmp_size + VARHDRSZ_EXTENDED_COMPRESSED);
+
+	return compressed;
+
+#else
+	NO_COMPRESSION_SUPPORT("zstd");
+	return NULL;
+#endif
+}
+
+/* Decompression routine */
+struct varlena *
+zstd_decompress_datum(const struct varlena *value)
+{
+#ifdef USE_ZSTD
+	uint32		actual_size_exhdr = VARDATA_COMPRESSED_GET_EXTSIZE(value);
+	uint32		cmplen;
+	struct varlena *result;
+	size_t		ucmplen;
+
+	cmplen = VARSIZE_ANY(value) - VARHDRSZ_EXTENDED_COMPRESSED;
+
+	/* Allocate space for the uncompressed data */
+	result = (struct varlena *) palloc(actual_size_exhdr + VARHDRSZ);
+
+	ucmplen = ZSTD_decompress(VARDATA(result),
+							  actual_size_exhdr,
+							  VARDATA_EXTENDED_COMPRESSED(value),
+							  cmplen);
+
+	if (ZSTD_isError(ucmplen))
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg_internal("compressed zstd data is corrupt")));
+
+	/* Set final size in the varlena header */
+	SET_VARSIZE(result, ucmplen + VARHDRSZ);
+	return result;
+
+#else
+	NO_COMPRESSION_SUPPORT("zstd");
+	return NULL;
+#endif
+}
+
+/* Decompress a slice of the datum */
+struct varlena *
+zstd_decompress_datum_slice(const struct varlena *value, int32 slicelength)
+{
+#ifdef USE_ZSTD
+	/* ZSTD no dictionary compression */
+
+	struct varlena *result;
+	ZSTD_inBuffer inBuf;
+	ZSTD_outBuffer outBuf;
+	size_t		ret;
+	ZSTD_DCtx  *zstdDctx = ZSTD_createDCtx();
+
+	if (!zstdDctx)
+		elog(ERROR, "could not create zstd decompression context");
+
+	inBuf.src = VARDATA_EXTENDED_COMPRESSED(value);
+	inBuf.size = VARSIZE_ANY(value) - VARHDRSZ_EXTENDED_COMPRESSED;
+	inBuf.pos = 0;
+
+	result = (struct varlena *) palloc(slicelength + VARHDRSZ);
+	outBuf.dst = (char *) result + VARHDRSZ;
+	outBuf.size = slicelength;
+	outBuf.pos = 0;
+
+	/* Common decompression loop */
+	while (inBuf.pos < inBuf.size && outBuf.pos < outBuf.size)
+	{
+		ret = ZSTD_decompressStream(zstdDctx, &outBuf, &inBuf);
+		if (ZSTD_isError(ret))
+		{
+			ZSTD_freeDCtx(zstdDctx);
+			ereport(ERROR,
+					(errcode(ERRCODE_DATA_CORRUPTED),
+					 errmsg_internal("compressed zstd data is corrupt")));
+		}
+	}
+
+	ZSTD_freeDCtx(zstdDctx);
+	Assert(outBuf.size == slicelength && outBuf.pos == slicelength);
+	SET_VARSIZE(result, outBuf.pos + VARHDRSZ);
+
+	return result;
+#else
+	NO_COMPRESSION_SUPPORT("zstd");
+	return NULL;
+#endif
+}
+
 /*
  * Extract compression ID from a varlena.
  *
@@ -287,6 +417,13 @@ CompressionNameToMethod(const char *compression)
 #endif
 		return TOAST_LZ4_COMPRESSION;
 	}
+	else if (strcmp(compression, "zstd") == 0)
+	{
+#ifndef USE_ZSTD
+		NO_COMPRESSION_SUPPORT("zstd");
+#endif
+		return TOAST_ZSTD_COMPRESSION;
+	}
 
 	return InvalidCompressionMethod;
 }
@@ -303,6 +440,8 @@ GetCompressionMethodName(char method)
 			return "pglz";
 		case TOAST_LZ4_COMPRESSION:
 			return "lz4";
+		case TOAST_ZSTD_COMPRESSION:
+			return "zstd";
 		default:
 			elog(ERROR, "invalid compression method %c", method);
 			return NULL;		/* keep compiler quiet */
diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
index 468aae64676..35be926b048 100644
--- a/src/backend/access/common/toast_internals.c
+++ b/src/backend/access/common/toast_internals.c
@@ -72,6 +72,10 @@ toast_compress_datum(Datum value, char cmethod)
 			tmp = lz4_compress_datum((const struct varlena *) value);
 			cmid = TOAST_LZ4_COMPRESSION_ID;
 			break;
+		case TOAST_ZSTD_COMPRESSION:
+			tmp = zstd_compress_datum((const struct varlena *) value);
+			cmid = TOAST_ZSTD_COMPRESSION_ID;
+			break;
 		default:
 			elog(ERROR, "invalid compression method %c", cmethod);
 	}
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 26c720449f7..3b4dd692d37 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -4204,6 +4204,9 @@ pg_column_compression(PG_FUNCTION_ARGS)
 		case TOAST_LZ4_COMPRESSION_ID:
 			result = "lz4";
 			break;
+		case TOAST_ZSTD_COMPRESSION_ID:
+			result = "zstd";
+			break;
 		default:
 			elog(ERROR, "invalid compression method id %d", cmid);
 	}
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index be523c9ac09..74c1fe424c4 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -461,6 +461,9 @@ static const struct config_enum_entry default_toast_compression_options[] = {
 	{"pglz", TOAST_PGLZ_COMPRESSION, false},
 #ifdef  USE_LZ4
 	{"lz4", TOAST_LZ4_COMPRESSION, false},
+#endif
+#ifdef  USE_ZSTD
+	{"zstd", TOAST_ZSTD_COMPRESSION, false},
 #endif
 	{NULL, 0, false}
 };
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 5f34b14ea39..0f1dc0dc05c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -752,7 +752,7 @@ autovacuum_worker_slots = 16	# autovacuum worker slots to allocate
 #row_security = on
 #default_table_access_method = 'heap'
 #default_tablespace = ''		# a tablespace name, '' uses the default
-#default_toast_compression = 'pglz'	# 'pglz' or 'lz4'
+#default_toast_compression = 'pglz'	# 'pglz' or 'lz4' or 'zstd'
 #default_toast_type = 'oid'		# 'oid' or 'int8'
 #temp_tablespaces = ''			# a list of tablespace names, '' uses
 					# only default tablespace
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7afb0d1a925..5a9353fe15d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -17692,6 +17692,9 @@ dumpTableSchema(Archive *fout, const TableInfo *tbinfo)
 					case 'l':
 						cmname = "lz4";
 						break;
+					case 'z':
+						cmname = "zstd";
+						break;
 					default:
 						cmname = NULL;
 						break;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index dd25d2fe7b8..e073f6766e8 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -2172,8 +2172,9 @@ describeOneTableDetails(const char *schemaname,
 			/* these strings are literal in our syntax, so not translated. */
 			printTableAddCell(&cont, (compression[0] == 'p' ? "pglz" :
 									  (compression[0] == 'l' ? "lz4" :
-									   (compression[0] == '\0' ? "" :
-										"???"))),
+									   (compression[0] == 'z' ? "zstd" :
+										(compression[0] == '\0' ? "" :
+										 "???")))),
 							  false, false);
 		}
 
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 37524364290..9032902a5c6 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2913,7 +2913,7 @@ match_previous_words(int pattern_id,
 	/* ALTER TABLE ALTER [COLUMN] <foo> SET COMPRESSION */
 	else if (Matches("ALTER", "TABLE", MatchAny, "ALTER", "COLUMN", MatchAny, "SET", "COMPRESSION") ||
 			 Matches("ALTER", "TABLE", MatchAny, "ALTER", MatchAny, "SET", "COMPRESSION"))
-		COMPLETE_WITH("DEFAULT", "PGLZ", "LZ4");
+		COMPLETE_WITH("DEFAULT", "PGLZ", "LZ4", "ZSTD");
 	/* ALTER TABLE ALTER [COLUMN] <foo> SET EXPRESSION */
 	else if (Matches("ALTER", "TABLE", MatchAny, "ALTER", "COLUMN", MatchAny, "SET", "EXPRESSION") ||
 			 Matches("ALTER", "TABLE", MatchAny, "ALTER", MatchAny, "SET", "EXPRESSION"))
diff --git a/src/include/access/toast_compression.h b/src/include/access/toast_compression.h
index 494e1b0dce6..accc4746a56 100644
--- a/src/include/access/toast_compression.h
+++ b/src/include/access/toast_compression.h
@@ -13,6 +13,10 @@
 #ifndef TOAST_COMPRESSION_H
 #define TOAST_COMPRESSION_H
 
+#ifdef USE_ZSTD
+#include <zstd.h>
+#endif
+
 /*
  * GUC support.
  *
@@ -23,22 +27,25 @@
 extern PGDLLIMPORT int default_toast_compression;
 
 /*
- * Built-in compression method ID.  The toast compression header will store
- * this in the first 2 bits of the raw length.  These built-in compression
- * method IDs are directly mapped to the built-in compression methods.
+ * Built-in compression method ID.
+ *
+ * For TOAST-compressed values:
+ *   - If using a non-extended method, the first 2 bits of the raw length
+ *		field store this ID.
+ *   - If using an extended method, it is stored in the extended 4-byte header.
  *
- * Don't use these values for anything other than understanding the meaning
- * of the raw bits from a varlena; in particular, if the goal is to identify
- * a compression method, use the constants TOAST_PGLZ_COMPRESSION, etc.
- * below. We might someday support more than 4 compression methods, but
- * we can never have more than 4 values in this enum, because there are
- * only 2 bits available in the places where this is stored.
+ * These IDs map directly to the built-in compression methods.
+ *
+ * Note: Do not use these values for anything other than interpreting the
+ * raw bits from a varlena. To identify a compression method in code, use
+ * the named constants (e.g., TOAST_PGLZ_COMPRESSION) instead.
  */
 typedef enum ToastCompressionId
 {
 	TOAST_PGLZ_COMPRESSION_ID = 0,
 	TOAST_LZ4_COMPRESSION_ID = 1,
-	TOAST_INVALID_COMPRESSION_ID = 2,
+	TOAST_ZSTD_COMPRESSION_ID = 2,
+	TOAST_INVALID_COMPRESSION_ID = 3,
 } ToastCompressionId;
 
 /*
@@ -48,6 +55,7 @@ typedef enum ToastCompressionId
  */
 #define TOAST_PGLZ_COMPRESSION			'p'
 #define TOAST_LZ4_COMPRESSION			'l'
+#define TOAST_ZSTD_COMPRESSION			'z'
 #define InvalidCompressionMethod		'\0'
 
 #define CompressionMethodIsValid(cm)  ((cm) != InvalidCompressionMethod)
@@ -71,6 +79,11 @@ extern struct varlena *lz4_decompress_datum(const struct varlena *value);
 extern struct varlena *lz4_decompress_datum_slice(const struct varlena *value,
 												  int32 slicelength);
 
+/* zstd nodict compression/decompression routines */
+extern struct varlena *zstd_compress_datum(const struct varlena *value);
+extern struct varlena *zstd_decompress_datum(const struct varlena *value);
+extern struct varlena *zstd_decompress_datum_slice(const struct varlena *value, int32 slicelength);
+
 /* other stuff */
 extern ToastCompressionId toast_get_compression_id(struct varlena *attr);
 extern char CompressionNameToMethod(const char *compression);
diff --git a/src/include/access/toast_internals.h b/src/include/access/toast_internals.h
index 27bc8a0b816..5fb8ca93fdd 100644
--- a/src/include/access/toast_internals.h
+++ b/src/include/access/toast_internals.h
@@ -21,13 +21,21 @@
  * Utilities for manipulation of header information for compressed
  * toast entries.
  */
-#define TOAST_COMPRESS_SET_SIZE_AND_COMPRESS_METHOD(ptr, len, cm_method) \
-	do { \
-		Assert((len) > 0 && (len) <= VARLENA_EXTSIZE_MASK); \
-		Assert((cm_method) == TOAST_PGLZ_COMPRESSION_ID || \
-			   (cm_method) == TOAST_LZ4_COMPRESSION_ID); \
-		((varattrib_4b *)(ptr))->va_compressed.va_tcinfo = \
-			((uint32)(len)) | ((uint32)(cm_method) << VARLENA_EXTSIZE_BITS); \
+#define TOAST_COMPRESS_SET_SIZE_AND_COMPRESS_METHOD(ptr, len, cm_method)														\
+	do {																														\
+		Assert((len) > 0 && (len) <= VARLENA_EXTSIZE_MASK);																		\
+		Assert((cm_method) == TOAST_PGLZ_COMPRESSION_ID ||																		\
+				(cm_method) == TOAST_LZ4_COMPRESSION_ID ||																		\
+				(cm_method) == TOAST_ZSTD_COMPRESSION_ID);																		\
+		if (!CompressionMethodIdIsExtended((cm_method)))																		\
+			((varattrib_4b *)(ptr))->va_compressed.va_tcinfo = ((uint32)(len)) | ((uint32)(cm_method) << VARLENA_EXTSIZE_BITS);	\
+		else																													\
+		{																														\
+			/* extended path: mark EXT flag in tcinfo */																		\
+			((varattrib_4b *)(ptr))->va_compressed_ext.va_tcinfo =																\
+				((uint32)(len)) | ((uint32)(VARATT_CE_FLAG) << VARLENA_EXTSIZE_BITS);											\
+			VARATT_CE_SET_COMPRESS_METHOD(((varattrib_4b *)(ptr))->va_compressed_ext.va_ecinfo, (cm_method));					\
+		}																														\
 	} while (0)
 
 extern Datum toast_compress_datum(Datum value, char cmethod);
diff --git a/src/test/regress/expected/compression_zstd.out b/src/test/regress/expected/compression_zstd.out
new file mode 100644
index 00000000000..166ba022541
--- /dev/null
+++ b/src/test/regress/expected/compression_zstd.out
@@ -0,0 +1,249 @@
+-- Tests for TOAST compression with zstd
+SELECT NOT(enumvals @> '{zstd}') AS skip_test FROM pg_settings WHERE
+  name = 'default_toast_compression' \gset
+\if :skip_test
+   \echo '*** skipping TOAST tests with zstd (not supported) ***'
+   \quit
+\endif
+CREATE SCHEMA zstd;
+SET search_path TO zstd, public;
+\set HIDE_TOAST_COMPRESSION false
+-- Ensure we get stable results regardless of the installation's default.
+-- We rely on this GUC value for a few tests.
+SET default_toast_compression = 'pglz';
+-- test creating table with compression method
+CREATE TABLE cmdata_pglz(f1 text COMPRESSION pglz);
+CREATE INDEX idx ON cmdata_pglz(f1);
+INSERT INTO cmdata_pglz VALUES(repeat('1234567890', 1000));
+\d+ cmdata
+CREATE TABLE cmdata_zstd(f1 TEXT COMPRESSION zstd);
+INSERT INTO cmdata_zstd VALUES(repeat('1234567890', 1004));
+\d+ cmdata1
+-- verify stored compression method in the data
+SELECT pg_column_compression(f1) FROM cmdata_zstd;
+ pg_column_compression 
+-----------------------
+ zstd
+(1 row)
+
+-- decompress data slice
+SELECT SUBSTR(f1, 200, 5) FROM cmdata_pglz;
+ substr 
+--------
+ 01234
+(1 row)
+
+SELECT SUBSTR(f1, 2000, 50) FROM cmdata_zstd;
+                       substr                       
+----------------------------------------------------
+ 01234567890123456789012345678901234567890123456789
+(1 row)
+
+-- copy with table creation
+SELECT * INTO cmmove1 FROM cmdata_zstd;
+\d+ cmmove1
+                                         Table "zstd.cmmove1"
+ Column | Type | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
+--------+------+-----------+----------+---------+----------+-------------+--------------+-------------
+ f1     | text |           |          |         | extended |             |              | 
+
+SELECT pg_column_compression(f1) FROM cmmove1;
+ pg_column_compression 
+-----------------------
+ zstd
+(1 row)
+
+-- test LIKE INCLUDING COMPRESSION.  The GUC default_toast_compression
+-- has no effect, the compression method from the table being copied.
+CREATE TABLE cmdata2 (LIKE cmdata_zstd INCLUDING COMPRESSION);
+\d+ cmdata2
+                                         Table "zstd.cmdata2"
+ Column | Type | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
+--------+------+-----------+----------+---------+----------+-------------+--------------+-------------
+ f1     | text |           |          |         | extended | zstd        |              | 
+
+DROP TABLE cmdata2;
+-- copy to existing table
+CREATE TABLE cmmove3(f1 text COMPRESSION pglz);
+INSERT INTO cmmove3 SELECT * FROM cmdata_pglz;
+INSERT INTO cmmove3 SELECT * FROM cmdata_zstd;
+SELECT pg_column_compression(f1) FROM cmmove3;
+ pg_column_compression 
+-----------------------
+ pglz
+ zstd
+(2 rows)
+
+-- update using datum from different table with zstd data.
+CREATE TABLE cmmove2(f1 text COMPRESSION pglz);
+INSERT INTO cmmove2 VALUES (repeat('1234567890', 1004));
+SELECT pg_column_compression(f1) FROM cmmove2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+UPDATE cmmove2 SET f1 = cmdata_zstd.f1 FROM cmdata_zstd;
+SELECT pg_column_compression(f1) FROM cmmove2;
+ pg_column_compression 
+-----------------------
+ zstd
+(1 row)
+
+-- test externally stored compressed data
+CREATE OR REPLACE FUNCTION large_val_zstd() RETURNS TEXT LANGUAGE SQL AS
+'select array_agg(fipshash(g::text))::text from generate_series(1, 256) g';
+CREATE TABLE cmdata2 (f1 text COMPRESSION zstd);
+INSERT INTO cmdata2 SELECT large_val_zstd() || repeat('a', 4000);
+SELECT pg_column_compression(f1) FROM cmdata2;
+ pg_column_compression 
+-----------------------
+ zstd
+(1 row)
+
+SELECT SUBSTR(f1, 200, 5) FROM cmdata2;
+ substr 
+--------
+ 79026
+(1 row)
+
+DROP TABLE cmdata2;
+DROP FUNCTION large_val_zstd;
+-- test compression with materialized view
+CREATE MATERIALIZED VIEW compressmv(x) AS SELECT * FROM cmdata_zstd;
+\d+ compressmv
+                                 Materialized view "zstd.compressmv"
+ Column | Type | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
+--------+------+-----------+----------+---------+----------+-------------+--------------+-------------
+ x      | text |           |          |         | extended |             |              | 
+View definition:
+ SELECT f1 AS x
+   FROM cmdata_zstd;
+
+SELECT pg_column_compression(f1) FROM cmdata_zstd;
+ pg_column_compression 
+-----------------------
+ zstd
+(1 row)
+
+SELECT pg_column_compression(x) FROM compressmv;
+ pg_column_compression 
+-----------------------
+ zstd
+(1 row)
+
+-- test compression with partition
+CREATE TABLE cmpart(f1 text COMPRESSION zstd) PARTITION BY HASH(f1);
+CREATE TABLE cmpart1 PARTITION OF cmpart FOR VALUES WITH (MODULUS 2, REMAINDER 0);
+CREATE TABLE cmpart2(f1 text COMPRESSION pglz);
+ALTER TABLE cmpart ATTACH PARTITION cmpart2 FOR VALUES WITH (MODULUS 2, REMAINDER 1);
+INSERT INTO cmpart VALUES (repeat('123456789', 1004));
+INSERT INTO cmpart VALUES (repeat('123456789', 4004));
+SELECT pg_column_compression(f1) FROM cmpart1;
+ pg_column_compression 
+-----------------------
+ zstd
+(1 row)
+
+SELECT pg_column_compression(f1) FROM cmpart2;
+ pg_column_compression 
+-----------------------
+ pglz
+(1 row)
+
+-- test compression with inheritance
+CREATE TABLE cminh() INHERITS(cmdata_pglz, cmdata_zstd); -- error
+NOTICE:  merging multiple inherited definitions of column "f1"
+ERROR:  column "f1" has a compression method conflict
+DETAIL:  pglz versus zstd
+CREATE TABLE cminh(f1 TEXT COMPRESSION zstd) INHERITS(cmdata_pglz); -- error
+NOTICE:  merging column "f1" with inherited definition
+ERROR:  column "f1" has a compression method conflict
+DETAIL:  pglz versus zstd
+CREATE TABLE cmdata3(f1 text);
+CREATE TABLE cminh() INHERITS (cmdata_pglz, cmdata3);
+NOTICE:  merging multiple inherited definitions of column "f1"
+-- test default_toast_compression GUC
+SET default_toast_compression = 'zstd';
+-- test alter compression method
+ALTER TABLE cmdata_pglz ALTER COLUMN f1 SET COMPRESSION zstd;
+INSERT INTO cmdata_pglz VALUES (repeat('123456789', 4004));
+\d+ cmdata
+SELECT pg_column_compression(f1) FROM cmdata_pglz;
+ pg_column_compression 
+-----------------------
+ pglz
+ zstd
+(2 rows)
+
+ALTER TABLE cmdata_pglz ALTER COLUMN f1 SET COMPRESSION pglz;
+-- test alter compression method for materialized views
+ALTER MATERIALIZED VIEW compressmv ALTER COLUMN x SET COMPRESSION zstd;
+\d+ compressmv
+                                 Materialized view "zstd.compressmv"
+ Column | Type | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
+--------+------+-----------+----------+---------+----------+-------------+--------------+-------------
+ x      | text |           |          |         | extended | zstd        |              | 
+View definition:
+ SELECT f1 AS x
+   FROM cmdata_zstd;
+
+-- test alter compression method for partitioned tables
+ALTER TABLE cmpart1 ALTER COLUMN f1 SET COMPRESSION pglz;
+ALTER TABLE cmpart2 ALTER COLUMN f1 SET COMPRESSION zstd;
+-- new data should be compressed with the current compression method
+INSERT INTO cmpart VALUES (repeat('123456789', 1004));
+INSERT INTO cmpart VALUES (repeat('123456789', 4004));
+SELECT pg_column_compression(f1) FROM cmpart1;
+ pg_column_compression 
+-----------------------
+ zstd
+ pglz
+(2 rows)
+
+SELECT pg_column_compression(f1) FROM cmpart2;
+ pg_column_compression 
+-----------------------
+ pglz
+ zstd
+(2 rows)
+
+-- test expression index
+CREATE TABLE cmdata2 (f1 TEXT COMPRESSION pglz, f2 TEXT COMPRESSION zstd);
+CREATE UNIQUE INDEX idx1 ON cmdata2 ((f1 || f2));
+INSERT INTO cmdata2 VALUES((SELECT array_agg(fipshash(g::TEXT))::TEXT FROM
+generate_series(1, 50) g), VERSION());
+-- check data is ok
+SELECT length(f1) FROM cmdata_pglz;
+ length 
+--------
+  10000
+  36036
+(2 rows)
+
+SELECT length(f1) FROM cmdata_zstd;
+ length 
+--------
+  10040
+(1 row)
+
+SELECT length(f1) FROM cmmove1;
+ length 
+--------
+  10040
+(1 row)
+
+SELECT length(f1) FROM cmmove2;
+ length 
+--------
+  10040
+(1 row)
+
+SELECT length(f1) FROM cmmove3;
+ length 
+--------
+  10000
+  10040
+(2 rows)
+
+\set HIDE_TOAST_COMPRESSION true
diff --git a/src/test/regress/expected/compression_zstd_1.out b/src/test/regress/expected/compression_zstd_1.out
new file mode 100644
index 00000000000..5f07342fd51
--- /dev/null
+++ b/src/test/regress/expected/compression_zstd_1.out
@@ -0,0 +1,7 @@
+-- Tests for TOAST compression with zstd
+SELECT NOT(enumvals @> '{zstd}') AS skip_test FROM pg_settings WHERE
+  name = 'default_toast_compression' \gset
+\if :skip_test
+   \echo '*** skipping TOAST tests with zstd (not supported) ***'
+*** skipping TOAST tests with zstd (not supported) ***
+   \quit
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index fbffc67ae60..1ef4797cd10 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -123,7 +123,7 @@ test: plancache limit plpgsql copy2 temp domain rangefuncs prepare conversion tr
 # The stats test resets stats, so nothing else needing stats access can be in
 # this group.
 # ----------
-test: partition_join partition_prune reloptions hash_part indexing partition_aggregate partition_info tuplesort explain compression compression_lz4 memoize stats predicate numa
+test: partition_join partition_prune reloptions hash_part indexing partition_aggregate partition_info tuplesort explain compression compression_lz4 compression_zstd memoize stats predicate numa
 
 # event_trigger depends on create_am and cannot run concurrently with
 # any test that runs DDL
diff --git a/src/test/regress/sql/compression_zstd.sql b/src/test/regress/sql/compression_zstd.sql
new file mode 100644
index 00000000000..134d1d44327
--- /dev/null
+++ b/src/test/regress/sql/compression_zstd.sql
@@ -0,0 +1,129 @@
+-- Tests for TOAST compression with zstd
+
+SELECT NOT(enumvals @> '{zstd}') AS skip_test FROM pg_settings WHERE
+  name = 'default_toast_compression' \gset
+\if :skip_test
+   \echo '*** skipping TOAST tests with zstd (not supported) ***'
+   \quit
+\endif
+
+CREATE SCHEMA zstd;
+SET search_path TO zstd, public;
+
+\set HIDE_TOAST_COMPRESSION false
+
+-- Ensure we get stable results regardless of the installation's default.
+-- We rely on this GUC value for a few tests.
+SET default_toast_compression = 'pglz';
+
+-- test creating table with compression method
+CREATE TABLE cmdata_pglz(f1 text COMPRESSION pglz);
+CREATE INDEX idx ON cmdata_pglz(f1);
+INSERT INTO cmdata_pglz VALUES(repeat('1234567890', 1000));
+\d+ cmdata
+CREATE TABLE cmdata_zstd(f1 TEXT COMPRESSION zstd);
+INSERT INTO cmdata_zstd VALUES(repeat('1234567890', 1004));
+\d+ cmdata1
+
+-- verify stored compression method in the data
+SELECT pg_column_compression(f1) FROM cmdata_zstd;
+
+-- decompress data slice
+SELECT SUBSTR(f1, 200, 5) FROM cmdata_pglz;
+SELECT SUBSTR(f1, 2000, 50) FROM cmdata_zstd;
+
+-- copy with table creation
+SELECT * INTO cmmove1 FROM cmdata_zstd;
+\d+ cmmove1
+SELECT pg_column_compression(f1) FROM cmmove1;
+
+-- test LIKE INCLUDING COMPRESSION.  The GUC default_toast_compression
+-- has no effect, the compression method from the table being copied.
+CREATE TABLE cmdata2 (LIKE cmdata_zstd INCLUDING COMPRESSION);
+\d+ cmdata2
+DROP TABLE cmdata2;
+
+-- copy to existing table
+CREATE TABLE cmmove3(f1 text COMPRESSION pglz);
+INSERT INTO cmmove3 SELECT * FROM cmdata_pglz;
+INSERT INTO cmmove3 SELECT * FROM cmdata_zstd;
+SELECT pg_column_compression(f1) FROM cmmove3;
+
+-- update using datum from different table with zstd data.
+CREATE TABLE cmmove2(f1 text COMPRESSION pglz);
+INSERT INTO cmmove2 VALUES (repeat('1234567890', 1004));
+SELECT pg_column_compression(f1) FROM cmmove2;
+UPDATE cmmove2 SET f1 = cmdata_zstd.f1 FROM cmdata_zstd;
+SELECT pg_column_compression(f1) FROM cmmove2;
+
+-- test externally stored compressed data
+CREATE OR REPLACE FUNCTION large_val_zstd() RETURNS TEXT LANGUAGE SQL AS
+'select array_agg(fipshash(g::text))::text from generate_series(1, 256) g';
+CREATE TABLE cmdata2 (f1 text COMPRESSION zstd);
+INSERT INTO cmdata2 SELECT large_val_zstd() || repeat('a', 4000);
+SELECT pg_column_compression(f1) FROM cmdata2;
+SELECT SUBSTR(f1, 200, 5) FROM cmdata2;
+DROP TABLE cmdata2;
+DROP FUNCTION large_val_zstd;
+
+-- test compression with materialized view
+CREATE MATERIALIZED VIEW compressmv(x) AS SELECT * FROM cmdata_zstd;
+\d+ compressmv
+SELECT pg_column_compression(f1) FROM cmdata_zstd;
+SELECT pg_column_compression(x) FROM compressmv;
+
+-- test compression with partition
+CREATE TABLE cmpart(f1 text COMPRESSION zstd) PARTITION BY HASH(f1);
+CREATE TABLE cmpart1 PARTITION OF cmpart FOR VALUES WITH (MODULUS 2, REMAINDER 0);
+CREATE TABLE cmpart2(f1 text COMPRESSION pglz);
+
+ALTER TABLE cmpart ATTACH PARTITION cmpart2 FOR VALUES WITH (MODULUS 2, REMAINDER 1);
+INSERT INTO cmpart VALUES (repeat('123456789', 1004));
+INSERT INTO cmpart VALUES (repeat('123456789', 4004));
+SELECT pg_column_compression(f1) FROM cmpart1;
+SELECT pg_column_compression(f1) FROM cmpart2;
+
+-- test compression with inheritance
+CREATE TABLE cminh() INHERITS(cmdata_pglz, cmdata_zstd); -- error
+CREATE TABLE cminh(f1 TEXT COMPRESSION zstd) INHERITS(cmdata_pglz); -- error
+CREATE TABLE cmdata3(f1 text);
+CREATE TABLE cminh() INHERITS (cmdata_pglz, cmdata3);
+
+-- test default_toast_compression GUC
+SET default_toast_compression = 'zstd';
+
+-- test alter compression method
+ALTER TABLE cmdata_pglz ALTER COLUMN f1 SET COMPRESSION zstd;
+INSERT INTO cmdata_pglz VALUES (repeat('123456789', 4004));
+\d+ cmdata
+SELECT pg_column_compression(f1) FROM cmdata_pglz;
+ALTER TABLE cmdata_pglz ALTER COLUMN f1 SET COMPRESSION pglz;
+
+-- test alter compression method for materialized views
+ALTER MATERIALIZED VIEW compressmv ALTER COLUMN x SET COMPRESSION zstd;
+\d+ compressmv
+
+-- test alter compression method for partitioned tables
+ALTER TABLE cmpart1 ALTER COLUMN f1 SET COMPRESSION pglz;
+ALTER TABLE cmpart2 ALTER COLUMN f1 SET COMPRESSION zstd;
+
+-- new data should be compressed with the current compression method
+INSERT INTO cmpart VALUES (repeat('123456789', 1004));
+INSERT INTO cmpart VALUES (repeat('123456789', 4004));
+SELECT pg_column_compression(f1) FROM cmpart1;
+SELECT pg_column_compression(f1) FROM cmpart2;
+
+-- test expression index
+CREATE TABLE cmdata2 (f1 TEXT COMPRESSION pglz, f2 TEXT COMPRESSION zstd);
+CREATE UNIQUE INDEX idx1 ON cmdata2 ((f1 || f2));
+INSERT INTO cmdata2 VALUES((SELECT array_agg(fipshash(g::TEXT))::TEXT FROM
+generate_series(1, 50) g), VERSION());
+
+-- check data is ok
+SELECT length(f1) FROM cmdata_pglz;
+SELECT length(f1) FROM cmdata_zstd;
+SELECT length(f1) FROM cmmove1;
+SELECT length(f1) FROM cmmove2;
+SELECT length(f1) FROM cmmove3;
+
+\set HIDE_TOAST_COMPRESSION true
-- 
2.47.1

