This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 3a667b5 Use IndexBuildingActivator to build index via Index Tool
3a667b5 is described below
commit 3a667b5a62474884da4995a6a3051608833ff910
Author: Palash Chauhan <[email protected]>
AuthorDate: Fri Dec 5 02:51:27 2025 -0800
Use IndexBuildingActivator to build index via Index Tool
---
.../org/apache/phoenix/ddb/rest/RESTServer.java | 5 +-
.../java/org/apache/phoenix/ddb/TestUtils.java | 21 -------
.../org/apache/phoenix/ddb/UpdateTable2IT.java | 8 +--
phoenix-ddb-utils/pom.xml | 5 ++
.../phoenix/ddb/utils/IndexBuildingActivator.java | 41 ++++++++++++-
.../org/apache/phoenix/ddb/utils/PhoenixUtils.java | 71 ++++++++++++++++++++++
6 files changed, 119 insertions(+), 32 deletions(-)
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/rest/RESTServer.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/rest/RESTServer.java
index 4a0de52..b9b16ad 100644
--- a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/rest/RESTServer.java
+++ b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/rest/RESTServer.java
@@ -298,11 +298,12 @@ public class RESTServer {
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
try (Connection connection =
ConnectionUtil.getConnection(jdbcUrl)) {
- IndexBuildingActivator.activateIndexesForBuilding(connection,
1800000);
+ IndexBuildingActivator.activateIndexesForBuilding(connection,
1800010);
+ IndexBuildingActivator.runIndexTool(connection, 1860000);
} catch (SQLException e) {
LOG.info("Error while running IndexBuildingActivator. ", e);
}
- }, 0, 15, TimeUnit.MINUTES);
+ }, 0, 5, TimeUnit.MINUTES);
LOG.info("Scheduled IndexBuildingActivator.");
return scheduler;
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
index 7afecc9..bac8967 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
@@ -22,7 +22,6 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -74,14 +73,11 @@ import org.apache.phoenix.ddb.utils.ApiMetadata;
import org.apache.phoenix.ddb.utils.PhoenixUtils;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
-import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
-import static org.apache.phoenix.end2end.IndexToolIT.getArgValues;
-import static org.junit.Assert.assertEquals;
import static
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.TRIM_HORIZON;
public class TestUtils {
@@ -531,21 +527,4 @@ public class TestUtils {
while (++nTries < maxTries);
Assert.fail("Timed out waiting for index state to become: " +
expectedState);
}
-
- public static IndexTool runIndexTool(Configuration conf, boolean
useSnapshot, String schemaName,
- String dataTableName, String indexTableName, String tenantId, int
expectedStatus,
- IndexTool.IndexVerifyType verifyType,
IndexTool.IndexDisableLoggingType disableLoggingType,
- String... additionalArgs) throws Exception {
- IndexTool indexingTool = new IndexTool();
- indexingTool.setConf(conf);
- final String[] cmdArgs = getArgValues(useSnapshot, schemaName,
dataTableName, indexTableName,
- tenantId, verifyType, disableLoggingType);
- List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
- cmdArgList.addAll(Arrays.asList(additionalArgs));
- LOGGER.info("Running IndexTool with {}",
Arrays.toString(cmdArgList.toArray()),
- new Exception("Stack Trace"));
- int status = indexingTool.run(cmdArgList.toArray(new
String[cmdArgList.size()]));
- assertEquals(expectedStatus, status);
- return indexingTool;
- }
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/UpdateTable2IT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/UpdateTable2IT.java
index f40b909..7c2ce5d 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/UpdateTable2IT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/UpdateTable2IT.java
@@ -22,11 +22,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.phoenix.ddb.rest.RESTServer;
import org.apache.phoenix.ddb.utils.IndexBuildingActivator;
-import org.apache.phoenix.ddb.utils.PhoenixUtils;
import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
-import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ServerUtil;
@@ -181,11 +179,7 @@ public class UpdateTable2IT {
// background thread activates index to BUILDING state
IndexBuildingActivator.activateIndexesForBuilding(connection, 0);
// run MR tool to build index and set state to ACTIVE
- Configuration conf = new Configuration(utility.getConfiguration());
- TestUtils.runIndexTool(conf, false, "DDB",
- PhoenixUtils.getEscapedArgument(tableName),
- PhoenixUtils.getEscapedArgument(tableName + "_" +
indexName),
- null, 0, IndexTool.IndexVerifyType.NONE,
IndexTool.IndexDisableLoggingType.NONE);
+ IndexBuildingActivator.runIndexTool(connection, 0);
}
// make sure Index is active
diff --git a/phoenix-ddb-utils/pom.xml b/phoenix-ddb-utils/pom.xml
index 936454e..178e329 100644
--- a/phoenix-ddb-utils/pom.xml
+++ b/phoenix-ddb-utils/pom.xml
@@ -46,6 +46,11 @@
<artifactId>aws-sdk-java</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.phoenix</groupId>
+ <artifactId>phoenix-core-server</artifactId>
+ <version>${phoenix.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/IndexBuildingActivator.java
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/IndexBuildingActivator.java
index f5dce11..ce4403c 100644
---
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/IndexBuildingActivator.java
+++
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/IndexBuildingActivator.java
@@ -1,7 +1,10 @@
package org.apache.phoenix.ddb.utils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -18,12 +21,23 @@ public class IndexBuildingActivator {
private static final String SELECT_CREATE_DISABLE_INDEX = "SELECT DISTINCT
TABLE_SCHEM, TABLE_NAME "
+ "FROM SYSTEM.CATALOG "
- + "WHERE COLUMN_NAME IS NULL "
+ + "WHERE TABLE_SCHEM='DDB'"
+ + "AND COLUMN_NAME IS NULL "
+ "AND COLUMN_FAMILY IS NULL "
+ "AND TABLE_TYPE = 'i'"
+ "AND INDEX_STATE = 'c' "
+ "AND TENANT_ID IS NULL "
- + "AND TO_NUMBER(CURRENT_TIME()) - LAST_DDL_TIMESTAMP > %d";
+ + "AND (TO_NUMBER(CURRENT_TIME()) - LAST_DDL_TIMESTAMP) > %d";
+
+ private static final String SELECT_BUILDING_INDEX = "SELECT DISTINCT
TABLE_SCHEM, TABLE_NAME "
+ + "FROM SYSTEM.CATALOG "
+ + "WHERE TABLE_SCHEM='DDB'"
+ + "AND COLUMN_NAME IS NULL "
+ + "AND COLUMN_FAMILY IS NULL "
+ + "AND TABLE_TYPE = 'i'"
+ + "AND INDEX_STATE = 'b' "
+ + "AND TENANT_ID IS NULL "
+ + "AND (TO_NUMBER(CURRENT_TIME()) - LAST_DDL_TIMESTAMP) > %d";
public static void activateIndexesForBuilding(Connection conn, int
minAgeMs) throws SQLException {
ResultSet rs =
conn.createStatement().executeQuery(String.format(SELECT_CREATE_DISABLE_INDEX,
minAgeMs));
@@ -37,4 +51,27 @@ public class IndexBuildingActivator {
PIndexState.BUILDING,
EnvironmentEdgeManager.currentTimeMillis());
}
}
+
+ public static void runIndexTool(Connection conn, int minAgeMs) throws
SQLException {
+ try {
+ ResultSet rs =
conn.createStatement().executeQuery(String.format(SELECT_BUILDING_INDEX,
minAgeMs));
+ while (rs.next()) {
+ String schemaName = rs.getString(1);
+ String indexName = rs.getString(2);
+ String fullIndexName = SchemaUtil.getTableName(schemaName,
indexName);
+ PTable ptable =
conn.unwrap(PhoenixConnection.class).getTable(fullIndexName);
+ String tableName =
PhoenixUtils.getTableNameFromFullName(ptable.getParentName().getString(),
false);
+ LOGGER.info("Found index " + fullIndexName + " to build");
+ Configuration conf =
conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
+ PhoenixUtils.runIndexTool(conf, false, "DDB",
+ PhoenixUtils.getEscapedArgument(tableName),
+ PhoenixUtils.getEscapedArgument(indexName),
+ null, IndexTool.IndexVerifyType.NONE);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error in building indexes", e);
+ throw new SQLException(e);
+ }
+ }
+
}
diff --git
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixUtils.java
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixUtils.java
index cb01d22..d2f52a6 100644
---
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixUtils.java
+++
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixUtils.java
@@ -1,12 +1,15 @@
package org.apache.phoenix.ddb.utils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.util.PhoenixRuntime;
import org.slf4j.Logger;
@@ -17,8 +20,10 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
/**
* Helper methods for Phoenix based functionality.
@@ -167,4 +172,70 @@ public class PhoenixUtils {
.replaceAll("'", "") // remove single quotes
.trim();
}
+
+ /**
+ * Run IndexTool to build indexes.
+ */
+ public static IndexTool runIndexTool(Configuration conf, boolean
useSnapshot, String schemaName,
+ String dataTableName, String
indexTableName, String tenantId,
+ IndexTool.IndexVerifyType verifyType)
throws Exception {
+ IndexTool indexingTool = new IndexTool();
+ indexingTool.setConf(conf);
+ final String[] cmdArgs = getArgValues(useSnapshot, schemaName,
dataTableName, indexTableName,
+ tenantId, verifyType);
+ List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
+ LOGGER.info("Running IndexTool with {}",
Arrays.toString(cmdArgList.toArray()));
+ int status = indexingTool.run(cmdArgList.toArray(new
String[cmdArgList.size()]));
+ LOGGER.info("IndexTool status = {}", status);
+ return indexingTool;
+ }
+
+ private static String[] getArgValues(boolean useSnapshot, String
schemaName, String dataTable, String indexTable, String tenantId,
IndexTool.IndexVerifyType verifyType) {
+ List<String> args = getArgList(useSnapshot, schemaName, dataTable,
indexTable, tenantId, verifyType, (Long)null, (Long)null, (Long)null, false);
+ return (String[])args.toArray(new String[0]);
+ }
+
+ private static List<String> getArgList(boolean useSnapshot, String
schemaName, String dataTable, String indxTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime, Long
incrementalVerify, boolean useIndexTableAsSource) {
+ List<String> args = Lists.newArrayList();
+ if (schemaName != null) {
+ args.add("--schema=" + schemaName);
+ }
+
+ args.add("--data-table=" + dataTable);
+ args.add("--index-table=" + indxTable);
+ args.add("-v");
+ args.add(verifyType.getValue());
+ args.add("-runfg");
+ if (useSnapshot) {
+ args.add("-snap");
+ }
+
+ if (tenantId != null) {
+ args.add("-tenant");
+ args.add(tenantId);
+ }
+
+ if (startTime != null) {
+ args.add("-st");
+ args.add(String.valueOf(startTime));
+ }
+
+ if (endTime != null) {
+ args.add("-et");
+ args.add(String.valueOf(endTime));
+ }
+
+ if (incrementalVerify != null) {
+ args.add("-rv");
+ args.add(String.valueOf(incrementalVerify));
+ }
+
+ if (useIndexTableAsSource) {
+ args.add("-fi");
+ }
+
+ args.add("-op");
+ args.add("/tmp/" + UUID.randomUUID().toString());
+ return args;
+ }
}