Hi Peter,

> Basically I need to find a way to set the batch-interval in (b), similar
as in (a) below.

That's trigger method on DataStreamWriter.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter

import org.apache.spark.sql.streaming.Trigger
df.writeStream.trigger(Trigger.ProcessingTime("1 second"))

See
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, May 24, 2018 at 10:14 PM, Peter Liu <peter.p...@gmail.com> wrote:

> Hi there,
>
> from my apache spark streaming website (see links below),
>
>    - the batch-interval is set when a spark StreamingContext is
>    constructed (see example (a) quoted below)
>    - the StreamingContext is available in older and new Spark version
>    (v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/
>    1.6.0/streaming-programming-guide.html
>    <https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html>
>    and https://spark.apache.org/docs/2.3.0/streaming-programming-
>    guide.html )
>    - however, example (b) below  doesn't use StreamingContext, but
>    StreamingSession object to setup a streaming flow;
>
> What does the usage difference in (a) and (b) mean? I was wondering if
> this would mean a different streaming approach ("traditional" streaming vs
> structured streaming?
>
> Basically I need to find a way to set the batch-interval in (b), similar
> as in (a) below.
>
> Would be great if someone can please share some insights here.
>
> Thanks!
>
> Peter
>
> (a)
> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )
>
> import org.apache.spark._import org.apache.spark.streaming._
> val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= 
> new StreamingContext(conf, Seconds(1))
>
>
> (b)
> ( from databricks' https://databricks.com/blog/
> 2017/04/26/processing-data-in-apache-kafka-with-structured-
> streaming-in-apache-spark-2-2.html)
>
>    val *spark *= SparkSession.builder()
>         .appName(appName)
>       .getOrCreate()
> ...
>
> jsonOptions = { "timestampFormat": nestTimestampFormat }
> parsed = *spark *\
>   .readStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "nest-logs") \
>   .load() \
>   .select(from_json(col("value").cast("string"), schema, 
> jsonOptions).alias("parsed_value"))
>
>
>
>
>

Reply via email to