[ https://issues.apache.org/jira/browse/HIVE-24893?focusedWorklogId=696523&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-696523 ]
ASF GitHub Bot logged work on HIVE-24893: ----------------------------------------- Author: ASF GitHub Bot Created on: 15/Dec/21 10:49 Start Date: 15/Dec/21 10:49 Worklog Time Spent: 10m Work Description: wangyum commented on pull request #2878: URL: https://github.com/apache/hive/pull/2878#issuecomment-994669275 This is an example how Spark implement `DownloadDataOperation`: ```scala private[hive] case class DownloadDataBlock( path: Option[Path] = None, offset: Option[Long] = None, schema: Option[String] = None, dataSize: Long) private[hive] class SparkDownloadDataOperation( val sqlContext: SQLContext, parentSession: HiveSession, tableName: String, query: String, format: String, options: JMap[String, String], runInBackground: Boolean) extends Operation( parentSession, Map.empty[String, String].asJava, OperationType.UNKNOWN_OPERATION, runInBackground) with SparkOperation with QueryLogging with Logging { private var result: DataFrame = _ private lazy val resultSchema: TableSchema = { if (result == null || result.schema.isEmpty) { new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) } else { logInfo(s"Result Schema: ${result.schema}") SparkExecuteStatementOperation.getTableSchema(result.schema) } } private val pathFilter = new PathFilter { override def accept(path: Path): Boolean = !path.getName.equals("_SUCCESS") && !path.getName.endsWith("crc") } private val defaultBlockSize = 10 * 1024 * 1024 // Please see CSVOptions for more details. private val defaultOptions = Map( "timestampFormat" -> "yyyy-MM-dd HH:mm:ss", "dateFormat" -> "yyyy-MM-dd", "delimiter" -> ",", "escape" -> "\"", "compression" -> "gzip", "header" -> "true", "maxRecordsPerFile" ->"0", // To avoid Zeta client timeout "fetchBlockSize" -> defaultBlockSize.toString, "maxFetchBlockTime" -> "30000", // To avoid coalesce "minFileSize" -> (defaultBlockSize - 1 * 1024 * 1024).toString) private val writeOptions = defaultOptions ++ Option(options).map(_.asScala).getOrElse(Map.empty[String, String]).toMap private val numFiles = writeOptions.get("numFiles").map(_.toInt) private val fetchSize = writeOptions("fetchBlockSize").toLong private val maxFetchBlockTime = writeOptions("maxFetchBlockTime").toLong private val minFileSize = writeOptions("minFileSize").toLong private val downloadQuery = s"Generating download files with arguments " + s"[${tableName}, ${query}, ${format}, ${writeOptions}]" private val hadoopConf = sqlContext.sparkContext.hadoopConfiguration private val scratchDir = sqlContext.conf.getConf(StaticSQLConf.SPARK_SCRATCH_DIR) private val pathPrefix = new Path(scratchDir + File.separator + "DownloadData" + File.separator + parentSession.getUserName + File.separator + parentSession.getSessionHandle.getSessionId) private val fs: FileSystem = pathPrefix.getFileSystem(hadoopConf) private var iter: JIterator[DownloadDataBlock] = _ private var schemaStr: String = _ private var totalDataSize: Long = 0 override def close(): Unit = { // CARMEL-4662 Fix Download query state is incorrect. if (getStatus.getState eq OperationState.FINISHED) { HiveThriftServer2.eventManager.onStatementFinish(statementId) } HiveThriftServer2.eventManager.onQueryExist( statementId, QueryLogObjectList(Option(result).map(_.queryExecution)), QueryLogExtInfo(false, totalDataSize)) logInfo(s"CLOSING $statementId") cleanup(OperationState.CLOSED) sqlContext.sparkContext.clearJobGroup() } override def runInternal(): Unit = { setState(OperationState.PENDING) setHasResultSet(true) if (!runInBackground) { execute() } else { val sparkServiceUGI = HiveShimsUtils.getUGI() val backgroundOperation = new Runnable() { override def run(): Unit = { val doAsAction = new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { try { execute() } catch { case e: HiveSQLException => setOperationException(e) log.error("Error generating download file: ", e) } } } try { sparkServiceUGI.doAs(doAsAction) } catch { case e: Exception => setOperationException(new HiveSQLException(e)) logError("Error generating download file as user : " + sparkServiceUGI.getShortUserName(), e) } } } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError("Error generating download file in background", e) setState(OperationState.ERROR) throw new HiveSQLException(e) } } } private def execute(): Unit = { statementId = getHandle.getHandleIdentifier.getPublicId.toString setState(OperationState.RUNNING) try { // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader // Use parent session's SessionState in this operation because such SessionState // keeps some shared info per session e.g. authorization information. SessionState.setCurrentSessionState(parentSession.getSessionState) Thread.currentThread().setContextClassLoader(executionHiveClassLoader) sqlContext.sparkContext.listenerBus. post(StatementStart(statementId, System.currentTimeMillis(), sqlContext.sparkContext.getLocalProperty("spark.hive.session.id"))) HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, downloadQuery, statementId, parentSession.getUsername) assert(fetchSize >= 1L * 1024 * 1024 && fetchSize <= 20L * 1024 * 1024, s"fetchBlockSize(${fetchSize}) should be greater than 1M and less than 20M.") if (StringUtils.isNotEmpty(tableName) && StringUtils.isNotEmpty(query)) { throw new HiveSQLException("Both table name and query are specified.") } sqlContext.sparkContext.setLocalProperty( SparkContext.SPARK_USER_RESOURCE_CONSUMER_ID, parentSession.getUserName) if (parentSession.getUserInfo != null) { sqlContext.sparkContext.setLocalProperty( SparkContext.SPARK_USER_RESOURCE_CONSUMER_PROFILE, parentSession.getUserInfo.profile) } sqlContext.sparkContext.setJobGroup(statementId, downloadQuery) logInfo(s"Running query [$statementId] in session " + s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD '$query'") val resultPath = writeData(new Path(pathPrefix, statementId)) logQueryInfo(s"Running query [$statementId] in session " + s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD '$query'") val dataSize = fs.getContentSummary(resultPath).getLength logInfo(s"Try to download ${dataSize} bytes data from thriftserver.") totalDataSize = dataSize val list: JList[DownloadDataBlock] = new JArrayList[DownloadDataBlock]() // Add total data size to first row. list.add(DownloadDataBlock(schema = Some(schemaStr), dataSize = dataSize)) // and then add data. fs.listStatus(resultPath, pathFilter).map(_.getPath).sortBy(_.getName).foreach { path => val dataLen = fs.getFileStatus(path).getLen // Cast to BigDecimal to avoid overflowing val fetchBatchs = BigDecimal(dataLen)./(BigDecimal(fetchSize)).setScale(0, RoundingMode.CEILING).longValue() assert(fetchBatchs < Int.MaxValue, "The fetch batch too large.") (0 until fetchBatchs.toInt).foreach { i => val fetchSizeInBatch = if (i == fetchBatchs - 1) dataLen - i * fetchSize else fetchSize list.add(DownloadDataBlock( path = Some(path), offset = Some(i * fetchSize), dataSize = fetchSizeInBatch)) } list.add(DownloadDataBlock(path = Some(path), dataSize = -1)) } iter = list.iterator() logInfo(s"Add ${list.size()} data blocks to be fetched.") setState(OperationState.FINISHED) logQueryInfo(s"Finished query [$statementId].") } catch { case NonFatal(e) => logQueryError(s"Error executing query [$statementId]", e) setState(OperationState.ERROR) HiveThriftServer2.eventManager.onStatementError( statementId, Utils.findFirstCause(e).toString, Utils.exceptionString(e)) val exception = new HiveSQLException(e) setOperationException(exception) } } private def writeData(path: Path): Path = withRetry { result = (Option(tableName), Option(query), Option(format), Option(options)) match { case (Some(t), None, _, _) => sqlContext.table(t) case (None, Some(q), _, _) => sqlContext.sql(q) case _ => throw new HiveSQLException(s"Invalid arguments: ($tableName, $query, $format, $options).") } schemaStr = result.schema.map(_.name).mkString(writeOptions("delimiter")) val needRepartition = result.queryExecution.sparkPlan match { case _: SortExec => false case _: TakeOrderedAndProjectExec => false case ProjectExec(_, _: SortExec) => false case AdaptiveSparkPlanExec(_: SortExec, _, _, _) => false case AdaptiveSparkPlanExec(_: TakeOrderedAndProjectExec, _, _, _) => false case AdaptiveSparkPlanExec(ProjectExec(_, _: SortExec), _, _, _) => false case _: ShuffleExchangeExec => false case ProjectExec(_, _: ShuffleExchangeExec) => false case _: CollectLimitExec => false case _: LimitExec => false case _ => true } // Background: according to the official Hadoop FileSystem API spec, // rename op's destination path must have a parent that exists, // otherwise we may get unexpected result on the rename API. // When downloading dataset as parquet format, if we configure a // quota-free path and adopt FileOutputCommitter V1 algorithm, we will // get the "IOException: Failed to rename FileStatus". // Hence, the parent path should exist (see CARMEL-5150). if (!fs.exists(path) && !fs.mkdirs(path)) { logWarning(s"Failed to create parent download path ${path}") } val step1Path = new Path(path, "step1") val outputFormat = Option(format).getOrElse("csv") val (castCols, readSchema) = if (outputFormat.equalsIgnoreCase("csv")) { // Support duplicate columns val names = result.schema.map(_.name) val renameDuplicateNames = if (names.distinct.length != names.length) { val duplicateColumns = names.groupBy(identity).collect { case (x, ys) if ys.length > 1 => x } result.logicalPlan.output.zipWithIndex.map { case (col, index) if duplicateColumns.exists(_.equals(col.name)) => col.withName(col.name + index) case (col, _) => col } } else { result.logicalPlan.output } // Support Complex types for csv file val output = renameDuplicateNames.map { col => col.dataType match { case BinaryType => Column(col).cast(StringType).alias(col.name) case NullType => Column(col).cast(StringType).alias(col.name) case CalendarIntervalType => Column(col).cast(StringType).alias(col.name) case ArrayType(_, _) => Column(col).cast(StringType).alias(col.name) case MapType(_, _, _) => Column(col).cast(StringType).alias(col.name) case StructType(_) => Column(col).cast(StringType).alias(col.name) case _ => Column(col).alias(col.name) } } (output, StructType(StructType.fromAttributes(renameDuplicateNames) .map(_.copy(dataType = StringType)))) } else if (outputFormat.equalsIgnoreCase("parquet")) { val output = result.logicalPlan.output.map { col => col.dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType => Column(col).alias(col.name) case _ => Column(col).cast(StringType).alias(col.name) } } val newSchema = result.schema.map(s => s.dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType => s case _ => s.copy(dataType = StringType) }) (output, StructType(newSchema)) } else { val output = result.logicalPlan.output.map(col => Column(col).alias(col.name)) (output, result.schema) } val writePlan = if (!needRepartition) { result.select(castCols: _*) } else if (numFiles.nonEmpty) { result.select(castCols: _*).repartition(numFiles.get) } else { result.select(castCols: _*).repartition(Column(Rand(1))) } writePlan.write .options(writeOptions) .format(outputFormat) .mode(SaveMode.Overwrite) .save(step1Path.toString) val contentSummary = fs.getContentSummary(step1Path) val dataSize = contentSummary.getLength if (dataSize > sqlContext.conf.getConf(HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE)) { throw QueryLevelRestrictionErrors.downloadDataSizeExceeded( dataSize, sqlContext.conf.getConf(HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE)) } step1Path } // Limit download speed. private var lastFetchTime = System.currentTimeMillis() private var downloadedDataSize = 0L override def getNextRowSet(orientation: FetchOrientation, maxRowsL: Long): RowSet = { val expectedMaxTime = math.min(maxFetchBlockTime, (downloadedDataSize.toDouble / (50L * 1024 * 1024)) * 1000L).toLong val downloadTime = System.currentTimeMillis() - lastFetchTime if (downloadTime < expectedMaxTime) { logInfo(s"Limit download speed ${downloadTime}, " + s"expected max download time ${expectedMaxTime}") Thread.sleep(expectedMaxTime - downloadTime) } lastFetchTime = System.currentTimeMillis() if (getStatus.getState ne OperationState.FINISHED) { throw getStatus.getOperationException } assertState(OperationState.FINISHED) validateFetchOrientation(orientation, JEnumSet.of(FetchOrientation.FETCH_NEXT)) val rowSet: RowSet = RowSetFactory.create(getDownloadSchema, getProtocolVersion, false) if (!iter.hasNext) { rowSet } else { val maxRows = maxRowsL.toInt var curRow = 0 while (curRow < maxRows && iter.hasNext) { val dataBlock = iter.next() val dataSize = dataBlock.dataSize dataBlock.path match { case Some(path) => if (dataSize >= 0) { val buffer: Array[Byte] = new Array[Byte](dataSize.toInt) Utils.tryWithResource(fs.open(path)) { is => is.seek(dataBlock.offset.get) is.readFully(buffer) } // data row rowSet.addRow(Array[AnyRef](path.getName, buffer, null, Long.box(dataSize))) downloadedDataSize = dataSize } else { // End of file row rowSet.addRow(Array[AnyRef](path.getName, null, null, Long.box(dataSize))) } case _ => // Schema row and total data size row rowSet.addRow(Array[AnyRef](null, null, dataBlock.schema.get, Long.box(dataSize))) } curRow += 1 } rowSet } } override def getResultSetSchema: TableSchema = { if (writeOptions.get("useRealSchema").nonEmpty && writeOptions("useRealSchema").equalsIgnoreCase("true")) { resultSchema } else { val ret = new TableSchema() .addPrimitiveColumn("FILE_NAME", Type.STRING_TYPE, "The file name to be transferred.") .addPrimitiveColumn("DATA", Type.BINARY_TYPE, "The data to be transferred.") .addPrimitiveColumn("SCHEMA", Type.STRING_TYPE, "The data schema to be transferred.") .addPrimitiveColumn("SIZE", Type.BIGINT_TYPE, "The size to be transferred in this fetch.") ret } } private def getDownloadSchema: TableSchema = { new TableSchema() .addPrimitiveColumn("FILE_NAME", Type.STRING_TYPE, "The file name to be transferred.") .addPrimitiveColumn("DATA", Type.BINARY_TYPE, "The data to be transferred.") .addPrimitiveColumn("SCHEMA", Type.STRING_TYPE, "The data schema to be transferred.") .addPrimitiveColumn("SIZE", Type.BIGINT_TYPE, "The size to be transferred in this fetch.") } override def cancel(): Unit = { if (statementId != null) { HiveThriftServer2.eventManager.onStatementCanceled(statementId) } HiveThriftServer2.eventManager.onQueryExist( statementId, QueryLogObjectList(Option(result).map(_.queryExecution)), QueryLogExtInfo(false, totalDataSize)) cleanup(OperationState.CANCELED) } private def withRetry[T](f: => T): T = { val maxRetry = 2 var retryNum = 0 def retriable(t: Throwable): Boolean = { var cur = t while (retryNum < maxRetry && cur != null) { Utils.findFirstCause(cur) match { case f: FileNotFoundException if !f.getMessage.contains("shuffle_") => // For some commands, they may failed when initiating dataset, since it will trigger // execution on dataset initialization. We need manually build a QueryExecution to // get the optimized plan. val qe = if (result != null) { result.queryExecution } else { val parsed = sqlContext.sessionState.sqlParser.parsePlan(query) new QueryExecution(sqlContext.sparkSession, parsed) } qe.optimizedPlan.foreach { case LogicalRelation(_, _, Some(table), _) => qe.sparkSession.sessionState.refreshTable(table.identifier.toString) case HiveTableRelation(tableMeta, _, _, _, _, _) => qe.sparkSession.sessionState.refreshTable(tableMeta.identifier.toString) case _ => } return true case c => cur = cur.getCause() } } false } var res: Option[T] = None do { if (retryNum > 0) { logInfo(s"Start to retry query $statementId.") } try { res = Some(f) } catch { case e if retriable(e) => logError(s"Query $statementId failed out of error ${e.getCause.getMessage}") retryNum += 1 case e: Throwable => throw e } } while (res.isEmpty) res.get } private def cleanup(state: OperationState) { setState(state) if (runInBackground) { val backgroundHandle = getBackgroundHandle() if (backgroundHandle != null) { backgroundHandle.cancel(true) } } if (statementId != null) { sqlContext.sparkContext.cancelJobGroup(statementId, Some("Clean up SparkDownloadData")) sqlContext.queryLoadLimitationManager.clean(statementId) } // Delete temp files try { fs.delete(pathPrefix, true) } catch { case e: IOException => log.warn("Failed to remove download temp files.", e) } sqlContext.sparkContext.closeJobGroup(statementId) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 696523) Time Spent: 20m (was: 10m) > Download data from Thriftserver through JDBC > -------------------------------------------- > > Key: HIVE-24893 > URL: https://issues.apache.org/jira/browse/HIVE-24893 > Project: Hive > Issue Type: New Feature > Components: HiveServer2, JDBC > Affects Versions: 4.0.0 > Reporter: Yuming Wang > Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > It is very useful to support downloading large amounts of data (such as more > than 50GB) through JDBC. > Snowflake has similar support : > https://docs.snowflake.com/en/user-guide/jdbc-using.html#label-jdbc-download-from-stage-to-stream > https://github.com/snowflakedb/snowflake-jdbc/blob/95a7d8a03316093430dc3960df6635643208b6fd/src/main/java/net/snowflake/client/jdbc/SnowflakeConnectionV1.java#L886 -- This message was sent by Atlassian Jira (v8.20.1#820001)