This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new f25b7fb4ebb [fix](binlog) Add is temp for UpsertRecord (#35774)
f25b7fb4ebb is described below
commit f25b7fb4ebb65958ee3a5ef27660ae3634bf3a29
Author: walter <[email protected]>
AuthorDate: Mon Jun 3 12:41:29 2024 +0800
[fix](binlog) Add is temp for UpsertRecord (#35774)
Cherry-pick #35636.
The ccr-syncer does not support syncing temporary partitions, so this PR
adds a field to record whether this upsert record comes from a temporary
partition.
---
.../src/main/java/org/apache/doris/binlog/UpsertRecord.java | 4 ++++
.../src/main/java/org/apache/doris/catalog/OlapTable.java | 4 ++++
.../main/java/org/apache/doris/catalog/TempPartitions.java | 4 ++++
.../apache/doris/transaction/DatabaseTransactionMgr.java | 6 ++++--
.../org/apache/doris/transaction/PartitionCommitInfo.java | 13 +++++++++++--
5 files changed, 27 insertions(+), 4 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
index f42c7031cf0..cdfe8550d4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
@@ -42,6 +42,9 @@ public class UpsertRecord {
@SerializedName(value = "version")
public long version;
+
+ @SerializedName(value = "isTempPartition")
+ public boolean isTemp;
}
@SerializedName(value = "partitionRecords")
@@ -60,6 +63,7 @@ public class UpsertRecord {
partitionRecord.partitionId = partitionCommitInfo.getPartitionId();
partitionRecord.range = partitionCommitInfo.getPartitionRange();
partitionRecord.version = partitionCommitInfo.getVersion();
+ partitionRecord.isTemp = partitionCommitInfo.isTempPartition();
partitionRecords.add(partitionRecord);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 0d3477b3a26..d901d2340fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -246,6 +246,10 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
return getOrCreatTableProperty().isBeingSynced();
}
+ public boolean isTemporaryPartition(long partitionId) {
+ return tempPartitions.hasPartition(partitionId);
+ }
+
public void setTableProperty(TableProperty tableProperty) {
this.tableProperty = tableProperty;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
index 9cd2d61bf91..b3f93661396 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
@@ -104,6 +104,10 @@ public class TempPartitions implements Writable,
GsonPostProcessable {
return nameToPartition.containsKey(partName);
}
+ public boolean hasPartition(long partitionId) {
+ return idToPartition.containsKey(partitionId);
+ }
+
public boolean isEmpty() {
return idToPartition.isEmpty();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 43e2ca038bc..3f82461ee48 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -1363,7 +1363,8 @@ public class DatabaseTransactionMgr {
|| tblPartitionInfo.getType() == PartitionType.LIST) {
partitionRange =
tblPartitionInfo.getItem(partitionId).getItems().toString();
}
- PartitionCommitInfo partitionCommitInfo = new
PartitionCommitInfo(partitionId, partitionRange, -1, -1);
+ PartitionCommitInfo partitionCommitInfo = new
PartitionCommitInfo(partitionId, partitionRange, -1, -1,
+ table.isTemporaryPartition(partitionId));
tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
@@ -1406,7 +1407,8 @@ public class DatabaseTransactionMgr {
}
PartitionCommitInfo partitionCommitInfo = new
PartitionCommitInfo(partitionId, partitionRange,
partition.getNextVersion(),
- System.currentTimeMillis() /* use as partition visible
time */);
+ System.currentTimeMillis() /* use as partition visible
time */,
+ table.isTemporaryPartition(partitionId));
tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
index 3f35c1d2954..e372a1fb2ce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
@@ -37,17 +37,21 @@ public class PartitionCommitInfo implements Writable {
private long version;
@SerializedName(value = "versionTime")
private long versionTime;
+ @SerializedName(value = "isTempPartition")
+ private boolean isTempPartition;
public PartitionCommitInfo() {
}
- public PartitionCommitInfo(long partitionId, String partitionRange, long
version, long visibleTime) {
+ public PartitionCommitInfo(long partitionId, String partitionRange, long
version, long visibleTime,
+ boolean isTempPartition) {
super();
this.partitionId = partitionId;
this.range = partitionRange;
this.version = version;
this.versionTime = visibleTime;
+ this.isTempPartition = isTempPartition;
}
@Override
@@ -85,12 +89,17 @@ public class PartitionCommitInfo implements Writable {
this.versionTime = versionTime;
}
+ public boolean isTempPartition() {
+ return this.isTempPartition;
+ }
+
@Override
public String toString() {
- StringBuilder sb = new StringBuilder("partitionid=");
+ StringBuilder sb = new StringBuilder("partitionId=");
sb.append(partitionId);
sb.append(", version=").append(version);
sb.append(", versionTime=").append(versionTime);
+ sb.append(", isTemp=").append(isTempPartition);
return sb.toString();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]