Okey, you are saying that 2.0.0 don't have that patch fixed ? @dev cc-- I don't like everytime changing the service versions !
Thanks. On Mon, Jan 30, 2017 at 1:10 AM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi, > > I think you have to upgrade to 2.1.0. There were few changes wrt the ERROR > since. > > Jacek > > > On 29 Jan 2017 9:24 a.m., "Chetan Khatri" <chetan.opensou...@gmail.com> > wrote: > > Hello Spark Users, > > I am getting error while saving Spark Dataframe to Hive Table: > Hive 1.2.1 > Spark 2.0.0 > Local environment. > Note: Job is getting executed successfully and the way I want but still > Exception raised. > *Source Code:* > > package com.chetan.poc.hbase > > /** > * Created by chetan on 24/1/17. > */ > import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration} > import org.apache.hadoop.hbase.mapreduce.TableInputFormat > import org.apache.hadoop.hbase.util.Bytes > import org.apache.hadoop.hbase.KeyValue.Type > import org.apache.spark.sql.SparkSession > import scala.collection.JavaConverters._ > import java.util.Date > import java.text.SimpleDateFormat > > > object IncrementalJob { > val APP_NAME: String = "SparkHbaseJob" > var HBASE_DB_HOST: String = null > var HBASE_TABLE: String = null > var HBASE_COLUMN_FAMILY: String = null > var HIVE_DATA_WAREHOUSE: String = null > var HIVE_TABLE_NAME: String = null > def main(args: Array[String]) { > // Initializing HBASE Configuration variables > HBASE_DB_HOST="127.0.0.1" > HBASE_TABLE="university" > HBASE_COLUMN_FAMILY="emp" > // Initializing Hive Metastore configuration > HIVE_DATA_WAREHOUSE = "/usr/local/hive/warehouse" > // Initializing Hive table name - Target table > HIVE_TABLE_NAME = "employees" > // setting spark application > // val sparkConf = new SparkConf().setAppName(APP_NAME).setMaster("local") > //initialize the spark context > //val sparkContext = new SparkContext(sparkConf) > //val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext) > // Enable Hive with Hive warehouse in SparkSession > val spark = > SparkSession.builder().appName(APP_NAME).config("hive.metastore.warehouse.dir", > HIVE_DATA_WAREHOUSE).config("spark.sql.warehouse.dir", > HIVE_DATA_WAREHOUSE).enableHiveSupport().getOrCreate() > import spark.implicits._ > import spark.sql > > val conf = HBaseConfiguration.create() > conf.set(TableInputFormat.INPUT_TABLE, HBASE_TABLE) > conf.set(TableInputFormat.SCAN_COLUMNS, HBASE_COLUMN_FAMILY) > // Load an RDD of rowkey, result(ImmutableBytesWritable, Result) tuples > from the table > val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(conf, > classOf[TableInputFormat], > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], > classOf[org.apache.hadoop.hbase.client.Result]) > > println(hBaseRDD.count()) > //hBaseRDD.foreach(println) > > //keyValue is a RDD[java.util.list[hbase.KeyValue]] > val keyValue = hBaseRDD.map(x => x._2).map(_.list) > > //outPut is a RDD[String], in which each line represents a record in HBase > val outPut = keyValue.flatMap(x => x.asScala.map(cell => > > HBaseResult( > Bytes.toInt(CellUtil.cloneRow(cell)), > Bytes.toStringBinary(CellUtil.cloneFamily(cell)), > Bytes.toStringBinary(CellUtil.cloneQualifier(cell)), > cell.getTimestamp, > new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new > Date(cell.getTimestamp.toLong)), > Bytes.toStringBinary(CellUtil.cloneValue(cell)), > Type.codeToType(cell.getTypeByte).toString > ) > ) > ).toDF() > // Output dataframe > outPut.show > > // get timestamp > val datetimestamp_threshold = "2016-08-25 14:27:02:001" > val datetimestampformat = new SimpleDateFormat("yyyy-MM-dd > HH:mm:ss:SSS").parse(datetimestamp_threshold).getTime() > > // Resultset filteration based on timestamp > val filtered_output_timestamp = outPut.filter($"colDatetime" >= > datetimestampformat) > // Resultset filteration based on rowkey > val filtered_output_row = > outPut.filter($"colDatetime".between(1668493360,1668493365)) > > > // Saving Dataframe to Hive Table Successfully. > > filtered_output_timestamp.write.mode("append").saveAsTable(HIVE_TABLE_NAME) > } > case class HBaseResult(rowkey: Int, colFamily: String, colQualifier: > String, colDatetime: Long, colDatetimeStr: String, colValue: String, colType: > String) > } > > > Error: > > 17/01/29 13:51:53 INFO metastore.HiveMetaStore: 0: create_database: > Database(name:default, description:default database, > locationUri:hdfs://localhost:9000/usr/local/hive/warehouse, parameters:{}) > 17/01/29 13:51:53 INFO HiveMetaStore.audit: ugi=hduser > ip=unknown-ip-addr cmd=create_database: Database(name:default, > description:default database, > locationUri:hdfs://localhost:9000/usr/local/hive/warehouse, parameters:{}) > 17/01/29 13:51:53 ERROR metastore.RetryingHMSHandler: > AlreadyExistsException(message:Database default already exists) > at > org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:891) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107) > at com.sun.proxy.$Proxy21.create_database(Unknown Source) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:644) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156) > at com.sun.proxy.$Proxy22.createDatabase(Unknown Source) > at org.apache.hadoop.hive.ql.metadata.Hive.createDatabase(Hive.java:306) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply$mcV$sp(HiveClientImpl.scala:309) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:309) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:309) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:280) > at > org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227) > at > org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226) > at > org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:269) > at > org.apache.spark.sql.hive.client.HiveClientImpl.createDatabase(HiveClientImpl.scala:308) > at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply$mcV$sp(HiveExternalCatalog.scala:99) > at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply(HiveExternalCatalog.scala:99) > at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply(HiveExternalCatalog.scala:99) > at > org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:72) > at > org.apache.spark.sql.hive.HiveExternalCatalog.createDatabase(HiveExternalCatalog.scala:98) > at > org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:147) > at > org.apache.spark.sql.catalyst.catalog.SessionCatalog.<init>(SessionCatalog.scala:89) > at > org.apache.spark.sql.hive.HiveSessionCatalog.<init>(HiveSessionCatalog.scala:51) > at > org.apache.spark.sql.hive.HiveSessionState.catalog$lzycompute(HiveSessionState.scala:49) > at > org.apache.spark.sql.hive.HiveSessionState.catalog(HiveSessionState.scala:48) > at > org.apache.spark.sql.hive.HiveSessionState$$anon$1.<init>(HiveSessionState.scala:63) > at > org.apache.spark.sql.hive.HiveSessionState.analyzer$lzycompute(HiveSessionState.scala:63) > at > org.apache.spark.sql.hive.HiveSessionState.analyzer(HiveSessionState.scala:62) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:161) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167) > at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59) > at > org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:441) > at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:395) > at > org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:163) > at com.chetan.poc.hbase.IncrementalJob$.main(IncrementalJob.scala:58) > at com.chetan.poc.hbase.IncrementalJob.main(IncrementalJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > > > Thanks. > > >