Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Tathagata Das
unately, the listeners process is async and can't guarantee happens > before association with microbatch to commit offsets to external storage. > But still they will work. Is there a way to access lastProgress in > foreachBatch ? > > > On Wed, May 22, 2024 at 7:35 AM Tathagata D

Re: Dstream HasOffsetRanges equivalent in Structured streaming

2024-05-22 Thread Tathagata Das
If you want to find what offset ranges are present in a microbatch in Structured Streaming, you have to look at the StreamingQuery.lastProgress or use the QueryProgressListener . Both of these

Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

2020-06-17 Thread Tathagata Das
Hello Rachana, Getting exactly-once semantics on files and making it scale to a very large number of files are very hard problems to solve. While Structured Streaming + built-in file sink solves the exactly-once guarantee that DStreams could not, it is definitely limited in other ways (scaling in

Re: how can i write spark addListener metric to kafka

2020-06-09 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis On Tue, Jun 9, 2020 at 4:42 PM a s wrote: > hi Guys, > > I am building a structured streaming app for google analytics data > > i want to capture the numbe

Re: Spark, read from Kafka stream failing AnalysisException

2020-04-05 Thread Tathagata Das
Have you looked at the suggestion made by the error by searching for "Structured Streaming + Kafka Integration Guide" in Google? It should be the first result. The last section in the "Structured Streaming

Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-04 Thread Tathagata Das
Make sure that you are continuously feeding data into the query to trigger the batches. only then timeouts are processed. See the timeout behavior details here - https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState On Wed, Mar 4, 2020 at 2:51 PM Somet

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Tathagata Das
as when executing a simple group by. > > Regards, > > Bryan Jeffrey > > Get Outlook for Android <https://aka.ms/ghei36> > > -- > *From:* Tathagata Das > *Sent:* Friday, February 28, 2020 4:56:07 PM > *To:* Bryan Jeffrey > *Cc:* user &

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Tathagata Das
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably bet

Re: dropDuplicates and watermark in structured streaming

2020-02-28 Thread Tathagata Das
lec ssmi wrote: > Such as : > df.withWarmark("time","window > size").dropDulplicates("id").withWatermark("time","real > watermark").groupBy(window("time","window size","window > size")).agg(count(&

Re: dropDuplicates and watermark in structured streaming

2020-02-27 Thread Tathagata Das
1. Yes. All times in event time, not processing time. So you may get 10AM event time data at 11AM processing time, but it will still be compared again all data within 9-10AM event times. 2. Show us your code. On Thu, Feb 27, 2020 at 2:30 AM lec ssmi wrote: > Hi: > I'm new to structured stre

Re: Spark Streaming: Aggregating values across batches

2020-02-27 Thread Tathagata Das
Use Structured Streaming. Its aggregation, by definition, is across batches. On Thu, Feb 27, 2020 at 3:17 PM Something Something < mailinglist...@gmail.com> wrote: > We've a Spark Streaming job that calculates some values in each batch. > What we need to do now is aggregate values across ALL batc

Re: how can I dynamic parse json in kafka when using Structured Streaming

2019-09-17 Thread Tathagata Das
You can use *from_json* built-in SQL function to parse json. https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column- On Mon, Sep 16, 2019 at 7:39 PM lk_spark wrote: > hi,all : > I'm using Structured

Re: Structured Streaming: How to add a listener for when a batch is complete

2019-09-03 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis On Tue, Sep 3, 2019, 3:26 PM Natalie Ruiz wrote: > Hello all, > > > > I’m a beginner, new to Spark and wanted to know if there was an equivalent > to Spark

Re: Structured Streaming Dataframe Size

2019-08-29 Thread Tathagata Das
) * Then *select * from **myAggTable* This will give awesome ACID transactional guarantees between reads and writes. Read more on the linked website (full disclosure, I work on that project as well). > Thank you very much for your help! > > > On Tue, Aug 27, 2019, 6:42 PM Tath

Re: Structured Streaming Dataframe Size

2019-08-27 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts *Note that Structured Streaming does not materialize the entire table*. It > reads the latest available data from the streaming data source, processes > it incrementally to update the result, and then d

Announcing Delta Lake 0.3.0

2019-08-01 Thread Tathagata Das
Hello everyone, We are excited to announce the availability of Delta Lake 0.3.0 which introduces new programmatic APIs for manipulating and managing data in Delta Lake tables. Here are the main features: - Scala/Java APIs for DML commands - You can now modify data in Delta Lake tables

Re: Announcing Delta Lake 0.2.0

2019-06-21 Thread Tathagata Das
@ayan guha @Gourav Sengupta Delta Lake is OSS currently does not support defining tables in Hive metastore using DDL commands. We are hoping to add the necessary compatibility fixes in Apache Spark to make Delta Lake work with tables and DDL commands. So we will support them in a future release.

Re: How to execute non-timestamp-based aggregations in spark structured streaming?

2019-04-22 Thread Tathagata Das
; select row_number() > over (partition by Origin order by OnTimeDepPct desc) OnTimeDepRank,* > from flights > > This will *not* work in *structured streaming* : The culprit is: > > partition by Origin > > The requirement is to use a timestamp-typed field such as > >

Re: Iterator of KeyValueGroupedDataset.flatMapGroupsWithState function

2018-10-31 Thread Tathagata Das
It is okay to collect the iterator. That will not break Spark. However, collecting it requires memory in the executor, so you may cause OOMs if a group has a LOT of new data. On Wed, Oct 31, 2018 at 3:44 AM Antonio Murgia - antonio.murg...@studio.unibo.it wrote: > Hi all, > > I'm currently devel

Re: [Structured Streaming] Two watermarks and StreamingQueryListener

2018-08-10 Thread Tathagata Das
Structured Streaming internally maintains one global watermark by taking a min of the two watermarks. Thats why one gets reported. In Spark 2.4, there will be the option of choosing max instead of min. Just curious. Why do you have to two watermarks? Whats the query like. TD On Thu, Aug 9, 2018

Re: How to read json data from kafka and store to hdfs with spark structued streaming?

2018-07-26 Thread Tathagata Das
Are you writing multiple streaming query output to the same location? If so, I can see this error occurring. Multiple streaming queries writing to the same directory is not supported. On Tue, Jul 24, 2018 at 3:38 PM, dddaaa wrote: > I'm trying to read json messages from kafka and store them in h

Re: Exceptions with simplest Structured Streaming example

2018-07-26 Thread Tathagata Das
Unfortunately, your output is not visible in the email that we see. Was it an image that some got removed? Maybe best to copy the output text (i.e. the error message) into the email. On Thu, Jul 26, 2018 at 5:41 AM, Jonathan Apple wrote: > Hello, > > There is a streaming World Count example at t

Re: Dataset - withColumn and withColumnRenamed that accept Column type

2018-07-17 Thread Tathagata Das
Yes. Yes you can. On Tue, Jul 17, 2018 at 11:42 AM, Sathi Chowdhury wrote: > Hi, > My question is about ability to integrate spark streaming with multiple > clusters.Is it a supported use case. An example of that is that two topics > owned by different group and they have their own kakka infra .

Re: [Structured Streaming] Custom StateStoreProvider

2018-07-10 Thread Tathagata Das
Note that this is not public API yet. Hence this is not very documented. So use it at your own risk :) On Tue, Jul 10, 2018 at 11:04 AM, subramgr wrote: > Hi, > > This looks very daunting *trait* is there some blog post or some articles > which explains on how to implement this *trait* > > Thank

Re: [Structured Streaming] Reading Checkpoint data

2018-07-09 Thread Tathagata Das
Only the stream metadata (e.g., streamid, offsets) are stored as json. The stream state data is stored in an internal binary format. On Mon, Jul 9, 2018 at 4:07 PM, subramgr wrote: > Hi, > > I read somewhere that with Structured Streaming all the checkpoint data is > more readable (Json) like. I

Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

2018-07-05 Thread Tathagata Das
Hey all, In Spark 2.4.0, there will be a new feature called *foreachBatch* which will expose the output rows of every micro-batch as a dataframe, on which you apply a user-defined function. With that, you can reuse existing batch sources for writing results as well as write results to multiple loc

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-28 Thread Tathagata Das
; Its all documented - https://spark.apache.org/docs/ >> latest/structured-streaming-programming-guide.html#monitorin >> g-streaming-queries >> >> On Wed, Jun 20, 2018 at 6:06 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Str

Re: How to reduceByKeyAndWindow in Structured Streaming?

2018-06-28 Thread Tathagata Das
The fundamental conceptual difference between the windowing in DStream vs Structured Streaming is that DStream used the arrival time of the record in Spark (aka processing time) and Structured Streaming using event time. If you want to exactly replicate DStream's processing time windows in Structur

Re: Recommendation of using StreamSinkProvider for a custom KairosDB Sink

2018-06-25 Thread Tathagata Das
This is interface is actually unstable. The v2 of DataSource APIs is being designed right now which will be public and stable in a release or two. So unfortunately there is no stable interface right now that I can officially recommend. That said, you could always use the ForeachWriter interface (s

Re: Does Spark Structured Streaming have a JDBC sink or Do I need to use ForEachWriter?

2018-06-21 Thread Tathagata Das
Actually, we do not support jdbc sink yet. The blog post was just an example :) I agree it is misleading in hindsight. On Wed, Jun 20, 2018 at 6:09 PM, kant kodali wrote: > Hi All, > > Does Spark Structured Streaming have a JDBC sink or Do I need to use > ForEachWriter? I see the following code

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-20 Thread Tathagata Das
-streaming-queries On Wed, Jun 20, 2018 at 6:06 PM, Tathagata Das wrote: > Structured Streaming does not maintain a queue of batch like DStream. > DStreams used to cut off batches at a fixed interval and put in a queue, > and a different thread processed queued batches. In contrast, S

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-20 Thread Tathagata Das
Structured Streaming does not maintain a queue of batch like DStream. DStreams used to cut off batches at a fixed interval and put in a queue, and a different thread processed queued batches. In contrast, Structured Streaming simply cuts off and immediately processes a batch after the previous batc

Re: [structured-streaming][parquet] readStream files order in Parquet

2018-06-15 Thread Tathagata Das
The files are processed in the order the file last modified timestamp. The path and partitioning scheme are not used for ordering. On Thu, Jun 14, 2018 at 6:59 AM, karthikjay wrote: > My parquet files are first partitioned by environment and then by date > like: > > env=testing/ >date=2018-

Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread Tathagata Das
Glad that it worked out! It's unfortunate that there exist such pitfalls. And there is no easy way to get around it. If you can, let us know how your experience with mapGroupsWithState has been. TD On Fri, Jun 8, 2018 at 1:49 PM, frankdede wrote: > You are exactly right! A few hours ago, I trie

Re: Reset the offsets, Kafka 0.10 and Spark

2018-06-08 Thread Tathagata Das
Structured Streaming really makes this easy. You can simply specify the option of whether the start the query from earliest or latest. Check out - https://www.slideshare.net/databricks/a-deep-dive-into-structured-streaming - https://spark.apache.org/docs/latest/structured-streaming-kafka-integratio

Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread Tathagata Das
Try to define the watermark on the right column immediately before calling `groupByKey(...).mapGroupsWithState(...)`. You are applying the watermark and then doing a bunch of opaque transformation (user-defined flatMap that the planner has no visibility into). This prevents the planner from propaga

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Tathagata Das
Just to be clear, these screenshots are about the memory consumption of the driver. So this is nothing to do with streaming aggregation state which are kept in the memory of the executors, not the driver. On Tue, May 22, 2018 at 10:21 AM, Jungtaek Lim wrote: > 1. Could you share your Spark versi

Re: can we use mapGroupsWithState in raw sql?

2018-04-16 Thread Tathagata Das
Unfortunately no. Honestly it does not make sense as for type-aware operations like map, mapGroups, etc., you have to provide an actual JVM function. That does not fit in with the SQL language structure. On Mon, Apr 16, 2018 at 7:34 PM, kant kodali wrote: > Hi All, > > can we use mapGroupsWithSt

Re: Structured Streaming on Kubernetes

2018-04-13 Thread Tathagata Das
Structured streaming is stable in production! At Databricks, we and our customers collectively process almost 100s of billions of records per day using SS. However, we are not using kubernetes :) Though I don't think it will matter too much as long as kubes are correctly provisioned+configured and

Re: Does partition by and order by works only in stateful case?

2018-04-12 Thread Tathagata Das
The traditional SQL windows with `over` is not supported in streaming. Only time-based windows, that is, `window("timestamp", "10 minutes")` is supported in streaming. On Thu, Apr 12, 2018 at 7:34 PM, kant kodali wrote: > Hi All, > > Does partition by and order by works only in stateful case? >

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
.start() > > > But what i need to do in this step is only transforming json string data > to Dataset . How to fix it? > > Thanks! > > > Regard, > Junfeng Chen > > On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: >

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
It's not very surprising that doing this sort of RDD to DF conversion inside DStream.foreachRDD has weird corner cases like this. In fact, you are going to have additional problems with partial parquet files (when there are failures) in this approach. I strongly suggest that you use Structured Stre

Re: Does structured streaming support Spark Kafka Direct?

2018-04-12 Thread Tathagata Das
The parallelism is same for Structured Streaming. In fact, the Kafka Structured Streaming source is based on the same principle as DStream's Kafka Direct, hence it has very similar behavior. On Tue, Apr 10, 2018 at 11:03 PM, SRK wrote: > hi, > > We have code based on Spark Kafka Direct in produ

Re: Apache Spark Structured Streaming - Kafka Consumer cannot fetch records for offset exception

2018-03-22 Thread Tathagata Das
Structured Streaming AUTOMATICALLY saves the offsets in a checkpoint directory that you provide. And when you start the query again with the same directory it will just pick up where it left off. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failur

Re: [Structured Streaming] Application Updates in Production

2018-03-22 Thread Tathagata Das
om the old > checkpoints, one would need to keep the old app and the new app running > until new app catches up on data processing with the old app. > > > - Original message - > From: Tathagata Das > To: Priyank Shrivastava > Cc: user > Subject: Re: [Structured

Re: [Structured Streaming] Application Updates in Production

2018-03-21 Thread Tathagata Das
Why do you want to start the new code in parallel to the old one? Why not stop the old one, and then start the new one? Structured Streaming ensures that all checkpoint information (offsets and state) are future-compatible (as long as state schema is unchanged), hence new code should be able to pic

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Tathagata Das
-with-tathagata-das On Thu, Mar 15, 2018 at 8:37 AM, Bowden, Chris wrote: > You need to tell Spark about the structure of the data, it doesn't know > ahead of time if you put avro, json, protobuf, etc. in kafka for the > message format. If the messages are in json, Spark provides

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Tathagata Das
Relevant: https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html This is true stream-stream join which will automatically buffer delayed data and appropriately join stuff with SQL join semantics. Please check it out :) TD On Wed, Mar 14, 2018 at 12:07

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-12 Thread Tathagata Das
I believe just spitting out nulls for every > trigger until there is a match and when there is match spitting out the > joined rows should suffice isn't it? > > Sorry if my thoughts are too naive! > > > > > > > > > > > On Thu, Mar 8, 2018 at 6:14 PM, Tat

Re: Upgrades of streaming jobs

2018-03-09 Thread Tathagata Das
Yes, all checkpoints are forward compatible. However, you do need to restart the query if you want to update the code of the query. This downtime can be in less than a second (if you just restart the query without stopping the application/Spark driver) or 10s of seconds (if you have to stop the ap

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-08 Thread Tathagata Das
d is >>>> it two implement both of these? It turns out the update mode and full outer >>>> join is very useful and required in my case, therefore, I'm just asking. >>>> >>>> Thanks! >>>> >>>> On Tue, Mar 6, 2018 at

Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-07 Thread Tathagata Das
ny this line: > >> CachedKafkaConsumer: CachedKafkaConsumer is not running in >> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are >> interrupted because of KAFKA-1894. > > > > Regard, > Junfeng Chen > > On Wed, Mar 7, 2018 at 3:34 AM, Tathagata Das

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread Tathagata Das
-distribution.sh --name custom-spark --pip --r --tgz -Psparkr > -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn > > > On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Hey, >> >> Thanks for testing ou

Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Tathagata Das
Which version of Spark are you using? And can you give us the full stack trace of the exception? On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen wrote: > I am trying to read kafka and save the data as parquet file on hdfs > according to this https://stackoverflow.com/questions/45827664/read-from >

Re: [Beginner] How to save Kafka Dstream data to parquet ?

2018-03-02 Thread Tathagata Das
way we get at least > once semantic and partial file write issue. > > Thoughts ? > > > Sunil Parmar > > On Wed, Feb 28, 2018 at 1:59 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> There is no good way to save to parquet without causing downstre

Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-28 Thread Tathagata Das
I made a JIRA for it - https://issues.apache.org/jira/browse/SPARK-23539 Unfortunately it is blocked by Kafka version upgrade, which has a few nasty issues related to Kafka bugs - https://issues.apache.org/jira/browse/SPARK-18057 On Wed, Feb 28, 2018 at 3:17 PM, karthikus wrote: > TD, > > Thanks

Re: [Beginner] How to save Kafka Dstream data to parquet ?

2018-02-28 Thread Tathagata Das
There is no good way to save to parquet without causing downstream consistency issues. You could use foreachRDD to get each RDD, convert it to DataFrame/Dataset, and write out as parquet files. But you will later run into issues with partial files caused by failures, etc. On Wed, Feb 28, 2018 at

Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread Tathagata Das
Let me answer the original question directly, that is, how do we determine that an event is late. We simply track the maximum event time the engine has seen in the data it has processed till now. And any data that has event time less than the max is basically "late" (as it is out-of-order). Now, in

Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-27 Thread Tathagata Das
Unfortunately, exposing Kafka headers is not yet supported in Structured Streaming. The community is more than welcome to add support for it :) On Tue, Feb 27, 2018 at 2:51 PM, Karthik Jayaraman wrote: > Hi all, > > I am using Spark 2.2.1 Structured Streaming to read messages from Kafka. I > wou

Re: Trigger.ProcessingTime("10 seconds") & Trigger.Continuous(10.seconds)

2018-02-25 Thread Tathagata Das
The continuous one is our new low latency continuous processing engine in Structured Streaming (to be released in 2.3). Here is the pre-release doc - https://dist.apache.org/repos/dist/dev/spark/v2.3.0-rc5-docs/_site/structured-streaming-programming-guide.html#continuous-processing On Sun, Feb 25,

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-22 Thread Tathagata Das
Hey, Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this. TD On Tue, Feb 20, 2018 at 8:20 PM, kant kodali wrote: > if I change it to the below code it works. However, I don't believe it is > the solution I am looking for. I want to be able t

Re: [Structured Streaming] Avoiding multiple streaming queries

2018-02-14 Thread Tathagata Das
Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to f

Re: Spark structured streaming: periodically refresh static data frame

2018-02-14 Thread Tathagata Das
estart queries? > > Should i just wait for 2.3 where i'll be able to join two structured > streams ( if the release is just a few weeks away ) > > Appreciate all the help! > > thanks > App > > > > On 14 February 2018 at 4:41:52 PM, Tathagata Das ( > t

Re: Spark structured streaming: periodically refresh static data frame

2018-02-14 Thread Tathagata Das
Let me fix my mistake :) What I suggested in that earlier thread does not work. The streaming query that joins a streaming dataset with a batch view, does not correctly pick up when the view is updated. It works only when you restart the query. That is, - stop the query - recreate the dataframes, -

Re: Spark Streaming withWatermark

2018-02-06 Thread Tathagata Das
That may very well be possible. The watermark delay guarantees that any record newer than or equal to watermark (that is, max event time seen - 20 seconds), will be considered and never be ignored. It does not guarantee the other way, that is, it does NOT guarantee that records older than the wate

Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Tathagata Das
t >> >> val socketDF = spark >> >> .readStream >> >> .format("socket") >> >> .option("host", "localhost") >> >> .option("port", ) >> >> .load() >> >> >>> soc

Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Tathagata Das
Hello Divya, To add further clarification, the Apache Bahir does not have any Structured Streaming support for Twitter. It only has support for Twitter + DStreams. TD On Wed, Jan 31, 2018 at 2:44 AM, vermanurag wrote: > Twitter functionality is not part of Core Spark. We have successfully us

Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-01-31 Thread Tathagata Das
Could you give the full stack trace of the exception? Also, can you do `dataframe2.explain(true)` and show us the plan output? On Wed, Jan 31, 2018 at 3:35 PM, M Singh wrote: > Hi Folks: > > I have to add a column to a structured *streaming* dataframe but when I > do that (using select or wit

Re: mapGroupsWithState in Python

2018-01-31 Thread Tathagata Das
Hello Ayan, >From what I understand, mapGroupsWithState (probably the more general flatMapGroupsWithState) is the best way forward (not available in python). However, you need to figure out your desired semantics of when you want to output the deduplicated data from the stremaing query. For exampl

Re: Max number of streams supported ?

2018-01-31 Thread Tathagata Das
Just to clarify a subtle difference between DStreams and Structured Streaming. Multiple input streams in a DStreamGraph is likely to mean they are all being processed/computed in the same way as there can be only one streaming query / context active in the StreamingContext. However, in the case of

Re: Apache Spark - Custom structured streaming data source

2018-01-25 Thread Tathagata Das
Hello Mans, The streaming DataSource APIs are still evolving and are not public yet. Hence there is no official documentation. In fact, there is a new DataSourceV2 API (in Spark 2.3) that we are migrating towards. So at this point of time, it's hard to make any concrete suggestion. You can take a

Re: [Spark structured streaming] Use of (flat)mapgroupswithstate takes long time

2018-01-22 Thread Tathagata Das
For computing mapGroupsWithState, can you check the following. - How many tasks are being launched in the reduce stage (that is, the stage after the shuffle, that is computing mapGroupsWithState) - How long each task is taking? - How many cores does the cluster have? On Thu, Jan 18, 2018 at 11:28

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

2018-01-12 Thread Tathagata Das
an > ​ > > > On Fri, Jan 12, 2018 at 4:39 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Hello Dan, >> >> From your code, it seems like you are setting the timeout timestamp based >> on the current processing-time / wall-clock-time, while

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

2018-01-12 Thread Tathagata Das
Hello Dan, >From your code, it seems like you are setting the timeout timestamp based on the current processing-time / wall-clock-time, while the watermark is being calculated on the event-time ("when" column). The semantics of the EventTimeTimeout is that when the last set timeout timestamp of a

Re: Spark structured streaming time series forecasting

2018-01-09 Thread Tathagata Das
Spark-ts has been under development for a while. So I doubt there is any integration with Structured Streaming. That said, Structured Streaming uses DataFrames and Datasets, and a lot of existing libraries build on Datasets/DataFrames should work directly, especially if they are map-like functions.

Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-03 Thread Tathagata Das
1. It is all the result data in that trigger. Note that it takes a DataFrame which is a purely logical representation of data and has no association with partitions, etc. which are physical representations. 2. If you want to limit the amount of data that is processed in a trigger, then you should

Re: Can we pass the Calcite streaming sql queries to spark sql?

2017-11-09 Thread Tathagata Das
I dont think so. Calcite's SQL is an extension of standard SQL (keywords like STREAM, etc.) which we dont support; we just support regular SQL, so queries like "SELECT STREAM " will not work. On Thu, Nov 9, 2017 at 11:50 AM, kant kodali wrote: > Can we pass the Calcite streaming sql queries

Re: Writing custom Structured Streaming receiver

2017-11-01 Thread Tathagata Das
Structured Streaming source APIs are not yet public, so there isnt a guide. However, if you are adventurous enough, you can take a look at the source code in Spark. Source API: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala

Re: Structured Stream in Spark

2017-10-25 Thread Tathagata Das
Please do not confuse old Spark Streaming (DStreams) with Structured Streaming. Structured Streaming's offset and checkpoint management is far more robust than DStreams. Take a look at my talk - https://spark-summit.org/2017/speakers/tathagata-das/ On Wed, Oct 25, 2017 at 9:29 PM, KhajaA

Re: Cases when to clear the checkpoint directories.

2017-10-09 Thread Tathagata Das
Any changes in the Java code (to be specific, the generated bytecode) in the functions you pass to Spark (i.e., map function, reduce function, as well as it closure dependencies) counts as "application code change", and will break the recovery from checkpoints. On Sat, Oct 7, 2017 at 11:53 AM, Joh

Re: Should I use Dstream or Structured Stream to transfer data from source to sink and then back from sink to source?

2017-09-14 Thread Tathagata Das
Are you sure the code is correct? A Dataset does not have a method "trigger". Rather I believe the correct code should be StreamingQuery query = resultDataSet*.writeStream.*trigger( ProcesingTime(1000)).format("kafka").start(); You can do all the same things you can do with Structured Streaming a

Re: Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread Tathagata Das
Why not set the watermark to be looser, one that works across all partitions? The main usage of watermark is to drop state. If you loosen the watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more state with older data, but you are guaranteed that you will not drop important da

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread Tathagata Das
llections library >> and I keep adding to to this heap for 24 hours and at some point I will run >> out of Java heap space right? Do I need to store TrainHistory as a >> DataSet or DataFrame instead of in memory max heap object from Java >> Collections library? >> >>

Re: Python UDF to convert timestamps (performance question)

2017-08-30 Thread Tathagata Das
1. Generally, adding columns, etc. will not affect performance because the Spark's optimizer will automatically figure out columns that are not needed and eliminate in the optimization step. So that should never be a concern. 2. Again, this is generally not a concern as the optimizer will take care

Re: Why do checkpoints work the way they do?

2017-08-29 Thread Tathagata Das
Hello, This is an unfortunate design on my part when I was building DStreams :) Fortunately, we learnt from our mistakes and built Structured Streaming the correct way. Checkpointing in Structured Streaming stores only the progress information (offsets, etc.), and the user can change their applic

Re: Time window on Processing Time

2017-08-29 Thread Tathagata Das
Yes, it can be! There is a sql function called current_timestamp() which is self-explanatory. So I believe you should be able to do something like import org.apache.spark.sql.functions._ ds.withColumn("processingTime", current_timestamp()) .groupBy(window("processingTime", "1 minute")) .count

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
dn't return the > exact solution, since it also groups by on destination. I would say the > easiest solution would be to use flatMapGroupsWithState, where you: > .groupByKey(_.train) > > and keep in state the row with the maximum time. > > On Tue, Aug 29, 2017 at

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
and in my > case I need to hold state for 24 hours which I forgot to mention in my > previous email. can I do ? > > *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 > hours"), "train", "dest").max("time")* > &

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Say, *trainTimesDataset* is the streaming Dataset of schema *[train: Int, dest: String, time: Timestamp] * *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* *SQL*: *"select train, dest, max(time) from trainTimesView group by train, dest"*// after calling *trainTimesData.crea

Re: Terminating Structured Streaming Applications on Source Failure

2017-08-29 Thread Tathagata Das
When you say "the application remained alive", do you mean the StreamingQuery stayed alive, or the whole process stayed alive? The StreamingQuery should be terminated immediately. And the stream execution threads are all daemon threads, so it should not affect the termination of the application whe

Re: Structured Streaming: multiple sinks

2017-08-25 Thread Tathagata Das
commend it. As stated above, it is > quite natural to chain processes via kafka. > > On Thu, Aug 24, 2017 at 11:03 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Responses inline. >> >> On Thu, Aug 24, 2017 at 7:16 PM, cbowden wrote: >> &

Re: Structured Streaming: multiple sinks

2017-08-24 Thread Tathagata Das
Responses inline. On Thu, Aug 24, 2017 at 7:16 PM, cbowden wrote: > 1. would it not be more natural to write processed to kafka and sink > processed from kafka to s3? > I am sorry i dont fully understand this question. Could you please elaborate further, as in, what is more natural than what?

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
Both works. The asynchronous method with listener will have less of down time, just that the first trigger/batch after the asynchronous unpersist+persist will probably take longer as it has to reload the data. On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep wrote: > Thanks tathagata

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
You can do something like this. def startQuery(): StreamingQuery = { // create your streaming dataframes // start the query with the same checkpoint directory} // handle to the active queryvar activeQuery: StreamingQuery = null while(!stopped) { if (activeQuery = null) { // if quer

Re: Spark 2.2 streaming with append mode: empty output

2017-08-14 Thread Tathagata Das
In append mode, the aggregation outputs a row only when the watermark has been crossed and the corresponding aggregate is *final*, that is, will not be updated any more. See http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking On Mon,

Re: Reusing dataframes for streaming (spark 1.6)

2017-08-09 Thread Tathagata Das
There is a DStream.transform() that does exactly this. On Tue, Aug 8, 2017 at 7:55 PM, Ashwin Raju wrote: > Hi, > > We've built a batch application on Spark 1.6.1. I'm looking into how to > run the same code as a streaming (DStream based) application. This is using > pyspark. > > In the batch ap

Re: Multiple queries on same stream

2017-08-09 Thread Tathagata Das
Its important to note that running multiple streaming queries, as of today, would read the input data that many number of time. So there is a trade off between the two approaches. So even though scenario 1 wont get great catalyst optimization, it may be more efficient overall in terms of resource u

Re: [Structured Streaming] Recommended way of joining streams

2017-08-09 Thread Tathagata Das
Writing streams into some sink (preferably fault-tolerant, exactly once sink, see docs) and then joining is definitely a possible way. But you will likely incur higher latency. If you want lower latency, then stream-stream joins is the best approach, which we are working on right now. Spark 2.3 is

Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread Tathagata Das
> > > On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das > wrote: > >> Its best to use DataFrames. You can read from as streaming or as batch. >> More details here. >> >> https://spark.apache.org/docs/latest/structured-streaming-ka >> fka-integration.html#cr

Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread Tathagata Das
Its best to use DataFrames. You can read from as streaming or as batch. More details here. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structur

  1   2   3   4   5   6   7   8   9   10   >