This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.3 by this push:
     new fca4a51f6e PHOENIX-7811 : Create CDC should not drop user supplied 
table properties (#2427)
fca4a51f6e is described below

commit fca4a51f6e4bda65329436f7e42e66715950b7f8
Author: Palash Chauhan <[email protected]>
AuthorDate: Mon Apr 27 21:59:32 2026 -0700

    PHOENIX-7811 : Create CDC should not drop user supplied table properties 
(#2427)
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../org/apache/phoenix/schema/MetaDataClient.java  |   1 -
 .../apache/phoenix/end2end/CDCDefinitionIT.java    | 115 +++++++++++++++++++++
 2 files changed, 115 insertions(+), 1 deletion(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 530c5612ac..4fff6826c6 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -2021,7 +2021,6 @@ public class MetaDataClient {
     
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME),
       PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null, null, false,
       SortOrder.getDefault(), "", null, false));
-    tableProps = new HashMap<>();
     if (dataTable.getImmutableStorageScheme() == 
SINGLE_CELL_ARRAY_WITH_OFFSETS) {
       // CDC table doesn't need SINGLE_CELL_ARRAY_WITH_OFFSETS encoding, so 
override it.
       tableProps.put(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName(),
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index 0aee3c9ab1..92f41c2808 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -18,13 +18,23 @@
 package org.apache.phoenix.end2end;
 
 import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
@@ -33,16 +43,22 @@ import java.util.List;
 import java.util.Properties;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.verification.VerificationMode;
 
 @RunWith(Parameterized.class)
 @Category(ParallelStatsDisabledTest.class)
@@ -462,4 +478,103 @@ public class CDCDefinitionIT extends CDCBaseIT {
       assertTrue(e.getMessage().endsWith("DUMMY"));
     }
   }
+
+  /**
+   * Verifies CDC honors UPDATE_CACHE_FREQUENCY when set on CREATE CDC, and 
falls back to the
+   * connection default when not. With UCF set, a warmed-cache SELECT against 
the CDC object must
+   * issue zero getTable RPCs for it; without UCF, atleast one per query.
+   */
+  @Test
+  public void testCreateCDCWithUpdateCacheFrequency() throws Exception {
+    final long ucfMillis = 300000L;
+
+    // parent has UCF, CREATE CDC does not - CDC keeps the connection default 
and every SELECT
+    // against it RPCs. Phoenix's compilation pipeline resolves the FROM table 
multiple times
+    // per query
+    String tableA = generateUniqueName();
+    String cdcA = generateUniqueName();
+    try (Connection conn = newConnection()) {
+      createTableAndCDC(conn, tableA, cdcA, "UPDATE_CACHE_FREQUENCY=" + 
ucfMillis,
+        /* cdcUcf */ null);
+      assertCdcUpdateCacheFrequency(conn, cdcA, 0L);
+    }
+    assertGetTableRpcModeForCdc(SchemaUtil.getTableName(null, cdcA), 
atLeast(1));
+
+    // CREATE CDC sets UCF - it must be persisted and suppress getTable RPCs.
+    String tableB = generateUniqueName();
+    String cdcB = generateUniqueName();
+    try (Connection conn = newConnection()) {
+      createTableAndCDC(conn, tableB, cdcB, /* tableProps */ null,
+        "UPDATE_CACHE_FREQUENCY=" + ucfMillis);
+      assertCdcUpdateCacheFrequency(conn, cdcB, ucfMillis);
+    }
+    assertGetTableRpcModeForCdc(SchemaUtil.getTableName(null, cdcB), times(0));
+  }
+
+  /**
+   * Create {@code dataTable}, optionally with a view on it (when {@link 
#forView} is true), a CDC
+   * over the resulting parent, and upsert one row.
+   */
+  private void createTableAndCDC(Connection conn, String dataTable, String 
cdcName,
+    String tablePropsSuffix, String cdcPropsSuffix) throws Exception {
+    String tableSql = "CREATE TABLE " + dataTable + " (k INTEGER PRIMARY KEY, 
v INTEGER)"
+      + (tablePropsSuffix == null ? "" : " " + tablePropsSuffix);
+    conn.createStatement().execute(tableSql);
+    String parentName = dataTable;
+    if (forView) {
+      String viewName = generateUniqueName();
+      conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * 
FROM " + dataTable);
+      parentName = viewName;
+    }
+    String cdcSql = "CREATE CDC " + cdcName + " ON " + parentName
+      + (cdcPropsSuffix == null ? "" : " " + cdcPropsSuffix);
+    createCDC(conn, cdcSql);
+    conn.createStatement().execute("UPSERT INTO " + dataTable + " VALUES (1, 
1)");
+    conn.commit();
+  }
+
+  private void assertCdcUpdateCacheFrequency(Connection conn, String cdcName, 
long expected)
+    throws SQLException {
+    try (ResultSet rs = conn.createStatement()
+      .executeQuery("SELECT UPDATE_CACHE_FREQUENCY FROM SYSTEM.CATALOG WHERE 
TABLE_NAME = '"
+        + cdcName + "' AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL")) {
+      assertTrue("CDC row not found in SYSTEM.CATALOG for " + cdcName, 
rs.next());
+      assertEquals("Unexpected UPDATE_CACHE_FREQUENCY on CDC virtual table " + 
cdcName, expected,
+        rs.getLong(1));
+    }
+  }
+
+  /**
+   * Open a fresh spied {@link ConnectionQueryServices}, run a CDC select once 
to warm the client
+   * metadata cache, reset the spy, run the same select again, and assert that 
{@code getTable(...)}
+   * for the CDC virtual table was invoked according to {@code mode}.
+   */
+  private void assertGetTableRpcModeForCdc(String fullCdcName, 
VerificationMode mode)
+    throws SQLException {
+    ConnectionQueryServices spied =
+      spy(driver.getConnectionQueryServices(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES)));
+    Properties cprops = new Properties();
+    cprops.putAll(PhoenixEmbeddedDriver.DEFAULT_PROPS.asMap());
+    String selectSql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
fullCdcName + " LIMIT 1";
+
+    try (Connection conn = spied.connect(getUrl(), cprops)) {
+      try (PreparedStatement ps = conn.prepareStatement(selectSql);
+        ResultSet rs = ps.executeQuery()) {
+        while (rs.next()) {
+        }
+      }
+      reset(spied);
+
+      try (PreparedStatement ps = conn.prepareStatement(selectSql);
+        ResultSet rs = ps.executeQuery()) {
+        while (rs.next()) {
+        }
+      }
+
+      String cdcSchema = SchemaUtil.getSchemaNameFromFullName(fullCdcName);
+      String cdcTableNameOnly = 
SchemaUtil.getTableNameFromFullName(fullCdcName);
+      verify(spied, mode).getTable((PName) any(), 
eq(PVarchar.INSTANCE.toBytes(cdcSchema)),
+        eq(PVarchar.INSTANCE.toBytes(cdcTableNameOnly)), anyLong(), anyLong());
+    }
+  }
 }

Reply via email to