This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9466e3f96d8 [improvement](statistics)Log one bdbje record for one load
transaction. #31619
9466e3f96d8 is described below
commit 9466e3f96d886ef9c7793ba395841100c3c1f741
Author: Jibing-Li <[email protected]>
AuthorDate: Sat Mar 2 01:00:37 2024 +0800
[improvement](statistics)Log one bdbje record for one load transaction.
#31619
---
.../org/apache/doris/datasource/ExternalTable.java | 2 +-
.../apache/doris/datasource/InternalCatalog.java | 5 +-
.../org/apache/doris/journal/JournalEntity.java | 12 ++
.../java/org/apache/doris/persist/EditLog.java | 20 ++-
.../apache/doris/statistics/AnalysisManager.java | 62 ++++++++--
.../doris/statistics/NewPartitionLoadedEvent.java | 22 ++--
.../apache/doris/statistics/UpdateRowsEvent.java | 27 ++--
.../doris/transaction/DatabaseTransactionMgr.java | 6 +-
...est_update_rows_and_partition_first_load.groovy | 137 +++++++++++++++++++++
9 files changed, 251 insertions(+), 42 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index 6732abf2a58..7f82d0d3876 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -199,7 +199,7 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
try {
makeSureInitialized();
} catch (Exception e) {
- LOG.warn("Failed to initialize table {}.{}.{}", catalog.name,
dbName, name, e);
+ LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(),
dbName, name, e);
return 0;
}
// All external table should get external row count from cache.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index bc2ffc40efc..95abc9e06c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3116,6 +3116,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
rowsToTruncate += partition.getBaseIndex().getRowCount();
}
} else {
+ rowsToTruncate = olapTable.getRowCount();
for (Partition partition : olapTable.getPartitions()) {
// If need absolutely correct, should check running txn
here.
// But if the txn is in prepare state, cann't known which
partitions had load data.
@@ -3279,12 +3280,14 @@ public class InternalCatalog implements
CatalogIf<Database> {
erasePartitionDropBackendReplicas(oldPartitions);
+ HashMap<Long, Long> updateRecords = new HashMap<>();
+ updateRecords.put(olapTable.getId(), rowsToTruncate);
if (truncateEntireTable) {
// Drop the whole table stats after truncate the entire table
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
} else {
// Update the updated rows in table stats after truncate some
partitions.
-
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(olapTable.getId(),
rowsToTruncate);
+
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords);
}
LOG.info("finished to truncate table {}, partitions: {}",
tblRef.getName().toSql(), tblRef.getPartitionNames());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 2c3e37ed022..8f0d2b54580 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -127,7 +127,9 @@ import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy;
import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.NewPartitionLoadedEvent;
import org.apache.doris.statistics.TableStatsMeta;
+import org.apache.doris.statistics.UpdateRowsEvent;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@@ -935,6 +937,16 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_LOG_UPDATE_ROWS: {
+ data = UpdateRowsEvent.read(in);
+ isRead = true;
+ break;
+ }
+ case OperationType.OP_LOG_NEW_PARTITION_LOADED: {
+ data = NewPartitionLoadedEvent.read(in);
+ isRead = true;
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 88675d5cca3..d21012dd658 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -88,7 +88,9 @@ import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.AnalysisTaskInfo;
+import org.apache.doris.statistics.NewPartitionLoadedEvent;
import org.apache.doris.statistics.TableStatsMeta;
+import org.apache.doris.statistics.UpdateRowsEvent;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@@ -1181,8 +1183,14 @@ public class EditLog {
env.getExternalMetaIdMgr().replayMetaIdMappingsLog((MetaIdMappingsLog)
journal.getData());
break;
}
- case OperationType.OP_LOG_UPDATE_ROWS:
- case OperationType.OP_LOG_NEW_PARTITION_LOADED:
+ case OperationType.OP_LOG_UPDATE_ROWS: {
+
env.getAnalysisManager().replayUpdateRowsRecord((UpdateRowsEvent)
journal.getData());
+ break;
+ }
+ case OperationType.OP_LOG_NEW_PARTITION_LOADED: {
+
env.getAnalysisManager().replayNewPartitionLoadedEvent((NewPartitionLoadedEvent)
journal.getData());
+ break;
+ }
case OperationType.OP_LOG_ALTER_COLUMN_STATS: {
// TODO: implement this while statistics finished related
work.
break;
@@ -2022,6 +2030,14 @@ public class EditLog {
logEdit(OperationType.OP_UPDATE_TABLE_STATS, tableStats);
}
+ public void logUpdateRowsRecord(UpdateRowsEvent record) {
+ logEdit(OperationType.OP_LOG_UPDATE_ROWS, record);
+ }
+
+ public void logNewPartitionLoadedEvent(NewPartitionLoadedEvent event) {
+ logEdit(OperationType.OP_LOG_NEW_PARTITION_LOADED, event);
+ }
+
public void logDeleteTableStats(TableStatsDeletionLog log) {
logEdit(OperationType.OP_DELETE_TABLE_STATS, log);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index eafcff3e7f2..b265b88f702 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -996,21 +996,31 @@ public class AnalysisManager implements Writable {
}
// Invoke this when load transaction finished.
- public void updateUpdatedRows(long tblId, long rows) {
- TableStatsMeta statsStatus = idToTblStats.get(tblId);
- if (statsStatus != null) {
- statsStatus.updatedRows.addAndGet(rows);
- logCreateTableStats(statsStatus);
+ public void updateUpdatedRows(Map<Long, Long> records) {
+ if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() ||
records == null || records.isEmpty()) {
+ return;
+ }
+ for (Entry<Long, Long> record : records.entrySet()) {
+ TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
+ if (statsStatus != null) {
+ statsStatus.updatedRows.addAndGet(record.getValue());
+ }
}
+ logUpdateRowsRecord(new UpdateRowsEvent(records));
}
// Set to true means new partition loaded data
- public void setNewPartitionLoaded(long tblId) {
- TableStatsMeta statsStatus = idToTblStats.get(tblId);
- if (statsStatus != null && Env.getCurrentEnv().isMaster() &&
!Env.isCheckpointThread()) {
- statsStatus.newPartitionLoaded.set(true);
- logCreateTableStats(statsStatus);
+ public void setNewPartitionLoaded(List<Long> tableIds) {
+ if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() ||
tableIds == null || tableIds.isEmpty()) {
+ return;
+ }
+ for (long tableId : tableIds) {
+ TableStatsMeta statsStatus = idToTblStats.get(tableId);
+ if (statsStatus != null) {
+ statsStatus.newPartitionLoaded.set(true);
+ }
}
+ logNewPartitionLoadedEvent(new NewPartitionLoadedEvent(tableIds));
}
public void updateTableStatsStatus(TableStatsMeta tableStats) {
@@ -1026,6 +1036,38 @@ public class AnalysisManager implements Writable {
Env.getCurrentEnv().getEditLog().logCreateTableStats(tableStats);
}
+ public void logUpdateRowsRecord(UpdateRowsEvent record) {
+ Env.getCurrentEnv().getEditLog().logUpdateRowsRecord(record);
+ }
+
+ public void logNewPartitionLoadedEvent(NewPartitionLoadedEvent event) {
+ Env.getCurrentEnv().getEditLog().logNewPartitionLoadedEvent(event);
+ }
+
+ public void replayUpdateRowsRecord(UpdateRowsEvent event) {
+ if (event == null || event.getRecords() == null) {
+ return;
+ }
+ for (Entry<Long, Long> record : event.getRecords().entrySet()) {
+ TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
+ if (statsStatus != null) {
+ statsStatus.updatedRows.addAndGet(record.getValue());
+ }
+ }
+ }
+
+ public void replayNewPartitionLoadedEvent(NewPartitionLoadedEvent event) {
+ if (event == null || event.getTableIds() == null) {
+ return;
+ }
+ for (long tableId : event.getTableIds()) {
+ TableStatsMeta statsStatus = idToTblStats.get(tableId);
+ if (statsStatus != null) {
+ statsStatus.newPartitionLoaded.set(true);
+ }
+ }
+ }
+
public void registerSysJob(AnalysisInfo jobInfo, Map<Long,
BaseAnalysisTask> taskInfos) {
recordAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java
index d09cb2df6c4..891eafd2dda 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java
@@ -27,20 +27,25 @@ import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
public class NewPartitionLoadedEvent implements Writable {
- @SerializedName("partitionIdToTableId")
- public final Map<Long, Long> partitionIdToTableId = new HashMap<>();
+ @SerializedName("tableIds")
+ private List<Long> tableIds;
@VisibleForTesting
- public NewPartitionLoadedEvent() {}
+ public NewPartitionLoadedEvent(List<Long> tableIds) {
+ this.tableIds = tableIds;
+ }
// No need to be thread safe, only publish thread will call this.
- public void addPartition(long tableId, long partitionId) {
- partitionIdToTableId.put(tableId, partitionId);
+ public void addTableId(long tableId) {
+ tableIds.add(tableId);
+ }
+
+ public List<Long> getTableIds() {
+ return tableIds;
}
@Override
@@ -51,7 +56,6 @@ public class NewPartitionLoadedEvent implements Writable {
public static NewPartitionLoadedEvent read(DataInput dataInput) throws
IOException {
String json = Text.readString(dataInput);
- NewPartitionLoadedEvent newPartitionLoadedEvent =
GsonUtils.GSON.fromJson(json, NewPartitionLoadedEvent.class);
- return newPartitionLoadedEvent;
+ return GsonUtils.GSON.fromJson(json, NewPartitionLoadedEvent.class);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
index 04f185c8b73..8cce3d29391 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
@@ -21,30 +21,24 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
-import com.google.common.annotations.VisibleForTesting;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
public class UpdateRowsEvent implements Writable {
- @SerializedName("tableIdToUpdateRows")
- public final Map<Long, Long> tableIdToUpdateRows = new HashMap<>();
+ @SerializedName("records")
+ private Map<Long, Long> records;
- @VisibleForTesting
- public UpdateRowsEvent() {}
+ public UpdateRowsEvent(Map<Long, Long> records) {
+ this.records = records;
+ }
- // No need to be thread safe, only publish thread will call this.
- public void addUpdateRows(long tableId, long rows) {
- if (tableIdToUpdateRows.containsKey(tableId)) {
- tableIdToUpdateRows.put(tableId, tableIdToUpdateRows.get(tableId)
+ rows);
- } else {
- tableIdToUpdateRows.put(tableId, rows);
- }
+ public Map<Long, Long> getRecords() {
+ return records;
}
@Override
@@ -53,9 +47,8 @@ public class UpdateRowsEvent implements Writable {
Text.writeString(out, json);
}
- public static UpdateRowsEvent read(DataInput dataInput) throws IOException
{
- String json = Text.readString(dataInput);
- UpdateRowsEvent updateRowsEvent = GsonUtils.GSON.fromJson(json,
UpdateRowsEvent.class);
- return updateRowsEvent;
+ public static UpdateRowsEvent read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, UpdateRowsEvent.class);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 0e5608791e7..af94917f97d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -1912,6 +1912,7 @@ public class DatabaseTransactionMgr {
private boolean updateCatalogAfterVisible(TransactionState
transactionState, Database db) {
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
+ List<Long> newPartitionLoadedTableIds = new ArrayList<>();
for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
long tableId = tableCommitInfo.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
@@ -1973,7 +1974,7 @@ public class DatabaseTransactionMgr {
long versionTime = partitionCommitInfo.getVersionTime();
if (partition.getVisibleVersion() ==
Partition.PARTITION_INIT_VERSION
&& version > Partition.PARTITION_INIT_VERSION) {
- analysisManager.setNewPartitionLoaded(tableId);
+ newPartitionLoadedTableIds.add(tableId);
}
partition.updateVisibleVersionAndTime(version, versionTime);
if (LOG.isDebugEnabled()) {
@@ -2000,7 +2001,8 @@ public class DatabaseTransactionMgr {
if (LOG.isDebugEnabled()) {
LOG.debug("table id to loaded rows:{}", tableIdToNumDeltaRows);
}
- tableIdToNumDeltaRows.forEach(analysisManager::updateUpdatedRows);
+ analysisManager.setNewPartitionLoaded(newPartitionLoadedTableIds);
+ analysisManager.updateUpdatedRows(tableIdToNumDeltaRows);
return true;
}
diff --git
a/regression-test/suites/statistics/test_update_rows_and_partition_first_load.groovy
b/regression-test/suites/statistics/test_update_rows_and_partition_first_load.groovy
new file mode 100644
index 00000000000..5bfa58abd62
--- /dev/null
+++
b/regression-test/suites/statistics/test_update_rows_and_partition_first_load.groovy
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_update_rows_and_partition_first_load", "p2") {
+
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String enabled = context.config.otherConfigs.get("enableBrokerLoad")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ sql """DROP DATABASE IF EXISTS
test_update_rows_and_partition_first_load"""
+ sql """CREATE DATABASE test_update_rows_and_partition_first_load"""
+ sql """use test_update_rows_and_partition_first_load"""
+ sql """
+ CREATE TABLE update_rows_test1 (
+ id int NULL,
+ name String NULL
+ )ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """
+ CREATE TABLE update_rows_test2 (
+ id int NULL,
+ name String NULL
+ )ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """
+ CREATE TABLE `partition_test1` (
+ `id` INT NOT NULL,
+ `name` VARCHAR(25) NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`id`)
+ (PARTITION p1 VALUES [("0"), ("100")),
+ PARTITION p2 VALUES [("100"), ("200")),
+ PARTITION p3 VALUES [("200"), ("300")))
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1");
+ """
+
+ sql """
+ CREATE TABLE `partition_test2` (
+ `id` INT NOT NULL,
+ `name` VARCHAR(25) NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`id`)
+ (PARTITION p1 VALUES [("0"), ("100")),
+ PARTITION p2 VALUES [("100"), ("200")),
+ PARTITION p3 VALUES [("200"), ("300")))
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1");
+ """
+
+ sql """analyze table update_rows_test1 with sync"""
+ sql """analyze table update_rows_test2 with sync"""
+ sql """analyze table partition_test1 with sync"""
+ sql """analyze table partition_test2 with sync"""
+
+ def label = "part_" + UUID.randomUUID().toString().replace("-", "0")
+ sql """
+ LOAD LABEL ${label} (
+ DATA
INFILE("s3://doris-build-1308700295/regression/load/data/update_rows_1.csv")
+ INTO TABLE update_rows_test1
+ COLUMNS TERMINATED BY ",",
+ DATA
INFILE("s3://doris-build-1308700295/regression/load/data/update_rows_2.csv")
+ INTO TABLE update_rows_test2
+ COLUMNS TERMINATED BY ",",
+ DATA
INFILE("s3://doris-build-1308700295/regression/load/data/update_rows_1.csv")
+ INTO TABLE partition_test1
+ COLUMNS TERMINATED BY ",",
+ DATA
INFILE("s3://doris-build-1308700295/regression/load/data/update_rows_2.csv")
+ INTO TABLE partition_test2
+ COLUMNS TERMINATED BY ","
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "$ak",
+ "AWS_SECRET_KEY" = "$sk",
+ "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
+ "AWS_REGION" = "ap-beijing"
+ );
+ """
+
+ boolean finished = false;
+ for(int i = 0; i < 120; i++) {
+ def result = sql """show load where label = "$label" """
+ if (result[0][2] == "FINISHED") {
+ finished = true;
+ break;
+ }
+ logger.info("Load not finished, wait one second.")
+ Thread.sleep(1000)
+ }
+ if (finished) {
+ def result = sql """show table stats update_rows_test1"""
+ assertEquals("5", result[0][0])
+ result = sql """show table stats update_rows_test2"""
+ assertEquals("6", result[0][0])
+ result = sql """show table stats partition_test1"""
+ assertEquals("5", result[0][0])
+ assertEquals("true", result[0][6])
+ result = sql """show table stats partition_test2"""
+ assertEquals("true", result[0][6])
+ assertEquals("6", result[0][0])
+ }
+ sql """DROP DATABASE IF EXISTS
test_update_rows_and_partition_first_load"""
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]