This patch allows using text position search functions with
nondeterministic collations. These functions are
- position, strpos
- replace
- split_part
- string_to_array
- string_to_table
which all use common internal infrastructure.
(This complements the patch "Support LIKE with nondeterministic
collations" but is independent.)
Some exploratory testing could be useful here. The present test
coverage was already quite helpful during development, but there is
always the possibility that something was overlooked.
From 699240b1d9ef64bfb52b8636965213ac5586de9f Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Mon, 26 Aug 2024 08:02:54 +0200
Subject: [PATCH v1] Support POSITION with nondeterministic collations
This allows using text position search functions with nondeterministic
collations. These functions are
- position, strpos
- replace
- split_part
- string_to_array
- string_to_table
which all use common internal infrastructure.
There was previously no internal implementation of this, so it was met
with a not-supported error. This adds the internal implementation and
removes the error.
Unlike with deterministic collations, the search cannot use any
byte-by-byte optimized techniques but has to go substring by
substring. We also need to consider that the found match could have a
different length than the needle and that there could be substrings of
different length matching at a position. In most cases, we need to
find the longest such substring (greedy semantics).
---
src/backend/utils/adt/varlena.c | 105 +++++++++++---
.../regress/expected/collate.icu.utf8.out | 136 +++++++++++++++---
src/test/regress/sql/collate.icu.utf8.sql | 32 ++++-
3 files changed, 236 insertions(+), 37 deletions(-)
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 7c6391a2760..5e72cdcdd27 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -54,7 +54,9 @@ typedef struct varlena VarString;
*/
typedef struct
{
+ pg_locale_t locale; /* collation used for substring
matching */
bool is_multibyte_char_in_char; /* need to check char
boundaries? */
+ bool greedy; /* find longest possible
substring? */
char *str1; /* haystack string */
char *str2; /* needle string */
@@ -65,7 +67,13 @@ typedef struct
int skiptablemask; /* mask for ANDing with
skiptable subscripts */
int skiptable[256]; /* skip distance for given
mismatched char */
+ /*
+ * Note that with nondeterministic collations, the length of the last
+ * match is not necessarily equal to the length of the "needle" passed
in.
+ */
char *last_match; /* pointer to last match in 'str1' */
+ int last_match_len; /* length of last match */
+ int last_match_len_tmp; /* same but for internal
use */
/*
* Sometimes we need to convert the byte position of a match to a
@@ -1178,15 +1186,21 @@ text_position(text *t1, text *t2, Oid collid)
TextPositionState state;
int result;
+ check_collation_set(collid);
+
/* Empty needle always matches at position 1 */
if (VARSIZE_ANY_EXHDR(t2) < 1)
return 1;
/* Otherwise, can't match if haystack is shorter than needle */
- if (VARSIZE_ANY_EXHDR(t1) < VARSIZE_ANY_EXHDR(t2))
+ if (VARSIZE_ANY_EXHDR(t1) < VARSIZE_ANY_EXHDR(t2) &&
+ pg_newlocale_from_collation(collid)->deterministic)
return 0;
text_position_setup(t1, t2, collid, &state);
+ /* don't need greedy mode here */
+ state.greedy = false;
+
if (!text_position_next(&state))
result = 0;
else
@@ -1217,18 +1231,17 @@ text_position_setup(text *t1, text *t2, Oid collid,
TextPositionState *state)
{
int len1 = VARSIZE_ANY_EXHDR(t1);
int len2 = VARSIZE_ANY_EXHDR(t2);
- pg_locale_t mylocale;
check_collation_set(collid);
- mylocale = pg_newlocale_from_collation(collid);
+ state->locale = pg_newlocale_from_collation(collid);
- if (!pg_locale_deterministic(mylocale))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("nondeterministic collations are not
supported for substring searches")));
+ /*
+ * Most callers need greedy mode, but some might want to unset this to
+ * optimize.
+ */
+ state->greedy = true;
- Assert(len1 > 0);
Assert(len2 > 0);
/*
@@ -1264,8 +1277,11 @@ text_position_setup(text *t1, text *t2, Oid collid,
TextPositionState *state)
* point in wasting cycles initializing the table. We also choose not
to
* use B-M-H for needles of length 1, since the skip table can't
possibly
* save anything in that case.
+ *
+ * (With nondeterministic collations, the search was already
+ * multibyte-aware, so we don't need this.)
*/
- if (len1 >= len2 && len2 > 1)
+ if (len1 >= len2 && len2 > 1 && state->locale->deterministic)
{
int searchlength = len1 - len2;
int skiptablemask;
@@ -1343,7 +1359,7 @@ text_position_next(TextPositionState *state)
/* Start from the point right after the previous match. */
if (state->last_match)
- start_ptr = state->last_match + needle_len;
+ start_ptr = state->last_match + state->last_match_len;
else
start_ptr = state->str1;
@@ -1359,7 +1375,7 @@ text_position_next(TextPositionState *state)
* multi-byte character, we need to verify that the match was at a
* character boundary, not in the middle of a multi-byte character.
*/
- if (state->is_multibyte_char_in_char)
+ if (state->is_multibyte_char_in_char && state->locale->deterministic)
{
/* Walk one character at a time, until we reach the match. */
@@ -1387,6 +1403,7 @@ text_position_next(TextPositionState *state)
}
state->last_match = matchptr;
+ state->last_match_len = state->last_match_len_tmp;
return true;
}
@@ -1408,7 +1425,63 @@ text_position_next_internal(char *start_ptr,
TextPositionState *state)
Assert(start_ptr >= haystack && start_ptr <= haystack_end);
- if (needle_len == 1)
+ state->last_match_len_tmp = needle_len;
+
+ if (!state->locale->deterministic)
+ {
+ /*
+ * With a nondeterministic collation, we have to use an
unoptimized
+ * route. We walk through the haystack and see if at each
position
+ * there is a substring of the remaining string that is equal
to the
+ * needle under the given collation.
+ *
+ * Note, the found substring could have a different length than
the
+ * needle, including being empty. Callers that want to skip
over the
+ * found string need to read the length of the found substring
from
+ * last_match_len rather than just using the length of their
needle.
+ *
+ * Most callers will require "greedy" semantics, meaning that
we need
+ * to find the longest such substring, not the shortest. For
callers
+ * don't don't need greedy semantics, we can finish on the first
+ * match.
+ */
+ const char *result_hptr = NULL;
+
+ hptr = start_ptr;
+ while (hptr < haystack_end)
+ {
+ /*
+ * First check the common case that there is a match in
the
+ * haystack of exactly the length of the needle.
+ */
+ if (!state->greedy &&
+ haystack_end - hptr >= needle_len &&
+ pg_strncoll(hptr, needle_len, needle,
needle_len, state->locale) == 0)
+ return (char *) hptr;
+
+ /*
+ * Else check if any of the possible substrings
starting at hptr
+ * are equal to the needle.
+ */
+ for (const char *test_end = hptr; test_end <
haystack_end; test_end += pg_mblen(test_end))
+ {
+ if (pg_strncoll(hptr, (test_end - hptr),
needle, needle_len, state->locale) == 0)
+ {
+ state->last_match_len_tmp = (test_end -
hptr);
+ result_hptr = hptr;
+ if (!state->greedy)
+ break;
+ }
+ }
+ if (result_hptr)
+ break;
+
+ hptr += pg_mblen(hptr);
+ }
+
+ return (char *) result_hptr;
+ }
+ else if (needle_len == 1)
{
/* No point in using B-M-H for a one-character needle */
char nchar = *needle;
@@ -4022,7 +4095,7 @@ replace_text(PG_FUNCTION_ARGS)
appendStringInfoText(&str, to_sub_text);
- start_ptr = curr_ptr + from_sub_text_len;
+ start_ptr = curr_ptr + state.last_match_len;
found = text_position_next(&state);
if (found)
@@ -4412,7 +4485,7 @@ split_part(PG_FUNCTION_ARGS)
/* special case of last field does not require an extra pass */
if (fldnum == -1)
{
- start_ptr = text_position_get_match_ptr(&state) +
fldsep_len;
+ start_ptr = text_position_get_match_ptr(&state) +
state.last_match_len;
end_ptr = VARDATA_ANY(inputstring) + inputstring_len;
text_position_cleanup(&state);
PG_RETURN_TEXT_P(cstring_to_text_with_len(start_ptr,
@@ -4442,7 +4515,7 @@ split_part(PG_FUNCTION_ARGS)
while (found && --fldnum > 0)
{
/* identify bounds of next field */
- start_ptr = end_ptr + fldsep_len;
+ start_ptr = end_ptr + state.last_match_len;
found = text_position_next(&state);
if (found)
end_ptr = text_position_get_match_ptr(&state);
@@ -4658,7 +4731,7 @@ split_text(FunctionCallInfo fcinfo, SplitTextOutputData
*tstate)
if (!found)
break;
- start_ptr = end_ptr + fldsep_len;
+ start_ptr = end_ptr + state.last_match_len;
}
text_position_cleanup(&state);
diff --git a/src/test/regress/expected/collate.icu.utf8.out
b/src/test/regress/expected/collate.icu.utf8.out
index 31345295c11..319e3eea476 100644
--- a/src/test/regress/expected/collate.icu.utf8.out
+++ b/src/test/regress/expected/collate.icu.utf8.out
@@ -1276,26 +1276,96 @@ CREATE COLLATION ctest_nondet (provider = icu, locale =
'', deterministic = fals
NOTICE: using standard form "und" for ICU locale ""
CREATE TABLE test6 (a int, b text);
-- same string in different normal forms
-INSERT INTO test6 VALUES (1, U&'\00E4bc');
-INSERT INTO test6 VALUES (2, U&'\0061\0308bc');
+INSERT INTO test6 VALUES (1, U&'zy\00E4bc');
+INSERT INTO test6 VALUES (2, U&'zy\0061\0308bc');
SELECT * FROM test6;
- a | b
----+-----
- 1 | äbc
- 2 | äbc
+ a | b
+---+-------
+ 1 | zyäbc
+ 2 | zyäbc
(2 rows)
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_det;
- a | b
----+-----
- 1 | äbc
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_det;
+ a | b
+---+-------
+ 1 | zyäbc
(1 row)
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_nondet;
- a | b
----+-----
- 1 | äbc
- 2 | äbc
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_nondet;
+ a | b
+---+-------
+ 1 | zyäbc
+ 2 | zyäbc
+(2 rows)
+
+SELECT strpos(b COLLATE ctest_det, 'bc') FROM test6;
+ strpos
+--------
+ 4
+ 5
+(2 rows)
+
+SELECT strpos(b COLLATE ctest_nondet, 'bc') FROM test6;
+ strpos
+--------
+ 4
+ 5
+(2 rows)
+
+SELECT replace(b COLLATE ctest_det, U&'\00E4b', 'X') FROM test6;
+ replace
+---------
+ zyXc
+ zyäbc
+(2 rows)
+
+SELECT replace(b COLLATE ctest_nondet, U&'\00E4b', 'X') FROM test6;
+ replace
+---------
+ zyXc
+ zyXc
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', 2) FROM test6;
+ a | split_part
+---+------------
+ 1 | c
+ 2 |
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', 2) FROM test6;
+ a | split_part
+---+------------
+ 1 | c
+ 2 | c
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', -1) FROM test6;
+ a | split_part
+---+------------
+ 1 | c
+ 2 | zyäbc
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', -1) FROM test6;
+ a | split_part
+---+------------
+ 1 | c
+ 2 | c
+(2 rows)
+
+SELECT a, string_to_array(b COLLATE ctest_det, U&'\00E4b') FROM test6;
+ a | string_to_array
+---+-----------------
+ 1 | {zy,c}
+ 2 | {zyäbc}
+(2 rows)
+
+SELECT a, string_to_array(b COLLATE ctest_nondet, U&'\00E4b') FROM test6;
+ a | string_to_array
+---+-----------------
+ 1 | {zy,c}
+ 2 | {zy,c}
(2 rows)
-- same with arrays
@@ -1601,7 +1671,11 @@ CREATE UNIQUE INDEX ON test3ci (x); -- error
ERROR: could not create unique index "test3ci_x_idx"
DETAIL: Key (x)=(abc) is duplicated.
SELECT string_to_array('ABC,DEF,GHI' COLLATE case_insensitive, ',', 'abc');
-ERROR: nondeterministic collations are not supported for substring searches
+ string_to_array
+-----------------
+ {NULL,DEF,GHI}
+(1 row)
+
SELECT string_to_array('ABCDEFGHI' COLLATE case_insensitive, NULL, 'b');
string_to_array
------------------------
@@ -1719,7 +1793,11 @@ CREATE UNIQUE INDEX ON test3bpci (x); -- error
ERROR: could not create unique index "test3bpci_x_idx"
DETAIL: Key (x)=(abc) is duplicated.
SELECT string_to_array('ABC,DEF,GHI'::char(11) COLLATE case_insensitive, ',',
'abc');
-ERROR: nondeterministic collations are not supported for substring searches
+ string_to_array
+-----------------
+ {NULL,DEF,GHI}
+(1 row)
+
SELECT string_to_array('ABCDEFGHI'::char(9) COLLATE case_insensitive, NULL,
'b');
string_to_array
------------------------
@@ -1840,6 +1918,30 @@ SELECT * FROM test4 WHERE b = 'Cote' COLLATE
case_insensitive;
1 | cote
(1 row)
+CREATE TABLE test4nfd (a int, b text);
+INSERT INTO test4nfd VALUES (1, 'cote'), (2, 'côte'), (3, 'coté'), (4, 'côté');
+UPDATE test4nfd SET b = normalize(b, nfd);
+-- This shows why replace should be greedy. Otherwise, in the NFD
+-- case, the match would stop before the decomposed accents, which
+-- would leave the accents in the results.
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4;
+ a | b | replace
+---+------+---------
+ 1 | cote | mate
+ 2 | côte | mate
+ 3 | coté | maté
+ 4 | côté | maté
+(4 rows)
+
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4nfd;
+ a | b | replace
+---+------+---------
+ 1 | cote | mate
+ 2 | côte | mate
+ 3 | coté | maté
+ 4 | côté | maté
+(4 rows)
+
-- foreign keys (should use collation of primary key)
-- PK is case-sensitive, FK is case-insensitive
CREATE TABLE test10pk (x text COLLATE case_sensitive PRIMARY KEY);
diff --git a/src/test/regress/sql/collate.icu.utf8.sql
b/src/test/regress/sql/collate.icu.utf8.sql
index 80f28a97d78..ffbb3897af0 100644
--- a/src/test/regress/sql/collate.icu.utf8.sql
+++ b/src/test/regress/sql/collate.icu.utf8.sql
@@ -516,11 +516,25 @@ CREATE COLLATION ctest_nondet (provider = icu, locale =
'', deterministic = fals
CREATE TABLE test6 (a int, b text);
-- same string in different normal forms
-INSERT INTO test6 VALUES (1, U&'\00E4bc');
-INSERT INTO test6 VALUES (2, U&'\0061\0308bc');
+INSERT INTO test6 VALUES (1, U&'zy\00E4bc');
+INSERT INTO test6 VALUES (2, U&'zy\0061\0308bc');
SELECT * FROM test6;
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_det;
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_nondet;
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_det;
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_nondet;
+
+SELECT strpos(b COLLATE ctest_det, 'bc') FROM test6;
+SELECT strpos(b COLLATE ctest_nondet, 'bc') FROM test6;
+
+SELECT replace(b COLLATE ctest_det, U&'\00E4b', 'X') FROM test6;
+SELECT replace(b COLLATE ctest_nondet, U&'\00E4b', 'X') FROM test6;
+
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', 2) FROM test6;
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', 2) FROM test6;
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', -1) FROM test6;
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', -1) FROM test6;
+
+SELECT a, string_to_array(b COLLATE ctest_det, U&'\00E4b') FROM test6;
+SELECT a, string_to_array(b COLLATE ctest_nondet, U&'\00E4b') FROM test6;
-- same with arrays
CREATE TABLE test6a (a int, b text[]);
@@ -687,6 +701,16 @@ CREATE TABLE test4 (a int, b text);
SELECT * FROM test4 WHERE b = 'Cote' COLLATE ignore_accents; -- still
case-sensitive
SELECT * FROM test4 WHERE b = 'Cote' COLLATE case_insensitive;
+CREATE TABLE test4nfd (a int, b text);
+INSERT INTO test4nfd VALUES (1, 'cote'), (2, 'côte'), (3, 'coté'), (4, 'côté');
+UPDATE test4nfd SET b = normalize(b, nfd);
+
+-- This shows why replace should be greedy. Otherwise, in the NFD
+-- case, the match would stop before the decomposed accents, which
+-- would leave the accents in the results.
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4;
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4nfd;
+
-- foreign keys (should use collation of primary key)
-- PK is case-sensitive, FK is case-insensitive
base-commit: 8daa62a10c911c851f7e9ec5ef7b90cfd4b73212
--
2.46.0