[ 
https://issues.apache.org/jira/browse/HIVE-24893?focusedWorklogId=696523&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-696523
 ]

ASF GitHub Bot logged work on HIVE-24893:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Dec/21 10:49
            Start Date: 15/Dec/21 10:49
    Worklog Time Spent: 10m 
      Work Description: wangyum commented on pull request #2878:
URL: https://github.com/apache/hive/pull/2878#issuecomment-994669275


   This is an example how Spark implement `DownloadDataOperation`:
   ```scala
   private[hive] case class DownloadDataBlock(
       path: Option[Path] = None,
       offset: Option[Long] = None,
       schema: Option[String] = None,
       dataSize: Long)
   
   private[hive] class SparkDownloadDataOperation(
       val sqlContext: SQLContext,
       parentSession: HiveSession,
       tableName: String,
       query: String,
       format: String,
       options: JMap[String, String],
       runInBackground: Boolean)
     extends Operation(
       parentSession,
       Map.empty[String, String].asJava,
       OperationType.UNKNOWN_OPERATION,
       runInBackground) with SparkOperation with QueryLogging with Logging {
   
     private var result: DataFrame = _
   
     private lazy val resultSchema: TableSchema = {
       if (result == null || result.schema.isEmpty) {
         new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
       } else {
         logInfo(s"Result Schema: ${result.schema}")
         SparkExecuteStatementOperation.getTableSchema(result.schema)
       }
     }
   
     private val pathFilter = new PathFilter {
       override def accept(path: Path): Boolean =
         !path.getName.equals("_SUCCESS") && !path.getName.endsWith("crc")
     }
   
     private val defaultBlockSize = 10 * 1024 * 1024
   
     // Please see CSVOptions for more details.
     private val defaultOptions = Map(
       "timestampFormat" -> "yyyy-MM-dd HH:mm:ss",
       "dateFormat" -> "yyyy-MM-dd",
       "delimiter" -> ",",
       "escape" -> "\"",
       "compression" -> "gzip",
       "header" -> "true",
       "maxRecordsPerFile" ->"0",
       // To avoid Zeta client timeout
       "fetchBlockSize" -> defaultBlockSize.toString,
       "maxFetchBlockTime" -> "30000",
       // To avoid coalesce
       "minFileSize" -> (defaultBlockSize - 1 * 1024 * 1024).toString)
   
     private val writeOptions =
       defaultOptions ++ 
Option(options).map(_.asScala).getOrElse(Map.empty[String, String]).toMap
     private val numFiles = writeOptions.get("numFiles").map(_.toInt)
   
     private val fetchSize = writeOptions("fetchBlockSize").toLong
   
     private val maxFetchBlockTime = writeOptions("maxFetchBlockTime").toLong
   
     private val minFileSize = writeOptions("minFileSize").toLong
   
     private val downloadQuery = s"Generating download files with arguments " +
       s"[${tableName}, ${query}, ${format}, ${writeOptions}]"
   
     private val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
     private val scratchDir = 
sqlContext.conf.getConf(StaticSQLConf.SPARK_SCRATCH_DIR)
     private val pathPrefix = new Path(scratchDir + File.separator + 
"DownloadData" + File.separator +
       parentSession.getUserName + File.separator + 
parentSession.getSessionHandle.getSessionId)
     private val fs: FileSystem = pathPrefix.getFileSystem(hadoopConf)
   
     private var iter: JIterator[DownloadDataBlock] = _
     private var schemaStr: String = _
     private var totalDataSize: Long = 0
   
     override def close(): Unit = {
       // CARMEL-4662 Fix Download query state is incorrect.
       if (getStatus.getState eq OperationState.FINISHED) {
         HiveThriftServer2.eventManager.onStatementFinish(statementId)
       }
       HiveThriftServer2.eventManager.onQueryExist(
         statementId,
         QueryLogObjectList(Option(result).map(_.queryExecution)),
         QueryLogExtInfo(false, totalDataSize))
       logInfo(s"CLOSING $statementId")
       cleanup(OperationState.CLOSED)
       sqlContext.sparkContext.clearJobGroup()
     }
   
     override def runInternal(): Unit = {
       setState(OperationState.PENDING)
       setHasResultSet(true)
   
       if (!runInBackground) {
         execute()
       } else {
         val sparkServiceUGI = HiveShimsUtils.getUGI()
   
         val backgroundOperation = new Runnable() {
   
           override def run(): Unit = {
             val doAsAction = new PrivilegedExceptionAction[Unit]() {
               override def run(): Unit = {
                 try {
                   execute()
                 } catch {
                   case e: HiveSQLException =>
                     setOperationException(e)
                     log.error("Error generating download file: ", e)
                 }
               }
             }
   
             try {
               sparkServiceUGI.doAs(doAsAction)
             } catch {
               case e: Exception =>
                 setOperationException(new HiveSQLException(e))
                 logError("Error generating download file as user : " +
                   sparkServiceUGI.getShortUserName(), e)
             }
           }
         }
         try {
           // This submit blocks if no background threads are available to run 
this operation
           val backgroundHandle =
             
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
           setBackgroundHandle(backgroundHandle)
         } catch {
           case rejected: RejectedExecutionException =>
             setState(OperationState.ERROR)
             throw new HiveSQLException("The background threadpool cannot 
accept" +
               " new task for execution, please retry the operation", rejected)
           case NonFatal(e) =>
             logError("Error generating download file in background", e)
             setState(OperationState.ERROR)
             throw new HiveSQLException(e)
         }
       }
     }
   
     private def execute(): Unit = {
       statementId = getHandle.getHandleIdentifier.getPublicId.toString
       setState(OperationState.RUNNING)
       try {
         // Always use the latest class loader provided by executionHive's 
state.
         val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
         // Use parent session's SessionState in this operation because such 
SessionState
         // keeps some shared info per session e.g. authorization information.
         SessionState.setCurrentSessionState(parentSession.getSessionState)
         Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
   
         sqlContext.sparkContext.listenerBus.
           post(StatementStart(statementId, System.currentTimeMillis(),
             sqlContext.sparkContext.getLocalProperty("spark.hive.session.id")))
         HiveThriftServer2.eventManager.onStatementStart(
           statementId,
           parentSession.getSessionHandle.getSessionId.toString,
           downloadQuery,
           statementId,
           parentSession.getUsername)
   
         assert(fetchSize >= 1L * 1024 * 1024 && fetchSize <= 20L * 1024 * 1024,
           s"fetchBlockSize(${fetchSize}) should be greater than 1M and less 
than 20M.")
   
         if (StringUtils.isNotEmpty(tableName) && 
StringUtils.isNotEmpty(query)) {
           throw new HiveSQLException("Both table name and query are 
specified.")
         }
   
         sqlContext.sparkContext.setLocalProperty(
           SparkContext.SPARK_USER_RESOURCE_CONSUMER_ID, 
parentSession.getUserName)
         if (parentSession.getUserInfo != null) {
           sqlContext.sparkContext.setLocalProperty(
             SparkContext.SPARK_USER_RESOURCE_CONSUMER_PROFILE, 
parentSession.getUserInfo.profile)
         }
         sqlContext.sparkContext.setJobGroup(statementId, downloadQuery)
   
         logInfo(s"Running query [$statementId] in session " +
           s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD 
'$query'")
         val resultPath = writeData(new Path(pathPrefix, statementId))
   
         logQueryInfo(s"Running query [$statementId] in session " +
           s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD 
'$query'")
         val dataSize = fs.getContentSummary(resultPath).getLength
         logInfo(s"Try to download ${dataSize} bytes data from thriftserver.")
         totalDataSize = dataSize
   
         val list: JList[DownloadDataBlock] = new 
JArrayList[DownloadDataBlock]()
         // Add total data size to first row.
         list.add(DownloadDataBlock(schema = Some(schemaStr), dataSize = 
dataSize))
         // and then add data.
         fs.listStatus(resultPath, 
pathFilter).map(_.getPath).sortBy(_.getName).foreach { path =>
           val dataLen = fs.getFileStatus(path).getLen
           // Cast to BigDecimal to avoid overflowing
           val fetchBatchs =
             BigDecimal(dataLen)./(BigDecimal(fetchSize)).setScale(0, 
RoundingMode.CEILING).longValue()
           assert(fetchBatchs < Int.MaxValue, "The fetch batch too large.")
   
           (0 until fetchBatchs.toInt).foreach { i =>
             val fetchSizeInBatch = if (i == fetchBatchs - 1) dataLen - i * 
fetchSize else fetchSize
             list.add(DownloadDataBlock(
               path = Some(path), offset = Some(i * fetchSize), dataSize = 
fetchSizeInBatch))
           }
   
           list.add(DownloadDataBlock(path = Some(path), dataSize = -1))
         }
   
         iter = list.iterator()
         logInfo(s"Add ${list.size()} data blocks to be fetched.")
   
         setState(OperationState.FINISHED)
         logQueryInfo(s"Finished query [$statementId].")
       } catch {
         case NonFatal(e) =>
           logQueryError(s"Error executing query [$statementId]", e)
           setState(OperationState.ERROR)
           HiveThriftServer2.eventManager.onStatementError(
             statementId, Utils.findFirstCause(e).toString, 
Utils.exceptionString(e))
           val exception = new HiveSQLException(e)
           setOperationException(exception)
       }
     }
   
     private def writeData(path: Path): Path = withRetry {
       result = (Option(tableName), Option(query), Option(format), 
Option(options)) match {
         case (Some(t), None, _, _) =>
           sqlContext.table(t)
         case (None, Some(q), _, _) =>
           sqlContext.sql(q)
         case _ =>
           throw new HiveSQLException(s"Invalid arguments: ($tableName, $query, 
$format, $options).")
       }
   
       schemaStr = result.schema.map(_.name).mkString(writeOptions("delimiter"))
       val needRepartition = result.queryExecution.sparkPlan match {
         case _: SortExec => false
         case _: TakeOrderedAndProjectExec => false
         case ProjectExec(_, _: SortExec) => false
         case AdaptiveSparkPlanExec(_: SortExec, _, _, _) => false
         case AdaptiveSparkPlanExec(_: TakeOrderedAndProjectExec, _, _, _) => 
false
         case AdaptiveSparkPlanExec(ProjectExec(_, _: SortExec), _, _, _) => 
false
         case _: ShuffleExchangeExec => false
         case ProjectExec(_, _: ShuffleExchangeExec) => false
         case _: CollectLimitExec => false
         case _: LimitExec => false
         case _ => true
       }
       // Background: according to the official Hadoop FileSystem API spec,
       // rename op's destination path must have a parent that exists,
       // otherwise we may get unexpected result on the rename API.
       // When downloading dataset as parquet format, if we configure a
       // quota-free path and adopt FileOutputCommitter V1 algorithm, we will
       // get the "IOException: Failed to rename FileStatus".
       // Hence, the parent path should exist (see CARMEL-5150).
       if (!fs.exists(path) && !fs.mkdirs(path)) {
         logWarning(s"Failed to create parent download path ${path}")
       }
       val step1Path = new Path(path, "step1")
       val outputFormat = Option(format).getOrElse("csv")
       val (castCols, readSchema) = if (outputFormat.equalsIgnoreCase("csv")) {
         // Support duplicate columns
         val names = result.schema.map(_.name)
         val renameDuplicateNames = if (names.distinct.length != names.length) {
           val duplicateColumns = names.groupBy(identity).collect {
             case (x, ys) if ys.length > 1 => x
           }
           result.logicalPlan.output.zipWithIndex.map {
             case (col, index) if duplicateColumns.exists(_.equals(col.name)) =>
               col.withName(col.name + index)
             case (col, _) => col
           }
         } else {
           result.logicalPlan.output
         }
         // Support Complex types for csv file
         val output = renameDuplicateNames.map { col =>
           col.dataType match {
             case BinaryType => Column(col).cast(StringType).alias(col.name)
             case NullType => Column(col).cast(StringType).alias(col.name)
             case CalendarIntervalType => 
Column(col).cast(StringType).alias(col.name)
             case ArrayType(_, _) => 
Column(col).cast(StringType).alias(col.name)
             case MapType(_, _, _) => 
Column(col).cast(StringType).alias(col.name)
             case StructType(_) => Column(col).cast(StringType).alias(col.name)
             case _ => Column(col).alias(col.name)
           }
         }
         (output, StructType(StructType.fromAttributes(renameDuplicateNames)
           .map(_.copy(dataType = StringType))))
       } else if (outputFormat.equalsIgnoreCase("parquet")) {
         val output = result.logicalPlan.output.map { col =>
           col.dataType match {
             case BooleanType | ByteType | ShortType | IntegerType
                  | LongType | FloatType | DoubleType | BinaryType => 
Column(col).alias(col.name)
             case _ => Column(col).cast(StringType).alias(col.name)
           }
         }
         val newSchema = result.schema.map(s => s.dataType match {
           case BooleanType | ByteType | ShortType | IntegerType
                | LongType | FloatType | DoubleType | BinaryType => s
           case _ => s.copy(dataType = StringType)
         })
         (output, StructType(newSchema))
       } else {
         val output = result.logicalPlan.output.map(col => 
Column(col).alias(col.name))
         (output, result.schema)
       }
   
       val writePlan = if (!needRepartition) {
         result.select(castCols: _*)
       } else if (numFiles.nonEmpty) {
         result.select(castCols: _*).repartition(numFiles.get)
       } else {
         result.select(castCols: _*).repartition(Column(Rand(1)))
       }
   
       writePlan.write
         .options(writeOptions)
         .format(outputFormat)
         .mode(SaveMode.Overwrite)
         .save(step1Path.toString)
   
       val contentSummary = fs.getContentSummary(step1Path)
       val dataSize = contentSummary.getLength
       if (dataSize > 
sqlContext.conf.getConf(HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE)) {
         throw QueryLevelRestrictionErrors.downloadDataSizeExceeded(
           dataSize,
           sqlContext.conf.getConf(HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE))
       }
   
       step1Path
     }
   
     // Limit download speed.
     private var lastFetchTime = System.currentTimeMillis()
     private var downloadedDataSize = 0L
   
     override def getNextRowSet(orientation: FetchOrientation, maxRowsL: Long): 
RowSet = {
       val expectedMaxTime = math.min(maxFetchBlockTime,
         (downloadedDataSize.toDouble / (50L * 1024 * 1024)) * 1000L).toLong
       val downloadTime = System.currentTimeMillis() - lastFetchTime
       if (downloadTime < expectedMaxTime) {
         logInfo(s"Limit download speed ${downloadTime}, " +
           s"expected max download time ${expectedMaxTime}")
         Thread.sleep(expectedMaxTime - downloadTime)
       }
       lastFetchTime = System.currentTimeMillis()
   
       if (getStatus.getState ne OperationState.FINISHED) {
         throw getStatus.getOperationException
       }
       assertState(OperationState.FINISHED)
       validateFetchOrientation(orientation, 
JEnumSet.of(FetchOrientation.FETCH_NEXT))
   
       val rowSet: RowSet = RowSetFactory.create(getDownloadSchema, 
getProtocolVersion, false)
   
       if (!iter.hasNext) {
         rowSet
       } else {
         val maxRows = maxRowsL.toInt
         var curRow = 0
         while (curRow < maxRows && iter.hasNext) {
           val dataBlock = iter.next()
           val dataSize = dataBlock.dataSize
           dataBlock.path match {
             case Some(path) =>
               if (dataSize >= 0) {
                 val buffer: Array[Byte] = new Array[Byte](dataSize.toInt)
                 Utils.tryWithResource(fs.open(path)) { is =>
                   is.seek(dataBlock.offset.get)
                   is.readFully(buffer)
                 }
                 // data row
                 rowSet.addRow(Array[AnyRef](path.getName, buffer, null, 
Long.box(dataSize)))
                 downloadedDataSize = dataSize
               } else {
                 // End of file row
                 rowSet.addRow(Array[AnyRef](path.getName, null, null, 
Long.box(dataSize)))
               }
             case _ =>
               // Schema row and total data size row
               rowSet.addRow(Array[AnyRef](null, null, dataBlock.schema.get, 
Long.box(dataSize)))
           }
           curRow += 1
         }
         rowSet
       }
     }
   
     override def getResultSetSchema: TableSchema = {
       if (writeOptions.get("useRealSchema").nonEmpty
         && writeOptions("useRealSchema").equalsIgnoreCase("true")) {
         resultSchema
       } else {
         val ret = new TableSchema()
           .addPrimitiveColumn("FILE_NAME", Type.STRING_TYPE, "The file name to 
be transferred.")
           .addPrimitiveColumn("DATA", Type.BINARY_TYPE, "The data to be 
transferred.")
           .addPrimitiveColumn("SCHEMA", Type.STRING_TYPE, "The data schema to 
be transferred.")
           .addPrimitiveColumn("SIZE", Type.BIGINT_TYPE, "The size to be 
transferred in this fetch.")
         ret
       }
     }
   
     private def getDownloadSchema: TableSchema = {
       new TableSchema()
         .addPrimitiveColumn("FILE_NAME", Type.STRING_TYPE, "The file name to 
be transferred.")
         .addPrimitiveColumn("DATA", Type.BINARY_TYPE, "The data to be 
transferred.")
         .addPrimitiveColumn("SCHEMA", Type.STRING_TYPE, "The data schema to be 
transferred.")
         .addPrimitiveColumn("SIZE", Type.BIGINT_TYPE, "The size to be 
transferred in this fetch.")
     }
   
     override def cancel(): Unit = {
       if (statementId != null) {
         HiveThriftServer2.eventManager.onStatementCanceled(statementId)
       }
       HiveThriftServer2.eventManager.onQueryExist(
         statementId,
         QueryLogObjectList(Option(result).map(_.queryExecution)),
         QueryLogExtInfo(false, totalDataSize))
       cleanup(OperationState.CANCELED)
     }
   
     private def withRetry[T](f: => T): T = {
       val maxRetry = 2
       var retryNum = 0
   
       def retriable(t: Throwable): Boolean = {
         var cur = t
         while (retryNum < maxRetry && cur != null) {
           Utils.findFirstCause(cur) match {
             case f: FileNotFoundException if 
!f.getMessage.contains("shuffle_") =>
               // For some commands, they may failed when initiating dataset, 
since it will trigger
               // execution on dataset initialization. We need manually build a 
QueryExecution to
               // get the optimized plan.
               val qe = if (result != null) {
                 result.queryExecution
               } else {
                 val parsed = sqlContext.sessionState.sqlParser.parsePlan(query)
                 new QueryExecution(sqlContext.sparkSession, parsed)
               }
               qe.optimizedPlan.foreach {
                 case LogicalRelation(_, _, Some(table), _) =>
                   
qe.sparkSession.sessionState.refreshTable(table.identifier.toString)
                 case HiveTableRelation(tableMeta, _, _, _, _, _) =>
                   
qe.sparkSession.sessionState.refreshTable(tableMeta.identifier.toString)
                 case _ =>
               }
               return true
             case c => cur = cur.getCause()
           }
         }
         false
       }
   
       var res: Option[T] = None
       do {
         if (retryNum > 0) {
           logInfo(s"Start to retry query $statementId.")
         }
         try {
           res = Some(f)
         } catch {
           case e if retriable(e) =>
             logError(s"Query $statementId failed out of error 
${e.getCause.getMessage}")
             retryNum += 1
           case e: Throwable =>
             throw e
         }
       } while (res.isEmpty)
       res.get
     }
   
     private def cleanup(state: OperationState) {
       setState(state)
       if (runInBackground) {
         val backgroundHandle = getBackgroundHandle()
         if (backgroundHandle != null) {
           backgroundHandle.cancel(true)
         }
       }
       if (statementId != null) {
         sqlContext.sparkContext.cancelJobGroup(statementId, Some("Clean up 
SparkDownloadData"))
         sqlContext.queryLoadLimitationManager.clean(statementId)
       }
   
       // Delete temp files
       try {
         fs.delete(pathPrefix, true)
       } catch {
         case e: IOException =>
           log.warn("Failed to remove download temp files.", e)
       }
       sqlContext.sparkContext.closeJobGroup(statementId)
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 696523)
    Time Spent: 20m  (was: 10m)

> Download data from Thriftserver through JDBC
> --------------------------------------------
>
>                 Key: HIVE-24893
>                 URL: https://issues.apache.org/jira/browse/HIVE-24893
>             Project: Hive
>          Issue Type: New Feature
>          Components: HiveServer2, JDBC
>    Affects Versions: 4.0.0
>            Reporter: Yuming Wang
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> It is very useful to support downloading large amounts of data (such as more 
> than 50GB) through JDBC.
> Snowflake has similar support :
> https://docs.snowflake.com/en/user-guide/jdbc-using.html#label-jdbc-download-from-stage-to-stream
> https://github.com/snowflakedb/snowflake-jdbc/blob/95a7d8a03316093430dc3960df6635643208b6fd/src/main/java/net/snowflake/client/jdbc/SnowflakeConnectionV1.java#L886



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to