Support for group aggregate pandas UDF in streaming aggregation for SPARK 3.0 python

2020-08-11 Thread Aesha Dhar Roy
Hi, Is there any plan to remove the limitation mentioned below? *Streaming aggregation doesn't support group aggregate pandas UDF * We want to run our data modelling jobs real time using Spark 3.0 and kafka 2.4 and need to have support for custom aggregate pandas UDF on stream windows. Is

Re: How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

2020-02-07 Thread stevech.hu
Thanks Jungtaek. I could not remove the watermark but setting 0 works for me. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

2020-02-02 Thread Jungtaek Lim
Have you try out printing timestamp for rows in each batch and watermark while you add artificial delay on processing batch? First of all, you're technically using "processing time" in your query, where you will never have "late events" theoretically. Watermark is to handle out-of-order events and

Re: How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

2020-01-23 Thread stevech.hu
Anyone know the answers or pointers? thanks. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org

How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

2020-01-18 Thread stevech.hu
We have a scenario to group raw records by correlation id every 3 minutes and append groupped result to some HDFS store, below is an example of our query val df= records.readStream.format("SomeDataSource") .selectExpr("current_timestamp() as CurrentTime", "*") .withWatermark("Curr

Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-30 Thread Koert Kuipers
. The > clock time means nothing. > > > > *From: *Koert Kuipers > *Date: *Monday, May 28, 2018 at 6:17 PM > *To: *user > *Subject: *trying to understand structured streaming aggregation with > watermark and append outputmode > > > > hello all,

Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-30 Thread Lalwani, Jayesh
, it will not output 1 exactly 1 second after 1 arrives. The clock time means nothing. From: Koert Kuipers Date: Monday, May 28, 2018 at 6:17 PM To: user Subject: trying to understand structured streaming aggregation with watermark and append outputmode hello all, just playing with structured

Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-29 Thread Koert Kuipers
let me ask this another way: if i run this program and then feed it a single value (on nc), it returns a single result, which is an empty batch. it will not return anything else after that, no matter how long i wait. this only happens with watermarking and append output mode. what do i do to corr

trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-28 Thread Koert Kuipers
hello all, just playing with structured streaming aggregations for the first time. this is my little program i run inside sbt: import org.apache.spark.sql.functions._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", )

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-23 Thread Jungtaek Lim
ic. > > BTW: we are using 2.3.0. > > Shall I fill a new Jira for that memory leak in Spark UI? Only found > https://issues.apache.org/jira/browse/SPARK-15716 but seems something > different. > > Trying with spark.ui.enabled=false in the meantime. > > > Tathagata Das

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-23 Thread weand
y found https://issues.apache.org/jira/browse/SPARK-15716 but seems something different. Trying with spark.ui.enabled=false in the meantime. Tathagata Das wrote > Just to be clear, these screenshots are about the memory consumption of > the > driver. So this is nothing to do with streami

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Tathagata Das
Just to be clear, these screenshots are about the memory consumption of the driver. So this is nothing to do with streaming aggregation state which are kept in the memory of the executors, not the driver. On Tue, May 22, 2018 at 10:21 AM, Jungtaek Lim wrote: > 1. Could you share your Sp

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Jungtaek Lim
1. Could you share your Spark version? 2. Could you reduce "spark.sql.ui.retainedExecutions" and see whether it helps? This configuration is available in 2.3.0, and default value is 1000. Thanks, Jungtaek Lim (HeartSaVioR) 2018년 5월 22일 (화) 오후 4:29, weand 님이 작성: > You can see it even better on th

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread weand
You can see it even better on this screenshot: TOP Entries Collapsed #2 Sorry for the spam, attached a not so perfect screen in the mail before. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ ---

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread weand
Instances of org.apache.spark.sql.execution.ui.SparkPlanGraphWrapper are not cleaned up, see TOP Entries Collapsed #2: TOP Entries All TOP Entries Collapsed #1

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-19 Thread Ted Yu
Hi, w.r.t. ElementTrackingStore, since it is backed by KVStore, there should be other classes which occupy significant memory. Can you pastebin the top 10 entries among the heap dump ? Thanks

Re: OOM: Structured Streaming aggregation state not cleaned up propertly

2018-05-19 Thread weand
Nobody has any idea... ? Is filtering after aggregation in structured streaming supported but maybe buggy? See following line in the example from earlier mail... ... .where(F.expr("distinct_username >= 2")) ... -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --

OOM: Structured Streaming aggregation state not cleaned up propertly

2018-05-16 Thread weand
We implemented a streaming query with aggregation on event-time with watermark. I'm wondering why aggregation state is not cleanup up. According to documentation old aggregation state should be cleared when using watermarks. We also don't see any condition [1] for why state should not be cleanup up

Spark structured streaming aggregation within microbatch

2018-05-15 Thread Koert Kuipers
I have a streaming dataframe where I insert a uuid in every row, then join with a static dataframe (after which uuid column is no longer unique), then group by uuid and do a simple aggregation. So I know all rows with same uuid will be in same micro batch, guaranteed, correct? How do I express it

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-22 Thread Jacek Laskowski
t;> >>> >> >>> On Fri, Aug 18, 2017 at 6:35 PM, Holden Karau >> >>> wrote: >> >>> > So performing complete output without an aggregation would require >> >>> > building >> >>> > up a table of the en

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Burak Yavuz
e: > >>> > So performing complete output without an aggregation would require > >>> > building > >>> > up a table of the entire input to write out at each micro batch. This > >>> > would > >>> > get prohibitively expensive quickly.

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Jacek Laskowski
an aggregation we just need >>> > to >>> > keep track of the aggregates and update them every batch, so the memory >>> > requirement is more reasonable. >>> > >>> > (Note: I don't do a lot of work in streaming so there may b

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Holden Karau
ggregation we just need to >> > keep track of the aggregates and update them every batch, so the memory >> > requirement is more reasonable. >> > >> > (Note: I don't do a lot of work in streaming so there may be additional >> > reasons, but t

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Holden Karau
atch, so the memory > > requirement is more reasonable. > > > > (Note: I don't do a lot of work in streaming so there may be additional > > reasons, but these are the ones I remember from when I was working on > > looking at integrating ML with SS). > > > >

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Jacek Laskowski
; > On Fri, Aug 18, 2017 at 5:25 AM Jacek Laskowski wrote: >> >> Hi, >> >> Why is the requirement for a streaming aggregation in a streaming >> query? What would happen if Spark allowed Complete without a single >> aggregation? This is the latest master. >

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Holden Karau
he requirement for a streaming aggregation in a streaming > query? What would happen if Spark allowed Complete without a single > aggregation? This is the latest master. > > scala> val q = ids. > | writeStream. > | format("memory"). >

[SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Jacek Laskowski
Hi, Why is the requirement for a streaming aggregation in a streaming query? What would happen if Spark allowed Complete without a single aggregation? This is the latest master. scala> val q = ids. | writeStream. | format("memory"). | queryName("dups"

Streaming aggregation

2014-06-24 Thread john levingston
I have a use case where I cannot figure out the spark streaming way to do it. Given two kafka topics corresponding to two different types of events A and B. For each element from topic A correspond an element from topic B. Unfortunately elements can arrive separately by hours. The aggregation