Hi Jacek,

This is exact what i'm looking for. Thanks!!

Also thanks for the link. I just noticed that I can unfold the link of
trigger and see the examples in java and scala languages - what a general
help for a new comer :-)
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter
<https://urldefense.proofpoint.com/v2/url?u=http-3A__spark.apache.org_docs_latest_api_scala_index.html-23org.apache.spark.sql.streaming.DataStreamWriter&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=9YF85k6Q86ELSbXl40mkGw&m=m2sU8kZLTdkMnwBSeE0_Zas-dlFoPDb3AeWH4V62vRo&s=RYwav9pkXP6vR0vMTl8w1BFABs-EQPuJ-mY376ARQPA&e=>
def trigger(trigger: Trigger
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/Trigger.html>
): DataStreamWriter
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html>
[T]

Set the trigger for the stream query. The default value is ProcessingTime(0)
and it will run the query as fast as possible.

Scala Example:
df.writeStream.trigger(ProcessingTime("10 seconds"))

import scala.concurrent.duration._
df.writeStream.trigger(ProcessingTime(10.seconds))

Java Example:
df.writeStream().trigger(ProcessingTime.create("10 seconds"))

import java.util.concurrent.TimeUnit
df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS))

Muchly appreciated!

Peter


On Fri, May 25, 2018 at 9:11 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Peter,
>
> > Basically I need to find a way to set the batch-interval in (b), similar
> as in (a) below.
>
> That's trigger method on DataStreamWriter.
>
> http://spark.apache.org/docs/latest/api/scala/index.html#
> org.apache.spark.sql.streaming.DataStreamWriter
>
> import org.apache.spark.sql.streaming.Trigger
> df.writeStream.trigger(Trigger.ProcessingTime("1 second"))
>
> See http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#triggers
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
> On Thu, May 24, 2018 at 10:14 PM, Peter Liu <peter.p...@gmail.com> wrote:
>
>> Hi there,
>>
>> from my apache spark streaming website (see links below),
>>
>>    - the batch-interval is set when a spark StreamingContext is
>>    constructed (see example (a) quoted below)
>>    - the StreamingContext is available in older and new Spark version
>>    (v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/
>>    1.6.0/streaming-programming-guide.html
>>    <https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html>
>>    and https://spark.apache.org/docs/2.3.0/streaming-programming-gu
>>    ide.html )
>>    - however, example (b) below  doesn't use StreamingContext, but
>>    StreamingSession object to setup a streaming flow;
>>
>> What does the usage difference in (a) and (b) mean? I was wondering if
>> this would mean a different streaming approach ("traditional" streaming vs
>> structured streaming?
>>
>> Basically I need to find a way to set the batch-interval in (b), similar
>> as in (a) below.
>>
>> Would be great if someone can please share some insights here.
>>
>> Thanks!
>>
>> Peter
>>
>> (a)
>> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )
>>
>> import org.apache.spark._import org.apache.spark.streaming._
>> val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= 
>> new StreamingContext(conf, Seconds(1))
>>
>>
>> (b)
>> ( from databricks' https://databricks.com/blog/20
>> 17/04/26/processing-data-in-apache-kafka-with-structured-str
>> eaming-in-apache-spark-2-2.html)
>>
>>    val *spark *= SparkSession.builder()
>>         .appName(appName)
>>       .getOrCreate()
>> ...
>>
>> jsonOptions = { "timestampFormat": nestTimestampFormat }
>> parsed = *spark *\
>>   .readStream \
>>   .format("kafka") \
>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>   .option("subscribe", "nest-logs") \
>>   .load() \
>>   .select(from_json(col("value").cast("string"), schema, 
>> jsonOptions).alias("parsed_value"))
>>
>>
>>
>>
>>
>

Reply via email to