This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 9d9e735f0c7 [fix](cache) Follower FE sql cache not invalidated on
table metadata replay (#63612) (#63657)
9d9e735f0c7 is described below
commit 9d9e735f0c703b1db0c95adc035909a5bfc2e0cc
Author: 924060929 <[email protected]>
AuthorDate: Wed May 27 17:31:39 2026 +0800
[fix](cache) Follower FE sql cache not invalidated on table metadata replay
(#63612) (#63657)
Cherry-pick of #63612
---
.../main/java/org/apache/doris/common/Config.java | 12 ++
.../main/java/org/apache/doris/alter/Alter.java | 4 +-
.../main/java/org/apache/doris/catalog/Env.java | 42 +++++-
.../doris/common/cache/NereidsSqlCacheManager.java | 28 ++--
.../org/apache/doris/journal/JournalEntity.java | 6 +
.../org/apache/doris/nereids/SqlCacheContext.java | 8 +-
.../java/org/apache/doris/persist/EditLog.java | 9 ++
.../org/apache/doris/persist/OperationType.java | 5 +
.../org/apache/doris/persist/TableMetaChange.java | 153 +++++++++++++++++++++
9 files changed, 249 insertions(+), 18 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 63d8134757f..6a9694c5469 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2413,6 +2413,18 @@ public class Config extends ConfigBase {
)
public static int sql_cache_manage_num = 100;
+ @ConfField(
+ mutable = true,
+ description = {
+ "是否在 DDL 时写入 OP_TABLE_META_CHANGE edit log 通知 follower FE
清理 sql cache。"
+ + "默认 false,开启后 master DDL 会广播表元数据变更信号到所有
follower",
+ "Whether to write OP_TABLE_META_CHANGE edit log on DDL to
notify follower FEs "
+ + "to invalidate sql cache. Default false. When
enabled, master DDL broadcasts "
+ + "table metadata change signal to all followers"
+ }
+ )
+ public static boolean enable_write_op_table_meta_change = false;
+
@ConfField(
mutable = true,
callbackClassString =
"org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager$UpdateConfig",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 53681831b34..4383fda5df5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -72,7 +72,6 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.cache.NereidsSqlCacheManager;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.PropertyAnalyzer;
@@ -363,8 +362,7 @@ public class Alter {
olapTable.writeLock();
try {
- NereidsSqlCacheManager sqlCacheManager =
Env.getCurrentEnv().getSqlCacheManager();
- sqlCacheManager.invalidateAboutTable(olapTable);
+ Env.getCurrentEnv().notifyTableMetaChange(olapTable);
} finally {
olapTable.writeUnlock();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index bcaee56c377..a9c6277863a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -219,6 +219,7 @@ import org.apache.doris.persist.SetTableStatusOperationLog;
import org.apache.doris.persist.Storage;
import org.apache.doris.persist.StorageInfo;
import org.apache.doris.persist.TableInfo;
+import org.apache.doris.persist.TableMetaChange;
import org.apache.doris.persist.TablePropertyInfo;
import org.apache.doris.persist.TableRenameColumnInfo;
import org.apache.doris.persist.TruncateTableInfo;
@@ -6860,7 +6861,9 @@ public class Env {
LOG.warn("ignore set same state {} for table {}. is replay:
{}.",
olapTable.getState(), tableName, isReplay);
}
-
Env.getCurrentEnv().getSqlCacheManager().invalidateAboutTable(olapTable);
+ if (!isReplay) {
+ notifyTableMetaChange(olapTable);
+ }
} finally {
olapTable.writeUnlock();
}
@@ -6968,7 +6971,9 @@ public class Env {
LOG.info("set replica {} of tablet {} on backend {} as version
{}, last success version {}, "
+ "last failed version {}, update time {}. is replay:
{}", replica.getId(), tabletId,
backendId, version, lastSuccessVersion,
lastFailedVersion, updateTime, isReplay);
-
Env.getCurrentEnv().getSqlCacheManager().invalidateAboutTable(table);
+ if (!isReplay) {
+ notifyTableMetaChange(table);
+ }
} finally {
table.writeUnlock();
}
@@ -7049,7 +7054,9 @@ public class Env {
+ " {}.", partitionId, oldVersion, visibleVersion,
database, table, isReplay);
}
-
Env.getCurrentEnv().getSqlCacheManager().invalidateAboutTable(olapTable);
+ if (!isReplay) {
+ notifyTableMetaChange(olapTable);
+ }
} finally {
olapTable.writeUnlock();
}
@@ -7278,6 +7285,35 @@ public class Env {
return sortedPartitionsCacheManager;
}
+ public void notifyTableMetaChange(TableIf table) {
+ if (table == null) {
+ return;
+ }
+ TableMetaChange change =
+ TableMetaChange.fromTable(table);
+ fanOutTableMetaChange(change);
+ if (isMaster() && editLog != null &&
Config.enable_write_op_table_meta_change) {
+ editLog.logTableMetaChange(change);
+ }
+ }
+
+ public void replayTableMetaChange(TableMetaChange change) {
+ if (change == null) {
+ return;
+ }
+ fanOutTableMetaChange(change);
+ }
+
+ private void fanOutTableMetaChange(TableMetaChange change) {
+ if (sqlCacheManager != null) {
+ sqlCacheManager.invalidateAboutTable(change);
+ }
+ if (sortedPartitionsCacheManager != null) {
+ sortedPartitionsCacheManager.invalidateTable(
+ change.getCatalogName(), change.getDbName(),
change.getTableName());
+ }
+ }
+
public SplitSourceManager getSplitSourceManager() {
return splitSourceManager;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java
index fe8f01b7254..af44c008b2e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java
@@ -59,6 +59,7 @@ import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.persist.TableMetaChange;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.ConnectContext;
@@ -112,29 +113,31 @@ public class NereidsSqlCacheManager {
}
public void invalidateAboutTable(TableIf tableIf) {
- Set<String> invalidateKeys = new LinkedHashSet<>();
+ invalidateAboutTable(TableMetaChange.fromTable(tableIf));
+ }
+
+ public void invalidateAboutTable(TableMetaChange event) {
FullTableName invalidateTableName = null;
- DatabaseIf database = tableIf.getDatabase();
- if (database != null) {
- CatalogIf catalog = database.getCatalog();
- if (catalog != null) {
- invalidateTableName = new FullTableName(
- database.getCatalog().getName(),
database.getFullName(), tableIf.getName()
- );
- }
+ if (event.getCatalogName() != null && event.getDbName() != null &&
event.getTableName() != null) {
+ invalidateTableName = new FullTableName(
+ event.getCatalogName(), event.getDbName(),
event.getTableName());
}
+ Set<String> invalidateKeys = new LinkedHashSet<>();
for (Entry<String, SqlCacheContext> kv : sqlCaches.asMap().entrySet())
{
String key = kv.getKey();
SqlCacheContext context = kv.getValue();
+ if (context == null) {
+ continue;
+ }
for (Entry<FullTableName, TableVersion> nameToVersion :
context.getUsedTables().entrySet()) {
FullTableName tableName = nameToVersion.getKey();
TableVersion tableVersion = nameToVersion.getValue();
- if (tableVersion.id == tableIf.getId()) {
+ if (tableVersion.id == event.getTableId()) {
invalidateKeys.add(key);
break;
}
- if (tableName.equals(invalidateTableName)) {
+ if (invalidateTableName != null &&
tableName.equals(invalidateTableName)) {
invalidateKeys.add(key);
break;
}
@@ -465,6 +468,9 @@ public class NereidsSqlCacheManager {
if (currentTableVersion != cacheTableVersion) {
return IsChanged.CHANGED_AND_INVALIDATE_CACHE;
}
+ if (olapTable.getBaseSchemaVersion() !=
tableVersion.schemaVersion) {
+ return IsChanged.CHANGED_AND_INVALIDATE_CACHE;
+ }
if (tableIf instanceof MTMV) {
// mtmv maybe access old data when grace_period > 0, we
should disable cache at this case
long gracePeriod = ((MTMV) tableIf).getGracePeriod();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index cb6dee000de..69e27b65e78 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -124,6 +124,7 @@ import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
import org.apache.doris.persist.TableBranchOrTagInfo;
import org.apache.doris.persist.TableInfo;
+import org.apache.doris.persist.TableMetaChange;
import org.apache.doris.persist.TablePropertyInfo;
import org.apache.doris.persist.TableRenameColumnInfo;
import org.apache.doris.persist.TableStatsDeletionLog;
@@ -992,6 +993,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_TABLE_META_CHANGE: {
+ data = TableMetaChange.read(in);
+ isRead = true;
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
index e532ce611fe..8828c5b711b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
@@ -203,12 +203,17 @@ public class SqlCacheContext {
LOG.warn("table {}, can not get version", tableIf.getName(), e);
}
+ int schemaVersion = 0;
+ if (tableIf instanceof OlapTable) {
+ schemaVersion = ((OlapTable) tableIf).getBaseSchemaVersion();
+ }
usedTables.put(
new FullTableName(database.getCatalog().getName(),
database.getFullName(), tableIf.getName()),
new TableVersion(
tableIf.getId(),
version,
- tableIf.getType()
+ tableIf.getType(),
+ schemaVersion
)
);
}
@@ -593,6 +598,7 @@ public class SqlCacheContext {
public final long id;
public final long version;
public final TableType type;
+ public final int schemaVersion;
}
/** CacheKeyType */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 95885f92286..0aad4b44ea6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -1418,6 +1418,11 @@ public class EditLog {
// TODO: implement
break;
}
+ case OperationType.OP_TABLE_META_CHANGE: {
+ TableMetaChange op = (TableMetaChange) journal.getData();
+ env.replayTableMetaChange(op);
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}, log id: {}", opCode,
logId, e);
@@ -2504,4 +2509,8 @@ public class EditLog {
public long logBeginSnapshot(SnapshotState snapshotState) {
return logEdit(OperationType.OP_BEGIN_SNAPSHOT, snapshotState);
}
+
+ public void logTableMetaChange(TableMetaChange op) {
+ logEdit(OperationType.OP_TABLE_META_CHANGE, op);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index fc42ce65163..e4f009274f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -425,6 +425,11 @@ public class OperationType {
public static final short OP_BEGIN_SNAPSHOT = 1100;
+ // Generic "an operation modified this table's metadata" signal broadcast
from
+ // master to followers so that every FE-local cache keyed by table can be
+ // invalidated (NereidsSqlCacheManager,
NereidsSortedPartitionsCacheManager, …).
+ public static final short OP_TABLE_META_CHANGE = 1102;
+
/**
* Get opcode name by op code.
**/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/TableMetaChange.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/TableMetaChange.java
new file mode 100644
index 00000000000..cc62417d92f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TableMetaChange.java
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Persist payload for {@link OperationType#OP_TABLE_META_OPERATION}.
+ * Generic "an operation modified this table's metadata" signal that follower
+ * FEs use to invalidate any local FE caches keyed by the table (sql cache,
+ * sorted partition cache, and future per-table caches). This is about
+ * metadata mutations (schema/properties/partitions/etc.), not data writes.
+ * Carries both ids and names of catalog / database / table so each subscriber
+ * can match by id (preferred) or by full name (fallback, e.g. when the table
+ * has been concurrently dropped/recreated and the id no longer matches but
+ * the name still does). Also carries the master-side timestamp so subscribers
+ * and audit tooling can correlate the event with the originating DDL.
+ */
+public class TableMetaChange implements Writable {
+ @SerializedName("ci")
+ private long catalogId;
+ @SerializedName("cn")
+ private String catalogName;
+ @SerializedName("di")
+ private long dbId;
+ @SerializedName("dn")
+ private String dbName;
+ @SerializedName("ti")
+ private long tableId;
+ @SerializedName("tn")
+ private String tableName;
+ // master-side millis-since-epoch when this event was emitted
+ @SerializedName("ts")
+ private long eventTimeMs;
+
+ public TableMetaChange() {
+ // for persist
+ }
+
+ /** Build a TableMetaChange from a TableIf (master-side helper). */
+ public static TableMetaChange fromTable(TableIf table) {
+ long catalogId = -1L;
+ String catalogName = "";
+ long dbId = -1L;
+ String dbName = "";
+ DatabaseIf<?> db = table.getDatabase();
+ if (db != null) {
+ dbId = db.getId();
+ dbName = db.getFullName();
+ CatalogIf<?> catalog = db.getCatalog();
+ if (catalog != null) {
+ catalogId = catalog.getId();
+ catalogName = catalog.getName();
+ }
+ }
+ return new TableMetaChange(catalogId, catalogName, dbId, dbName,
+ table.getId(), table.getName());
+ }
+
+ public TableMetaChange(long catalogId, String catalogName,
+ long dbId, String dbName,
+ long tableId, String tableName) {
+ this(catalogId, catalogName, dbId, dbName, tableId, tableName,
System.currentTimeMillis());
+ }
+
+ public TableMetaChange(long catalogId, String catalogName,
+ long dbId, String dbName,
+ long tableId, String tableName,
+ long eventTimeMs) {
+ this.catalogId = catalogId;
+ this.catalogName = catalogName;
+ this.dbId = dbId;
+ this.dbName = dbName;
+ this.tableId = tableId;
+ this.tableName = tableName;
+ this.eventTimeMs = eventTimeMs;
+ }
+
+ public long getCatalogId() {
+ return catalogId;
+ }
+
+ public String getCatalogName() {
+ return catalogName;
+ }
+
+ public long getDbId() {
+ return dbId;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public long getTableId() {
+ return tableId;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public long getEventTimeMs() {
+ return eventTimeMs;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static TableMetaChange read(DataInput in) throws IOException {
+ return GsonUtils.GSON.fromJson(Text.readString(in),
TableMetaChange.class);
+ }
+
+ @Override
+ public String toString() {
+ return "TableMetaChange{catalogId=" + catalogId
+ + ", catalogName='" + catalogName + '\''
+ + ", dbId=" + dbId
+ + ", dbName='" + dbName + '\''
+ + ", tableId=" + tableId
+ + ", tableName='" + tableName + '\''
+ + ", eventTimeMs=" + eventTimeMs
+ + '}';
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]