commit 3a2a00bad8d0050bed66ebc7972b0517dc164ae2
Author: Robert Haas <robertmhaas@gmail.com>
Date:   Fri Jun 18 12:20:20 2010 -0400

    Compact numeric format, with 2-byte header in common cases.

diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 5766a8b..d720922 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -234,9 +234,19 @@ static void dump_var(const char *str, NumericVar *var);
 
 #define init_var(v)		MemSetAligned(v, 0, sizeof(NumericVar))
 
-#define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data)
+/*
+ * If this is a "short" numeric, then the n_weight field is not present in
+ * the packed representation, and the data begins at that offset.  Otherwise
+ * that's still part of the header, and the data begins at n_data.
+ */
+#define NUMERIC_DIGITS(num) (NUMERIC_IS_SHORT(num) ? \
+	(NumericDigit *)&((num)->n_weight) : (NumericDigit *)(num)->n_data)
 #define NUMERIC_NDIGITS(num) \
-	((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit))
+	((VARSIZE(num) - NUMERIC_HEADER_SIZE(num)) / sizeof(NumericDigit))
+#define NUMERIC_CAN_BE_SHORT(scale,weight) \
+	((scale) <= NUMERIC_SHORT_DSCALE_MAX && \
+	(weight) <= NUMERIC_SHORT_WEIGHT_MAX && \
+	(weight) >= NUMERIC_SHORT_WEIGHT_MIN)
 
 static void alloc_var(NumericVar *var, int ndigits);
 static void free_var(NumericVar *var);
@@ -589,15 +599,23 @@ numeric		(PG_FUNCTION_ARGS)
 	/*
 	 * If the number is certainly in bounds and due to the target scale no
 	 * rounding could be necessary, just make a copy of the input and modify
-	 * its scale fields.  (Note we assume the existing dscale is honest...)
+	 * its scale fields, unless the larger scale forces us to abandon the
+	 * short representation.  (Note we assume the existing dscale is honest...)
 	 */
-	ddigits = (num->n_weight + 1) * DEC_DIGITS;
-	if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num))
+	ddigits = (NUMERIC_WEIGHT(num) + 1) * DEC_DIGITS;
+	if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num)
+		&& (NUMERIC_CAN_BE_SHORT(scale, NUMERIC_WEIGHT(num))
+		|| !NUMERIC_IS_SHORT(num)))
 	{
 		new = (Numeric) palloc(VARSIZE(num));
 		memcpy(new, num, VARSIZE(num));
-		new->n_sign_dscale = NUMERIC_SIGN(new) |
-			((uint16) scale & NUMERIC_DSCALE_MASK);
+		if (NUMERIC_IS_SHORT(num))
+			new->n_sign_dscale =
+				(num->n_sign_dscale & ~NUMERIC_SHORT_DSCALE_MASK)
+				| (scale << NUMERIC_SHORT_DSCALE_SHIFT);
+		else
+			new->n_sign_dscale = NUMERIC_SIGN(new) |
+				((uint16) scale & NUMERIC_DSCALE_MASK);
 		PG_RETURN_NUMERIC(new);
 	}
 
@@ -703,7 +721,10 @@ numeric_abs(PG_FUNCTION_ARGS)
 	res = (Numeric) palloc(VARSIZE(num));
 	memcpy(res, num, VARSIZE(num));
 
-	res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
+	if (NUMERIC_IS_SHORT(num))
+		res->n_sign_dscale = num->n_sign_dscale & ~NUMERIC_SHORT_SIGN_MASK;
+	else
+		res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
 
 	PG_RETURN_NUMERIC(res);
 }
@@ -732,10 +753,12 @@ numeric_uminus(PG_FUNCTION_ARGS)
 	 * we can identify a ZERO by the fact that there are no digits at all.	Do
 	 * nothing to a zero.
 	 */
-	if (VARSIZE(num) != NUMERIC_HDRSZ)
+	if (NUMERIC_NDIGITS(num) != 0)
 	{
 		/* Else, flip the sign */
-		if (NUMERIC_SIGN(num) == NUMERIC_POS)
+		if (NUMERIC_IS_SHORT(num))
+			res->n_sign_dscale = num->n_sign_dscale ^ NUMERIC_SHORT_SIGN_MASK;
+		else if (NUMERIC_SIGN(num) == NUMERIC_POS)
 			res->n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE(num);
 		else
 			res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
@@ -782,7 +805,7 @@ numeric_sign(PG_FUNCTION_ARGS)
 	 * The packed format is known to be totally zero digit trimmed always. So
 	 * we can identify a ZERO by the fact that there are no digits at all.
 	 */
-	if (VARSIZE(num) == NUMERIC_HDRSZ)
+	if (NUMERIC_NDIGITS(num) == 0)
 		set_var_from_var(&const_zero, &result);
 	else
 	{
@@ -1220,9 +1243,9 @@ cmp_numerics(Numeric num1, Numeric num2)
 	else
 	{
 		result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
-								num1->n_weight, NUMERIC_SIGN(num1),
+								NUMERIC_WEIGHT(num1), NUMERIC_SIGN(num1),
 								NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
-								num2->n_weight, NUMERIC_SIGN(num2));
+								NUMERIC_WEIGHT(num2), NUMERIC_SIGN(num2));
 	}
 
 	return result;
@@ -1239,12 +1262,13 @@ hash_numeric(PG_FUNCTION_ARGS)
 	int			end_offset;
 	int			i;
 	int			hash_len;
+	NumericDigit   *digits;
 
 	/* If it's NaN, don't try to hash the rest of the fields */
 	if (NUMERIC_IS_NAN(key))
 		PG_RETURN_UINT32(0);
 
-	weight = key->n_weight;
+	weight = NUMERIC_WEIGHT(key);
 	start_offset = 0;
 	end_offset = 0;
 
@@ -1254,9 +1278,10 @@ hash_numeric(PG_FUNCTION_ARGS)
 	 * zeros are suppressed, but we're paranoid. Note that we measure the
 	 * starting and ending offsets in units of NumericDigits, not bytes.
 	 */
+	digits = NUMERIC_DIGITS(key);
 	for (i = 0; i < NUMERIC_NDIGITS(key); i++)
 	{
-		if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
+		if (digits[i] != (NumericDigit) 0)
 			break;
 
 		start_offset++;
@@ -1277,7 +1302,7 @@ hash_numeric(PG_FUNCTION_ARGS)
 
 	for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
 	{
-		if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0)
+		if (digits[i] != (NumericDigit) 0)
 			break;
 
 		end_offset++;
@@ -2473,7 +2498,7 @@ numeric_avg(PG_FUNCTION_ARGS)
 
 	/* SQL92 defines AVG of no values to be NULL */
 	/* N is zero iff no digits (cf. numeric_uminus) */
-	if (VARSIZE(N) == NUMERIC_HDRSZ)
+	if (NUMERIC_NDIGITS(N) == 0)
 		PG_RETURN_NULL();
 
 	PG_RETURN_DATUM(DirectFunctionCall2(numeric_div,
@@ -2911,7 +2936,8 @@ dump_numeric(const char *str, Numeric num)
 
 	ndigits = NUMERIC_NDIGITS(num);
 
-	printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
+	printf("%s: NUMERIC w=%d d=%d ", str,
+		   NUMERIC_WEIGHT(num), NUMERIC_DSCALE(num));
 	switch (NUMERIC_SIGN(num))
 	{
 		case NUMERIC_POS:
@@ -3202,11 +3228,11 @@ set_var_from_num(Numeric num, NumericVar *dest)
 
 	alloc_var(dest, ndigits);
 
-	dest->weight = num->n_weight;
+	dest->weight = NUMERIC_WEIGHT(num);
 	dest->sign = NUMERIC_SIGN(num);
 	dest->dscale = NUMERIC_DSCALE(num);
 
-	memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit));
+	memcpy(dest->digits, NUMERIC_DIGITS(num), ndigits * sizeof(NumericDigit));
 }
 
 
@@ -3498,11 +3524,11 @@ make_result(NumericVar *var)
 
 	if (sign == NUMERIC_NAN)
 	{
-		result = (Numeric) palloc(NUMERIC_HDRSZ);
+		result = (Numeric) palloc(NUMERIC_HDRSZ_SHORT);
 
-		SET_VARSIZE(result, NUMERIC_HDRSZ);
-		result->n_weight = 0;
+		SET_VARSIZE(result, NUMERIC_HDRSZ_SHORT);
 		result->n_sign_dscale = NUMERIC_NAN;
+		/* short header, mustn't touch n_weight */
 
 		dump_numeric("make_result()", result);
 		return result;
@@ -3529,20 +3555,39 @@ make_result(NumericVar *var)
 	}
 
 	/* Build the result */
-	len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
-	result = (Numeric) palloc(len);
-	SET_VARSIZE(result, len);
-	result->n_weight = weight;
-	result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
+	if (NUMERIC_CAN_BE_SHORT(var->dscale, weight))
+	{
+		len = NUMERIC_HDRSZ_SHORT + n * sizeof(NumericDigit);
+		result = (Numeric) palloc(len);
+		SET_VARSIZE(result, len);
+		result->n_sign_dscale = 
+			(sign == NUMERIC_NEG ? (NUMERIC_SHORT | NUMERIC_SHORT_SIGN_MASK)
+				: NUMERIC_SHORT)
+			| (var->dscale << NUMERIC_SHORT_DSCALE_SHIFT)
+			| (weight < 0 ? NUMERIC_SHORT_WEIGHT_SIGN_MASK : 0)
+			| (weight & NUMERIC_SHORT_WEIGHT_MASK);
+		/* short header, mustn't touch result->n_weight */
+	}
+	else
+	{
+		len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
+		result = (Numeric) palloc(len);
+		SET_VARSIZE(result, len);
+		result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
+		result->n_weight = weight;
+	}
 
-	memcpy(result->n_data, digits, n * sizeof(NumericDigit));
+	memcpy(NUMERIC_DIGITS(result), digits, n * sizeof(NumericDigit));
+	Assert(NUMERIC_NDIGITS(result) == n);
 
 	/* Check for overflow of int16 fields */
-	if (result->n_weight != weight ||
+	if (NUMERIC_WEIGHT(result) != weight ||
 		NUMERIC_DSCALE(result) != var->dscale)
 		ereport(ERROR,
 				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
-				 errmsg("value overflows numeric format")));
+				 errmsg("value overflows numeric format %x w=%d s=%u",
+					result->n_sign_dscale,
+					NUMERIC_WEIGHT(result), NUMERIC_DSCALE(result))));
 
 	dump_numeric("make_result()", result);
 	return result;
diff --git a/src/include/utils/numeric.h b/src/include/utils/numeric.h
index 73c1ee1..bf903c1 100644
--- a/src/include/utils/numeric.h
+++ b/src/include/utils/numeric.h
@@ -44,12 +44,54 @@
 #define NUMERIC_SIGN_MASK	0xC000
 #define NUMERIC_POS			0x0000
 #define NUMERIC_NEG			0x4000
+#define NUMERIC_SHORT		0x8000
 #define NUMERIC_NAN			0xC000
-#define NUMERIC_DSCALE_MASK 0x3FFF
-#define NUMERIC_SIGN(n)		((n)->n_sign_dscale & NUMERIC_SIGN_MASK)
-#define NUMERIC_DSCALE(n)	((n)->n_sign_dscale & NUMERIC_DSCALE_MASK)
-#define NUMERIC_IS_NAN(n)	(NUMERIC_SIGN(n) != NUMERIC_POS &&	\
-							 NUMERIC_SIGN(n) != NUMERIC_NEG)
+
+#define NUMERIC_DSCALE_MASK 			0x3FFF
+
+/*
+ * If the high bits of n_sign_dscale are NUMERIC_SHORT, then the numeric will
+ * have only a 2-byte header instead of the usual 4-byte header.  The remaining
+ * 14 bits are allocated as follows: 1 for sign (positive or negative), 6 for
+ * dynamic scale, and 7 for weight.  In practice, most commonly encountered
+ * values can be represented this way.
+ *
+ * If the high bits of n_scale_dscale are NUMERIC_NAN, the two-byte header
+ * format is also used, but the low bits of n_scale_dscale are discarded in
+ * this case.
+ *
+ * Either way, when the two-byte header format is used, n_weight must not be
+ * referenced; if there are any NumericDigits they will begin at that offset
+ * rather than at the start of n_data.
+ */
+#define NUMERIC_SHORT_SIGN_MASK			0x2000
+#define NUMERIC_SHORT_DSCALE_MASK		0x1F80
+#define NUMERIC_SHORT_DSCALE_SHIFT		7
+#define NUMERIC_SHORT_DSCALE_MAX		\
+	(NUMERIC_SHORT_DSCALE_MASK >> NUMERIC_SHORT_DSCALE_SHIFT)
+#define NUMERIC_SHORT_WEIGHT_SIGN_MASK	0x0040
+#define NUMERIC_SHORT_WEIGHT_MASK		0x003F
+#define NUMERIC_SHORT_WEIGHT_MAX		NUMERIC_SHORT_WEIGHT_MASK
+#define NUMERIC_SHORT_WEIGHT_MIN		(-(NUMERIC_SHORT_WEIGHT_MASK+1))
+
+#define NUMERIC_FLAGBITS(n) ((n)->n_sign_dscale & NUMERIC_SIGN_MASK)
+#define NUMERIC_SIGN(n) \
+	(NUMERIC_IS_SHORT(n) ? (((n)->n_sign_dscale & NUMERIC_SHORT_SIGN_MASK) ? \
+		NUMERIC_NEG : NUMERIC_POS) : NUMERIC_FLAGBITS(n))
+#define NUMERIC_IS_NAN(n)		(NUMERIC_FLAGBITS(n) == NUMERIC_NAN)
+#define NUMERIC_IS_SHORT(n)		(NUMERIC_FLAGBITS(n) == NUMERIC_SHORT)
+#define NUMERIC_HEADER_SIZE(n) \
+	(VARHDRSZ + sizeof(uint16) + (NUMERIC_FLAGBITS(n) == NUMERIC_POS || \
+		NUMERIC_FLAGBITS(n) == NUMERIC_NEG ? sizeof(int16) : 0))
+#define NUMERIC_DSCALE(n)	(NUMERIC_IS_SHORT((n)) ? \
+	((n)->n_sign_dscale & NUMERIC_SHORT_DSCALE_MASK) \
+		>> NUMERIC_SHORT_DSCALE_SHIFT \
+	: ((n)->n_sign_dscale & NUMERIC_DSCALE_MASK))
+#define NUMERIC_WEIGHT(n)	(NUMERIC_IS_SHORT((n)) ? \
+	(((n)->n_sign_dscale & NUMERIC_SHORT_WEIGHT_SIGN_MASK ? \
+		~NUMERIC_SHORT_WEIGHT_MASK : 0) \
+	 | ((n)->n_sign_dscale & NUMERIC_SHORT_WEIGHT_MASK)) \
+	: ((n)->n_weight))
 
 
 /*
@@ -71,6 +113,7 @@ typedef struct NumericData
 typedef NumericData *Numeric;
 
 #define NUMERIC_HDRSZ	(VARHDRSZ + sizeof(uint16) + sizeof(int16))
+#define NUMERIC_HDRSZ_SHORT	(VARHDRSZ + sizeof(uint16))
 
 
 /*
