This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch tmp-ec
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/tmp-ec by this push:
new bcca68c9ba addendum
bcca68c9ba is described below
commit bcca68c9ba482e6a21b352245771968cd5628ab7
Author: Viraj Jasani <[email protected]>
AuthorDate: Thu Mar 12 22:53:54 2026 -0700
addendum
---
phoenix-core-client/src/main/antlr3/PhoenixSQL.g | 3 +-
.../java/org/apache/phoenix/schema/PTable.java | 5 +
.../org/apache/phoenix/util/CDCChangeBuilder.java | 6 +
.../src/main/protobuf/IndexMutations.proto | 8 +
.../coprocessor/CDCGlobalIndexRegionScanner.java | 137 ++++++++-
.../phoenix/hbase/index/IndexCDCConsumer.java | 316 ++++++++++++++++++---
.../phoenix/hbase/index/IndexRegionObserver.java | 211 +++++++++-----
.../java/org/apache/phoenix/end2end/Bson5IT.java | 18 ++
...ncurrentMutationsCoveredEventualGenerateIT.java | 71 +++++
.../ConcurrentMutationsExtendedGenerateIT.java | 68 +++++
.../end2end/ConcurrentMutationsExtendedIT.java | 4 +
...urrentMutationsUncoveredEventualGenerateIT.java | 71 +++++
...xToolForNonTxGlobalIndexEventualGenerateIT.java | 93 ++++++
.../IndexToolForNonTxGlobalIndexEventualIT.java | 91 ++++++
.../end2end/IndexToolForNonTxGlobalIndexIT.java | 29 +-
.../phoenix/end2end/VarBinaryEncoded2IT.java | 6 +
.../GlobalIndexCheckerEventualGenerateIT.java | 84 ++++++
.../index/GlobalIndexCheckerEventualIT.java | 82 ++++++
.../end2end/index/GlobalIndexCheckerIT.java | 43 ++-
.../java/org/apache/phoenix/query/BaseTest.java | 2 +-
20 files changed, 1228 insertions(+), 120 deletions(-)
diff --git a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
index 6eaefd5813..bb7d3d8a89 100644
--- a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
@@ -86,6 +86,7 @@ tokens
POST='post';
CHANGE='change';
IDX_MUTATIONS='idx_mutations';
+ DATA_ROW_STATE='data_row_state';
LATEST='latest';
ALL='all';
INDEX='index';
@@ -608,7 +609,7 @@ cdc_change_scopes returns [Set<CDCChangeScope> ret]
;
cdc_change_scope returns [CDCChangeScope ret]
- : v=(PRE | POST | CHANGE | IDX_MUTATIONS)
+ : v=(PRE | POST | CHANGE | IDX_MUTATIONS | DATA_ROW_STATE)
{
ret = CDCChangeScope.valueOf(v.getText().toUpperCase());
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
index bd314e00c9..effd7773ec 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -1162,5 +1162,10 @@ public interface PTable extends PMetaDataEntity {
* Include index mutations for eventually consistent indexes.
*/
IDX_MUTATIONS,
+
+ /**
+ * Include raw before/after data row states as serialized Puts for index
mutation generation.
+ */
+ DATA_ROW_STATE,
}
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
index 34408cfbf1..0229ac8943 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
@@ -37,6 +37,7 @@ public class CDCChangeBuilder {
private final boolean isPreImageInScope;
private final boolean isPostImageInScope;
private final boolean isIdxMutationsInScope;
+ private final boolean isDataRowStateInScope;
private final CDCTableInfo cdcDataTableInfo;
private String changeType;
private long lastDeletedTimestamp;
@@ -51,6 +52,7 @@ public class CDCChangeBuilder {
isPreImageInScope = changeScopes.contains(PTable.CDCChangeScope.PRE);
isPostImageInScope = changeScopes.contains(PTable.CDCChangeScope.POST);
isIdxMutationsInScope =
changeScopes.contains(PTable.CDCChangeScope.IDX_MUTATIONS);
+ isDataRowStateInScope =
changeScopes.contains(PTable.CDCChangeScope.DATA_ROW_STATE);
}
public void initChange(long ts) {
@@ -164,4 +166,8 @@ public class CDCChangeBuilder {
return isIdxMutationsInScope;
}
+ public boolean isDataRowStateInScope() {
+ return isDataRowStateInScope;
+ }
+
}
diff --git a/phoenix-core-client/src/main/protobuf/IndexMutations.proto
b/phoenix-core-client/src/main/protobuf/IndexMutations.proto
index eb5dc2bbce..cba4360dd7 100644
--- a/phoenix-core-client/src/main/protobuf/IndexMutations.proto
+++ b/phoenix-core-client/src/main/protobuf/IndexMutations.proto
@@ -29,3 +29,11 @@ message IndexMutations {
repeated bytes tables = 1;
repeated bytes mutations = 2;
}
+
+// Raw data row states for generating index mutations.
+// Contains the data row key and serialized MutationProto for the before and
after states.
+message DataRowStates {
+ optional bytes dataRowKey = 1;
+ optional bytes currentDataRowState = 2;
+ optional bytes nextDataRowState = 3;
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index c096c443ac..1b6217d876 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -21,19 +21,24 @@ import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverCons
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -62,6 +67,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
/**
* CDC (Change Data Capture) enabled region scanner for global indexes that
processes uncovered CDC
* index queries by reconstructing CDC events from index and data table rows.
@@ -90,6 +98,7 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
private static final Logger LOGGER =
LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class);
private CDCTableInfo cdcDataTableInfo;
private CDCChangeBuilder changeBuilder;
+ private static final byte[] SEPARATOR = { 0 };
private static final byte[] EMPTY_IDX_MUTATIONS =
PVarchar.INSTANCE.toBytes(Base64.getEncoder()
.encodeToString(IndexMutationsProtos.IndexMutations.getDefaultInstance().toByteArray()));
@@ -110,8 +119,9 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
@Override
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws
IOException {
if (
- changeBuilder.isIdxMutationsInScope() &&
!changeBuilder.isChangeImageInScope()
- && !changeBuilder.isPreImageInScope() &&
!changeBuilder.isPostImageInScope()
+ changeBuilder.isIdxMutationsInScope() &&
!changeBuilder.isDataRowStateInScope()
+ && !changeBuilder.isChangeImageInScope() &&
!changeBuilder.isPreImageInScope()
+ && !changeBuilder.isPostImageInScope()
) {
return null;
}
@@ -125,7 +135,11 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
// scan.getStopRow(), 0, SortOrder.getDefault());
// }
- return CDCUtil.setupScanForCDC(prepareDataTableScan(dataRowKeys, true));
+ Scan dataScan = prepareDataTableScan(dataRowKeys, true);
+ if (dataScan == null) {
+ return null;
+ }
+ return CDCUtil.setupScanForCDC(dataScan);
}
protected boolean getNextCoveredIndexRow(List<Cell> result) throws
IOException {
@@ -133,6 +147,9 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
List<Cell> indexRow = indexRowIterator.next();
Cell indexCell = indexRow.get(0);
byte[] indexRowKey =
ImmutableBytesPtr.cloneCellRowIfNecessary(indexCell);
+ if (handleDataRowStateCDCEvent(indexRowKey, indexCell, result)) {
+ return true;
+ }
if (handleIdxMutationsCDCEvent(indexRow, indexRowKey, indexCell,
result)) {
return true;
}
@@ -336,6 +353,120 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
return true;
}
+ /**
+ * Handles CDC event when DATA_ROW_STATE scope is enabled. Partitions the
raw data table cells
+ * into before/after HBase Put objects representing the row state before and
after the change,
+ * serializes them into a DataRowStates protobuf, and returns the
Base64-encoded proto as the CDC
+ * result.
+ * @param indexRowKey The CDC index row key.
+ * @param indexCell The index cell.
+ * @param result The result list to populate.
+ * @return true if DATA_ROW_STATE scope is enabled, false otherwise.
+ * @throws IOException if serialization fails.
+ */
+ private boolean handleDataRowStateCDCEvent(byte[] indexRowKey, Cell
indexCell, List<Cell> result)
+ throws IOException {
+ if (!changeBuilder.isDataRowStateInScope()) {
+ return false;
+ }
+ ImmutableBytesPtr dataRowKeyPtr = new
ImmutableBytesPtr(indexToDataRowKeyMap.get(indexRowKey));
+ Result dataRow = dataRows.get(dataRowKeyPtr);
+ long changeTS = indexCell.getTimestamp();
+ byte[] dataRowKey = dataRowKeyPtr.copyBytesIfNecessary();
+ Put currentDataRowState = null;
+ Put nextDataRowState = null;
+ if (dataRow != null && dataRow.rawCells().length > 0) {
+ boolean isFullRowDelete = false;
+ long lastFullRowDeleteTimestamp = 0L;
+ Set<ImmutableBytesPtr> deletedColumns = new HashSet<>();
+ Map<ImmutableBytesPtr, Long> deletedColumnsBeforeChange = new
HashMap<>();
+ Map<ImmutableBytesPtr, Cell> latestBeforeChange = new LinkedHashMap<>();
+ Map<ImmutableBytesPtr, Cell> atChange = new LinkedHashMap<>();
+ for (Cell cell : dataRow.rawCells()) {
+ if (cell.getTimestamp() > changeTS) {
+ continue;
+ }
+ byte[] cf = CellUtil.cloneFamily(cell);
+ byte[] cq = CellUtil.cloneQualifier(cell);
+ ImmutableBytesPtr colKey = new ImmutableBytesPtr(Bytes.add(cf,
SEPARATOR, cq));
+
+ if (cell.getType() == Cell.Type.DeleteFamily) {
+ if (cell.getTimestamp() == changeTS) {
+ isFullRowDelete = true;
+ } else if (cell.getTimestamp() < changeTS &&
lastFullRowDeleteTimestamp == 0L) {
+ lastFullRowDeleteTimestamp = cell.getTimestamp();
+ }
+ } else if (cell.getType() == Cell.Type.DeleteColumn) {
+ if (cell.getTimestamp() == changeTS) {
+ deletedColumns.add(colKey);
+ } else if (
+ cell.getTimestamp() < changeTS && cell.getTimestamp() >
lastFullRowDeleteTimestamp
+ ) {
+ deletedColumnsBeforeChange.putIfAbsent(colKey,
cell.getTimestamp());
+ }
+ } else if (cell.getType() == Cell.Type.Put) {
+ if (cell.getTimestamp() == changeTS) {
+ if (!atChange.containsKey(colKey)) {
+ atChange.put(colKey, cell);
+ }
+ } else if (
+ cell.getTimestamp() < changeTS && cell.getTimestamp() >
lastFullRowDeleteTimestamp
+ ) {
+ Long colDeleteTs = deletedColumnsBeforeChange.get(colKey);
+ if (
+ (colDeleteTs == null || cell.getTimestamp() > colDeleteTs)
+ && !latestBeforeChange.containsKey(colKey)
+ ) {
+ latestBeforeChange.put(colKey, cell);
+ }
+ }
+ }
+ }
+
+ if (!isFullRowDelete && atChange.isEmpty() && deletedColumns.isEmpty()) {
+ // No cells at changeTS means the data table mutation hasn't been
+ // committed yet. Return only the dataRowKey with no states so the
consumer retries.
+ } else {
+ if (!latestBeforeChange.isEmpty()) {
+ currentDataRowState = new Put(dataRowKey);
+ for (Cell cell : latestBeforeChange.values()) {
+ currentDataRowState.add(cell);
+ }
+ }
+ if (!isFullRowDelete) {
+ Put nextState = new Put(dataRowKey);
+ for (Map.Entry<ImmutableBytesPtr, Cell> entry :
latestBeforeChange.entrySet()) {
+ if (!atChange.containsKey(entry.getKey()) &&
!deletedColumns.contains(entry.getKey())) {
+ nextState.add(entry.getValue());
+ }
+ }
+ for (Cell cell : atChange.values()) {
+ nextState.add(cell);
+ }
+ if (!nextState.isEmpty()) {
+ nextDataRowState = nextState;
+ }
+ }
+ }
+ }
+ IndexMutationsProtos.DataRowStates.Builder builder =
+ IndexMutationsProtos.DataRowStates.newBuilder();
+ builder.setDataRowKey(ByteString.copyFrom(dataRowKey));
+ if (currentDataRowState != null) {
+ builder.setCurrentDataRowState(ByteString.copyFrom(
+ ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT,
currentDataRowState)
+ .toByteArray()));
+ }
+ if (nextDataRowState != null) {
+ builder.setNextDataRowState(ByteString.copyFrom(ProtobufUtil
+ .toMutation(ClientProtos.MutationProto.MutationType.PUT,
nextDataRowState).toByteArray()));
+ }
+ String base64String =
Base64.getEncoder().encodeToString(builder.build().toByteArray());
+ byte[] cdcEventBytes = PVarchar.INSTANCE.toBytes(base64String);
+ addResult(indexRowKey, indexCell, result, indexCell, cdcEventBytes);
+ return true;
+ }
+
/**
* Handles CDC event when IDX_MUTATIONS scope is enabled. Returns the index
mutations as a
* serialized IndexMutations, or an empty proto if no mutations are present.
Skips the data table
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index c940765499..5616887c46 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -30,9 +30,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
@@ -41,6 +41,7 @@ import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
@@ -105,12 +106,26 @@ public class IndexCDCConsumer implements Runnable {
private static final long DEFAULT_POLL_INTERVAL_MS = 1000;
/**
- * The time buffer in milliseconds subtracted from current time when
querying CDC mutations. This
- * buffer helps avoid reading mutations that are too recent.
+ * The time buffer in milliseconds subtracted from current time when
querying CDC mutations to
+ * help avoid reading mutations that are too recent.
*/
public static final String INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS =
"phoenix.index.cdc.consumer.timestamp.buffer.ms";
- private static final long DEFAULT_TIMESTAMP_BUFFER_MS = 1000;
+ private static final long DEFAULT_TIMESTAMP_BUFFER_MS = 25000;
+
+ /**
+ * Maximum number of retries when CDC events exist but the corresponding
data table mutations are
+ * not yet visible (or permanently failed). After exceeding this limit, the
consumer advances past
+ * the unprocessable events to avoid blocking indefinitely. This is only
used for index mutation
+ * generation approach (serializeCDCMutations = false).
+ */
+ public static final String INDEX_CDC_CONSUMER_MAX_DATA_VISIBILITY_RETRIES =
+ "phoenix.index.cdc.consumer.max.data.visibility.retries";
+ private static final int DEFAULT_MAX_DATA_VISIBILITY_RETRIES = 15;
+
+ public static final String INDEX_CDC_CONSUMER_RETRY_PAUSE_MS =
+ "phoenix.index.cdc.consumer.retry.pause.ms";
+ private static final long DEFAULT_RETRY_PAUSE_MS = 2000;
private final RegionCoprocessorEnvironment env;
private final String dataTableName;
@@ -121,26 +136,31 @@ public class IndexCDCConsumer implements Runnable {
private final int batchSize;
private final long pollIntervalMs;
private final long timestampBufferMs;
+ private final int maxDataVisibilityRetries;
private final Configuration config;
+ private final boolean serializeCDCMutations;
private volatile boolean stopped = false;
private Thread consumerThread;
private boolean hasParentPartitions = false;
private PTable cachedDataTable;
/**
- * Creates a new IndexCDCConsumer for the given region.
- * @param env region coprocessor environment.
- * @param dataTableName name of the data table.
- * @param serverName server name.
+ * Creates a new IndexCDCConsumer for the given region with configurable
serialization mode.
+ * @param env region coprocessor environment.
+ * @param dataTableName name of the data table.
+ * @param serverName server name.
+ * @param serializeCDCMutations when true, consumes pre-serialized index
mutations; when false,
+ * generates index mutations from data row
states.
* @throws IOException if the IndexWriter cannot be created.
*/
- public IndexCDCConsumer(RegionCoprocessorEnvironment env, String
dataTableName, String serverName)
- throws IOException {
+ public IndexCDCConsumer(RegionCoprocessorEnvironment env, String
dataTableName, String serverName,
+ boolean serializeCDCMutations) throws IOException {
this.env = env;
this.dataTableName = dataTableName;
this.encodedRegionName = env.getRegion().getRegionInfo().getEncodedName();
this.config = env.getConfiguration();
- this.pause = config.getLong(HConstants.HBASE_CLIENT_PAUSE, 300);
+ this.serializeCDCMutations = serializeCDCMutations;
+ this.pause = config.getLong(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS,
DEFAULT_RETRY_PAUSE_MS);
this.startupDelayMs =
config.getLong(INDEX_CDC_CONSUMER_STARTUP_DELAY_MS,
DEFAULT_STARTUP_DELAY_MS);
this.batchSize = config.getInt(INDEX_CDC_CONSUMER_BATCH_SIZE,
DEFAULT_CDC_BATCH_SIZE);
@@ -148,6 +168,8 @@ public class IndexCDCConsumer implements Runnable {
config.getLong(INDEX_CDC_CONSUMER_POLL_INTERVAL_MS,
DEFAULT_POLL_INTERVAL_MS);
this.timestampBufferMs =
config.getLong(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS,
DEFAULT_TIMESTAMP_BUFFER_MS);
+ this.maxDataVisibilityRetries =
config.getInt(INDEX_CDC_CONSUMER_MAX_DATA_VISIBILITY_RETRIES,
+ DEFAULT_MAX_DATA_VISIBILITY_RETRIES);
DelegateRegionCoprocessorEnvironment indexWriterEnv =
new DelegateRegionCoprocessorEnvironment(env,
ConnectionType.INDEX_WRITER_CONNECTION);
this.indexWriter =
@@ -249,8 +271,13 @@ public class IndexCDCConsumer implements Runnable {
while (!stopped) {
try {
long previousTimestamp = lastProcessedTimestamp;
- lastProcessedTimestamp =
- processCDCBatch(encodedRegionName, encodedRegionName,
lastProcessedTimestamp, false);
+ if (serializeCDCMutations) {
+ lastProcessedTimestamp =
+ processCDCBatch(encodedRegionName, encodedRegionName,
lastProcessedTimestamp, false);
+ } else {
+ lastProcessedTimestamp =
processCDCBatchGenerated(encodedRegionName, encodedRegionName,
+ lastProcessedTimestamp, false);
+ }
if (lastProcessedTimestamp == previousTimestamp) {
sleepIfNotStopped(ConnectionUtils.getPauseTime(pause,
++retryCount));
} else {
@@ -605,8 +632,14 @@ public class IndexCDCConsumer implements Runnable {
currentLastProcessedTimestamp = getParentProgress(partitionId);
}
}
- long newTimestamp =
- processCDCBatch(partitionId, ownerPartitionId,
currentLastProcessedTimestamp, true);
+ long newTimestamp;
+ if (serializeCDCMutations) {
+ newTimestamp =
+ processCDCBatch(partitionId, ownerPartitionId,
currentLastProcessedTimestamp, true);
+ } else {
+ newTimestamp = processCDCBatchGenerated(partitionId,
ownerPartitionId,
+ currentLastProcessedTimestamp, true);
+ }
batchCount++;
retryCount = 0;
if (newTimestamp == currentLastProcessedTimestamp) {
@@ -697,18 +730,7 @@ public class IndexCDCConsumer implements Runnable {
dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp);
try (PhoenixConnection conn =
QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class))
{
- PTable dataTable = getDataTable(conn);
- String cdcObjectName = CDCUtil.getCDCObjectName(dataTable, false);
- if (cdcObjectName == null) {
- throw new SQLException("No CDC object found for table " +
dataTableName);
- }
- String schemaName = dataTable.getSchemaName().getString();
- if (schemaName == null || schemaName.isEmpty()) {
- cdcObjectName = "\"" + cdcObjectName + "\"";
- } else {
- cdcObjectName =
- "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" +
cdcObjectName + "\"";
- }
+ String cdcObjectName = getCdcObjectName(conn);
String cdcQuery;
if (isParentReplay) {
cdcQuery = String
@@ -728,15 +750,7 @@ public class IndexCDCConsumer implements Runnable {
int retryCount = 0;
while (hasMoreRows && batchMutations.isEmpty()) {
try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
- ps.setString(1, partitionId);
- ps.setDate(2, new Date(newLastTimestamp));
- if (isParentReplay) {
- ps.setInt(3, batchSize);
- } else {
- long currentTime = EnvironmentEdgeManager.currentTimeMillis() -
timestampBufferMs;
- ps.setDate(3, new Date(currentTime));
- ps.setInt(4, batchSize);
- }
+ setStatementParams(partitionId, isParentReplay, newLastTimestamp,
ps);
Pair<Long, Boolean> result =
getMutationsAndTimestamp(ps, newLastTimestamp, batchMutations);
hasMoreRows = result.getSecond();
@@ -778,6 +792,236 @@ public class IndexCDCConsumer implements Runnable {
}
}
+ private String getCdcObjectName(PhoenixConnection conn) throws SQLException {
+ PTable dataTable = getDataTable(conn);
+ String cdcObjectName = CDCUtil.getCDCObjectName(dataTable, false);
+ if (cdcObjectName == null) {
+ throw new SQLException("No CDC object found for table " + dataTableName);
+ }
+ String schemaName = dataTable.getSchemaName().getString();
+ if (schemaName == null || schemaName.isEmpty()) {
+ cdcObjectName = "\"" + cdcObjectName + "\"";
+ } else {
+ cdcObjectName =
+ "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" +
cdcObjectName + "\"";
+ }
+ return cdcObjectName;
+ }
+
+ /**
+ * Processes a batch of CDC events for the given partition starting from the
specified timestamp
+ * by generating index mutations from data row states. This method queries
the CDC index with the
+ * DATA_ROW_STATE scope, which triggers a server-side data table scan to
reconstruct the
+ * before-image ({@code currentDataRowState}) and after-image ({@code
nextDataRowState}) for each
+ * change.
+ * @param partitionId the partition (region) ID to process CDC
events for.
+ * @param ownerPartitionId the owner partition ID.
+ * @param lastProcessedTimestamp the timestamp to start processing CDC
events from.
+ * @param isParentReplay true if replaying a closed parent partition.
+ * @return the new last processed timestamp after this batch, or the same
timestamp if no new
+ * records were found.
+ * @throws SQLException if a SQL error occurs.
+ * @throws IOException if an I/O error occurs.
+ * @throws InterruptedException if the thread is interrupted while waiting.
+ */
+ private long processCDCBatchGenerated(String partitionId, String
ownerPartitionId,
+ long lastProcessedTimestamp, boolean isParentReplay)
+ throws SQLException, IOException, InterruptedException {
+ LOG.debug(
+ "Processing CDC batch (generated mode) for table {} partition {} owner
{} from timestamp {}",
+ dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp);
+ try (PhoenixConnection conn =
+ QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class))
{
+ String cdcObjectName = getCdcObjectName(conn);
+ String cdcQuery;
+ if (isParentReplay) {
+ cdcQuery = String
+ .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
+ + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() >
? "
+ + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT
?", cdcObjectName);
+ } else {
+ cdcQuery = String
+ .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
+ + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() >
? "
+ + "AND PHOENIX_ROW_TIMESTAMP() < ? "
+ + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT
?", cdcObjectName);
+ }
+
+ List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates = new
ArrayList<>();
+ long newLastTimestamp = lastProcessedTimestamp;
+ long[] lastScannedTimestamp = { lastProcessedTimestamp };
+ boolean hasMoreRows = true;
+ int retryCount = 0;
+ while (hasMoreRows && batchStates.isEmpty()) {
+ try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
+ setStatementParams(partitionId, isParentReplay, newLastTimestamp,
ps);
+ Pair<Long, Boolean> result =
+ getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates,
lastScannedTimestamp);
+ hasMoreRows = result.getSecond();
+ if (hasMoreRows) {
+ if (!batchStates.isEmpty()) {
+ newLastTimestamp = result.getFirst();
+ } else if (retryCount >= maxDataVisibilityRetries) {
+ LOG.warn(
+ "Skipping CDC events for table {} partition {} from timestamp
{}"
+ + " to {} after {} retries — data table mutations may have
failed",
+ dataTableName, partitionId, newLastTimestamp,
lastScannedTimestamp[0], retryCount);
+ newLastTimestamp = lastScannedTimestamp[0];
+ break;
+ } else {
+ // CDC index entries are written but the data is not yet visible.
+ // Don't advance newLastTimestamp so the same events are
re-fetched
+ // once the data becomes visible.
+ sleepIfNotStopped(ConnectionUtils.getPauseTime(pause,
++retryCount));
+ }
+ }
+ }
+ }
+ if (newLastTimestamp > lastProcessedTimestamp) {
+ String sameTimestampQuery = String
+ .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
+ + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() =
? "
+ + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC",
cdcObjectName);
+ final long timestampToRefetch = newLastTimestamp;
+ batchStates.removeIf(pair -> pair.getFirst() == timestampToRefetch);
+ try (PreparedStatement ps = conn.prepareStatement(sameTimestampQuery))
{
+ ps.setString(1, partitionId);
+ ps.setDate(2, new Date(newLastTimestamp));
+ Pair<Long, Boolean> result =
+ getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates,
lastScannedTimestamp);
+ newLastTimestamp = result.getFirst();
+ if (batchStates.isEmpty()) {
+ newLastTimestamp = timestampToRefetch;
+ } else if (newLastTimestamp != timestampToRefetch) {
+ throw new IOException("Unexpected timestamp mismatch: expected " +
timestampToRefetch
+ + " but got " + newLastTimestamp);
+ }
+ }
+ }
+ generateAndApplyIndexMutations(conn, batchStates, partitionId,
ownerPartitionId,
+ newLastTimestamp);
+ if (newLastTimestamp > lastProcessedTimestamp) {
+ updateTrackerProgress(conn, partitionId, ownerPartitionId,
newLastTimestamp,
+ PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
+ }
+ return newLastTimestamp;
+ }
+ }
+
+ private void setStatementParams(String partitionId, boolean isParentReplay,
long newLastTimestamp,
+ PreparedStatement ps) throws SQLException {
+ ps.setString(1, partitionId);
+ ps.setDate(2, new Date(newLastTimestamp));
+ if (isParentReplay) {
+ ps.setInt(3, batchSize);
+ } else {
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis() -
timestampBufferMs;
+ ps.setDate(3, new Date(currentTime));
+ ps.setInt(4, batchSize);
+ }
+ }
+
+ private static Pair<Long, Boolean>
getDataRowStatesAndTimestamp(PreparedStatement ps,
+ long initialLastTimestamp, List<Pair<Long,
IndexMutationsProtos.DataRowStates>> batchStates,
+ long[] lastScannedTimestamp) throws SQLException, IOException {
+ boolean hasRows = false;
+ long lastTimestamp = initialLastTimestamp;
+ lastScannedTimestamp[0] = initialLastTimestamp;
+ try (ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ hasRows = true;
+ long rowTimestamp = rs.getDate(1).getTime();
+ lastScannedTimestamp[0] = rowTimestamp;
+ String cdcValue = rs.getString(2);
+ if (cdcValue != null && !cdcValue.isEmpty()) {
+ byte[] protoBytes = Base64.getDecoder().decode(cdcValue);
+ IndexMutationsProtos.DataRowStates dataRowStates =
+ IndexMutationsProtos.DataRowStates.parseFrom(protoBytes);
+ if (
+ dataRowStates.hasDataRowKey()
+ && (dataRowStates.hasCurrentDataRowState() ||
dataRowStates.hasNextDataRowState())
+ ) {
+ batchStates.add(Pair.newPair(rowTimestamp, dataRowStates));
+ lastTimestamp = rowTimestamp;
+ }
+ }
+ }
+ }
+ return Pair.newPair(lastTimestamp, hasRows);
+ }
+
+ private void generateAndApplyIndexMutations(PhoenixConnection conn,
+ List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates, String
partitionId,
+ String ownerPartitionId, long lastProcessedTimestamp) throws SQLException,
IOException {
+ if (batchStates.isEmpty()) {
+ return;
+ }
+ refreshDataTableCache(conn);
+ PTable dataTable = getDataTable(conn);
+ byte[] encodedRegionNameBytes =
env.getRegion().getRegionInfo().getEncodedNameAsBytes();
+ List<Pair<IndexMaintainer, HTableInterfaceReference>> indexTables = new
ArrayList<>();
+ for (PTable index : dataTable.getIndexes()) {
+ IndexConsistency consistency = index.getIndexConsistency();
+ if (consistency != null && consistency.isAsynchronous()) {
+ IndexMaintainer maintainer = index.getIndexMaintainer(dataTable, conn);
+ HTableInterfaceReference tableRef =
+ new HTableInterfaceReference(new
ImmutableBytesPtr(maintainer.getIndexTableName()));
+ indexTables.add(new Pair<>(maintainer, tableRef));
+ }
+ }
+ if (indexTables.isEmpty()) {
+ return;
+ }
+ ListMultimap<HTableInterfaceReference, Mutation> indexUpdates =
ArrayListMultimap.create();
+ int totalMutations = 0;
+ for (Pair<Long, IndexMutationsProtos.DataRowStates> entry : batchStates) {
+ long ts = entry.getFirst();
+ IndexMutationsProtos.DataRowStates dataRowStates = entry.getSecond();
+ byte[] dataRowKey = dataRowStates.getDataRowKey().toByteArray();
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(dataRowKey);
+
+ Put currentDataRowState = null;
+ if (dataRowStates.hasCurrentDataRowState()) {
+ ClientProtos.MutationProto currentProto = ClientProtos.MutationProto
+ .parseFrom(dataRowStates.getCurrentDataRowState().toByteArray());
+ Mutation currentMutation = ProtobufUtil.toMutation(currentProto);
+ if (currentMutation instanceof Put) {
+ currentDataRowState = (Put) currentMutation;
+ }
+ }
+ Put nextDataRowState = null;
+ if (dataRowStates.hasNextDataRowState()) {
+ ClientProtos.MutationProto nextProto =
+
ClientProtos.MutationProto.parseFrom(dataRowStates.getNextDataRowState().toByteArray());
+ Mutation nextMutation = ProtobufUtil.toMutation(nextProto);
+ if (nextMutation instanceof Put) {
+ nextDataRowState = (Put) nextMutation;
+ }
+ }
+ if (currentDataRowState == null && nextDataRowState == null) {
+ continue;
+ }
+ IndexRegionObserver.generateIndexMutationsForRow(rowKeyPtr,
currentDataRowState,
+ nextDataRowState, ts, encodedRegionNameBytes,
QueryConstants.VERIFIED_BYTES, indexTables,
+ indexUpdates);
+ if (indexUpdates.size() >= batchSize) {
+ indexWriter.write(indexUpdates, false,
MetaDataProtocol.PHOENIX_VERSION);
+ totalMutations += indexUpdates.size();
+ indexUpdates.clear();
+ }
+ }
+ if (!indexUpdates.isEmpty()) {
+ indexWriter.write(indexUpdates, false, MetaDataProtocol.PHOENIX_VERSION);
+ totalMutations += indexUpdates.size();
+ }
+ if (totalMutations > 0) {
+ LOG.debug(
+ "Applied total {} index mutations for table {} partition {} owner {} "
+ + ", last processed timestamp {}",
+ totalMutations, dataTableName, partitionId, ownerPartitionId,
lastProcessedTimestamp);
+ }
+ }
+
private void executeIndexMutations(String partitionId,
List<Pair<Long, IndexMutationsProtos.IndexMutations>> batchMutations,
String ownerPartitionId,
long lastProcessedTimestamp) throws SQLException, IOException {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index a3934f6b0f..48c823d258 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -178,6 +178,56 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
public static final String PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED =
"phoenix.index.cdc.mutations.compress.enabled";
public static final boolean
DEFAULT_PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED = false;
+ /**
+ * Controls which approach is used for implementing eventually consistent
global secondary indexes
+ * via the {@link IndexCDCConsumer}.
+ * <p>
+ * <b>Approach 1: Serialized mutations (default, value = true)</b>
+ * </p>
+ * <p>
+ * During {@code preBatchMutate}, {@link IndexRegionObserver} generates
index mutations for each
+ * data table mutation and serializes them into a Protobuf {@code
IndexMutations} message. This
+ * serialized payload is written as a column value in the CDC index table
row alongside the CDC
+ * event. The {@link IndexCDCConsumer} later reads these pre-computed
mutations from the CDC
+ * index, deserializes them, and applies them directly to the index
table(s). In this approach,
+ * the consumer does not need to understand index structure or re-derive
mutations — it simply
+ * replays what was already computed on the write path. The trade-off is
increased CDC index row
+ * size due to the serialized mutation payload.
+ * </p>
+ * <p>
+ * <b>Approach 2: Generated mutations from data row states (value =
false)</b>
+ * </p>
+ * <p>
+ * During {@code preBatchMutate}, {@link IndexRegionObserver} writes only a
lightweight CDC index
+ * entry without serialized index mutations. Instead, the CDC event is
created with the
+ * {@code DATA_ROW_STATE} scope. When the {@link IndexCDCConsumer} processes
these events, it
+ * reads the CDC index rows which trigger a server-side scan of the data
table (via
+ * {@code CDCGlobalIndexRegionScanner}) to reconstruct the before-image
+ * ({@code currentDataRowState}) and after-image ({@code nextDataRowState})
of the data row at the
+ * change timestamp. These raw row states are returned as a Protobuf {@code
DataRowStates}
+ * message. The consumer then feeds these states into {@code
generateIndexMutationsForRow()} — the
+ * same core utility used by {@link
IndexRegionObserver#prepareIndexMutations} on the write path —
+ * to derive index mutations at consume time. This approach keeps CDC index
rows small and
+ * generates mutations based on the current index definition, but requires
an additional data
+ * table read per CDC event and is sensitive to data visibility timing. Make
sure max lookback age
+ * is long enough to retain before and after images of the row.
+ * </p>
+ * <p>
+ * <b>When to use which approach:</b>
+ * </p>
+ * <ul>
+ * <li>Use <b>Approach 1</b> (serialize = true) when scanning each data
table row at consume time
+ * could be an IO bottleneck, and slightly higher write-path latency due to
index mutation
+ * serialization is acceptable.</li>
+ * <li>Use <b>Approach 2</b> (serialize = false) when uniform and
predictable write latency is a
+ * strict requirement regardless of the number and type (covered or
uncovered) of the eventually
+ * consistent global secondary indexes, and the additional data table
point-lookup with raw scan
+ * per CDC event at consume time is not a big IO concern.</li>
+ * </ul>
+ */
+ public static final String PHOENIX_INDEX_CDC_MUTATION_SERIALIZE =
+ "phoenix.index.cdc.mutation.serialize";
+ public static final boolean DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE =
true;
/**
* Class to represent pending data table rows
@@ -433,6 +483,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL;
private boolean indexCDCConsumerEnabled =
DEFAULT_PHOENIX_INDEX_CDC_CONSUMER_ENABLED;
private boolean compressCDCMutations =
DEFAULT_PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED;
+ private boolean serializeCDCMutations =
DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
private boolean isNamespaceEnabled = false;
private boolean useBloomFilter = false;
private long lastTimestamp = 0;
@@ -494,6 +545,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
this.compressCDCMutations =
env.getConfiguration().getBoolean(PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED,
DEFAULT_PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED);
+ this.serializeCDCMutations = env.getConfiguration().getBoolean(
+ PHOENIX_INDEX_CDC_MUTATION_SERIALIZE,
DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE);
this.isNamespaceEnabled =
SchemaUtil.isNamespaceMappingEnabled(PTableType.INDEX,
env.getConfiguration());
TableDescriptor tableDescriptor = env.getRegion().getTableDescriptor();
@@ -504,7 +557,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
this.indexCDCConsumerEnabled &&
!this.dataTableName.startsWith("SYSTEM.")
&& !this.dataTableName.startsWith("SYSTEM:")
) {
- this.indexCDCConsumer = new IndexCDCConsumer(env, this.dataTableName,
serverName);
+ this.indexCDCConsumer =
+ new IndexCDCConsumer(env, this.dataTableName, serverName,
this.serializeCDCMutations);
this.indexCDCConsumer.start();
}
} catch (NoSuchMethodError ex) {
@@ -1208,6 +1262,76 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
}
+ public static void generateIndexMutationsForRow(ImmutableBytesPtr rowKeyPtr,
+ Put currentDataRowState, Put nextDataRowState, long ts, byte[]
encodedRegionName,
+ byte[] emptyColumnValue, List<Pair<IndexMaintainer,
HTableInterfaceReference>> indexTables,
+ ListMultimap<HTableInterfaceReference, Mutation> indexUpdates) throws
IOException {
+ for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables) {
+ IndexMaintainer indexMaintainer = pair.getFirst();
+ HTableInterfaceReference hTableInterfaceReference = pair.getSecond();
+ if (
+ nextDataRowState != null &&
indexMaintainer.shouldPrepareIndexMutations(nextDataRowState)
+ ) {
+ ValueGetter nextDataRowVG = new
IndexUtil.SimpleValueGetter(nextDataRowState);
+ Put indexPut =
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ nextDataRowVG, rowKeyPtr, ts, null, null, false, encodedRegionName);
+ if (indexPut == null) {
+ // No covered column. Just prepare an index row with the empty column
+ byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG,
rowKeyPtr, null, null, ts,
+ encodedRegionName);
+ indexPut = new Put(indexRowKey);
+ } else {
+ IndexUtil.removeEmptyColumn(indexPut,
+ indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier());
+ }
+ byte[] finalEmptyColumnValue =
+ indexMaintainer.isUncovered() ? QueryConstants.UNVERIFIED_BYTES :
emptyColumnValue;
+
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier(), ts,
finalEmptyColumnValue);
+ indexUpdates.put(hTableInterfaceReference, indexPut);
+ if (!ignoreWritingDeleteColumnsToIndex) {
+ Delete deleteColumn =
indexMaintainer.buildDeleteColumnMutation(indexPut, ts);
+ if (deleteColumn != null) {
+ indexUpdates.put(hTableInterfaceReference, deleteColumn);
+ }
+ }
+ // Delete the current index row if the new index key is different from
the
+ // current one and the index is not a CDC index
+ if (currentDataRowState != null) {
+ ValueGetter currentDataRowVG = new
IndexUtil.SimpleValueGetter(currentDataRowState);
+ byte[] indexRowKeyForCurrentDataRow =
indexMaintainer.buildRowKey(currentDataRowVG,
+ rowKeyPtr, null, null, ts, encodedRegionName);
+ if (
+ !indexMaintainer.isCDCIndex()
+ && Bytes.compareTo(indexPut.getRow(),
indexRowKeyForCurrentDataRow) != 0
+ ) {
+ Mutation del =
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+ IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+ indexUpdates.put(hTableInterfaceReference, del);
+ }
+ }
+ } else if (
+ currentDataRowState != null
+ && indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)
+ ) {
+ if (indexMaintainer.isCDCIndex()) {
+ // CDC Index needs two a delete marker for referencing the data table
+ // delete mutation with the right index row key, that is, the index
row key
+ // starting with ts
+ Put cdcDataRowState = new Put(currentDataRowState.getRow());
+ cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
+ indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
ByteUtil.EMPTY_BYTE_ARRAY);
+ indexUpdates.put(hTableInterfaceReference,
getDeleteIndexMutation(cdcDataRowState,
+ indexMaintainer, ts, rowKeyPtr, encodedRegionName));
+ } else {
+ indexUpdates.put(hTableInterfaceReference,
getDeleteIndexMutation(currentDataRowState,
+ indexMaintainer, ts, rowKeyPtr, encodedRegionName));
+ }
+ }
+ }
+ }
+
/**
* Generate the index update for a data row from the mutation that are
obtained by merging the
* previous data row state with the pending row mutation.
@@ -1220,6 +1344,12 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (indexMaintainer.isLocalIndex()) {
continue;
}
+ if (
+ !serializeCDCMutations && indexMaintainer.getIndexConsistency() != null
+ && indexMaintainer.getIndexConsistency().isAsynchronous()
+ ) {
+ continue;
+ }
HTableInterfaceReference hTableInterfaceReference =
new HTableInterfaceReference(new
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
indexTables.add(new Pair<>(indexMaintainer, hTableInterfaceReference));
@@ -1232,73 +1362,12 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (currentDataRowState == null && nextDataRowState == null) {
continue;
}
- for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables)
{
- IndexMaintainer indexMaintainer = pair.getFirst();
- HTableInterfaceReference hTableInterfaceReference = pair.getSecond();
- if (
- nextDataRowState != null &&
indexMaintainer.shouldPrepareIndexMutations(nextDataRowState)
- ) {
- ValueGetter nextDataRowVG = new
IndexUtil.SimpleValueGetter(nextDataRowState);
- Put indexPut =
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
- nextDataRowVG, rowKeyPtr, ts, null, null, false,
encodedRegionName);
- if (indexPut == null) {
- // No covered column. Just prepare an index row with the empty
column
- byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG,
rowKeyPtr, null, null,
- ts, encodedRegionName);
- indexPut = new Put(indexRowKey);
- } else {
- IndexUtil.removeEmptyColumn(indexPut,
- indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
- indexMaintainer.getEmptyKeyValueQualifier());
- }
-
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
- indexMaintainer.getEmptyKeyValueQualifier(), ts,
QueryConstants.UNVERIFIED_BYTES);
- context.indexUpdates.put(hTableInterfaceReference,
- new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get()));
- if (!ignoreWritingDeleteColumnsToIndex) {
- Delete deleteColumn =
indexMaintainer.buildDeleteColumnMutation(indexPut, ts);
- if (deleteColumn != null) {
- context.indexUpdates.put(hTableInterfaceReference,
- new Pair<Mutation, byte[]>(deleteColumn, rowKeyPtr.get()));
- }
- }
- // Delete the current index row if the new index key is different
from the
- // current one and the index is not a CDC index
- if (currentDataRowState != null) {
- ValueGetter currentDataRowVG = new
IndexUtil.SimpleValueGetter(currentDataRowState);
- byte[] indexRowKeyForCurrentDataRow =
indexMaintainer.buildRowKey(currentDataRowVG,
- rowKeyPtr, null, null, ts, encodedRegionName);
- if (
- !indexMaintainer.isCDCIndex()
- && Bytes.compareTo(indexPut.getRow(),
indexRowKeyForCurrentDataRow) != 0
- ) {
- Mutation del =
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
- IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
- context.indexUpdates.put(hTableInterfaceReference,
- new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
- }
- }
- } else if (
- currentDataRowState != null
- && indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)
- ) {
- if (indexMaintainer.isCDCIndex()) {
- // CDC Index needs two a delete marker for referencing the data
table
- // delete mutation with the right index row key, that is, the
index row key
- // starting with ts
- Put cdcDataRowState = new Put(currentDataRowState.getRow());
- cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
- indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
- ByteUtil.EMPTY_BYTE_ARRAY);
- context.indexUpdates.put(hTableInterfaceReference,
- new Pair<Mutation,
byte[]>(getDeleteIndexMutation(cdcDataRowState, indexMaintainer,
- ts, rowKeyPtr, encodedRegionName), rowKeyPtr.get()));
- } else {
- context.indexUpdates.put(hTableInterfaceReference,
- new Pair<Mutation,
byte[]>(getDeleteIndexMutation(currentDataRowState,
- indexMaintainer, ts, rowKeyPtr, encodedRegionName),
rowKeyPtr.get()));
- }
- }
+ ListMultimap<HTableInterfaceReference, Mutation> idxUpdates =
ArrayListMultimap.create();
+ generateIndexMutationsForRow(rowKeyPtr, currentDataRowState,
nextDataRowState, ts,
+ encodedRegionName, QueryConstants.UNVERIFIED_BYTES, indexTables,
idxUpdates);
+ for (Map.Entry<HTableInterfaceReference, Mutation> idxUpdate :
idxUpdates.entries()) {
+ context.indexUpdates.put(idxUpdate.getKey(),
+ new Pair<>(idxUpdate.getValue(), rowKeyPtr.get()));
}
}
}
@@ -1326,8 +1395,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>
create();
prepareIndexMutations(context, maintainers, batchTimestamp);
- prepareEventuallyConsistentIndexMutations(context, batchTimestamp,
maintainers,
- compressCDCMutations);
+ if (serializeCDCMutations) {
+ prepareEventuallyConsistentIndexMutations(context, batchTimestamp,
maintainers,
+ compressCDCMutations);
+ }
context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference,
Mutation> create();
int updateCount = 0;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java
index 5ca60cbd9c..7666580b76 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.end2end;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -42,10 +44,13 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
import org.bson.BsonArray;
import org.bson.BsonBinary;
import org.bson.BsonDocument;
@@ -54,10 +59,12 @@ import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.RawBsonDocument;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
/**
* Tests for BSON with expression field key alias.
@@ -67,6 +74,17 @@ public class Bson5IT extends ParallelStatsDisabledIT {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Integer.toString(60 * 60));
+ props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION,
Boolean.toString(false));
+ props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000));
+ props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
private static String getJsonString(String jsonFilePath) throws IOException {
URL fileUrl = Bson5IT.class.getClassLoader().getResource(jsonFilePath);
Preconditions.checkArgument(fileUrl != null, "File path " + jsonFilePath +
" seems invalid");
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
new file mode 100644
index 0000000000..1bf63c7579
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConcurrentMutationsCoveredEventualGenerateIT
+ extends ConcurrentMutationsExtendedIndexIT {
+
+ private static final int MAX_LOOKBACK_AGE = 1000000;
+
+ public ConcurrentMutationsCoveredEventualGenerateIT(boolean uncovered,
boolean eventual) {
+ super(uncovered, eventual);
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(10);
+
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
+
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Integer.toString(MAX_LOOKBACK_AGE));
+ props.put("hbase.rowlock.wait.duration", "100");
+ props.put("phoenix.index.concurrent.wait.duration.ms", "10");
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(2));
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(1));
+ props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500));
+ props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(5000));
+ props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(200));
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
+ props.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
+ props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Parameterized.Parameters(name = "uncovered={0}, eventual={1}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] { { false, true } });
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedGenerateIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedGenerateIT.java
new file mode 100644
index 0000000000..ea87bc7c14
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedGenerateIT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConcurrentMutationsExtendedGenerateIT extends
ConcurrentMutationsExtendedIT {
+
+ public ConcurrentMutationsExtendedGenerateIT(boolean uncovered, boolean
eventual) {
+ super(uncovered, eventual);
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(10);
+
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
+
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Integer.toString(MAX_LOOKBACK_AGE));
+ props.put("hbase.rowlock.wait.duration", "100");
+ props.put("phoenix.index.concurrent.wait.duration.ms", "10");
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(2));
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(1));
+ props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500));
+ props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(5000));
+ props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(200));
+ props.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
+ props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Parameterized.Parameters(name = "uncovered={0}, eventual={1}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] { { false, true }, { true, true } });
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index ecbabb2473..aff8b40b92 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.end2end;
import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable;
import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -95,6 +97,8 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(2));
props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(1));
props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500));
+ props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(5000));
+ props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(200));
props.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
new file mode 100644
index 0000000000..5ecfc1c036
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConcurrentMutationsUncoveredEventualGenerateIT
+ extends ConcurrentMutationsExtendedIndexIT {
+
+ private static final int MAX_LOOKBACK_AGE = 1000000;
+
+ public ConcurrentMutationsUncoveredEventualGenerateIT(boolean uncovered,
boolean eventual) {
+ super(uncovered, eventual);
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(10);
+
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
+
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Integer.toString(MAX_LOOKBACK_AGE));
+ props.put("hbase.rowlock.wait.duration", "100");
+ props.put("phoenix.index.concurrent.wait.duration.ms", "10");
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(2));
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(1));
+ props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500));
+ props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(5000));
+ props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(200));
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
+ props.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
+ props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Parameterized.Parameters(name = "uncovered={0}, eventual={1}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] { { true, true } });
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualGenerateIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualGenerateIT.java
new file mode 100644
index 0000000000..533427174d
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualGenerateIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexToolForNonTxGlobalIndexEventualGenerateIT extends
IndexToolForNonTxGlobalIndexIT {
+
+ public IndexToolForNonTxGlobalIndexEventualGenerateIT(boolean mutable,
boolean singleCell) {
+ super(mutable, singleCell);
+ if (indexDDLOptions.trim().isEmpty()) {
+ indexDDLOptions = " CONSISTENCY=EVENTUAL";
+ } else {
+ indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions;
+ }
+ }
+
+ @Override
+ protected void waitForEventualConsistency() throws Exception {
+ Thread.sleep(18000);
+ }
+
+ @BeforeClass
+ public static synchronized void setup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(13);
+ serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
Long.toString(20));
+
serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+ Long.toString(5));
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+ QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
Long.toString(8));
+
serverProps.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Long.toString(MAX_LOOKBACK_AGE));
+ serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(2));
+ serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(1));
+
serverProps.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+ Long.toString(0));
+ serverProps.put("hbase.regionserver.rpc.retry.interval", Long.toString(0));
+ serverProps.put("hbase.procedure.remote.dispatcher.delay.msec",
Integer.toString(0));
+ serverProps.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS,
Integer.toString(2000));
+ serverProps.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+ serverProps.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE,
Boolean.FALSE.toString());
+ serverProps.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS,
Integer.toString(-1));
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5);
+ clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION,
Boolean.toString(true));
+ clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB,
Long.toString(5));
+ clientProps.put(QueryServices.TRANSACTIONS_ENABLED,
Boolean.TRUE.toString());
+ clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB,
Boolean.TRUE.toString());
+ clientProps.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
+ Boolean.TRUE.toString());
+ destroyDriver();
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+
getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
"1");
+ }
+
+ @Parameterized.Parameters(name = "mutable={0}, singleCellIndex={1}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][] { { true, true }, { true, false }, { false, true }, {
false, false } });
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java
new file mode 100644
index 0000000000..287f95c57a
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexToolForNonTxGlobalIndexEventualIT extends
IndexToolForNonTxGlobalIndexIT {
+
+ public IndexToolForNonTxGlobalIndexEventualIT(boolean mutable, boolean
singleCell) {
+ super(mutable, singleCell);
+ if (indexDDLOptions.trim().isEmpty()) {
+ indexDDLOptions = " CONSISTENCY=EVENTUAL";
+ } else {
+ indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions;
+ }
+ }
+
+ @Override
+ protected void waitForEventualConsistency() throws Exception {
+ Thread.sleep(15000);
+ }
+
+ @BeforeClass
+ public static synchronized void setup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(12);
+ serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
Long.toString(20));
+
serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+ Long.toString(5));
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+ QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
Long.toString(8));
+
serverProps.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Long.toString(MAX_LOOKBACK_AGE));
+ serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(2));
+ serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(1));
+
serverProps.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+ Long.toString(0));
+ serverProps.put("hbase.regionserver.rpc.retry.interval", Long.toString(0));
+ serverProps.put("hbase.procedure.remote.dispatcher.delay.msec",
Integer.toString(0));
+ serverProps.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS,
Integer.toString(2000));
+ serverProps.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+ serverProps.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS,
Integer.toString(-1));
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5);
+ clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION,
Boolean.toString(true));
+ clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB,
Long.toString(5));
+ clientProps.put(QueryServices.TRANSACTIONS_ENABLED,
Boolean.TRUE.toString());
+ clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB,
Boolean.TRUE.toString());
+ clientProps.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
+ Boolean.TRUE.toString());
+ destroyDriver();
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+
getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
"1");
+ }
+
+ @Parameterized.Parameters(name = "mutable={0}, singleCellIndex={1}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][] { { true, true }, { true, false }, { false, true }, {
false, false } });
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index d6061a8380..5bf5c905ce 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -128,11 +128,11 @@ import
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
public class IndexToolForNonTxGlobalIndexIT extends BaseTest {
public static final int MAX_LOOKBACK_AGE = 3600;
- private final String tableDDLOptions;
+ protected final String tableDDLOptions;
private final boolean useSnapshot = false;
- private final boolean mutable;
- private final String indexDDLOptions;
+ protected final boolean mutable;
+ protected String indexDDLOptions;
private boolean singleCell;
@Rule
@@ -194,6 +194,9 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
"1");
}
+ protected void waitForEventualConsistency() throws Exception {
+ }
+
@After
public void cleanup() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -239,15 +242,18 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
assertEquals(NROWS,
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
assertTrue("Index rebuild failed!", indexTool.getJob().isSuccessful());
TestUtil.assertIndexState(conn, indexTableFullName, PIndexState.ACTIVE,
null);
+ waitForEventualConsistency();
long actualRowCount =
IndexScrutiny.scrutinizeIndex(conn, dataTableFullName,
indexTableFullName);
assertEquals(NROWS, actualRowCount);
IndexToolIT.setEveryNthRowWithNull(NROWS, 5, stmt);
conn.commit();
+ waitForEventualConsistency();
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName,
indexTableFullName);
assertEquals(NROWS, actualRowCount);
IndexToolIT.setEveryNthRowWithNull(NROWS, 7, stmt);
conn.commit();
+ waitForEventualConsistency();
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName,
indexTableFullName);
assertEquals(NROWS, actualRowCount);
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName,
indexTableFullName);
@@ -444,6 +450,7 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
}
assertEquals(0, indexTool.getJob().getCounters()
.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ waitForEventualConsistency();
long actualRowCount =
IndexScrutiny.scrutinizeIndex(conn, dataTableFullName,
indexTableFullName);
assertEquals(NROWS, actualRowCount);
@@ -453,6 +460,7 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
IndexToolIT.upsertRow(stmt1, i);
}
conn.commit();
+ waitForEventualConsistency();
indexTool = IndexToolIT.runIndexTool(useSnapshot, schemaName,
dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]);
assertEquals(2 * NROWS,
@@ -491,6 +499,7 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters()
.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ waitForEventualConsistency();
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName,
indexTableFullName);
assertEquals(2 * NROWS, actualRowCount);
}
@@ -553,6 +562,7 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
.findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters()
.findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+ waitForEventualConsistency();
long actualRowCount =
IndexScrutiny.scrutinizeIndex(conn, dataTableFullName,
indexTableFullName);
assertEquals(N_ROWS, actualRowCount);
@@ -579,6 +589,7 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
assertEquals(0, indexTool.getJob().getCounters()
.findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+ waitForEventualConsistency();
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName,
indexTableFullName);
assertEquals(N_ROWS, actualRowCount);
}
@@ -586,6 +597,8 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
@Test
public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
+ Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+ indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String schemaName = generateUniqueName();
@@ -629,6 +642,8 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
@Test
public void testIndexToolVerifyAfterOption() throws Exception {
+ Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+ indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String schemaName = generateUniqueName();
@@ -842,6 +857,8 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
@Test
public void testIndexToolForIncrementalVerify() throws Exception {
+ Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+ indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -977,6 +994,8 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
@Test
public void testIndexToolForIncrementalVerify_viewIndex() throws Exception {
+ Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+ indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
ManualEnvironmentEdge customeEdge = new ManualEnvironmentEdge();
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -1387,6 +1406,8 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
@Test
public void testUpdatablePKFilterViewIndexRebuild() throws Exception {
+ Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+ indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
if (!mutable) {
return;
}
@@ -1456,6 +1477,8 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseTest {
@Test
public void testUpdatableNonPkFilterViewIndexRebuild() throws Exception {
+ Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+ indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
if (!mutable) {
return;
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java
index bddd8d384a..52112ffd0f 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java
@@ -17,6 +17,9 @@
*/
package org.apache.phoenix.end2end;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import java.sql.Connection;
@@ -72,6 +75,9 @@ public class VarBinaryEncoded2IT extends
ParallelStatsDisabledIT {
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(60 * 60));
props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION,
Boolean.toString(false));
+ props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000));
+ props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+ props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualGenerateIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualGenerateIT.java
new file mode 100644
index 0000000000..68ba183f72
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualGenerateIT.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class GlobalIndexCheckerEventualGenerateIT extends GlobalIndexCheckerIT
{
+
+ public GlobalIndexCheckerEventualGenerateIT(boolean async, boolean encoded) {
+ super(async, encoded);
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(10);
+
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
+ props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000));
+ props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+ props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(2));
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(1));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void appendEventualConsistency() {
+ if (indexDDLOptions.trim().isEmpty()) {
+ indexDDLOptions = " CONSISTENCY=EVENTUAL";
+ } else {
+ indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions;
+ }
+ }
+
+ @Override
+ protected void waitForEventualConsistency() throws Exception {
+ Thread.sleep(18000);
+ }
+
+ @Parameterized.Parameters(name = "async={0},encoded={1}")
+ public static synchronized Collection<Object[]> data() {
+ List<Object[]> list = Lists.newArrayListWithExpectedSize(4);
+ boolean[] Booleans = new boolean[] { true, false };
+ for (boolean async : Booleans) {
+ for (boolean encoded : Booleans) {
+ list.add(new Object[] { async, encoded });
+ }
+ }
+ return list;
+ }
+
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java
new file mode 100644
index 0000000000..2fc1daeda5
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class GlobalIndexCheckerEventualIT extends GlobalIndexCheckerIT {
+
+ public GlobalIndexCheckerEventualIT(boolean async, boolean encoded) {
+ super(async, encoded);
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(8);
+
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
+ props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000));
+ props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(2));
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(1));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void appendEventualConsistency() {
+ if (indexDDLOptions.trim().isEmpty()) {
+ indexDDLOptions = " CONSISTENCY=EVENTUAL";
+ } else {
+ indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions;
+ }
+ }
+
+ @Override
+ protected void waitForEventualConsistency() throws Exception {
+ Thread.sleep(15000);
+ }
+
+ @Parameterized.Parameters(name = "async={0},encoded={1}")
+ public static synchronized Collection<Object[]> data() {
+ List<Object[]> list = Lists.newArrayListWithExpectedSize(4);
+ boolean[] Booleans = new boolean[] { true, false };
+ for (boolean async : Booleans) {
+ for (boolean encoded : Booleans) {
+ list.add(new Object[] { async, encoded });
+ }
+ }
+ return list;
+ }
+
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 67ccf665e2..f64149e145 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -85,8 +85,8 @@ import
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
public class GlobalIndexCheckerIT extends BaseTest {
private final boolean async;
- private String indexDDLOptions;
- private String tableDDLOptions;
+ protected String indexDDLOptions;
+ protected String tableDDLOptions;
private StringBuilder optionBuilder;
private StringBuilder indexOptionBuilder;
private final boolean encoded;
@@ -266,6 +266,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
+ "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
// Verify that we will read from the index table
assertExplainPlan(conn, query, dataTableName, indexTableName);
+ waitForEventualConsistency();
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("bc", rs.getString(1));
@@ -300,6 +301,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
"SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName +
" WHERE val1 = 'de'";
// Verify that we will read from the index table
assertExplainPlan(conn, query, dataTableName, indexTableName);
+ waitForEventualConsistency();
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("de", rs.getString(1));
@@ -322,6 +324,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " values ('e', 'ae', 'efg',
'efgh')");
conn.commit();
+ waitForEventualConsistency();
// Write a query to get all the rows in the order of their timestamps
query = "SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " +
dataTableName + " WHERE "
+ "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + initial.toString()
@@ -365,6 +368,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
+ "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
assertExplainPlan(conn, query, dataTableName, indexTableName);
+ waitForEventualConsistency();
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("ab", rs.getString(1));
@@ -565,6 +569,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.commit();
// Now the expected state of the index table is {('ab', 'a', 'abcc' ,
null), ('ab', 'b', null,
// 'bcde')}
+ waitForEventualConsistency();
ResultSet rs = conn.createStatement().executeQuery("SELECT * from " +
indexTableName);
assertTrue(rs.next());
assertEquals("ab", rs.getString(1));
@@ -606,6 +611,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
String dml = "DELETE from " + dataTableName + " WHERE id = 'a'";
assertEquals(1, conn.createStatement().executeUpdate(dml));
conn.commit();
+ waitForEventualConsistency();
// The index rows are actually not deleted yet because
IndexRegionObserver failed delete
// operation. However, they are
@@ -653,6 +659,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
String selectSql = "SELECT * from " + dataTableName + " WHERE val1 =
'ab'";
// Verify that we will read from the index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ waitForEventualConsistency();
ResultSet rs = conn.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
@@ -666,6 +673,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.commit();
// Verify that we will read from the index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ waitForEventualConsistency();
rs = conn.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
@@ -795,6 +803,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
// run the index MR job.
IndexToolIT.runIndexTool(false, null, dataTableName, indexTableName);
}
+ waitForEventualConsistency();
// Configure IndexRegionObserver to fail the last write phase (i.e., the
post index update
// phase) where the verify flag is set
// to true and/or index rows are deleted and check that this does not
impact the correctness
@@ -805,6 +814,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " (id, val1, val2) values
('c', 'cd','cde')");
conn.commit();
+ waitForEventualConsistency();
IndexTool indexTool = IndexToolIT.runIndexTool(false, "", dataTableName,
indexTableName, null,
0, IndexTool.IndexVerifyType.ONLY);
assertEquals(3,
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
@@ -824,8 +834,10 @@ public class GlobalIndexCheckerIT extends BaseTest {
.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters()
.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(2, indexTool.getJob().getCounters()
- .findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+ if (!indexDDLOptions.contains("CONSISTENCY=EVENTUAL")) {
+ assertEquals(2, indexTool.getJob().getCounters()
+ .findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+ }
assertEquals(0, indexTool.getJob().getCounters()
.findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters()
@@ -872,6 +884,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " (id, val1, val2) values
('a', 'ab','abc')");
conn.commit();
+ waitForEventualConsistency();
// At this moment val3 in the data table row has null value
String selectSql = "SELECT val3 from " + dataTableName + " WHERE val1 =
'ab'";
// Verify that we will read from the index table
@@ -919,6 +932,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " values ('a', 'ab','abc',
'abcd')");
conn.commit();
+ waitForEventualConsistency();
// At this moment val3 in the data table row should not have null value
selectSql = "SELECT val3 from " + dataTableName + " WHERE val1 = 'ab'";
// Verify that we will read from the index table
@@ -932,6 +946,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " (id, val1, val3) values
('a', 'ab','abcde')");
commitWithException(conn);
+ waitForEventualConsistency();
// The above upsert will create an unverified index row
// Configure IndexRegionObserver to allow the data write phase
IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
@@ -983,6 +998,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE
val1 = 'cd'";
// Verify that we will read from the first index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName + "1");
+ waitForEventualConsistency();
// Verify the first write is visible but the second one is not
ResultSet rs = conn.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
@@ -1120,6 +1136,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " (id, val3) values ('a',
'abcdd')");
conn.commit();
+ waitForEventualConsistency();
String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE
val1 = 'ab'";
// Verify that we will read from the first index table
assertExplainPlan(conn, selectSql, dataTableName, indexName + "1");
@@ -1326,6 +1343,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " " + "values ('g', 'val1',
'val2g', null)");
conn.commit();
+ waitForEventualConsistency();
// Fail phase 3
IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
String selectSql = "SELECT id from " + dataTableName
@@ -1589,6 +1607,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " (id, val3) values ('a',
null)");
conn.commit();
+ waitForEventualConsistency();
String dql =
String.format("select id, val2 from %s where val1='ab' and
val3='abcd'", dataTableName);
@@ -1613,6 +1632,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " values ('a', 'ac', null,
null)");
conn.commit();
+ waitForEventualConsistency();
dql =
String.format("select id, val2 from %s where val1='ac' and val3 is
null", dataTableName);
@@ -1651,6 +1671,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " (id, val1, val2) values
('c', 'cd', 'cde')");
conn.commit();
+ waitForEventualConsistency();
IndexRegionObserver.setIgnoreWritingDeleteColumnsToIndex(false);
IndexTool it = IndexToolIT.runIndexTool(false, null, dataTableName,
indexName, null, 0,
IndexTool.IndexVerifyType.BEFORE);
@@ -1689,6 +1710,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
String delete = String.format("DELETE FROM %s where id = 'a'",
dataTableName);
conn.createStatement().execute(delete);
conn.commit();
+ waitForEventualConsistency();
// skip phase2, inserts an unverified row in index
IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
String dml = "upsert into " + dataTableName + " (id, val1, val3) values
('a', 'ab', ?)";
@@ -1715,6 +1737,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
conn.createStatement()
.execute("upsert into " + dataTableName + " (id, val1, val3) values
('a', 'ab', null)");
conn.commit();
+ waitForEventualConsistency();
IndexTool it = IndexToolIT.runIndexTool(false, null, dataTableName,
indexName, null, 0,
IndexTool.IndexVerifyType.ONLY);
CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(it);
@@ -1737,6 +1760,8 @@ public class GlobalIndexCheckerIT extends BaseTest {
// No need to run the same test twice one for async = true and the other
for async = false
return;
}
+ Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+ indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
try (Connection conn = DriverManager.getConnection(getUrl())) {
// Create a base table
String dataTableName = generateUniqueName();
@@ -1813,6 +1838,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
+ "val1 = val1 || val1, val2 = val2 || val2";
conn.createStatement().execute(upsertSql);
conn.commit();
+ waitForEventualConsistency();
String selectSql = "SELECT * from " + dataTableName + " WHERE val1 =
'abab'";
assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
ResultSet rs = conn.createStatement().executeQuery(selectSql);
@@ -1849,6 +1875,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
ps.executeUpdate();
}
conn.commit();
+ waitForEventualConsistency();
String distinctQuery = "SELECT DISTINCT val1 FROM " + dataTableName;
try (ResultSet rs = conn.createStatement().executeQuery(distinctQuery)) {
PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
@@ -1876,14 +1903,18 @@ public class GlobalIndexCheckerIT extends BaseTest {
}
}
- static private void verifyTableHealth(Connection conn, String dataTableName,
- String indexTableName) throws Exception {
+ protected void waitForEventualConsistency() throws Exception {
+ }
+
+ protected void verifyTableHealth(Connection conn, String dataTableName,
String indexTableName)
+ throws Exception {
// Add two rows and check everything is still okay
conn.createStatement()
.execute("upsert into " + dataTableName + " values ('a', 'ab', 'abc',
'abcd')");
conn.createStatement()
.execute("upsert into " + dataTableName + " values ('z', 'za', 'zab',
'zabc')");
conn.commit();
+ waitForEventualConsistency();
String selectSql = "SELECT * from " + dataTableName + " WHERE val1 =
'ab'";
/// Verify that we will read from the index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 7b14996fba..e2e3afedef 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -2070,7 +2070,7 @@ public abstract class BaseTest {
* @throws IOException if something went wrong while connecting to Admin
*/
public synchronized static boolean isAnyStoreRefCountLeaked(Admin admin)
throws IOException {
- int retries = 5;
+ int retries = 15;
while (retries > 0) {
boolean isStoreRefCountLeaked = isStoreRefCountLeaked(admin);
if (!isStoreRefCountLeaked) {