[ https://issues.apache.org/jira/browse/HIVE-26035?focusedWorklogId=838555&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-838555 ]
ASF GitHub Bot logged work on HIVE-26035: ----------------------------------------- Author: ASF GitHub Bot Created on: 11/Jan/23 09:11 Start Date: 11/Jan/23 09:11 Worklog Time Spent: 10m Work Description: VenuReddy2103 commented on code in PR #3905: URL: https://github.com/apache/hive/pull/3905#discussion_r1066741904 ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java: ########## @@ -515,6 +529,803 @@ public List<String> getMaterializedViewsForRewriting(String dbName) throws MetaE } } + private Long getDataStoreId(Class<?> modelClass) throws MetaException { + ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext(); + AbstractClassMetaData cmd = ec.getMetaDataManager().getMetaDataForClass(modelClass, ec.getClassLoaderResolver()); + if (cmd.getIdentityType() == IdentityType.DATASTORE) { + return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, cmd, -1); + } else { + throw new MetaException("Identity type is not datastore."); + } + } + + /** + * Interface to execute multiple row insert query in batch for direct SQL + */ + interface BatchExecutionContext { + void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException; + } + + private void insertInBatch(String tableName, String columns, int columnCount, String rowFormat, int rowCount, + BatchExecutionContext bec) throws MetaException { + if (rowCount == 0 || columnCount == 0) { + return; + } + int maxParamsCount = maxParamsInInsert; + if (maxParamsCount < columnCount) { + LOG.error("Maximum number of parameters in the direct SQL batch insert query is less than the table: {}" + + " columns. Executing single row insert queries.", tableName); + maxParamsCount = columnCount; + } + int maxRowsInBatch = maxParamsCount / columnCount; + int maxBatches = rowCount / maxRowsInBatch; + int last = rowCount % maxRowsInBatch; + String query = ""; + if (maxBatches > 0) { + query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, maxRowsInBatch); + } + int batchParamCount = maxRowsInBatch * columnCount; + for (int batch = 0; batch < maxBatches; batch++) { + bec.execute(query, maxRowsInBatch, batchParamCount); + } + if (last != 0) { + query = dbType.getBatchInsertQuery(tableName, columns, rowFormat, last); + bec.execute(query, last, last * columnCount); + } + } + + private void insertSerdeInBatch(Map<Long, MSerDeInfo> serdeIdToSerDeInfo) throws MetaException { + int rowCount = serdeIdToSerDeInfo.size(); + String columns = "(\"SERDE_ID\",\"DESCRIPTION\",\"DESERIALIZER_CLASS\",\"NAME\",\"SERDE_TYPE\",\"SLIB\"," + + "\"SERIALIZER_CLASS\")"; + String row = "(?,?,?,?,?,?,?)"; + int columnCount = 7; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MSerDeInfo>> it = serdeIdToSerDeInfo.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MSerDeInfo> entry = it.next(); + MSerDeInfo serdeInfo = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = serdeInfo.getDescription(); + params[paramIndex++] = serdeInfo.getDeserializerClass(); + params[paramIndex++] = serdeInfo.getName(); + params[paramIndex++] = serdeInfo.getSerdeType(); + params[paramIndex++] = serdeInfo.getSerializationLib(); + params[paramIndex++] = serdeInfo.getSerializerClass(); + } + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(SERDES, columns, columnCount, row, rowCount, bec); + } + + private void insertStorageDescriptorInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor, + Map<Long, Long> sdIdToSerdeId, Map<Long, Long> sdIdToCdId) throws MetaException { + int rowCount = sdIdToStorageDescriptor.size(); + String columns = "(\"SD_ID\",\"CD_ID\",\"INPUT_FORMAT\",\"IS_COMPRESSED\",\"IS_STOREDASSUBDIRECTORIES\"," + + "\"LOCATION\",\"NUM_BUCKETS\",\"OUTPUT_FORMAT\",\"SERDE_ID\")"; + String row = "(?,?,?,?,?,?,?,?,?)"; + int columnCount = 9; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MStorageDescriptor>> it = sdIdToStorageDescriptor.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MStorageDescriptor> entry = it.next(); + MStorageDescriptor sd = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = sdIdToCdId.get(entry.getKey()); + params[paramIndex++] = sd.getInputFormat(); + params[paramIndex++] = dbType.getBoolean(sd.isCompressed()); + params[paramIndex++] = dbType.getBoolean(sd.isStoredAsSubDirectories()); + params[paramIndex++] = sd.getLocation(); + params[paramIndex++] = sd.getNumBuckets(); + params[paramIndex++] = sd.getOutputFormat(); + params[paramIndex++] = sdIdToSerdeId.get(entry.getKey()); + } + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(SDS, columns, columnCount, row, rowCount, bec); + } + + private void insertPartitionInBatch(Map<Long, MPartition> partIdToPartition, Map<Long, Long> partIdToSdId) + throws MetaException { + int rowCount = partIdToPartition.size(); + String columns = "(\"PART_ID\",\"CREATE_TIME\",\"LAST_ACCESS_TIME\",\"PART_NAME\",\"SD_ID\",\"TBL_ID\"," + + "\"WRITE_ID\")"; + String row = "(?,?,?,?,?,?,?)"; + int columnCount = 7; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MPartition>> it = partIdToPartition.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MPartition> entry = it.next(); + MPartition partition = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = partition.getCreateTime(); + params[paramIndex++] = partition.getLastAccessTime(); + params[paramIndex++] = partition.getPartitionName(); + params[paramIndex++] = partIdToSdId.get(entry.getKey()); + params[paramIndex++] = partition.getTable().getId(); + params[paramIndex++] = partition.getWriteId(); + } + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(PARTITIONS, columns, columnCount, row, rowCount, bec); + } + + private void insertSerdeParamInBatch(Map<Long, MSerDeInfo> serdeIdToSerDeInfo) throws MetaException { + int rowCount = 0; + for (MSerDeInfo serDeInfo : serdeIdToSerDeInfo.values()) { + rowCount += serDeInfo.getParameters().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"SERDE_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")"; + String row = "(?,?,?)"; + int columnCount = 3; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MSerDeInfo>> serdeIt = serdeIdToSerDeInfo.entrySet().iterator(); + Map.Entry<Long, MSerDeInfo> serdeEntry = serdeIt.next(); + Iterator<Map.Entry<String, String>> it = serdeEntry.getValue().getParameters().entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + params[paramIndex++] = serdeEntry.getKey(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = entry.getValue(); + index++; + } + if (index < batchRowCount) { + serdeEntry = serdeIt.next(); + it = serdeEntry.getValue().getParameters().entrySet().iterator(); + } + } while (index < batchRowCount); + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(SERDE_PARAMS, columns, columnCount, row, rowCount, bec); + } + + private void insertStorageDescriptorParamInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor) + throws MetaException { + int rowCount = 0; + for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) { + rowCount += sd.getParameters().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"SD_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")"; + String row = "(?,?,?)"; + int columnCount = 3; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = sdIdToStorageDescriptor.entrySet().iterator(); + Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next(); + Iterator<Map.Entry<String, String>> it = sdEntry.getValue().getParameters().entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + params[paramIndex++] = sdEntry.getKey(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = entry.getValue(); + index++; + } + if (index < batchRowCount) { + sdEntry = sdIt.next(); + it = sdEntry.getValue().getParameters().entrySet().iterator(); + } + } while (index < batchRowCount); + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(SD_PARAMS, columns, columnCount, row, rowCount, bec); + } + + private void insertPartitionParamInBatch(Map<Long, MPartition> partIdToPartition) throws MetaException { + int rowCount = 0; + for (MPartition part : partIdToPartition.values()) { + rowCount += part.getParameters().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"PART_ID\",\"PARAM_KEY\",\"PARAM_VALUE\")"; + String row = "(?,?,?)"; + int columnCount = 3; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MPartition>> partIt = partIdToPartition.entrySet().iterator(); + Map.Entry<Long, MPartition> partEntry = partIt.next(); + Iterator<Map.Entry<String, String>> it = partEntry.getValue().getParameters().entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + params[paramIndex++] = partEntry.getKey(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = entry.getValue(); + index++; + } + if (index < batchRowCount) { + partEntry = partIt.next(); + it = partEntry.getValue().getParameters().entrySet().iterator(); + } + } while (index < batchRowCount); + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(PARTITION_PARAMS, columns, columnCount, row, rowCount, bec); + } + + private void insertPartitionKeyValInBatch(Map<Long, MPartition> partIdToPartition) throws MetaException { + int rowCount = 0; + for (MPartition part : partIdToPartition.values()) { + rowCount += part.getValues().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"PART_ID\",\"PART_KEY_VAL\",\"INTEGER_IDX\")"; + String row = "(?,?,?)"; + int columnCount = 3; + BatchExecutionContext bec = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, MPartition>> partIt = partIdToPartition.entrySet().iterator(); + Map.Entry<Long, MPartition> partEntry = partIt.next(); + Iterator<String> it = partEntry.getValue().getValues().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + params[paramIndex++] = partEntry.getKey(); + params[paramIndex++] = it.next(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + partEntry = partIt.next(); + it = partEntry.getValue().getValues().iterator(); + } + } while (index < batchRowCount); + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(PARTITION_KEY_VALS, columns, columnCount, row, rowCount, bec); + } + + + private void insertColumnDescriptorInBatch(Map<Long, MColumnDescriptor> cdIdToColumnDescriptor) throws MetaException { + int rowCount = cdIdToColumnDescriptor.size(); + String columns = "(\"CD_ID\")"; + String row = "(?)"; + int columnCount = 1; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Long> it = cdIdToColumnDescriptor.keySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + params[paramIndex++] = it.next(); + } + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(CDS, columns, columnCount, row, rowCount, bec); + } + + private void insertColumnV2InBatch(Map<Long, MColumnDescriptor> cdIdToColumnDescriptor) throws MetaException { + int rowCount = 0; + for (MColumnDescriptor cd : cdIdToColumnDescriptor.values()) { + rowCount += cd.getCols().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"CD_ID\",\"COMMENT\",\"COLUMN_NAME\",\"TYPE_NAME\",\"INTEGER_IDX\")"; + String row = "(?,?,?,?,?)"; + int columnCount = 5; + BatchExecutionContext bec = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, MColumnDescriptor>> cdIt = cdIdToColumnDescriptor.entrySet().iterator(); + Map.Entry<Long, MColumnDescriptor> cdEntry = cdIt.next(); + Iterator<MFieldSchema> it = cdEntry.getValue().getCols().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + MFieldSchema fieldSchema = it.next(); + params[paramIndex++] = cdEntry.getKey(); + params[paramIndex++] = fieldSchema.getComment(); + params[paramIndex++] = fieldSchema.getName(); + params[paramIndex++] = fieldSchema.getType(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + cdEntry = cdIt.next(); + it = cdEntry.getValue().getCols().iterator(); + } + } while (index < batchRowCount); + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(COLUMNS_V2, columns, columnCount, row, rowCount, bec); + } + + private void insertBucketColInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor) throws MetaException { + int rowCount = 0; + for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) { + rowCount += sd.getBucketCols().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"SD_ID\",\"BUCKET_COL_NAME\",\"INTEGER_IDX\")"; + String row = "(?,?,?)"; + int columnCount = 3; + BatchExecutionContext bec = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = sdIdToStorageDescriptor.entrySet().iterator(); + Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next(); + Iterator<String> it = sdEntry.getValue().getBucketCols().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + params[paramIndex++] = sdEntry.getKey(); + params[paramIndex++] = it.next(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + sdEntry = sdIt.next(); + it = sdEntry.getValue().getBucketCols().iterator(); + } + } while (index < batchRowCount); + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(BUCKETING_COLS, columns, columnCount, row, rowCount, bec); + } + + private void insertSortColInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor) throws MetaException { + int rowCount = 0; + for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) { + rowCount += sd.getSortCols().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"SD_ID\",\"COLUMN_NAME\",\"ORDER\",\"INTEGER_IDX\")"; + String row = "(?,?,?,?)"; + int columnCount = 4; + BatchExecutionContext bec = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = sdIdToStorageDescriptor.entrySet().iterator(); + Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next(); + Iterator<MOrder> it = sdEntry.getValue().getSortCols().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + MOrder order = it.next(); + params[paramIndex++] = sdEntry.getKey(); + params[paramIndex++] = order.getCol(); + params[paramIndex++] = order.getOrder(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + sdEntry = sdIt.next(); + it = sdEntry.getValue().getSortCols().iterator(); + } + } while (index < batchRowCount); + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(SORT_COLS, columns, columnCount, row, rowCount, bec); + } + + private void insertSkewedStringListInBatch(List<Long> stringListIds) throws MetaException { + int rowCount = stringListIds.size(); + String columns = "(\"STRING_LIST_ID\")"; + String row = "(?)"; + int columnCount = 1; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Long> it = stringListIds.iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + params[paramIndex++] = it.next(); + } + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(SKEWED_STRING_LIST, columns, columnCount, row, rowCount, bec); + } + + private void insertSkewedStringListValInBatch(Map<Long, List<String>> stringListIdToStringList) throws MetaException { + int rowCount = 0; + for (List<String> stringList : stringListIdToStringList.values()) { + rowCount += stringList.size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"STRING_LIST_ID\",\"STRING_LIST_VALUE\",\"INTEGER_IDX\")"; + String row = "(?,?,?)"; + int columnCount = 3; + BatchExecutionContext bec = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, List<String>>> stringListIt = stringListIdToStringList.entrySet().iterator(); + Map.Entry<Long, List<String>> stringListEntry = stringListIt.next(); + Iterator<String> it = stringListEntry.getValue().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + params[paramIndex++] = stringListEntry.getKey(); + params[paramIndex++] = it.next(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + stringListEntry = stringListIt.next(); + it = stringListEntry.getValue().iterator(); + } + } while (index < batchRowCount); + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(SKEWED_STRING_LIST_VALUES, columns, columnCount, row, rowCount, bec); + } + + private void insertSkewedColInBatch(Map<Long, MStorageDescriptor> sdIdToStorageDescriptor) throws MetaException { + int rowCount = 0; + for (MStorageDescriptor sd : sdIdToStorageDescriptor.values()) { + rowCount += sd.getSkewedColNames().size(); + } + if (rowCount == 0) { + return; + } + String columns = "(\"SD_ID\",\"SKEWED_COL_NAME\",\"INTEGER_IDX\")"; + String row = "(?,?,?)"; + int columnCount = 3; + BatchExecutionContext bec = new BatchExecutionContext() { + int colIndex = 0; + final Iterator<Map.Entry<Long, MStorageDescriptor>> sdIt = sdIdToStorageDescriptor.entrySet().iterator(); + Map.Entry<Long, MStorageDescriptor> sdEntry = sdIt.next(); + Iterator<String> it = sdEntry.getValue().getSkewedColNames().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int index = 0; + int paramIndex = 0; + do { + while (index < batchRowCount && it.hasNext()) { + params[paramIndex++] = sdEntry.getKey(); + params[paramIndex++] = it.next(); + params[paramIndex++] = colIndex++; + index++; + } + if (index < batchRowCount) { + colIndex = 0; + sdEntry = sdIt.next(); + it = sdEntry.getValue().getSkewedColNames().iterator(); + } + } while (index < batchRowCount); + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(SKEWED_COL_NAMES, columns, columnCount, row, rowCount, bec); + } + + private void insertSkewedValInBatch(List<Long> stringListIds, Map<Long, Long> stringListIdToSdId) + throws MetaException { + int rowCount = stringListIds.size(); + String columns = "(\"SD_ID_OID\",\"STRING_LIST_ID_EID\",\"INTEGER_IDX\")"; + String row = "(?,?,?)"; + int columnCount = 3; + BatchExecutionContext bec = new BatchExecutionContext() { + int colIndex = 0; + long prevSdId = -1; + final Iterator<Long> it = stringListIds.iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Long stringListId = it.next(); + Long sdId = stringListIdToSdId.get(stringListId); + params[paramIndex++] = sdId; + params[paramIndex++] = stringListId; + if (prevSdId != sdId) { + colIndex = 0; + } + params[paramIndex++] = colIndex++; + prevSdId = sdId; + } + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(SKEWED_VALUES, columns, columnCount, row, rowCount, bec); + } + + private void insertSkewedLocationInBatch(Map<Long, String> stringListIdToLocation, Map<Long, Long> stringListIdToSdId) + throws MetaException { + int rowCount = stringListIdToLocation.size(); + String columns = "(\"SD_ID\",\"STRING_LIST_ID_KID\",\"LOCATION\")"; + String row = "(?,?,?)"; + int columnCount = 3; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, String>> it = stringListIdToLocation.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, String> entry = it.next(); + params[paramIndex++] = stringListIdToSdId.get(entry.getKey()); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = entry.getValue(); + } + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(SKEWED_COL_VALUE_LOC_MAP, columns, columnCount, row, rowCount, bec); + } + + private void insertPartitionPrivilegeInBatch(Map<Long, MPartitionPrivilege> partGrantIdToPrivilege, + Map<Long, Long> partGrantIdToPartId) throws MetaException { + int rowCount = partGrantIdToPrivilege.size(); + String columns = "(\"PART_GRANT_ID\",\"AUTHORIZER\",\"CREATE_TIME\",\"GRANT_OPTION\",\"GRANTOR\",\"GRANTOR_TYPE\"," + + "\"PART_ID\",\"PRINCIPAL_NAME\",\"PRINCIPAL_TYPE\",\"PART_PRIV\")"; + String row = "(?,?,?,?,?,?,?,?,?,?)"; + int columnCount = 10; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MPartitionPrivilege>> it = partGrantIdToPrivilege.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MPartitionPrivilege> entry = it.next(); + MPartitionPrivilege partPrivilege = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = partPrivilege.getAuthorizer(); + params[paramIndex++] = partPrivilege.getCreateTime(); + params[paramIndex++] = partPrivilege.getGrantOption(); + params[paramIndex++] = partPrivilege.getGrantor(); + params[paramIndex++] = partPrivilege.getGrantorType(); + params[paramIndex++] = partGrantIdToPartId.get(entry.getKey()); + params[paramIndex++] = partPrivilege.getPrincipalName(); + params[paramIndex++] = partPrivilege.getPrincipalType(); + params[paramIndex++] = partPrivilege.getPrivilege(); + } + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(PART_PRIVS, columns, columnCount, row, rowCount, bec); + } + + private void insertPartitionColPrivilegeInBatch(Map<Long, MPartitionColumnPrivilege> partColumnGrantIdToPrivilege, + Map<Long, Long> partColumnGrantIdToPartId) throws MetaException { + int rowCount = partColumnGrantIdToPrivilege.size(); + String columns = "(\"PART_COLUMN_GRANT_ID\",\"AUTHORIZER\",\"COLUMN_NAME\",\"CREATE_TIME\",\"GRANT_OPTION\"," + + "\"GRANTOR\",\"GRANTOR_TYPE\",\"PART_ID\",\"PRINCIPAL_NAME\",\"PRINCIPAL_TYPE\",\"PART_COL_PRIV\")"; + String row = "(?,?,?,?,?,?,?,?,?,?,?)"; + int columnCount = 11; + BatchExecutionContext bec = new BatchExecutionContext() { + final Iterator<Map.Entry<Long, MPartitionColumnPrivilege>> it + = partColumnGrantIdToPrivilege.entrySet().iterator(); + @Override + public void execute(String batchQueryText, int batchRowCount, int batchParamCount) throws MetaException { + Object[] params = new Object[batchParamCount]; + int paramIndex = 0; + for (int index = 0; index < batchRowCount; index++) { + Map.Entry<Long, MPartitionColumnPrivilege> entry = it.next(); + MPartitionColumnPrivilege partColumnPrivilege = entry.getValue(); + params[paramIndex++] = entry.getKey(); + params[paramIndex++] = partColumnPrivilege.getAuthorizer(); + params[paramIndex++] = partColumnPrivilege.getColumnName(); + params[paramIndex++] = partColumnPrivilege.getCreateTime(); + params[paramIndex++] = partColumnPrivilege.getGrantOption(); + params[paramIndex++] = partColumnPrivilege.getGrantor(); + params[paramIndex++] = partColumnPrivilege.getGrantorType(); + params[paramIndex++] = partColumnGrantIdToPartId.get(entry.getKey()); + params[paramIndex++] = partColumnPrivilege.getPrincipalName(); + params[paramIndex++] = partColumnPrivilege.getPrincipalType(); + params[paramIndex++] = partColumnPrivilege.getPrivilege(); + } + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", batchQueryText))) { + executeWithArray(query.getInnerQuery(), params, batchQueryText); + } + } + }; + insertInBatch(PART_COL_PRIVS, columns, columnCount, row, rowCount, bec); + } + + /** + * Add partitions in batch using direct SQL + * @param parts list of partitions + * @param partPrivilegesList list of partition privileges + * @param partColPrivilegesList list of partition column privileges + * @throws MetaException + */ + public void addPartitions(List<MPartition> parts, List<List<MPartitionPrivilege>> partPrivilegesList, + List<List<MPartitionColumnPrivilege>> partColPrivilegesList) throws MetaException { + Map<Long, MSerDeInfo> serdeIdToSerDeInfo = new HashMap<>(); + Map<Long, MColumnDescriptor> cdIdToColumnDescriptor = new HashMap<>(); + Map<Long, MStorageDescriptor> sdIdToStorageDescriptor = new HashMap<>(); + Map<Long, MPartition> partIdToPartition = new HashMap<>(); + Map<Long, MPartitionPrivilege> partGrantIdToPrivilege = new HashMap<>(); + Map<Long, MPartitionColumnPrivilege> partColumnGrantIdToPrivilege = new HashMap<>(); + Map<Long, Long> sdIdToSerdeId = new HashMap<>(); + Map<Long, Long> sdIdToCdId = new HashMap<>(); + Map<Long, Long> partIdToSdId = new HashMap<>(); + Map<Long, List<String>> stringListIdToStringList = new HashMap<>(); + Map<Long, Long> stringListIdToSdId = new HashMap<>(); + Map<Long, String> stringListIdToLocation = new HashMap<>(); + Map<Long, Long> partGrantIdToPartId = new HashMap<>(); + Map<Long, Long> partColumnGrantIdToPartId = new HashMap<>(); + List<Long> stringListIds = new ArrayList<>(); + int partitionsCount = parts.size(); + for (int index = 0; index < partitionsCount; index++) { + MPartition part = parts.get(index); + Long serDeId = getDataStoreId(MSerDeInfo.class); + serdeIdToSerDeInfo.put(serDeId, part.getSd().getSerDeInfo()); + + Long cdId; + DatastoreId storeId = (DatastoreId) pm.getObjectId(part.getSd().getCD()); + if (storeId == null) { + cdId = getDataStoreId(MColumnDescriptor.class); + cdIdToColumnDescriptor.put(cdId, part.getSd().getCD()); + } else { + cdId = (Long) storeId.getKeyAsObject(); + } + + Long sdId = getDataStoreId(MStorageDescriptor.class); + sdIdToStorageDescriptor.put(sdId, part.getSd()); + sdIdToSerdeId.put(sdId, serDeId); + sdIdToCdId.put(sdId, cdId); + + Long partId = getDataStoreId(MPartition.class); Review Comment: Datanucleus generate identity values unique across different JVMs. We have used datanucleus api to get the id. Issue Time Tracking ------------------- Worklog Id: (was: 838555) Time Spent: 1h 50m (was: 1h 40m) > Explore moving to directsql for ObjectStore::addPartitions > ---------------------------------------------------------- > > Key: HIVE-26035 > URL: https://issues.apache.org/jira/browse/HIVE-26035 > Project: Hive > Issue Type: Bug > Reporter: Rajesh Balamohan > Priority: Major > Labels: pull-request-available > Time Spent: 1h 50m > Remaining Estimate: 0h > > Currently {{addPartitions}} uses datanuclues and is super slow for large > number of partitions. It will be good to move to direct sql. Lots of repeated > SQLs can be avoided as well (e.g SDS, SERDE, TABLE_PARAMS) -- This message was sent by Atlassian Jira (v8.20.10#820010)