Hi Jacek, This is exact what i'm looking for. Thanks!!
Also thanks for the link. I just noticed that I can unfold the link of trigger and see the examples in java and scala languages - what a general help for a new comer :-) http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter <https://urldefense.proofpoint.com/v2/url?u=http-3A__spark.apache.org_docs_latest_api_scala_index.html-23org.apache.spark.sql.streaming.DataStreamWriter&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=9YF85k6Q86ELSbXl40mkGw&m=m2sU8kZLTdkMnwBSeE0_Zas-dlFoPDb3AeWH4V62vRo&s=RYwav9pkXP6vR0vMTl8w1BFABs-EQPuJ-mY376ARQPA&e=> def trigger(trigger: Trigger <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/Trigger.html> ): DataStreamWriter <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html> [T] Set the trigger for the stream query. The default value is ProcessingTime(0) and it will run the query as fast as possible. Scala Example: df.writeStream.trigger(ProcessingTime("10 seconds")) import scala.concurrent.duration._ df.writeStream.trigger(ProcessingTime(10.seconds)) Java Example: df.writeStream().trigger(ProcessingTime.create("10 seconds")) import java.util.concurrent.TimeUnit df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) Muchly appreciated! Peter On Fri, May 25, 2018 at 9:11 AM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi Peter, > > > Basically I need to find a way to set the batch-interval in (b), similar > as in (a) below. > > That's trigger method on DataStreamWriter. > > http://spark.apache.org/docs/latest/api/scala/index.html# > org.apache.spark.sql.streaming.DataStreamWriter > > import org.apache.spark.sql.streaming.Trigger > df.writeStream.trigger(Trigger.ProcessingTime("1 second")) > > See http://spark.apache.org/docs/latest/structured- > streaming-programming-guide.html#triggers > > Pozdrawiam, > Jacek Laskowski > ---- > https://about.me/JacekLaskowski > Mastering Spark SQL https://bit.ly/mastering-spark-sql > Spark Structured Streaming https://bit.ly/spark-structured-streaming > Mastering Kafka Streams https://bit.ly/mastering-kafka-streams > Follow me at https://twitter.com/jaceklaskowski > > On Thu, May 24, 2018 at 10:14 PM, Peter Liu <peter.p...@gmail.com> wrote: > >> Hi there, >> >> from my apache spark streaming website (see links below), >> >> - the batch-interval is set when a spark StreamingContext is >> constructed (see example (a) quoted below) >> - the StreamingContext is available in older and new Spark version >> (v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/ >> 1.6.0/streaming-programming-guide.html >> <https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html> >> and https://spark.apache.org/docs/2.3.0/streaming-programming-gu >> ide.html ) >> - however, example (b) below doesn't use StreamingContext, but >> StreamingSession object to setup a streaming flow; >> >> What does the usage difference in (a) and (b) mean? I was wondering if >> this would mean a different streaming approach ("traditional" streaming vs >> structured streaming? >> >> Basically I need to find a way to set the batch-interval in (b), similar >> as in (a) below. >> >> Would be great if someone can please share some insights here. >> >> Thanks! >> >> Peter >> >> (a) >> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html ) >> >> import org.apache.spark._import org.apache.spark.streaming._ >> val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= >> new StreamingContext(conf, Seconds(1)) >> >> >> (b) >> ( from databricks' https://databricks.com/blog/20 >> 17/04/26/processing-data-in-apache-kafka-with-structured-str >> eaming-in-apache-spark-2-2.html) >> >> val *spark *= SparkSession.builder() >> .appName(appName) >> .getOrCreate() >> ... >> >> jsonOptions = { "timestampFormat": nestTimestampFormat } >> parsed = *spark *\ >> .readStream \ >> .format("kafka") \ >> .option("kafka.bootstrap.servers", "localhost:9092") \ >> .option("subscribe", "nest-logs") \ >> .load() \ >> .select(from_json(col("value").cast("string"), schema, >> jsonOptions).alias("parsed_value")) >> >> >> >> >> >