Why does this not work? Is insert into broken in 1.3.1? It does not throw
any errors, fail, or throw exceptions. It simply does not work.
val ssc = new StreamingContext(sc, Minutes(10))
val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
parquetFile.registerTempTable("rideaccepted")
currentStream.foreachRDD { rdd =>
val df = sqlContext.jsonRDD(rdd)
df.insertInto("rideaccepted")
}
ssc.start()
Or this?
val ssc = new StreamingContext(sc, Minutes(10))
val currentStream = ssc.textFileStream("s3://textFileDirectory")
val day = sqlContext.jsonFile("s3://textFileDirectory")
day.registerTempTable("rideaccepted")
currentStream.foreachRDD { rdd =>
val df = sqlContext.jsonRDD(rdd)
df.registerTempTable("tmp_rideaccepted")
sqlContext.sql("insert into table rideaccepted select * from
tmp_rideaccepted")
}
ssc.start()
or this?
val ssc = new StreamingContext(sc, Minutes(10))
val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
dayBefore..registerTempTable("rideaccepted")
currentStream.foreachRDD { rdd =>
val df = sqlContext.jsonRDD(rdd)
df.insertInto("rideaccepted")
}
ssc.start()