This is an automated email from the ASF dual-hosted git repository.
virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 245f441 Provide update cache frequency in Create CDC
245f441 is described below
commit 245f44186a3680c2555d93bd3da844d37aab959e
Author: Palash Chauhan <[email protected]>
AuthorDate: Tue May 26 15:50:38 2026 -0700
Provide update cache frequency in Create CDC
---
.../phoenix/ddb/service/CreateTableService.java | 5 ++--
.../java/org/apache/phoenix/ddb/CreateTableIT.java | 1 +
.../java/org/apache/phoenix/ddb/TestUtils.java | 10 ++++++++
.../org/apache/phoenix/ddb/TableOptionsConfig.java | 27 +++++++++++++++++++++-
4 files changed, 40 insertions(+), 3 deletions(-)
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/CreateTableService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/CreateTableService.java
index 3ec6c5c..b68712b 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/CreateTableService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/CreateTableService.java
@@ -50,7 +50,7 @@ public class CreateTableService {
private static final Logger LOGGER =
LoggerFactory.getLogger(CreateTableService.class);
- private static final String CREATE_CDC_DDL = "CREATE CDC IF NOT EXISTS
\"CDC_%s\" on %s";
+ private static final String CREATE_CDC_DDL = "CREATE CDC IF NOT EXISTS
\"CDC_%s\" on %s %s";
private static final String ALTER_TABLE_STREAM_TYPE_DDL =
"ALTER TABLE %s set SCHEMA_VERSION = '%s'";
@@ -209,7 +209,8 @@ public class CreateTableService {
if (StringUtils.isEmpty(streamType)) {
throw new ValidationException("STREAM_VIEW_TYPE attribute is
required.");
}
- cdcDDLs.add(String.format(CREATE_CDC_DDL, tableName,
PhoenixUtils.getFullTableName(tableName, true)));
+ cdcDDLs.add(String.format(CREATE_CDC_DDL, tableName,
+ PhoenixUtils.getFullTableName(tableName, true),
TableOptionsConfig.getCdcOptions()));
cdcDDLs.add(String.format(ALTER_TABLE_STREAM_TYPE_DDL,
PhoenixUtils.getFullTableName(tableName, true), streamType));
}
return cdcDDLs;
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/CreateTableIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/CreateTableIT.java
index 494e542..fa165ed 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/CreateTableIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/CreateTableIT.java
@@ -315,6 +315,7 @@ public class CreateTableIT {
}
TestUtils.validateTableProps(url, tableName, false);
TestUtils.validateTableProps(url, tableName + "_IDX1_" + tableName,
true);
+ TestUtils.validateCdcProps(url, tableName);
}
@Test(timeout = 120000)
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
index 5eee162..75cf914 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
@@ -517,6 +517,16 @@ public class TestUtils {
}
}
+ public static void validateCdcProps(String url, String tableName) throws
SQLException {
+ String cdcFullName = PhoenixUtils.getFullTableName("CDC_" + tableName,
false);
+ try (Connection conn = DriverManager.getConnection(url)) {
+ PhoenixConnection phoenixConnection =
conn.unwrap(PhoenixConnection.class);
+ PTable cdc = phoenixConnection.getTable(
+ new PTableKey(phoenixConnection.getTenantId(),
cdcFullName));
+ Assert.assertEquals(60000, cdc.getUpdateCacheFrequency());
+ }
+ }
+
public static void waitForIndexState(DynamoDbClient client, String
tableName, String indexName,
String expectedState) {
DescribeTableRequest dtr =
DescribeTableRequest.builder().tableName(tableName).build();
diff --git
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/TableOptionsConfig.java
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/TableOptionsConfig.java
index 4483dd4..af16df0 100644
---
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/TableOptionsConfig.java
+++
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/TableOptionsConfig.java
@@ -12,9 +12,11 @@ public class TableOptionsConfig {
private static final Logger LOGGER =
LoggerFactory.getLogger(TableOptionsConfig.class);
private static final String TABLE_CONFIG_FILE =
"phoenix-table-options.properties";
private static final String INDEX_CONFIG_FILE =
"phoenix-index-options.properties";
+ private static final String UPDATE_CACHE_FREQUENCY_KEY =
"UPDATE_CACHE_FREQUENCY";
private static String tableOptionsString;
private static String indexOptionsString;
+ private static String cdcOptionsString;
/**
* Initialize both table and index configurations at startup.
@@ -22,7 +24,8 @@ public class TableOptionsConfig {
public static void initialize() throws IOException {
tableOptionsString = buildOptionsString(TABLE_CONFIG_FILE, "table");
indexOptionsString = buildOptionsString(INDEX_CONFIG_FILE, "index");
- LOGGER.info("Initialized table and index configurations");
+ cdcOptionsString = buildCdcOptionsString(TABLE_CONFIG_FILE);
+ LOGGER.info("Initialized table, index, and CDC configurations");
}
/**
@@ -45,6 +48,28 @@ public class TableOptionsConfig {
return indexOptionsString;
}
+ /**
+ * Get CDC options as formatted string for CREATE CDC statements.
+ */
+ public static String getCdcOptions() {
+ if (cdcOptionsString == null) {
+ throw new IllegalStateException("CDC Options Config not
initialized.");
+ }
+ return cdcOptionsString;
+ }
+
+ /**
+ * Build CDC options string using UPDATE_CACHE_FREQUENCY from the table
config.
+ */
+ private static String buildCdcOptionsString(String tableConfigFile) throws
IOException {
+ Properties props = loadConfiguration(tableConfigFile, "cdc");
+ String updateCacheFrequency =
props.getProperty(UPDATE_CACHE_FREQUENCY_KEY);
+ if (updateCacheFrequency == null) {
+ throw new IOException(UPDATE_CACHE_FREQUENCY_KEY + " not found in
" + tableConfigFile);
+ }
+ return UPDATE_CACHE_FREQUENCY_KEY + "=" + updateCacheFrequency;
+ }
+
/**
* Load configuration from file and build formatted options string.
*/