Hi Peter, > Basically I need to find a way to set the batch-interval in (b), similar as in (a) below.
That's trigger method on DataStreamWriter. http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.streaming.Trigger df.writeStream.trigger(Trigger.ProcessingTime("1 second")) See http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers Pozdrawiam, Jacek Laskowski ---- https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Thu, May 24, 2018 at 10:14 PM, Peter Liu <peter.p...@gmail.com> wrote: > Hi there, > > from my apache spark streaming website (see links below), > > - the batch-interval is set when a spark StreamingContext is > constructed (see example (a) quoted below) > - the StreamingContext is available in older and new Spark version > (v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/ > 1.6.0/streaming-programming-guide.html > <https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html> > and https://spark.apache.org/docs/2.3.0/streaming-programming- > guide.html ) > - however, example (b) below doesn't use StreamingContext, but > StreamingSession object to setup a streaming flow; > > What does the usage difference in (a) and (b) mean? I was wondering if > this would mean a different streaming approach ("traditional" streaming vs > structured streaming? > > Basically I need to find a way to set the batch-interval in (b), similar > as in (a) below. > > Would be great if someone can please share some insights here. > > Thanks! > > Peter > > (a) > https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html ) > > import org.apache.spark._import org.apache.spark.streaming._ > val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= > new StreamingContext(conf, Seconds(1)) > > > (b) > ( from databricks' https://databricks.com/blog/ > 2017/04/26/processing-data-in-apache-kafka-with-structured- > streaming-in-apache-spark-2-2.html) > > val *spark *= SparkSession.builder() > .appName(appName) > .getOrCreate() > ... > > jsonOptions = { "timestampFormat": nestTimestampFormat } > parsed = *spark *\ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "nest-logs") \ > .load() \ > .select(from_json(col("value").cast("string"), schema, > jsonOptions).alias("parsed_value")) > > > > >