This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new fca4a51f6e PHOENIX-7811 : Create CDC should not drop user supplied
table properties (#2427)
fca4a51f6e is described below
commit fca4a51f6e4bda65329436f7e42e66715950b7f8
Author: Palash Chauhan <[email protected]>
AuthorDate: Mon Apr 27 21:59:32 2026 -0700
PHOENIX-7811 : Create CDC should not drop user supplied table properties
(#2427)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../org/apache/phoenix/schema/MetaDataClient.java | 1 -
.../apache/phoenix/end2end/CDCDefinitionIT.java | 115 +++++++++++++++++++++
2 files changed, 115 insertions(+), 1 deletion(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 530c5612ac..4fff6826c6 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -2021,7 +2021,6 @@ public class MetaDataClient {
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME),
PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null, null, false,
SortOrder.getDefault(), "", null, false));
- tableProps = new HashMap<>();
if (dataTable.getImmutableStorageScheme() ==
SINGLE_CELL_ARRAY_WITH_OFFSETS) {
// CDC table doesn't need SINGLE_CELL_ARRAY_WITH_OFFSETS encoding, so
override it.
tableProps.put(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName(),
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index 0aee3c9ab1..92f41c2808 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -18,13 +18,23 @@
package org.apache.phoenix.end2end;
import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
@@ -33,16 +43,22 @@ import java.util.List;
import java.util.Properties;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.verification.VerificationMode;
@RunWith(Parameterized.class)
@Category(ParallelStatsDisabledTest.class)
@@ -462,4 +478,103 @@ public class CDCDefinitionIT extends CDCBaseIT {
assertTrue(e.getMessage().endsWith("DUMMY"));
}
}
+
+ /**
+ * Verifies CDC honors UPDATE_CACHE_FREQUENCY when set on CREATE CDC, and
falls back to the
+ * connection default when not. With UCF set, a warmed-cache SELECT against
the CDC object must
+ * issue zero getTable RPCs for it; without UCF, atleast one per query.
+ */
+ @Test
+ public void testCreateCDCWithUpdateCacheFrequency() throws Exception {
+ final long ucfMillis = 300000L;
+
+ // parent has UCF, CREATE CDC does not - CDC keeps the connection default
and every SELECT
+ // against it RPCs. Phoenix's compilation pipeline resolves the FROM table
multiple times
+ // per query
+ String tableA = generateUniqueName();
+ String cdcA = generateUniqueName();
+ try (Connection conn = newConnection()) {
+ createTableAndCDC(conn, tableA, cdcA, "UPDATE_CACHE_FREQUENCY=" +
ucfMillis,
+ /* cdcUcf */ null);
+ assertCdcUpdateCacheFrequency(conn, cdcA, 0L);
+ }
+ assertGetTableRpcModeForCdc(SchemaUtil.getTableName(null, cdcA),
atLeast(1));
+
+ // CREATE CDC sets UCF - it must be persisted and suppress getTable RPCs.
+ String tableB = generateUniqueName();
+ String cdcB = generateUniqueName();
+ try (Connection conn = newConnection()) {
+ createTableAndCDC(conn, tableB, cdcB, /* tableProps */ null,
+ "UPDATE_CACHE_FREQUENCY=" + ucfMillis);
+ assertCdcUpdateCacheFrequency(conn, cdcB, ucfMillis);
+ }
+ assertGetTableRpcModeForCdc(SchemaUtil.getTableName(null, cdcB), times(0));
+ }
+
+ /**
+ * Create {@code dataTable}, optionally with a view on it (when {@link
#forView} is true), a CDC
+ * over the resulting parent, and upsert one row.
+ */
+ private void createTableAndCDC(Connection conn, String dataTable, String
cdcName,
+ String tablePropsSuffix, String cdcPropsSuffix) throws Exception {
+ String tableSql = "CREATE TABLE " + dataTable + " (k INTEGER PRIMARY KEY,
v INTEGER)"
+ + (tablePropsSuffix == null ? "" : " " + tablePropsSuffix);
+ conn.createStatement().execute(tableSql);
+ String parentName = dataTable;
+ if (forView) {
+ String viewName = generateUniqueName();
+ conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT *
FROM " + dataTable);
+ parentName = viewName;
+ }
+ String cdcSql = "CREATE CDC " + cdcName + " ON " + parentName
+ + (cdcPropsSuffix == null ? "" : " " + cdcPropsSuffix);
+ createCDC(conn, cdcSql);
+ conn.createStatement().execute("UPSERT INTO " + dataTable + " VALUES (1,
1)");
+ conn.commit();
+ }
+
+ private void assertCdcUpdateCacheFrequency(Connection conn, String cdcName,
long expected)
+ throws SQLException {
+ try (ResultSet rs = conn.createStatement()
+ .executeQuery("SELECT UPDATE_CACHE_FREQUENCY FROM SYSTEM.CATALOG WHERE
TABLE_NAME = '"
+ + cdcName + "' AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL")) {
+ assertTrue("CDC row not found in SYSTEM.CATALOG for " + cdcName,
rs.next());
+ assertEquals("Unexpected UPDATE_CACHE_FREQUENCY on CDC virtual table " +
cdcName, expected,
+ rs.getLong(1));
+ }
+ }
+
+ /**
+ * Open a fresh spied {@link ConnectionQueryServices}, run a CDC select once
to warm the client
+ * metadata cache, reset the spy, run the same select again, and assert that
{@code getTable(...)}
+ * for the CDC virtual table was invoked according to {@code mode}.
+ */
+ private void assertGetTableRpcModeForCdc(String fullCdcName,
VerificationMode mode)
+ throws SQLException {
+ ConnectionQueryServices spied =
+ spy(driver.getConnectionQueryServices(getUrl(),
PropertiesUtil.deepCopy(TEST_PROPERTIES)));
+ Properties cprops = new Properties();
+ cprops.putAll(PhoenixEmbeddedDriver.DEFAULT_PROPS.asMap());
+ String selectSql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
fullCdcName + " LIMIT 1";
+
+ try (Connection conn = spied.connect(getUrl(), cprops)) {
+ try (PreparedStatement ps = conn.prepareStatement(selectSql);
+ ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ }
+ }
+ reset(spied);
+
+ try (PreparedStatement ps = conn.prepareStatement(selectSql);
+ ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ }
+ }
+
+ String cdcSchema = SchemaUtil.getSchemaNameFromFullName(fullCdcName);
+ String cdcTableNameOnly =
SchemaUtil.getTableNameFromFullName(fullCdcName);
+ verify(spied, mode).getTable((PName) any(),
eq(PVarchar.INSTANCE.toBytes(cdcSchema)),
+ eq(PVarchar.INSTANCE.toBytes(cdcTableNameOnly)), anyLong(), anyLong());
+ }
+ }
}