ASF GitHub Bot logged work on HIVE-24893: ----------------------------------------- Author: ASF GitHub Bot Created on: 15/Dec/21 10:49 Start Date: 15/Dec/21 10:49 Worklog Time Spent: 10m Work Description: wangyum commented on pull request #2878: URL: https://github.com/apache/hive/pull/2878#issuecomment-994669275 This is an example how Spark implement `DownloadDataOperation`: ```scala private[hive] case class DownloadDataBlock( path: Option[Path] = None, offset: Option[Long] = None, schema: Option[String] = None, dataSize: Long) private[hive] class SparkDownloadDataOperation( val sqlContext: SQLContext, parentSession: HiveSession, tableName: String, query: String, format: String, options: JMap[String, String], runInBackground: Boolean) extends Operation( parentSession, Map.empty[String, String].asJava, OperationType.UNKNOWN_OPERATION, runInBackground) with SparkOperation with QueryLogging with Logging { private var result: DataFrame = _ private lazy val resultSchema: TableSchema = { if (result == null || result.schema.isEmpty) { new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) } else { logInfo(s"Result Schema: ${result.schema}") SparkExecuteStatementOperation.getTableSchema(result.schema) } } private val pathFilter = new PathFilter { override def accept(path: Path): Boolean = !path.getName.equals("_SUCCESS") && !path.getName.endsWith("crc") } private val defaultBlockSize = 10 * 1024 * 1024 // Please see CSVOptions for more details. private val defaultOptions = Map( "timestampFormat" -> "yyyy-MM-dd HH:mm:ss", "dateFormat" -> "yyyy-MM-dd", "delimiter" -> ",", "escape" -> "\"", "compression" -> "gzip", "header" -> "true", "maxRecordsPerFile" ->"0", // To avoid Zeta client timeout "fetchBlockSize" -> defaultBlockSize.toString, "maxFetchBlockTime" -> "30000", // To avoid coalesce "minFileSize" -> (defaultBlockSize - 1 * 1024 * 1024).toString) private val writeOptions = defaultOptions ++ Option(options).map(_.asScala).getOrElse(Map.empty[String, String]).toMap private val numFiles = writeOptions.get("numFiles").map(_.toInt) private val fetchSize = writeOptions("fetchBlockSize").toLong private val maxFetchBlockTime = writeOptions("maxFetchBlockTime").toLong private val minFileSize = writeOptions("minFileSize").toLong private val downloadQuery = s"Generating download files with arguments " + s"[${tableName}, ${query}, ${format}, ${writeOptions}]" private val hadoopConf = sqlContext.sparkContext.hadoopConfiguration private val scratchDir = sqlContext.conf.getConf(StaticSQLConf.SPARK_SCRATCH_DIR) private val pathPrefix = new Path(scratchDir + File.separator + "DownloadData" + File.separator + parentSession.getUserName + File.separator + parentSession.getSessionHandle.getSessionId) private val fs: FileSystem = pathPrefix.getFileSystem(hadoopConf) private var iter: JIterator[DownloadDataBlock] = _ private var schemaStr: String = _ private var totalDataSize: Long = 0 override def close(): Unit = { // CARMEL-4662 Fix Download query state is incorrect. if (getStatus.getState eq OperationState.FINISHED) { HiveThriftServer2.eventManager.onStatementFinish(statementId) } HiveThriftServer2.eventManager.onQueryExist( statementId, QueryLogObjectList(Option(result).map(_.queryExecution)), QueryLogExtInfo(false, totalDataSize)) logInfo(s"CLOSING $statementId") cleanup(OperationState.CLOSED) sqlContext.sparkContext.clearJobGroup() } override def runInternal(): Unit = { setState(OperationState.PENDING) setHasResultSet(true) if (!runInBackground) { execute() } else { val sparkServiceUGI = HiveShimsUtils.getUGI() val backgroundOperation = new Runnable() { override def run(): Unit = { val doAsAction = new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { try { execute() } catch { case e: HiveSQLException => setOperationException(e) log.error("Error generating download file: ", e) } } } try { sparkServiceUGI.doAs(doAsAction) } catch { case e: Exception => setOperationException(new HiveSQLException(e)) logError("Error generating download file as user : " + sparkServiceUGI.getShortUserName(), e) } } } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError("Error generating download file in background", e) setState(OperationState.ERROR) throw new HiveSQLException(e) } } } private def execute(): Unit = { statementId = getHandle.getHandleIdentifier.getPublicId.toString setState(OperationState.RUNNING) try { // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader // Use parent session's SessionState in this operation because such SessionState // keeps some shared info per session e.g. authorization information. SessionState.setCurrentSessionState(parentSession.getSessionState) Thread.currentThread().setContextClassLoader(executionHiveClassLoader) sqlContext.sparkContext.listenerBus. post(StatementStart(statementId, System.currentTimeMillis(), sqlContext.sparkContext.getLocalProperty("spark.hive.session.id"))) HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, downloadQuery, statementId, parentSession.getUsername) assert(fetchSize >= 1L * 1024 * 1024 && fetchSize <= 20L * 1024 * 1024, s"fetchBlockSize(${fetchSize}) should be greater than 1M and less than 20M.") if (StringUtils.isNotEmpty(tableName) && StringUtils.isNotEmpty(query)) { throw new HiveSQLException("Both table name and query are specified.") } sqlContext.sparkContext.setLocalProperty( SparkContext.SPARK_USER_RESOURCE_CONSUMER_ID, parentSession.getUserName) if (parentSession.getUserInfo != null) { sqlContext.sparkContext.setLocalProperty( SparkContext.SPARK_USER_RESOURCE_CONSUMER_PROFILE, parentSession.getUserInfo.profile) } sqlContext.sparkContext.setJobGroup(statementId, downloadQuery) logInfo(s"Running query [$statementId] in session " + s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD '$query'") val resultPath = writeData(new Path(pathPrefix, statementId)) logQueryInfo(s"Running query [$statementId] in session " + s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD '$query'") val dataSize = fs.getContentSummary(resultPath).getLength logInfo(s"Try to download ${dataSize} bytes data from thriftserver.") totalDataSize = dataSize val list: JList[DownloadDataBlock] = new JArrayList[DownloadDataBlock]() // Add total data size to first row. list.add(DownloadDataBlock(schema = Some(schemaStr), dataSize = dataSize)) // and then add data. fs.listStatus(resultPath, pathFilter).map(_.getPath).sortBy(_.getName).foreach { path => val dataLen = fs.getFileStatus(path).getLen // Cast to BigDecimal to avoid overflowing val fetchBatchs = BigDecimal(dataLen)./(BigDecimal(fetchSize)).setScale(0, RoundingMode.CEILING).longValue() assert(fetchBatchs < Int.MaxValue, "The fetch batch too large.") (0 until fetchBatchs.toInt).foreach { i => val fetchSizeInBatch = if (i == fetchBatchs - 1) dataLen - i * fetchSize else fetchSize list.add(DownloadDataBlock( path = Some(path), offset = Some(i * fetchSize), dataSize = fetchSizeInBatch)) } list.add(DownloadDataBlock(path = Some(path), dataSize = -1)) } iter = list.iterator() logInfo(s"Add ${list.size()} data blocks to be fetched.") setState(OperationState.FINISHED) logQueryInfo(s"Finished query [$statementId].") } catch { case NonFatal(e) => logQueryError(s"Error executing query [$statementId]", e) setState(OperationState.ERROR) HiveThriftServer2.eventManager.onStatementError( statementId, Utils.findFirstCause(e).toString, Utils.exceptionString(e)) val exception = new HiveSQLException(e) setOperationException(exception) } } private def writeData(path: Path): Path = withRetry { result = (Option(tableName), Option(query), Option(format), Option(options)) match { case (Some(t), None, _, _) => sqlContext.table(t) case (None, Some(q), _, _) => sqlContext.sql(q) case _ => throw new HiveSQLException(s"Invalid arguments: ($tableName, $query, $format, $options).") } schemaStr = result.schema.map(_.name).mkString(writeOptions("delimiter")) val needRepartition = result.queryExecution.sparkPlan match { case _: SortExec => false case _: TakeOrderedAndProjectExec => false case ProjectExec(_, _: SortExec) => false case AdaptiveSparkPlanExec(_: SortExec, _, _, _) => false case AdaptiveSparkPlanExec(_: TakeOrderedAndProjectExec, _, _, _) => false case AdaptiveSparkPlanExec(ProjectExec(_, _: SortExec), _, _, _) => false case _: ShuffleExchangeExec => false case ProjectExec(_, _: ShuffleExchangeExec) => false case _: CollectLimitExec => false case _: LimitExec => false case _ => true } // Background: according to the official Hadoop FileSystem API spec, // rename op's destination path must have a parent that exists, // otherwise we may get unexpected result on the rename API. // When downloading dataset as parquet format, if we configure a // quota-free path and adopt FileOutputCommitter V1 algorithm, we will // get the "IOException: Failed to rename FileStatus". // Hence, the parent path should exist (see CARMEL-5150). if (!fs.exists(path) && !fs.mkdirs(path)) { logWarning(s"Failed to create parent download path ${path}") } val step1Path = new Path(path, "step1") val outputFormat = Option(format).getOrElse("csv") val (castCols, readSchema) = if (outputFormat.equalsIgnoreCase("csv")) { // Support duplicate columns val names = result.schema.map(_.name) val renameDuplicateNames = if (names.distinct.length != names.length) { val duplicateColumns = names.groupBy(identity).collect { case (x, ys) if ys.length > 1 => x } result.logicalPlan.output.zipWithIndex.map { case (col, index) if duplicateColumns.exists(_.equals(col.name)) => col.withName(col.name + index) case (col, _) => col } } else { result.logicalPlan.output } // Support Complex types for csv file val output = renameDuplicateNames.map { col => col.dataType match { case BinaryType => Column(col).cast(StringType).alias(col.name) case NullType => Column(col).cast(StringType).alias(col.name) case CalendarIntervalType => Column(col).cast(StringType).alias(col.name) case ArrayType(_, _) => Column(col).cast(StringType).alias(col.name) case MapType(_, _, _) => Column(col).cast(StringType).alias(col.name) case StructType(_) => Column(col).cast(StringType).alias(col.name) case _ => Column(col).alias(col.name) } } (output, StructType(StructType.fromAttributes(renameDuplicateNames) .map(_.copy(dataType = StringType)))) } else if (outputFormat.equalsIgnoreCase("parquet")) { val output = result.logicalPlan.output.map { col => col.dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType => Column(col).alias(col.name) case _ => Column(col).cast(StringType).alias(col.name) } } val newSchema = result.schema.map(s => s.dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType => s case _ => s.copy(dataType = StringType) }) (output, StructType(newSchema)) } else { val output = result.logicalPlan.output.map(col => Column(col).alias(col.name)) (output, result.schema) } val writePlan = if (!needRepartition) { result.select(castCols: _*) } else if (numFiles.nonEmpty) { result.select(castCols: _*).repartition(numFiles.get) } else { result.select(castCols: _*).repartition(Column(Rand(1))) } writePlan.write .options(writeOptions) .format(outputFormat) .mode(SaveMode.Overwrite) .save(step1Path.toString) val contentSummary = fs.getContentSummary(step1Path) val dataSize = contentSummary.getLength if (dataSize > sqlContext.conf.getConf(HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE)) { throw QueryLevelRestrictionErrors.downloadDataSizeExceeded( dataSize, sqlContext.conf.getConf(HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE)) } step1Path } // Limit download speed. private var lastFetchTime = System.currentTimeMillis() private var downloadedDataSize = 0L override def getNextRowSet(orientation: FetchOrientation, maxRowsL: Long): RowSet = { val expectedMaxTime = math.min(maxFetchBlockTime, (downloadedDataSize.toDouble / (50L * 1024 * 1024)) * 1000L).toLong val downloadTime = System.currentTimeMillis() - lastFetchTime if (downloadTime < expectedMaxTime) { logInfo(s"Limit download speed ${downloadTime}, " + s"expected max download time ${expectedMaxTime}") Thread.sleep(expectedMaxTime - downloadTime) } lastFetchTime = System.currentTimeMillis() if (getStatus.getState ne OperationState.FINISHED) { throw getStatus.getOperationException } assertState(OperationState.FINISHED) validateFetchOrientation(orientation, JEnumSet.of(FetchOrientation.FETCH_NEXT)) val rowSet: RowSet = RowSetFactory.create(getDownloadSchema, getProtocolVersion, false) if (!iter.hasNext) { rowSet } else { val maxRows = maxRowsL.toInt var curRow = 0 while (curRow < maxRows && iter.hasNext) { val dataBlock = iter.next() val dataSize = dataBlock.dataSize dataBlock.path match { case Some(path) => if (dataSize >= 0) { val buffer: Array[Byte] = new Array[Byte](dataSize.toInt) Utils.tryWithResource(fs.open(path)) { is => is.seek(dataBlock.offset.get) is.readFully(buffer) } // data row rowSet.addRow(Array[AnyRef](path.getName, buffer, null, Long.box(dataSize))) downloadedDataSize = dataSize } else { // End of file row rowSet.addRow(Array[AnyRef](path.getName, null, null, Long.box(dataSize))) } case _ => // Schema row and total data size row rowSet.addRow(Array[AnyRef](null, null, dataBlock.schema.get, Long.box(dataSize))) } curRow += 1 } rowSet } } override def getResultSetSchema: TableSchema = { if (writeOptions.get("useRealSchema").nonEmpty && writeOptions("useRealSchema").equalsIgnoreCase("true")) { resultSchema } else { val ret = new TableSchema() .addPrimitiveColumn("FILE_NAME", Type.STRING_TYPE, "The file name to be transferred.") .addPrimitiveColumn("DATA", Type.BINARY_TYPE, "The data to be transferred.") .addPrimitiveColumn("SCHEMA", Type.STRING_TYPE, "The data schema to be transferred.") .addPrimitiveColumn("SIZE", Type.BIGINT_TYPE, "The size to be transferred in this fetch.") ret } } private def getDownloadSchema: TableSchema = { new TableSchema() .addPrimitiveColumn("FILE_NAME", Type.STRING_TYPE, "The file name to be transferred.") .addPrimitiveColumn("DATA", Type.BINARY_TYPE, "The data to be transferred.") .addPrimitiveColumn("SCHEMA", Type.STRING_TYPE, "The data schema to be transferred.") .addPrimitiveColumn("SIZE", Type.BIGINT_TYPE, "The size to be transferred in this fetch.") } override def cancel(): Unit = { if (statementId != null) { HiveThriftServer2.eventManager.onStatementCanceled(statementId) } HiveThriftServer2.eventManager.onQueryExist( statementId, QueryLogObjectList(Option(result).map(_.queryExecution)), QueryLogExtInfo(false, totalDataSize)) cleanup(OperationState.CANCELED) } private def withRetry[T](f: => T): T = { val maxRetry = 2 var retryNum = 0 def retriable(t: Throwable): Boolean = { var cur = t while (retryNum < maxRetry && cur != null) { Utils.findFirstCause(cur) match { case f: FileNotFoundException if !f.getMessage.contains("shuffle_") => // For some commands, they may failed when initiating dataset, since it will trigger // execution on dataset initialization. We need manually build a QueryExecution to // get the optimized plan. val qe = if (result != null) { result.queryExecution } else { val parsed = sqlContext.sessionState.sqlParser.parsePlan(query) new QueryExecution(sqlContext.sparkSession, parsed) } qe.optimizedPlan.foreach { case LogicalRelation(_, _, Some(table), _) => qe.sparkSession.sessionState.refreshTable(table.identifier.toString) case HiveTableRelation(tableMeta, _, _, _, _, _) => qe.sparkSession.sessionState.refreshTable(tableMeta.identifier.toString) case _ => } return true case c => cur = cur.getCause() } } false } var res: Option[T] = None do { if (retryNum > 0) { logInfo(s"Start to retry query $statementId.") } try { res = Some(f) } catch { case e if retriable(e) => logError(s"Query $statementId failed out of error ${e.getCause.getMessage}") retryNum += 1 case e: Throwable => throw e } } while (res.isEmpty) res.get } private def cleanup(state: OperationState) { setState(state) if (runInBackground) { val backgroundHandle = getBackgroundHandle() if (backgroundHandle != null) { backgroundHandle.cancel(true) } } if (statementId != null) { sqlContext.sparkContext.cancelJobGroup(statementId, Some("Clean up SparkDownloadData")) sqlContext.queryLoadLimitationManager.clean(statementId) } // Delete temp files try { fs.delete(pathPrefix, true) } catch { case e: IOException => log.warn("Failed to remove download temp files.", e) } sqlContext.sparkContext.closeJobGroup(statementId) } } ``` -- This is an automated message from the Apache Git Service. 