[ https://issues.apache.org/jira/browse/HIVE-26242?focusedWorklogId=777429&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-777429 ]
ASF GitHub Bot logged work on HIVE-26242: ----------------------------------------- Author: ASF GitHub Bot Created on: 02/Jun/22 11:41 Start Date: 02/Jun/22 11:41 Worklog Time Spent: 10m Work Description: deniskuzZ commented on code in PR #3303: URL: https://github.com/apache/hive/pull/3303#discussion_r887856299 ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Singleton service responsible for heartbeating the compaction transactions. + */ +class CompactionHeartbeatService { + + + private static final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeatService.class); + + private static volatile CompactionHeartbeatService INSTANCE; + + /** + * Return the singleton instance of this class. + * @param conf The {@link HiveConf} used to create the service. Used only during the firsst call + * @return Returns the singleton {@link CompactionHeartbeatService} + * @throws IllegalStateException Thrown when the service has already been shut down. + */ + static CompactionHeartbeatService getInstance(HiveConf conf) { + if (INSTANCE == null) { + synchronized (CompactionHeartbeatService.class) { + if (INSTANCE == null) { + LOG.debug("Initializing compaction txn heartbeater service."); + INSTANCE = new CompactionHeartbeatService(conf); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + INSTANCE.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + })); + } + } + } + if (INSTANCE.heartbeatExecutor.isShutdown()) { + throw new IllegalStateException("The CompactionHeartbeatService is already shut down!"); + } + return INSTANCE; + } + + private final ScheduledExecutorService heartbeatExecutor; + private final ObjectPool<IMetaStoreClient> clientPool; + private final long initialDelay; + private final long period; + private final ConcurrentHashMap<Long, TaskWrapper> tasks = new ConcurrentHashMap<>(30); Review Comment: why should it be a concurrent map, I don't see any usage of it in the code ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionHeartbeatService.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Singleton service responsible for heartbeating the compaction transactions. + */ +class CompactionHeartbeatService { + + + private static final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeatService.class); + + private static volatile CompactionHeartbeatService INSTANCE; + + /** + * Return the singleton instance of this class. + * @param conf The {@link HiveConf} used to create the service. Used only during the firsst call + * @return Returns the singleton {@link CompactionHeartbeatService} + * @throws IllegalStateException Thrown when the service has already been shut down. + */ + static CompactionHeartbeatService getInstance(HiveConf conf) { + if (INSTANCE == null) { + synchronized (CompactionHeartbeatService.class) { + if (INSTANCE == null) { + LOG.debug("Initializing compaction txn heartbeater service."); + INSTANCE = new CompactionHeartbeatService(conf); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + INSTANCE.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + })); + } + } + } + if (INSTANCE.heartbeatExecutor.isShutdown()) { + throw new IllegalStateException("The CompactionHeartbeatService is already shut down!"); + } + return INSTANCE; + } + + private final ScheduledExecutorService heartbeatExecutor; + private final ObjectPool<IMetaStoreClient> clientPool; + private final long initialDelay; + private final long period; + private final ConcurrentHashMap<Long, TaskWrapper> tasks = new ConcurrentHashMap<>(30); Review Comment: why should it be a concurrent map, I don't see any usage of it in the code? Issue Time Tracking ------------------- Worklog Id: (was: 777429) Time Spent: 50m (was: 40m) > Compaction heartbeater improvements > ----------------------------------- > > Key: HIVE-26242 > URL: https://issues.apache.org/jira/browse/HIVE-26242 > Project: Hive > Issue Type: Improvement > Reporter: László Végh > Assignee: László Végh > Priority: Major > Labels: pull-request-available > Time Spent: 50m > Remaining Estimate: 0h > > The Compaction heartbeater should be improved the following ways: > * The metastore clients should be reused between heartbeats and closed only > at the end, when the transaction ends > * Instead of having a dedicated heartbeater thread for each Compaction > transaction, there should be shared a heartbeater executor where the > heartbeat tasks can be scheduled/submitted. -- This message was sent by Atlassian Jira (v8.20.7#820007)