On 26.08.24 08:09, Peter Eisentraut wrote:
This patch allows using text position search functions with nondeterministic collations.  These functions are

- position, strpos
- replace
- split_part
- string_to_array
- string_to_table

which all use common internal infrastructure.

Some exploratory testing could be useful here.  The present test coverage was already quite helpful during development, but there is always the possibility that something was overlooked.

Here is a freshly rebased patch version, no functionality changes.
From 272befa6ab78d94ac35d24203da415545b3ad2d9 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Mon, 2 Dec 2024 10:06:28 +0100
Subject: [PATCH v2] Support POSITION with nondeterministic collations

This allows using text position search functions with nondeterministic
collations.  These functions are

- position, strpos
- replace
- split_part
- string_to_array
- string_to_table

which all use common internal infrastructure.

There was previously no internal implementation of this, so it was met
with a not-supported error.  This adds the internal implementation and
removes the error.

Unlike with deterministic collations, the search cannot use any
byte-by-byte optimized techniques but has to go substring by
substring.  We also need to consider that the found match could have a
different length than the needle and that there could be substrings of
different length matching at a position.  In most cases, we need to
find the longest such substring (greedy semantics).

Discussion: 
https://www.postgresql.org/message-id/flat/582b2613-0900-48ca-8b0d-340c06f4d...@eisentraut.org
---
 src/backend/utils/adt/varlena.c               | 105 ++++++++++--
 .../regress/expected/collate.icu.utf8.out     | 154 +++++++++++++++---
 src/test/regress/sql/collate.icu.utf8.sql     |  36 +++-
 3 files changed, 247 insertions(+), 48 deletions(-)

diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 533bebc1c7b..3fda7a8aeea 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -54,7 +54,9 @@ typedef struct varlena VarString;
  */
 typedef struct
 {
+       pg_locale_t locale;                     /* collation used for substring 
matching */
        bool            is_multibyte_char_in_char;      /* need to check char 
boundaries? */
+       bool            greedy;                 /* find longest possible 
substring? */
 
        char       *str1;                       /* haystack string */
        char       *str2;                       /* needle string */
@@ -65,7 +67,13 @@ typedef struct
        int                     skiptablemask;  /* mask for ANDing with 
skiptable subscripts */
        int                     skiptable[256]; /* skip distance for given 
mismatched char */
 
+       /*
+        * Note that with nondeterministic collations, the length of the last
+        * match is not necessarily equal to the length of the "needle" passed 
in.
+        */
        char       *last_match;         /* pointer to last match in 'str1' */
+       int                     last_match_len; /* length of last match */
+       int                     last_match_len_tmp; /* same but for internal 
use */
 
        /*
         * Sometimes we need to convert the byte position of a match to a
@@ -1178,15 +1186,21 @@ text_position(text *t1, text *t2, Oid collid)
        TextPositionState state;
        int                     result;
 
+       check_collation_set(collid);
+
        /* Empty needle always matches at position 1 */
        if (VARSIZE_ANY_EXHDR(t2) < 1)
                return 1;
 
        /* Otherwise, can't match if haystack is shorter than needle */
-       if (VARSIZE_ANY_EXHDR(t1) < VARSIZE_ANY_EXHDR(t2))
+       if (VARSIZE_ANY_EXHDR(t1) < VARSIZE_ANY_EXHDR(t2) &&
+               pg_newlocale_from_collation(collid)->deterministic)
                return 0;
 
        text_position_setup(t1, t2, collid, &state);
+       /* don't need greedy mode here */
+       state.greedy = false;
+
        if (!text_position_next(&state))
                result = 0;
        else
@@ -1217,18 +1231,17 @@ text_position_setup(text *t1, text *t2, Oid collid, 
TextPositionState *state)
 {
        int                     len1 = VARSIZE_ANY_EXHDR(t1);
        int                     len2 = VARSIZE_ANY_EXHDR(t2);
-       pg_locale_t mylocale;
 
        check_collation_set(collid);
 
-       mylocale = pg_newlocale_from_collation(collid);
+       state->locale = pg_newlocale_from_collation(collid);
 
-       if (!mylocale->deterministic)
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("nondeterministic collations are not 
supported for substring searches")));
+       /*
+        * Most callers need greedy mode, but some might want to unset this to
+        * optimize.
+        */
+       state->greedy = true;
 
-       Assert(len1 > 0);
        Assert(len2 > 0);
 
        /*
@@ -1264,8 +1277,11 @@ text_position_setup(text *t1, text *t2, Oid collid, 
TextPositionState *state)
         * point in wasting cycles initializing the table.  We also choose not 
to
         * use B-M-H for needles of length 1, since the skip table can't 
possibly
         * save anything in that case.
+        *
+        * (With nondeterministic collations, the search was already
+        * multibyte-aware, so we don't need this.)
         */
-       if (len1 >= len2 && len2 > 1)
+       if (len1 >= len2 && len2 > 1 && state->locale->deterministic)
        {
                int                     searchlength = len1 - len2;
                int                     skiptablemask;
@@ -1343,7 +1359,7 @@ text_position_next(TextPositionState *state)
 
        /* Start from the point right after the previous match. */
        if (state->last_match)
-               start_ptr = state->last_match + needle_len;
+               start_ptr = state->last_match + state->last_match_len;
        else
                start_ptr = state->str1;
 
@@ -1359,7 +1375,7 @@ text_position_next(TextPositionState *state)
         * multi-byte character, we need to verify that the match was at a
         * character boundary, not in the middle of a multi-byte character.
         */
-       if (state->is_multibyte_char_in_char)
+       if (state->is_multibyte_char_in_char && state->locale->deterministic)
        {
                /* Walk one character at a time, until we reach the match. */
 
@@ -1387,6 +1403,7 @@ text_position_next(TextPositionState *state)
        }
 
        state->last_match = matchptr;
+       state->last_match_len = state->last_match_len_tmp;
        return true;
 }
 
@@ -1408,7 +1425,63 @@ text_position_next_internal(char *start_ptr, 
TextPositionState *state)
 
        Assert(start_ptr >= haystack && start_ptr <= haystack_end);
 
-       if (needle_len == 1)
+       state->last_match_len_tmp = needle_len;
+
+       if (!state->locale->deterministic)
+       {
+               /*
+                * With a nondeterministic collation, we have to use an 
unoptimized
+                * route.  We walk through the haystack and see if at each 
position
+                * there is a substring of the remaining string that is equal 
to the
+                * needle under the given collation.
+                *
+                * Note, the found substring could have a different length than 
the
+                * needle, including being empty.  Callers that want to skip 
over the
+                * found string need to read the length of the found substring 
from
+                * last_match_len rather than just using the length of their 
needle.
+                *
+                * Most callers will require "greedy" semantics, meaning that 
we need
+                * to find the longest such substring, not the shortest.  For 
callers
+                * don't don't need greedy semantics, we can finish on the first
+                * match.
+                */
+               const char *result_hptr = NULL;
+
+               hptr = start_ptr;
+               while (hptr < haystack_end)
+               {
+                       /*
+                        * First check the common case that there is a match in 
the
+                        * haystack of exactly the length of the needle.
+                        */
+                       if (!state->greedy &&
+                               haystack_end - hptr >= needle_len &&
+                               pg_strncoll(hptr, needle_len, needle, 
needle_len, state->locale) == 0)
+                               return (char *) hptr;
+
+                       /*
+                        * Else check if any of the possible substrings 
starting at hptr
+                        * are equal to the needle.
+                        */
+                       for (const char *test_end = hptr; test_end < 
haystack_end; test_end += pg_mblen(test_end))
+                       {
+                               if (pg_strncoll(hptr, (test_end - hptr), 
needle, needle_len, state->locale) == 0)
+                               {
+                                       state->last_match_len_tmp = (test_end - 
hptr);
+                                       result_hptr = hptr;
+                                       if (!state->greedy)
+                                               break;
+                               }
+                       }
+                       if (result_hptr)
+                               break;
+
+                       hptr += pg_mblen(hptr);
+               }
+
+               return (char *) result_hptr;
+       }
+       else if (needle_len == 1)
        {
                /* No point in using B-M-H for a one-character needle */
                char            nchar = *needle;
@@ -4055,7 +4128,7 @@ replace_text(PG_FUNCTION_ARGS)
 
                appendStringInfoText(&str, to_sub_text);
 
-               start_ptr = curr_ptr + from_sub_text_len;
+               start_ptr = curr_ptr + state.last_match_len;
 
                found = text_position_next(&state);
                if (found)
@@ -4445,7 +4518,7 @@ split_part(PG_FUNCTION_ARGS)
                /* special case of last field does not require an extra pass */
                if (fldnum == -1)
                {
-                       start_ptr = text_position_get_match_ptr(&state) + 
fldsep_len;
+                       start_ptr = text_position_get_match_ptr(&state) + 
state.last_match_len;
                        end_ptr = VARDATA_ANY(inputstring) + inputstring_len;
                        text_position_cleanup(&state);
                        PG_RETURN_TEXT_P(cstring_to_text_with_len(start_ptr,
@@ -4475,7 +4548,7 @@ split_part(PG_FUNCTION_ARGS)
        while (found && --fldnum > 0)
        {
                /* identify bounds of next field */
-               start_ptr = end_ptr + fldsep_len;
+               start_ptr = end_ptr + state.last_match_len;
                found = text_position_next(&state);
                if (found)
                        end_ptr = text_position_get_match_ptr(&state);
@@ -4691,7 +4764,7 @@ split_text(FunctionCallInfo fcinfo, SplitTextOutputData 
*tstate)
                        if (!found)
                                break;
 
-                       start_ptr = end_ptr + fldsep_len;
+                       start_ptr = end_ptr + state.last_match_len;
                }
 
                text_position_cleanup(&state);
diff --git a/src/test/regress/expected/collate.icu.utf8.out 
b/src/test/regress/expected/collate.icu.utf8.out
index 9f5e57428e8..ebd2cc58fee 100644
--- a/src/test/regress/expected/collate.icu.utf8.out
+++ b/src/test/regress/expected/collate.icu.utf8.out
@@ -1300,39 +1300,109 @@ SELECT 'abc' LIKE 'a\bc' COLLATE ctest_nondet;
 
 CREATE TABLE test6 (a int, b text);
 -- same string in different normal forms
-INSERT INTO test6 VALUES (1, U&'\00E4bc');
-INSERT INTO test6 VALUES (2, U&'\0061\0308bc');
+INSERT INTO test6 VALUES (1, U&'zy\00E4bc');
+INSERT INTO test6 VALUES (2, U&'zy\0061\0308bc');
 SELECT * FROM test6;
- a |  b  
----+-----
- 1 | äbc
- 2 | äbc
+ a |   b   
+---+-------
+ 1 | zyäbc
+ 2 | zyäbc
 (2 rows)
 
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_det;
- a |  b  
----+-----
- 1 | äbc
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_det;
+ a |   b   
+---+-------
+ 1 | zyäbc
 (1 row)
 
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_nondet;
- a |  b  
----+-----
- 1 | äbc
- 2 | äbc
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_nondet;
+ a |   b   
+---+-------
+ 1 | zyäbc
+ 2 | zyäbc
 (2 rows)
 
-SELECT * FROM test6 WHERE b LIKE 'äbc' COLLATE ctest_det;
- a |  b  
----+-----
- 1 | äbc
+SELECT strpos(b COLLATE ctest_det, 'bc') FROM test6;
+ strpos 
+--------
+      4
+      5
+(2 rows)
+
+SELECT strpos(b COLLATE ctest_nondet, 'bc') FROM test6;
+ strpos 
+--------
+      4
+      5
+(2 rows)
+
+SELECT replace(b COLLATE ctest_det, U&'\00E4b', 'X') FROM test6;
+ replace 
+---------
+ zyXc
+ zyäbc
+(2 rows)
+
+SELECT replace(b COLLATE ctest_nondet, U&'\00E4b', 'X') FROM test6;
+ replace 
+---------
+ zyXc
+ zyXc
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', 2) FROM test6;
+ a | split_part 
+---+------------
+ 1 | c
+ 2 | 
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', 2) FROM test6;
+ a | split_part 
+---+------------
+ 1 | c
+ 2 | c
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', -1) FROM test6;
+ a | split_part 
+---+------------
+ 1 | c
+ 2 | zyäbc
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', -1) FROM test6;
+ a | split_part 
+---+------------
+ 1 | c
+ 2 | c
+(2 rows)
+
+SELECT a, string_to_array(b COLLATE ctest_det, U&'\00E4b') FROM test6;
+ a | string_to_array 
+---+-----------------
+ 1 | {zy,c}
+ 2 | {zyäbc}
+(2 rows)
+
+SELECT a, string_to_array(b COLLATE ctest_nondet, U&'\00E4b') FROM test6;
+ a | string_to_array 
+---+-----------------
+ 1 | {zy,c}
+ 2 | {zy,c}
+(2 rows)
+
+SELECT * FROM test6 WHERE b LIKE 'zyäbc' COLLATE ctest_det;
+ a |   b   
+---+-------
+ 1 | zyäbc
 (1 row)
 
-SELECT * FROM test6 WHERE b LIKE 'äbc' COLLATE ctest_nondet;
- a |  b  
----+-----
- 1 | äbc
- 2 | äbc
+SELECT * FROM test6 WHERE b LIKE 'zyäbc' COLLATE ctest_nondet;
+ a |   b   
+---+-------
+ 1 | zyäbc
+ 2 | zyäbc
 (2 rows)
 
 -- same with arrays
@@ -1643,7 +1713,11 @@ CREATE UNIQUE INDEX ON test3ci (x);  -- error
 ERROR:  could not create unique index "test3ci_x_idx"
 DETAIL:  Key (x)=(abc) is duplicated.
 SELECT string_to_array('ABC,DEF,GHI' COLLATE case_insensitive, ',', 'abc');
-ERROR:  nondeterministic collations are not supported for substring searches
+ string_to_array 
+-----------------
+ {NULL,DEF,GHI}
+(1 row)
+
 SELECT string_to_array('ABCDEFGHI' COLLATE case_insensitive, NULL, 'b');
     string_to_array     
 ------------------------
@@ -1766,7 +1840,11 @@ CREATE UNIQUE INDEX ON test3bpci (x);  -- error
 ERROR:  could not create unique index "test3bpci_x_idx"
 DETAIL:  Key (x)=(abc) is duplicated.
 SELECT string_to_array('ABC,DEF,GHI'::char(11) COLLATE case_insensitive, ',', 
'abc');
-ERROR:  nondeterministic collations are not supported for substring searches
+ string_to_array 
+-----------------
+ {NULL,DEF,GHI}
+(1 row)
+
 SELECT string_to_array('ABCDEFGHI'::char(9) COLLATE case_insensitive, NULL, 
'b');
     string_to_array     
 ------------------------
@@ -1895,6 +1973,30 @@ SELECT * FROM test4 WHERE b = 'Cote' COLLATE 
case_insensitive;
  1 | cote
 (1 row)
 
+CREATE TABLE test4nfd (a int, b text);
+INSERT INTO test4nfd VALUES (1, 'cote'), (2, 'côte'), (3, 'coté'), (4, 'côté');
+UPDATE test4nfd SET b = normalize(b, nfd);
+-- This shows why replace should be greedy.  Otherwise, in the NFD
+-- case, the match would stop before the decomposed accents, which
+-- would leave the accents in the results.
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4;
+ a |  b   | replace 
+---+------+---------
+ 1 | cote | mate
+ 2 | côte | mate
+ 3 | coté | maté
+ 4 | côté | maté
+(4 rows)
+
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4nfd;
+ a |  b   | replace 
+---+------+---------
+ 1 | cote | mate
+ 2 | côte | mate
+ 3 | coté | maté
+ 4 | côté | maté
+(4 rows)
+
 -- This is a tricky one.  A naive implementation would first test
 -- \00E4 matches \0061, which is true under ignore_accents, but then
 -- the rest of the string won't match anymore.  Therefore, the
diff --git a/src/test/regress/sql/collate.icu.utf8.sql 
b/src/test/regress/sql/collate.icu.utf8.sql
index 5ee2da4e0e0..cd0353fd7c6 100644
--- a/src/test/regress/sql/collate.icu.utf8.sql
+++ b/src/test/regress/sql/collate.icu.utf8.sql
@@ -522,14 +522,28 @@ CREATE COLLATION ctest_nondet (provider = icu, locale = 
'', deterministic = fals
 
 CREATE TABLE test6 (a int, b text);
 -- same string in different normal forms
-INSERT INTO test6 VALUES (1, U&'\00E4bc');
-INSERT INTO test6 VALUES (2, U&'\0061\0308bc');
+INSERT INTO test6 VALUES (1, U&'zy\00E4bc');
+INSERT INTO test6 VALUES (2, U&'zy\0061\0308bc');
 SELECT * FROM test6;
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_det;
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_nondet;
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_det;
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_nondet;
 
-SELECT * FROM test6 WHERE b LIKE 'äbc' COLLATE ctest_det;
-SELECT * FROM test6 WHERE b LIKE 'äbc' COLLATE ctest_nondet;
+SELECT strpos(b COLLATE ctest_det, 'bc') FROM test6;
+SELECT strpos(b COLLATE ctest_nondet, 'bc') FROM test6;
+
+SELECT replace(b COLLATE ctest_det, U&'\00E4b', 'X') FROM test6;
+SELECT replace(b COLLATE ctest_nondet, U&'\00E4b', 'X') FROM test6;
+
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', 2) FROM test6;
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', 2) FROM test6;
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', -1) FROM test6;
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', -1) FROM test6;
+
+SELECT a, string_to_array(b COLLATE ctest_det, U&'\00E4b') FROM test6;
+SELECT a, string_to_array(b COLLATE ctest_nondet, U&'\00E4b') FROM test6;
+
+SELECT * FROM test6 WHERE b LIKE 'zyäbc' COLLATE ctest_det;
+SELECT * FROM test6 WHERE b LIKE 'zyäbc' COLLATE ctest_nondet;
 
 -- same with arrays
 CREATE TABLE test6a (a int, b text[]);
@@ -696,6 +710,16 @@ CREATE TABLE test4 (a int, b text);
 SELECT * FROM test4 WHERE b = 'Cote' COLLATE ignore_accents;  -- still 
case-sensitive
 SELECT * FROM test4 WHERE b = 'Cote' COLLATE case_insensitive;
 
+CREATE TABLE test4nfd (a int, b text);
+INSERT INTO test4nfd VALUES (1, 'cote'), (2, 'côte'), (3, 'coté'), (4, 'côté');
+UPDATE test4nfd SET b = normalize(b, nfd);
+
+-- This shows why replace should be greedy.  Otherwise, in the NFD
+-- case, the match would stop before the decomposed accents, which
+-- would leave the accents in the results.
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4;
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4nfd;
+
 -- This is a tricky one.  A naive implementation would first test
 -- \00E4 matches \0061, which is true under ignore_accents, but then
 -- the rest of the string won't match anymore.  Therefore, the
-- 
2.47.1

Reply via email to