Hi, To move data from Hive to Google BigQuery, one needs to create a staging table in Hive in a storage format that can be read in BigQuery. Both AVRO and ORC file format in Hive work but the files cannot be compressed.
In addition, to handle both data types and Dounble types, best to convert these into String in the staging table. For example I have the following Hive table (ORC compressed) 0: jdbc:hive2://rhes75:10099/default> desc ll_18201960; +-------------------------+------------+----------+ | col_name | data_type | comment | +-------------------------+------------+----------+ | transactiondate | date | | | transactiontype | string | | | sortcode | string | | | accountnumber | string | | | transactiondescription | string | | | debitamount | double | | | creditamount | double | | | balance | double | | +-------------------------+------------+----------+ Note the date and Double types. I now create a normal ORC table in Hive as follows: drop table if exists accounts.ll_18201960_gcp; CREATE TABLE accounts.ll_18201960_gcp STORED AS ORC AS SELECT CAST(transactiondate AS String) AS transactiondate , transactiontype , sortcode, accountnumber , transactiondescription , CAST(debitamount AS String) AS debitamount , CAST(creditamount AS String) AS creditamount , CAST(balance AS String) as balance FROM ll_18201960; Note the casting o String type. I then extract the table files from HDFS hdfs dfs -get /user/hive/warehouse/accounts.db/ll_18201960_gcp . and move them to GCP bucket gsutil rm -r gs://xxxx/accounts/ll_18201960_gcp gsutil cp -r ll_18201960_gcp gs://xxxx/accounts/ll_18201960_gcp I then create a BigQuery table with autodetect and load the data bq load --autodetect --replace=true --source_format=ORC accounts.ll_18201960_gcp "gs://xxxx/accounts/ll_18201960_gcp/*" bq query --use_legacy_sql=false "select * from accounts.ll_18201960_gcp" So a staging table in BiqQuery is created. I then use Spark as an etl tool to move data from the BigQuery staging table into final format, taking care of Casting etc. For Spark I create an Uber JAR file import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.sql.hive.HiveContext import java.util.Calendar import org.apache.spark.sql.SparkSession import scala.collection.mutable.ArrayBuffer import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.fs.Path import scala.util.Random import org.apache.spark.sql.functions._ // // import org.apache.log4j.Logger import org.apache.log4j.Level import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat import com.google.gson.JsonObject import org.apache.hadoop.io.LongWritable import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.LongWritable import org.apache.avro.generic.GenericData import com.google.cloud.hadoop.io.bigquery.AvroBigQueryInputFormat import com.samelamin.spark.bigquery._ object simple { private var sparkAppName = "simple" private var sparkDefaultParllelism = null private var sparkDefaultParallelismValue = "12" private var sparkSerializer = null private var sparkSerializerValue = "org.apache.spark.serializer.KryoSerializer" private var sparkNetworkTimeOut = null private var sparkNetworkTimeOutValue = "3600" private var sparkStreamingUiRetainedBatches = null private var sparkStreamingUiRetainedBatchesValue = "5" private var sparkWorkerUiRetainedDrivers = null private var sparkWorkerUiRetainedDriversValue = "5" private var sparkWorkerUiRetainedExecutors = null private var sparkWorkerUiRetainedExecutorsValue = "30" private var sparkWorkerUiRetainedStages = null private var sparkWorkerUiRetainedStagesValue = "100" private var sparkUiRetainedJobs = null private var sparkUiRetainedJobsValue = "100" private var sparkJavaStreamingDurationsInSeconds = "10" private var sparkNumberOfSlaves = 14 private var sparkRequestTopicShortName = null private var sparkImpressionTopicShortName = null private var sparkClickTopicShortName = null private var sparkConversionTopicShortName = null private var sparkNumberOfPartitions = 30 private var sparkClusterDbIp = null private var clusterDbPort = null private var insertQuery = null private var insertOnDuplicateQuery = null private var sqlDriverName = null private var memorySet = "F" private var enableHiveSupport = null private var enableHiveSupportValue = "true" def main(args: Array[String]) { var startTimeQuery = System.currentTimeMillis // Create a SparkSession. No need to create SparkContext. In Spark 2.0 the same effects can be achieved through SparkSession, without explicitly creating SparkConf, SparkContext or SQLContext as they are encapsulated within the SparkSession val spark = SparkSession. builder(). appName(sparkAppName). config("spark.driver.allowMultipleContexts", "true"). config("spark.hadoop.validateOutputSpecs", "false"). getOrCreate() // change the values accordingly. spark.conf.set("sparkDefaultParllelism", sparkDefaultParallelismValue) spark.conf.set("sparkSerializer", sparkSerializerValue) spark.conf.set("sparkNetworkTimeOut", sparkNetworkTimeOutValue) import spark.implicits._ spark.sparkContext.setLogLevel("ERROR") println ("\nStarted at"); spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) val HadoopConf = spark.sparkContext.hadoopConfiguration //get and set the env variables val bucket = HadoopConf.get("fs.gs.system.bucket") val projectId = HadoopConf.get("fs.gs.project.id") val jobName = "simplejob" HadoopConf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId) HadoopConf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket) val inputTable = "accounts.ll_18201960_gcp" val fullyQualifiedInputTableId = projectId+":"+inputTable val outputTable = "test.ll_18201960" val fullyQualifiedOutputTableId = projectId+":"+outputTable val jsonKeyFile="xxxxxx.json" val datasetLocation="xxxxx" val writedisposition="WRITE_TRUNCATE" // Set up GCP credentials spark.sqlContext.setGcpJsonKeyFile(jsonKeyFile) // Set up BigQuery project and bucket spark.sqlContext.setBigQueryProjectId(projectId) spark.sqlContext.setBigQueryGcsBucket(bucket) spark.sqlContext.setBigQueryDatasetLocation(datasetLocation) // Delete existing data in BigQuery table if any println("\nDeleting data from output table " + outputTable) var sqltext = "DELETE from " + outputTable + " WHERE True" spark.sqlContext.runDMLQuery(sqltext) //read data from the staging (input) Table println("\nreading data from " + inputTable) val df = spark.sqlContext .read .format("com.samelamin.spark.bigquery") .option("tableReferenceSource",fullyQualifiedInputTableId) .load() println("\nInput table schema") df.printSchema // Create a new DF based on CAST columns // val df2 = df.select('transactiondate.cast("DATE").as("transactiondate"), ' transactiontype.as("transactiontype"), substring('sortcode,2,8).as("sortcode"), 'accountnumber.as("accountnumber"), 'transactiondescription.as("transactiondescription"), 'debitamount.cast("DOUBLE").as("debitamount"), 'creditamount.cast("DOUBLE").as("creditamount"), 'balance.cast("DOUBLE").as("balance")) println("\nModified table schema for output storage") df2.printSchema // Save data to a BigQuery table println("\nsaving data to " + outputTable) df2.saveAsBigQueryTable(fullyQualifiedOutputTableId) println("\nreading data from " + outputTable + ", and counting rows") // Load everything from the table and count the number of rows val ll_18201960 = spark.sqlContext.bigQueryTable(fullyQualifiedOutputTableId) ll_18201960.agg(count("*").as("Rows in the output table " + outputTable)).show I have tested this and it works fine with Uber Jar file created using SBT. I would like to share the views and experience with other members. I am sure there may be better ways of making this work so happy to hear the views. Regards, Mich LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.