All,
This causes below error:
However if i replace JavaHiveContext with Hive Context (see the commented code
below) and replace JavaClass with CaseClass (Scala) the same code works ok. Any
reason why this could be happening ?
ava.lang.ArithmeticException: / by zero
at
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:99)
at
parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:92)
at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at
org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:300)
at
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318)
at
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.api.java.JavaHiveContext
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Logging, SparkConf}
================================================================
object SparkStreamingToParquet extends Logging {
/**
*
* @param args
* @throws Exception
*/
def main(args: Array[String]) {
// if (args.length < 6) {
// logInfo("Please provide valid parameters: <hdfsFilesLocation:
hdfs://ip:8020/user/hdfs/--/> <IMPALAtableloc hdfs://ip:8020/user/hive/--/> "
// + "<tablename> <ex: com.philips.BeanClassName> <appname> <no of
cores> <checkpoint-dire>");
// logInfo("make user you give full folder path with '/' at the end i.e
/user/hdfs/abc/");
// System.exit(1);
// }
val CHECKPOINT_DIR = "hdfs://127.0.0.1:5555/user/checkpoint/" //args(6)
val jssc: StreamingContext = StreamingContext.getOrCreate(CHECKPOINT_DIR,
()=>{
createContext(args)
})
jssc.start
jssc.awaitTermination
}
def createContext(args:Array[String]): StreamingContext = {
val CHECKPOINT_DIR = "hdfs://127.0.0.1:5555/user/checkpoint" //args(6)
val sparkConf: SparkConf = new SparkConf()
val HDFS_URI = "hdfs://127.0.0.1:5555"
//sparkConf.get("spark.philips.hdfsuri");
val HDFS_FILE_LOC = HDFS_URI+"/user/logs/" //args(0); // for streaming
val IMPALA_TABLE_LOC = HDFS_URI+ "/user/impala/" //args(1); // impala table
location
val TEMP_TABLE_NAME = "temp_json" //args(2); // temp table name for hive
// context
val BEAN_CLASS_NAME = "Person" //args(3);
val SPARK_APP_NAME = "Monitor" //args(4);
sparkConf.setAppName(SPARK_APP_NAME).setMaster("local[2]")
var noOfCores = "3";
// if(args.length>=6){
// noOfCores= args(5);
// }
sparkConf.set("spark.cores.max", noOfCores);
val jssc: StreamingContext = new StreamingContext(sparkConf, new
Duration(30000))
val stream = jssc.textFileStream(HDFS_FILE_LOC)
stream.foreachRDD(rdd => {
if(rdd!=null && rdd.count()>0) {
val hcontext = new JavaHiveContext(rdd.sparkContext)
hcontext.createParquetFile(Class.forName(BEAN_CLASS_NAME),IMPALA_TABLE_LOC,true,org.apache.spark.deploy.SparkHadoopUtil.get.conf).registerTempTable(TEMP_TABLE_NAME);
//hcontext.createParquetFile[Person(IMPALA_TABLE_LOC,true,org.apache.spark.deploy.SparkHadoopUtil.get.conf).registerTempTable(TEMP_TABLE_NAME);
val schRdd = hcontext.jsonRDD(rdd)
schRdd.insertInto(TEMP_TABLE_NAME)
}
})
jssc.checkpoint(CHECKPOINT_DIR)
jssc
}
}
________________________________
The information contained in this message may be confidential and legally
protected under applicable law. The message is intended solely for the
addressee(s). If you are not the intended recipient, you are hereby notified
that any use, forwarding, dissemination, or reproduction of this message is
strictly prohibited and may be unlawful. If you are not the intended recipient,
please contact the sender by return e-mail and destroy all copies of the
original message.