[ https://issues.apache.org/jira/browse/HIVE-26035?focusedWorklogId=842309&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-842309 ]
ASF GitHub Bot logged work on HIVE-26035: ----------------------------------------- Author: ASF GitHub Bot Created on: 30/Jan/23 15:13 Start Date: 30/Jan/23 15:13 Worklog Time Spent: 10m Work Description: VenuReddy2103 commented on code in PR #3905: URL: https://github.com/apache/hive/pull/3905#discussion_r1090754911 ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java: ########## @@ -0,0 +1,804 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import static org.apache.commons.lang3.StringUtils.repeat; +import static org.apache.hadoop.hive.metastore.Batchable.NO_BATCHING; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.jdo.PersistenceManager; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.model.MColumnDescriptor; +import org.apache.hadoop.hive.metastore.model.MFieldSchema; +import org.apache.hadoop.hive.metastore.model.MOrder; +import org.apache.hadoop.hive.metastore.model.MPartition; +import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege; +import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege; +import org.apache.hadoop.hive.metastore.model.MSerDeInfo; +import org.apache.hadoop.hive.metastore.model.MStorageDescriptor; +import org.apache.hadoop.hive.metastore.model.MStringList; +import org.datanucleus.ExecutionContext; +import org.datanucleus.api.jdo.JDOPersistenceManager; +import org.datanucleus.identity.DatastoreId; +import org.datanucleus.metadata.AbstractClassMetaData; +import org.datanucleus.metadata.IdentityType; + +/** + * This class contains the methods to insert into tables on the underlying database using direct SQL + */ +class DirectSqlInsertPart { + private final PersistenceManager pm; + private final DatabaseProduct dbType; + private final int batchSize; + + public DirectSqlInsertPart(PersistenceManager pm, DatabaseProduct dbType, int batchSize) { + this.pm = pm; + this.dbType = dbType; + this.batchSize = batchSize; + } + + /** + * Interface to execute multiple row insert query in batch for direct SQL + */ + interface BatchExecutionContext { + void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException; + } + + private Long getDataStoreId(Class<?> modelClass) throws MetaException { + ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext(); + AbstractClassMetaData cmd = ec.getMetaDataManager().getMetaDataForClass(modelClass, ec.getClassLoaderResolver()); + if (cmd.getIdentityType() == IdentityType.DATASTORE) { + return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, cmd, -1); + } else { + throw new MetaException("Identity type is not datastore."); + } + } + + private void insertInBatch(String tableName, String columns, int columnCount, int rowCount, + BatchExecutionContext batchExecutionContext) throws MetaException { + if (rowCount == 0 || columnCount == 0) { + return; + } + int maxRowsInBatch = (batchSize == NO_BATCHING) ? rowCount : batchSize; + int maxBatches = rowCount / maxRowsInBatch; + int last = rowCount % maxRowsInBatch; + String rowFormat = "(" + repeat(",?", columnCount).substring(1) + ")"; + String query = ""; + if (maxBatches > 0) { + query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, maxRowsInBatch); + } + int batchParamCount = maxRowsInBatch * columnCount; + for (int batch = 0; batch < maxBatches; batch++) { + batchExecutionContext.execute(query, maxRowsInBatch, batchParamCount); + } + if (last != 0) { + query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, last); + batchExecutionContext.execute(query, last, last * columnCount); + } + } + + private void executeQuery(String queryText, Object[] params) throws MetaException { + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { + MetastoreDirectSqlUtils.executeWithArray(query.getInnerQuery(), params, queryText); + } + } + + private void insertSerdeInBatch(Map<Long, MSerDeInfo> serdeIdToSerDeInfo) throws MetaException { + int rowCount = serdeIdToSerDeInfo.size(); + String columns = "(\"SERDE_ID\",\"DESCRIPTION\",\"DESERIALIZER_CLASS\",\"NAME\",\"SERDE_TYPE\",\"SLIB\"," + + "\"SERIALIZER_CLASS\")"; + int columnCount = 7; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MSerDeInfo>> it = serdeIdToSerDeInfo.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MSerDeInfo> entry = it.next(); + MSerDeInfo serdeInfo = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = serdeInfo.getDescription(); + params[paramIndex++] = serdeInfo.getDeserializerClass(); + params[paramIndex++] = serdeInfo.getName(); + params[paramIndex++] = serdeInfo.getSerdeType(); + params[paramIndex++] = serdeInfo.getSerializationLib(); + params[paramIndex++] = serdeInfo.getSerializerClass(); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SERDES\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertStorageDescriptorInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor, + Map<Long, Long> sdIdToSerdeId, Map<Long, Long> sdIdToCdId) throws MetaException { + int rowCount = sdIdToStorageDescriptor.size(); + String columns = "(\"SD_ID\",\"CD_ID\",\"INPUT_FORMAT\",\"IS_COMPRESSED\",\"IS_STOREDASSUBDIRECTORIES\"," + + "\"LOCATION\",\"NUM_BUCKETS\",\"OUTPUT_FORMAT\",\"SERDE_ID\")"; + int columnCount = 9; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MStorageDescriptor>> it = sdIdToStorageDescriptor.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MStorageDescriptor> entry = it.next(); + MStorageDescriptor sd = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = sdIdToCdId.get(entry.getKey()); + params[paramIndex++] = sd.getInputFormat(); + params[paramIndex++] = dbType.getBoolean(sd.isCompressed()); + params[paramIndex++] = dbType.getBoolean(sd.isStoredAsSubDirectories()); + params[paramIndex++] = sd.getLocation(); + params[paramIndex++] = sd.getNumBuckets(); + params[paramIndex++] = sd.getOutputFormat(); + params[paramIndex++] = sdIdToSerdeId.get(entry.getKey()); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SDS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertPartitionInBatch(Map<Long, MPartition> partIdToPartition, Map<Long, Long> partIdToSdId) + throws MetaException { + int rowCount = partIdToPartition.size(); + String columns = "(\"PART_ID\",\"CREATE_TIME\",\"LAST_ACCESS_TIME\",\"PART_NAME\",\"SD_ID\",\"TBL_ID\"," + + "\"WRITE_ID\")"; + int columnCount = 7; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MPartition>> it = partIdToPartition.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MPartition> entry = it.next(); + MPartition partition = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = partition.getCreateTime(); + params[paramIndex++] = partition.getLastAccessTime(); + params[paramIndex++] = partition.getPartitionName(); + params[paramIndex++] = partIdToSdId.get(entry.getKey()); + params[paramIndex++] = partition.getTable().getId(); + params[paramIndex++] = partition.getWriteId(); + } + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"PARTITIONS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertSerdeParamInBatch(Map<Long, MSerDeInfo> serdeIdToSerDeInfo) throws MetaException { + int rowCount = 0; + for (MSerDeInfo serDeInfo : serdeIdToSerDeInfo.values()) { + rowCount += serDeInfo.getParameters().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"SERDE_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MSerDeInfo>> serdeIt = serdeIdToSerDeInfo.entrySet().iterator(); + Map.Entry<Long, MSerDeInfo> serdeEntry = serdeIt.next(); + Iterator<Map.Entry<String, String>> it = serdeEntry.getValue().getParameters().entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + params[paramIndex++] = serdeEntry.getKey(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = entry.getValue(); + index++; + } + if (index < batchRowCount) { + serdeEntry = serdeIt.next(); + it = serdeEntry.getValue().getParameters().entrySet().iterator(); + } + } while (index < batchRowCount); + executeQuery(batchQueryText, params); + } + }; + insertInBatch("\"SERDE_PARAMS\"", columns, columnCount, rowCount, batchExecutionContext); + } + + private void insertStorageDescriptorParamInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor) + throws MetaException { + int rowCount = 0; + for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) { + rowCount += sd.getParameters().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"SD_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")"; + int columnCount = 3; + BatchExecutionContext batchExecutionContext = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = sdIdToStorageDescriptor.entrySet().iterator(); + Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next(); + Iterator<Map.Entry<String, String>> it = sdEntry.getValue().getParameters().entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + params[paramIndex++] = sdEntry.getKey(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = entry.getValue(); + index++; + } + if (index < batchRowCount) { + sdEntry = sdIt.next(); Review Comment: same reason as above. Have added comment now to make it clear. Issue Time Tracking ------------------- Worklog Id: (was: 842309) Time Spent: 6.5h (was: 6h 20m) > Explore moving to directsql for ObjectStore::addPartitions > ---------------------------------------------------------- > > Key: HIVE-26035 > URL: https://issues.apache.org/jira/browse/HIVE-26035 > Project: Hive > Issue Type: Bug > Reporter: Rajesh Balamohan > Assignee: Venugopal Reddy K > Priority: Major > Labels: pull-request-available > Time Spent: 6.5h > Remaining Estimate: 0h > > Currently {{addPartitions}} uses datanuclues and is super slow for large > number of partitions. It will be good to move to direct sql. Lots of repeated > SQLs can be avoided as well (e.g SDS, SERDE, TABLE_PARAMS) -- This message was sent by Atlassian Jira (v8.20.10#820010)