This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 543ed46 (#5390)fix NPE when replay colocate group (#5391) 543ed46 is described below commit 543ed46bc3d65bc9c050dd346ff72f5cad9e27ad Author: wangbo <506340...@qq.com> AuthorDate: Fri Mar 12 14:24:34 2021 +0800 (#5390)fix NPE when replay colocate group (#5391) * (#5390)fix NPE when replay colocate group * remove table id from colocate group when duplicate create table * remove tablet id when duplicate create table,just like ddlexception * add ut --- .../java/org/apache/doris/catalog/Catalog.java | 51 ++++++++++++++-------- .../apache/doris/catalog/ColocateTableIndex.java | 6 +++ .../java/org/apache/doris/catalog/Database.java | 8 +++- .../org/apache/doris/catalog/InfoSchemaDb.java | 5 ++- .../apache/doris/catalog/TabletInvertedIndex.java | 20 +++++++++ .../org/apache/doris/catalog/CreateTableTest.java | 34 +++++++++++++++ .../org/apache/doris/catalog/InfoSchemaDbTest.java | 2 +- 7 files changed, 102 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 7f79320..cd54710 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -3791,22 +3791,35 @@ public class Catalog { throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name()); } - if (!db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists())) { + Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists()); + if (!result.first) { ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exists"); } - // we have added these index to memory, only need to persist here - if (getColocateTableIndex().isColocateTable(tableId)) { - GroupId groupId = getColocateTableIndex().getGroup(tableId); - List<List<Long>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId); - ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq); - editLog.logColocateAddTable(info); - } - LOG.info("successfully create table[{};{}]", tableName, tableId); - // register or remove table from DynamicPartition after table created - DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false); - dynamicPartitionScheduler.createOrUpdateRuntimeInfo( - tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime()); + if (result.second) { + if (getColocateTableIndex().isColocateTable(tableId)) { + // if this is a colocate join table, its table id is already added to colocate group + // so we should remove the tableId here + getColocateTableIndex().removeTable(tableId); + } + for (Long tabletId : tabletIdSet) { + Catalog.getCurrentInvertedIndex().deleteTablet(tabletId); + } + LOG.info("duplicate create table[{};{}], skip next steps", tableName, tableId); + } else { + // we have added these index to memory, only need to persist here + if (getColocateTableIndex().isColocateTable(tableId)) { + GroupId groupId = getColocateTableIndex().getGroup(tableId); + List<List<Long>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId); + ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq); + editLog.logColocateAddTable(info); + } + LOG.info("successfully create table[{};{}]", tableName, tableId); + // register or remove table from DynamicPartition after table created + DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false); + dynamicPartitionScheduler.createOrUpdateRuntimeInfo( + tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime()); + } } catch (DdlException e) { for (Long tabletId : tabletIdSet) { Catalog.getCurrentInvertedIndex().deleteTablet(tabletId); @@ -3829,7 +3842,7 @@ public class Catalog { long tableId = Catalog.getCurrentCatalog().getNextId(); MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties()); mysqlTable.setComment(stmt.getComment()); - if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists())) { + if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) { ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist"); } LOG.info("successfully create table[{}-{}]", tableName, tableId); @@ -3844,7 +3857,7 @@ public class Catalog { long tableId = Catalog.getCurrentCatalog().getNextId(); OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties()); odbcTable.setComment(stmt.getComment()); - if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists())) { + if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) { ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist"); } LOG.info("successfully create table[{}-{}]", tableName, tableId); @@ -3875,7 +3888,7 @@ public class Catalog { EsTable esTable = new EsTable(tableId, tableName, baseSchema, stmt.getProperties(), partitionInfo); esTable.setComment(stmt.getComment()); - if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists())) { + if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) { ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist"); } LOG.info("successfully create table{} with id {}", tableName, tableId); @@ -3892,7 +3905,7 @@ public class Catalog { brokerTable.setComment(stmt.getComment()); brokerTable.setBrokerProperties(stmt.getExtProperties()); - if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists())) { + if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) { ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist"); } LOG.info("successfully create table[{}-{}]", tableName, tableId); @@ -3906,7 +3919,7 @@ public class Catalog { long tableId = getNextId(); HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties()); hiveTable.setComment(stmt.getComment()); - if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists())) { + if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) { ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist"); } LOG.info("successfully create table[{}-{}]", tableName, tableId); @@ -5553,7 +5566,7 @@ public class Catalog { throw new DdlException("failed to init view stmt", e); } - if (!db.createTableWithLock(newView, false, stmt.isSetIfNotExists())) { + if (!db.createTableWithLock(newView, false, stmt.isSetIfNotExists()).first) { throw new DdlException("Failed to create view[" + tableName + "]."); } LOG.info("successfully create view[" + tableName + "-" + newView.getId() + "]"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java index 7c5fd98..dc6bf25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java @@ -700,4 +700,10 @@ public class ColocateTableIndex implements Writable { writeUnlock(); } } + + // just for ut + public Map<Long, GroupId> getTable2Group() { + return table2Group; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index 202c8eb..c1d6f97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -281,13 +281,17 @@ public class Database extends MetaObject implements Writable { checkReplicaQuota(); } - public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) { + public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) { boolean result = true; + // if a table is already exists, then edit log won't be executed + // some caller of this method may need to know this message + boolean isTableExist = false; writeLock(); try { String tableName = table.getName(); if (nameToTable.containsKey(tableName)) { result = setIfNotExist; + isTableExist = true; } else { idToTable.put(table.getId(), table); nameToTable.put(table.getName(), table); @@ -301,7 +305,7 @@ public class Database extends MetaObject implements Writable { Catalog.getCurrentCatalog().getEsRepository().registerTable((EsTable)table); } } - return result; + return Pair.create(result, isTableExist); } finally { writeUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java index e577d4f..78666f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java @@ -18,6 +18,7 @@ package org.apache.doris.catalog; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.Pair; import org.apache.doris.common.SystemIdGenerator; import java.io.DataInput; @@ -39,8 +40,8 @@ public class InfoSchemaDb extends Database { } @Override - public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) { - return false; + public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) { + return Pair.create(false, false); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index edad1e0..3e424e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -695,5 +695,25 @@ public class TabletInvertedIndex { this.beByReplicaCount = TreeMultimap.create(info.beByReplicaCount); } } + + // just for ut + public Table<Long, Long, Replica> getReplicaMetaTable() { + return replicaMetaTable; + } + + // just for ut + public Table<Long, Long, Replica> getBackingReplicaMetaTable() { + return backingReplicaMetaTable; + } + + // just for ut + public Table<Long, Long, TabletMeta> getTabletMetaTable() { + return tabletMetaTable; + } + + // just for ut + public Map<Long, TabletMeta> getTabletMetaMap() { + return tabletMetaMap; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java index dab5fbd..6bfdb87 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java @@ -32,6 +32,8 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.File; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; public class CreateTableTest { @@ -63,6 +65,38 @@ public class CreateTableTest { } @Test + public void testDuplicateCreateTable() throws Exception{ + // test + Catalog catalog = Catalog.getCurrentCatalog(); + String sql = "create table if not exists test.tbl1\n" + "(k1 int, k2 int)\n" + "duplicate key(k1)\n" + + "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1','colocate_with'='test'); "; + createTable(sql); + Set<Long> tabletIdSetAfterCreateFirstTable = catalog.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet(); + Set<TabletMeta> tabletMetaSetBeforeCreateFirstTable = new HashSet<>(); + catalog.getTabletInvertedIndex().getTabletMetaTable().values().forEach(tabletMeta -> {tabletMetaSetBeforeCreateFirstTable.add(tabletMeta);}); + Set<Long> colocateTableIdBeforeCreateFirstTable = catalog.getColocateTableIndex().getTable2Group().keySet(); + Assert.assertTrue(colocateTableIdBeforeCreateFirstTable.size() > 0); + Assert.assertTrue(tabletIdSetAfterCreateFirstTable.size() > 0); + + createTable(sql); + // check whether tablet is cleared after duplicate create table + Set<Long> tabletIdSetAfterDuplicateCreateTable1 = catalog.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet(); + Set<Long> tabletIdSetAfterDuplicateCreateTable2 = catalog.getTabletInvertedIndex().getBackingReplicaMetaTable().columnKeySet(); + Set<Long> tabletIdSetAfterDuplicateCreateTable3 = catalog.getTabletInvertedIndex().getTabletMetaMap().keySet(); + Set<TabletMeta> tabletIdSetAfterDuplicateCreateTable4 = new HashSet<>(); + catalog.getTabletInvertedIndex().getTabletMetaTable().values().forEach(tabletMeta -> {tabletIdSetAfterDuplicateCreateTable4.add(tabletMeta);}); + + Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable1)); + Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable2)); + Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable3)); + Assert.assertTrue(tabletMetaSetBeforeCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable4)); + + // check whether table id is cleared from colocate group after duplicate create table + Set<Long> colocateTableIdAfterCreateFirstTable = catalog.getColocateTableIndex().getTable2Group().keySet(); + Assert.assertTrue(colocateTableIdBeforeCreateFirstTable.equals(colocateTableIdAfterCreateFirstTable)); + } + + @Test public void testNormal() throws DdlException { ExceptionChecker.expectThrowsNoException( () -> createTable("create table test.tbl1\n" + "(k1 int, k2 int)\n" + "duplicate key(k1)\n" diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java index 08a47a8..1cf7fdd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java @@ -28,7 +28,7 @@ public class InfoSchemaDbTest { Database db = new InfoSchemaDb(); Assert.assertFalse(db.createTable(null)); - Assert.assertFalse(db.createTableWithLock(null, false, false)); + Assert.assertFalse(db.createTableWithLock(null, false, false).first); db.dropTable("authors"); db.write(null); Assert.assertNull(db.getTable("authors")); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org