[ https://issues.apache.org/jira/browse/HIVE-26242?focusedWorklogId=784521&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-784521 ]
ASF GitHub Bot logged work on HIVE-26242: ----------------------------------------- Author: ASF GitHub Bot Created on: 24/Jun/22 10:17 Start Date: 24/Jun/22 10:17 Worklog Time Spent: 10m Work Description: veghlaci05 commented on code in PR #3303: URL: https://github.com/apache/hive/pull/3303#discussion_r905928478 ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java: ########## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hive.common.util.ShutdownHookManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.hive.common.util.HiveStringUtils.SHUTDOWN_HOOK_PRIORITY; + +/** + * Singleton service responsible for heartbeating the compaction transactions. + */ +class CompactionHeartbeatService { + + + private static final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeatService.class); + + private static volatile CompactionHeartbeatService instance; + + /** + * Return the singleton instance of this class. + * @param conf The {@link HiveConf} used to create the service. Used only during the firsst call + * @return Returns the singleton {@link CompactionHeartbeatService} + * @throws IllegalStateException Thrown when the service has already been shut down. + */ + static CompactionHeartbeatService getInstance(HiveConf conf) { + if (instance == null) { + synchronized (CompactionHeartbeatService.class) { + if (instance == null) { + LOG.debug("Initializing compaction txn heartbeater service."); + instance = new CompactionHeartbeatService(conf); + ShutdownHookManager.addShutdownHook(() -> { + try { + instance.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, SHUTDOWN_HOOK_PRIORITY); + } + } + } + if (instance.heartbeatExecutor.isShutdown()) { + throw new IllegalStateException("The CompactionHeartbeatService is already shut down!"); + } + return instance; + } + + private final ScheduledExecutorService heartbeatExecutor; + private final ObjectPool<IMetaStoreClient> clientPool; + private final long initialDelay; + private final long period; + private final HashMap<Long, TaskWrapper> tasks = new HashMap<>(30); + + /** + * Starts the heartbeat for the given transaction + * @param txnId The id of the compaction txn + * @param lockId The id of the lock associated with the txn + * @param tableName Required for logging only + * @throws IllegalStateException Thrown when the heartbeat for the given txn has already been started. + */ + void startHeartbeat(long txnId, long lockId, String tableName) { + if (tasks.containsKey(txnId)) { + throw new IllegalStateException("Heartbeat already started for TXN " + txnId); + } + LOG.info("Submitting heartbeat task for TXN {}", txnId); + CompactionHeartbeater heartbeater = new CompactionHeartbeater(txnId, lockId, tableName); + Future<?> submittedTask = heartbeatExecutor.scheduleAtFixedRate(heartbeater, initialDelay, period, TimeUnit.MILLISECONDS); + tasks.put(txnId, new TaskWrapper(heartbeater, submittedTask)); + } + + /** + * Stops the heartbeat for the given transaction + * @param txnId The id of the compaction txn + * @throws IllegalStateException Thrown when there is no {@link CompactionHeartbeater} task associated with the + * given txnId. + */ + void stopHeartbeat(long txnId) throws InterruptedException { + LOG.info("Stopping heartbeat task for TXN {}", txnId); + TaskWrapper wrapper = tasks.get(txnId); + if (wrapper == null) { + throw new IllegalStateException("No registered heartbeat found for TXN " + txnId); + } + wrapper.future.cancel(false); + try { + wrapper.heartbeater.waitUntilFinish(initialDelay); + } finally { + tasks.remove(txnId); + } + } + + /** + * Shuts down the service, by closing its underlying resources. Be aware that after shutdown this service is no + * longer usable, there is no way to re-initialize it. + * @throws InterruptedException + */ + void shutdown() throws InterruptedException { + LOG.info("Shutting down compaction txn heartbeater service."); + heartbeatExecutor.shutdownNow(); + try { + heartbeatExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); + } finally { + tasks.clear(); + clientPool.close(); + } + LOG.info("Compaction txn heartbeater service is successfully stopped."); + } + + private CompactionHeartbeatService(HiveConf conf) { + heartbeatExecutor = Executors.newScheduledThreadPool( + MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS)); + GenericObjectPoolConfig<IMetaStoreClient> config = new GenericObjectPoolConfig<>(); + config.setMinIdle(1); + config.setMaxIdle(2); + config.setMaxTotal(5); Review Comment: You are right, the maxtotal could be equal to that value. Issue Time Tracking ------------------- Worklog Id: (was: 784521) Time Spent: 5h (was: 4h 50m) > Compaction heartbeater improvements > ----------------------------------- > > Key: HIVE-26242 > URL: https://issues.apache.org/jira/browse/HIVE-26242 > Project: Hive > Issue Type: Improvement > Reporter: László Végh > Assignee: László Végh > Priority: Major > Labels: pull-request-available > Time Spent: 5h > Remaining Estimate: 0h > > The Compaction heartbeater should be improved the following ways: > * The metastore clients should be reused between heartbeats and closed only > at the end, when the transaction ends > * Instead of having a dedicated heartbeater thread for each Compaction > transaction, there should be shared a heartbeater executor where the > heartbeat tasks can be scheduled/submitted. -- This message was sent by Atlassian Jira (v8.20.7#820007)