Why is .remember not ideal?
On Sun, Jul 12, 2015 at 7:22 PM, Brandon White <bwwintheho...@gmail.com> wrote: > Hi Yin, > > Yes there were no new rows. I fixed it by doing a .remember on the > context. Obviously, this is not ideal. > > On Sun, Jul 12, 2015 at 6:31 PM, Yin Huai <yh...@databricks.com> wrote: > >> Hi Brandon, >> >> Can you explain what did you mean by "It simply does not work"? You did >> not see new data files? >> >> Thanks, >> >> Yin >> >> On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bwwintheho...@gmail.com> >> wrote: >> >>> Why does this not work? Is insert into broken in 1.3.1? It does not >>> throw any errors, fail, or throw exceptions. It simply does not work. >>> >>> >>> val ssc = new StreamingContext(sc, Minutes(10)) >>> >>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/") >>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/") >>> >>> dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet") >>> val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet") >>> parquetFile.registerTempTable("rideaccepted") >>> >>> currentStream.foreachRDD { rdd => >>> val df = sqlContext.jsonRDD(rdd) >>> df.insertInto("rideaccepted") >>> } >>> >>> ssc.start() >>> >>> >>> Or this? >>> >>> val ssc = new StreamingContext(sc, Minutes(10)) >>> val currentStream = ssc.textFileStream("s3://textFileDirectory") >>> val day = sqlContext.jsonFile("s3://textFileDirectory") >>> day.registerTempTable("rideaccepted") >>> >>> >>> currentStream.foreachRDD { rdd => >>> val df = sqlContext.jsonRDD(rdd) >>> df.registerTempTable("tmp_rideaccepted") >>> sqlContext.sql("insert into table rideaccepted select * from >>> tmp_rideaccepted") >>> } >>> >>> ssc.start() >>> >>> >>> or this? >>> >>> val ssc = new StreamingContext(sc, Minutes(10)) >>> >>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/") >>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/") >>> >>> dayBefore..registerTempTable("rideaccepted") >>> >>> currentStream.foreachRDD { rdd => >>> val df = sqlContext.jsonRDD(rdd) >>> df.insertInto("rideaccepted") >>> } >>> >>> ssc.start() >>> >>> >> >