[ 
https://issues.apache.org/jira/browse/HIVE-26509?focusedWorklogId=812299&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-812299
 ]

ASF GitHub Bot logged work on HIVE-26509:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Sep/22 00:24
            Start Date: 27/Sep/22 00:24
    Worklog Time Spent: 10m 
      Work Description: saihemanth-cloudera commented on code in PR #3567:
URL: https://github.com/apache/hive/pull/3567#discussion_r975629801


##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java:
##########
@@ -1523,6 +1531,11 @@ public enum ConfVars {
     TXN_USE_MIN_HISTORY_LEVEL("metastore.txn.use.minhistorylevel", 
"hive.txn.use.minhistorylevel", true,
         "Set this to false, for the TxnHandler and Cleaner to not use 
MinHistoryLevel table and take advantage of openTxn optimisation.\n"
             + "If the table is dropped HMS will switch this flag to false."),
+    LOCK_NUMRETRIES("metastore.lock.numretries", "hive.lock.numretries", 100,

Review Comment:
   I assume this lock is for polling to see if a lock is available to become a 
leader, is that correct? If so should we change this name to make it more 
understandable to specify it is for leader election? Also, I'm wondering why 
are we retrying the number of retries to 100, can you please explain?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java:
##########
@@ -891,33 +855,34 @@ public void run() {
             startCondition.await();
           }
 
-          if (isLeader) {
-            startCompactorInitiator(conf);
-            startCompactorCleaner(conf);
-            startRemoteOnlyTasks(conf);
-            startStatsUpdater(conf);
-            HMSHandler.startAlwaysTaskThreads(conf);
-          }
-
-          // The leader HMS may not necessarily have sufficient compute 
capacity required to run
-          // actual compaction work. So it can run on a non-leader HMS with 
sufficient capacity
-          // or a configured HS2 instance.
-          if (MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore")) {
-            LOG.warn("Running compaction workers on HMS side is not suggested 
because compaction pools are not supported in HMS " +
-                "(HIVE-26443). Consider removing the 
hive.metastore.runworker.in configuration setting, as it will be " +
-                "comletely removed in future releases.");
-            startCompactorWorkers(conf);
+         LeaderElectionContext context = new 
LeaderElectionContext.ContextBuilder(conf)
+             
.setHMSHandler(thriftServer.getHandler()).servHost(getServerHostName())
+             // always tasks
+             .setTType(LeaderElectionContext.TTYPE.ALWAYS_TASKS)
+             .addListener(new HouseKeepingTasks(conf, false))
+             // housekeeping tasks
+             .setTType(LeaderElectionContext.TTYPE.HOUSEKEEPING)
+             .addListener(new CMClearer(conf))
+             .addListener(new StatsUpdaterTask(conf))
+             .addListener(new CompactorTasks(conf, false))
+             .addListener(new CompactorPMF())
+             .addListener(new HouseKeepingTasks(conf, true))

Review Comment:
   Do we need to include partition discovery task here?



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java:
##########
@@ -185,5 +188,20 @@ void searchHousekeepingThreads() throws Exception {
       }
     }
   }
+
+  private void resetThreadStatus() {
+    Map<String, Boolean> newThreadNames = new HashMap<>();
+    for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
+      newThreadNames.put(entry.getKey(), false);

Review Comment:
   Can we directly set the values of threadnames to false instead of assigning 
them to a new variable?



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.leader.LeaderElection;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
+import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestMetastoreLeaseNonLeader {
+
+  LeaderElection election;
+
+  TestMetastoreHousekeepingLeader hms;
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = MetastoreConf.newMetastoreConf();
+    TestTxnDbUtil.setConfValues(conf);
+    TestTxnDbUtil.prepDb(conf);
+    election = new LeaseLeaderElection();
+    MetastoreConf.setVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION, "lock");
+    TableName tableName = (TableName) 
LeaderElectionContext.getLeaderMutex(conf,
+        LeaderElectionContext.TTYPE.HOUSEKEEPING, null);
+    election.tryBeLeader(conf, tableName);
+    assertTrue("The elector should hold the lease now", election.isLeader());
+    // start the non-leader hms now
+    hms = new TestMetastoreHousekeepingLeader();
+    MetastoreConf.setTimeVar(hms.conf, 
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS);
+    hms.conf.setBoolean(LeaderElectionContext.LEADER_IN_TEST, true);
+    hms.internalSetup("", false);

Review Comment:
   Maybe we need an assertFalse or Log info to verify that hms is not the 
leader now.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java:
##########
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.leader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Hive Lock based leader election.
+ * If wins, the current instance becomes the leader,
+ * and a heartbeat daemon will be started to renew the lock before timeout.
+ * If loses, a non-leader watcher will also be started to check the
+ * lock periodically to see if he can grab the lock in order to be the leader.
+ * The change of Leadership can be received by registering the
+ * listeners through {@link LeaderElection#addStateListener}.
+ */
+public class LeaseLeaderElection implements LeaderElection<TableName> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LeaseLeaderElection.class);
+
+  private static final AtomicLong ID = new AtomicLong();
+
+  // Result of election
+  private volatile boolean isLeader;
+
+  private TxnStore store;
+
+  // Initial sleep time for locking the table at retrying.
+  private long nextSleep = 50;
+
+  // A daemon used for renewing the lock before timeout,
+  // this happens when the current instance wins the election.
+  private LeaseWatcher heartbeater;
+
+  // For non-leader instances to check the lock periodically to
+  // see if there is a chance to take over the leadership.
+  // At any time, only one of heartbeater and nonLeaderWatcher is alive.
+  private LeaseWatcher nonLeaderWatcher;
+
+  // Current lock id
+  private volatile long lockId = -1;
+
+  // Leadership change listeners
+  private List<LeadershipStateListener> listeners = new ArrayList<>();
+
+  // Property for testing only
+  public static final String METASTORE_RENEW_LEASE = 
"metastore.renew.leader.lease";
+
+  private String name;
+
+  private void doWork(LockResponse resp, Configuration conf,
+      TableName tableName) throws LeaderException {
+    lockId = resp.getLockid();
+    assert resp.getState() == LockState.ACQUIRED || resp.getState() == 
LockState.WAITING;
+    shutdownWatcher();
+
+    switch (resp.getState()) {
+    case ACQUIRED:
+      boolean renewLease = conf.getBoolean(METASTORE_RENEW_LEASE, true);
+      heartbeater = renewLease ?
+          new Heartbeater(conf, tableName) : new 
ReleaseAndRequireWatcher(conf, tableName);
+      heartbeater.perform();
+      if (!isLeader) {
+        isLeader = true;
+        notifyListener();
+      }
+      break;
+    case WAITING:
+      nonLeaderWatcher = new NonLeaderWatcher(conf, tableName);
+      nonLeaderWatcher.perform();
+      if (isLeader) {
+        isLeader = false;
+        notifyListener();
+      }
+      break;
+    default:
+      throw new IllegalStateException("Unexpected lock state: " + 
resp.getState());
+    }
+  }
+
+  private void notifyListener() {
+    listeners.forEach(listener -> {
+      try {
+        if (isLeader) {
+          listener.takeLeadership(this);
+        } else {
+          listener.lossLeadership(this);

Review Comment:
   if the current listener isn't a leader and then we are unnecessarily telling 
to lose its leadership since it is waiting to be a leader.  I think we should 
only call this when it is losing its leadership role.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/AuditLeaderListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.leader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+
+import static java.util.Objects.requireNonNull;
+
+public class AuditLeaderListener implements 
LeaderElection.LeadershipStateListener {
+  private final Configuration configuration;
+
+  private final Path tableLocation;
+
+  private final static String SERDE = 
"org.apache.hadoop.hive.serde2.JsonSerDe";
+  private final static String INPUTFORMAT = 
"org.apache.hadoop.mapred.TextInputFormat";
+  private final static String OUTPUTFORMAT = 
"org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat";
+
+  public AuditLeaderListener(TableName tableName, IHMSHandler handler) throws 
Exception {
+    requireNonNull(tableName, "tableName is null");
+    requireNonNull(handler, "handler is null");
+    this.configuration = handler.getConf();
+    try {
+      // store the leader info as json + text for human-readable
+      Table table = new TableBuilder()
+          .setCatName(tableName.getCat())
+          .setDbName(tableName.getDb())
+          .setTableName(tableName.getTable())
+          .addCol("leader", ColumnType.STRING_TYPE_NAME)
+          .addCol("name", ColumnType.STRING_TYPE_NAME)
+          .addCol("elected_time", ColumnType.BIGINT_TYPE_NAME)
+          .setSerdeLib(SERDE)
+          .setInputFormat(INPUTFORMAT)
+          .setOutputFormat(OUTPUTFORMAT)
+          .build(handler.getConf());
+      handler.create_table(table);

Review Comment:
   I think we need to set the owner of this table, right? Also, is there a way 
to recreate this table, if this is dropped for 
    some reason?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/AuditLeaderListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.leader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+
+import static java.util.Objects.requireNonNull;
+
+public class AuditLeaderListener implements 
LeaderElection.LeadershipStateListener {
+  private final Configuration configuration;
+
+  private final Path tableLocation;
+
+  private final static String SERDE = 
"org.apache.hadoop.hive.serde2.JsonSerDe";
+  private final static String INPUTFORMAT = 
"org.apache.hadoop.mapred.TextInputFormat";
+  private final static String OUTPUTFORMAT = 
"org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat";
+
+  public AuditLeaderListener(TableName tableName, IHMSHandler handler) throws 
Exception {
+    requireNonNull(tableName, "tableName is null");
+    requireNonNull(handler, "handler is null");
+    this.configuration = handler.getConf();
+    try {
+      // store the leader info as json + text for human-readable
+      Table table = new TableBuilder()
+          .setCatName(tableName.getCat())
+          .setDbName(tableName.getDb())
+          .setTableName(tableName.getTable())
+          .addCol("leader", ColumnType.STRING_TYPE_NAME)
+          .addCol("name", ColumnType.STRING_TYPE_NAME)
+          .addCol("elected_time", ColumnType.BIGINT_TYPE_NAME)
+          .setSerdeLib(SERDE)
+          .setInputFormat(INPUTFORMAT)
+          .setOutputFormat(OUTPUTFORMAT)
+          .build(handler.getConf());
+      handler.create_table(table);
+    } catch (AlreadyExistsException e) {
+      // ignore
+    }
+
+    Table table = handler.getMS().getTable(tableName.getCat(),
+        tableName.getDb(), tableName.getTable());
+    this.tableLocation = new Path(table.getSd().getLocation());
+    String serde = table.getSd().getSerdeInfo().getSerializationLib();
+    String input = table.getSd().getInputFormat();
+    String output = table.getSd().getOutputFormat();
+    if (!SERDE.equals(serde) || !INPUTFORMAT.equals(input)
+        || !OUTPUTFORMAT.equals(output)) {
+      throw new RuntimeException(tableName + " should be in json + text 
format");
+    }
+
+  }
+
+  @Override
+  public void takeLeadership(LeaderElection election) throws Exception {
+    String hostName = getHostname();
+    String message = "{\"leader\": \"" + hostName + "\", \"name\": \""
+        + election.getName() + "\", \"elected_time\": " + 
System.currentTimeMillis() + "} \n";
+    Path path = new Path(tableLocation, "leader.json");
+    try {
+      FileSystem fs = Warehouse.getFs(path, configuration);
+      try (OutputStream outputStream = fs.exists(path) ?
+          fs.append(path) :
+          fs.create(path, false)) {
+        outputStream.write(message.getBytes(StandardCharsets.UTF_8));

Review Comment:
   Why are we trying to persist leader election info under the table directory 
instead of creating an entry into the table?



##########
ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java:
##########
@@ -159,6 +159,7 @@ public void run() {
         return;
       }
     }
+    stopWorkers();

Review Comment:
   Why do we need this, when we already have L158?



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java:
##########
@@ -58,7 +58,8 @@ public void testHouseKeepingThreadExistence() throws 
Exception {
         if (entry.getValue()) {
           LOG.info("Thread found for " + entry.getKey().getSimpleName());
         }
-        Assert.assertTrue("No thread found for " + 
entry.getKey().getSimpleName(), entry.getValue());
+        // Worker now runs on a leader hms
+        // Assert.assertTrue("No thread found for " + 
entry.getKey().getSimpleName(), entry.getValue());

Review Comment:
   Nit: Can you remove the commented code?



##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java:
##########
@@ -562,6 +562,14 @@ public enum ConfVars {
         "match that configuration. Otherwise it should be same as the hostname 
returned by " +
         "InetAddress#getLocalHost#getHostName(). Given the uncertainty in the 
later " +
         "it is desirable to configure metastore.thrift.bind.host on the 
intended leader HMS."),
+    
METASTORE_HOUSEKEEPING_LEADER_ELECTION("metastore.housekeeping.leader.election",
+        "metastore.housekeeping.leader.election",

Review Comment:
   should this be hive.metastore.leader.election? Similarly should 'hive.' 
prefix be added to L571?



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.leader.LeaderElection;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
+import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestMetastoreLeaseLeader {
+
+  LeaderElection election;
+
+  TestMetastoreHousekeepingLeader hms;
+
+  @Before
+  public void setUp() throws Exception {
+    hms = new TestMetastoreHousekeepingLeader();
+    MetastoreConf.setTimeVar(hms.conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 3, 
TimeUnit.SECONDS);
+    MetastoreConf.setTimeVar(hms.conf, 
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS);
+    hms.conf.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, false);

Review Comment:
   Should we add this config in MetaStore conf instead of LeaseLeaderElection 
class?



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java:
##########
@@ -58,7 +58,8 @@ public void testHouseKeepingThreadExistence() throws 
Exception {
         if (entry.getValue()) {
           LOG.info("Thread found for " + entry.getKey().getSimpleName());
         }
-        Assert.assertTrue("No thread found for " + 
entry.getKey().getSimpleName(), entry.getValue());
+        // Worker now runs on a leader hms
+        // Assert.assertTrue("No thread found for " + 
entry.getKey().getSimpleName(), entry.getValue());

Review Comment:
   You can remove L62 as this is a commented line



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java:
##########
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.leader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Hive Lock based leader election.
+ * If wins, the current instance becomes the leader,
+ * and a heartbeat daemon will be started to renew the lock before timeout.
+ * If loses, a non-leader watcher will also be started to check the
+ * lock periodically to see if he can grab the lock in order to be the leader.
+ * The change of Leadership can be received by registering the
+ * listeners through {@link LeaderElection#addStateListener}.
+ */
+public class LeaseLeaderElection implements LeaderElection<TableName> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LeaseLeaderElection.class);
+
+  private static final AtomicLong ID = new AtomicLong();
+
+  // Result of election
+  private volatile boolean isLeader;
+
+  private TxnStore store;
+
+  // Initial sleep time for locking the table at retrying.
+  private long nextSleep = 50;
+
+  // A daemon used for renewing the lock before timeout,
+  // this happens when the current instance wins the election.
+  private LeaseWatcher heartbeater;
+
+  // For non-leader instances to check the lock periodically to
+  // see if there is a chance to take over the leadership.
+  // At any time, only one of heartbeater and nonLeaderWatcher is alive.
+  private LeaseWatcher nonLeaderWatcher;
+
+  // Current lock id
+  private volatile long lockId = -1;
+
+  // Leadership change listeners
+  private List<LeadershipStateListener> listeners = new ArrayList<>();
+
+  // Property for testing only
+  public static final String METASTORE_RENEW_LEASE = 
"metastore.renew.leader.lease";

Review Comment:
   I don't think this property is just for testing, it is being used in actual 
feature. Is it correct?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.leader;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import 
org.apache.hadoop.hive.metastore.leader.LeaderElection.LeadershipStateListener;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class LeaderElectionContext {
+
+  /**
+   * Types of tasks that exactly have one instance in a given warehouse.
+   * For those tasks which belong to the same type, they will be running in 
the same leader.
+   */
+  public enum TTYPE {
+    HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
+        "metastore_housekeeping_leader"), "housekeeping"),
+    WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
+        "metastore_worker_leader"), "compactor_worker"),
+    ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
+        "metastore_always_tasks_leader"), "always_tasks");
+    // Mutex of TTYPE, which can be a nonexistent table
+    private final TableName mutex;
+    // Name of TTYPE
+    private final String name;
+
+    TTYPE(TableName tableName, String name) {
+      this.mutex = tableName;
+      this.name  = name;
+    }
+    public TableName getTableName() {
+      return mutex;
+    }
+    public String getName() {
+      return name;
+    }
+  }
+
+  private final Configuration conf;
+  private final String servHost;
+  // Whether the context should be started as a daemon
+  private final boolean startAsDaemon;
+  // Audit the event of election
+  private AuditLeaderListener auditLeaderListener;
+  // State change listeners group by type
+  private final Map<TTYPE, List<LeadershipStateListener>> listeners;
+  // Collection of leader candidates
+  private final List<LeaderElection> leaderElections = new ArrayList<>();
+  // Property for testing, a single leader will be created
+  public final static String LEADER_IN_TEST = 
"metastore.leader.election.in.test";
+
+  private LeaderElectionContext(String servHost, Configuration conf,
+      Map<TTYPE, List<LeadershipStateListener>> listeners,
+      boolean startAsDaemon, IHMSHandler handler) throws Exception {
+    requireNonNull(conf, "conf is null");
+    requireNonNull(listeners, "listeners is null");
+    this.servHost = servHost;
+    this.conf = new Configuration(conf);
+    this.startAsDaemon = startAsDaemon;
+    String tableName = MetastoreConf.getVar(conf,
+        MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_AUDITTABLE);
+    if (StringUtils.isNotEmpty(tableName)) {
+      TableName table = TableName.fromString(tableName, 
MetaStoreUtils.getDefaultCatalog(conf),
+          Warehouse.DEFAULT_DATABASE_NAME);

Review Comment:
   Should we audit the table in sys db instead of the default database? 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 812299)
    Time Spent: 2h 20m  (was: 2h 10m)

> Introduce dynamic leader election in HMS
> ----------------------------------------
>
>                 Key: HIVE-26509
>                 URL: https://issues.apache.org/jira/browse/HIVE-26509
>             Project: Hive
>          Issue Type: New Feature
>          Components: Standalone Metastore
>            Reporter: Zhihua Deng
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> From HIVE-21841 we have a leader HMS selected by configuring 
> metastore.housekeeping.leader.hostname on startup. This approach saves us 
> from running duplicated HMS's housekeeping tasks cluster-wide. 
> In this jira, we introduce another dynamic leader election: adopt hive lock 
> to implement the leader election. Once a HMS owns the lock, then it becomes 
> the leader, carries out the housekeeping tasks, and sends heartbeats to renew 
> the lock before timeout. If the leader fails to reclaim the lock, then stops 
> the already started tasks if it has, the electing event is audited. We can 
> achieve a more dynamic leader when the original goes down or in the public 
> cloud without well configured property, and reduce the leader’s burdens by 
> running these tasks among different leaders.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to