Question about 'maxOffsetsPerTrigger'

2020-06-30 Thread Eric Beabes
While running my Spark (Stateful) Structured Streaming job I am setting 'maxOffsetsPerTrigger' value to 10 Million. I've noticed that messages are processed faster if I use a large value for this property. What I am also noticing is that until the batch is completely processed, no messages are get

Failure Threshold in Spark Structured Streaming?

2020-07-02 Thread Eric Beabes
Currently my job fails even on a single failure. In other words, even if one incoming message is malformed the job fails. I believe there's a property that allows us to set an acceptable number of failures. I Googled but couldn't find the answer. Can someone please help? Thanks.

Load distribution in Structured Streaming

2020-07-06 Thread Eric Beabes
In my structured streaming job I've noticed that a LOT of data keeps going to one executor whereas other executors don't process that much data. As a result, tasks on that executor take a lot of time to complete. In other words, the distribution is skewed. I believe in Structured streaming the Par

Submitting Spark Job thru REST API?

2020-09-02 Thread Eric Beabes
Under Spark 2.4 is it possible to submit a Spark job thru REST API - just like the Flink job? Here's the use case: We need to submit a Spark Job to the EMR cluster but our security team is not allowing us to submit a job from the Master node or thru UI. They want us to create a "Docker Container"

Re: Submitting Spark Job thru REST API?

2020-09-03 Thread Eric Beabes
the project is still very active. So highly recommend it to you > ><https://www.upwork.com/fl/huanqingzhu> > <https://www.tianlang.tech/>Fusion Zhu <https://www.tianlang.tech/> > > ------ > 发件人:Eric B

Re: Submitting Spark Job thru REST API?

2020-09-14 Thread Eric Beabes
ay to upload the JAR file prior to running this? Get the Id of this file & then submit the Spark job. Kinda like how Flink does it. I realize this is an Apache Livy question so I will also ask on their mailing list. Thanks. On Thu, Sep 3, 2020 at 11:47 AM Eric Beabes wrote: > Thank you all

States get dropped in Structured Streaming

2020-10-22 Thread Eric Beabes
We're using Stateful Structured Streaming in Spark 2.4. We are noticing that when the load on the system is heavy & LOTs of messages are coming in some of the states disappear with no error message. Any suggestions on how we can debug this? Any tips for fixing this? Thanks in advance.

Debugging tools for Spark Structured Streaming

2020-10-29 Thread Eric Beabes
We're using Spark 2.4. We recently pushed to production a product that's using Spark Structured Streaming. It's working well most of the time but occasionally, when the load is high, we've noticed that there are only 10+ 'Active Tasks' even though we've provided 128 cores. Would like to debug this

Cannot perform operation after producer has been closed

2020-11-02 Thread Eric Beabes
I know this is related to Kafka but it happens during the Spark Structured Streaming job that's why I am asking on this mailing list. How would you debug this or get around this in Spark Structured Streaming? Any tips would be appreciated. Thanks. java.lang.IllegalStateException: Cannot perform

Re: Cannot perform operation after producer has been closed

2020-11-10 Thread Eric Beabes
;d like to check > whether your case is bound to the known issue or not. > > https://issues.apache.org/jira/browse/SPARK-21869 > > > On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes > wrote: > >> I know this is related to Kafka but it happens during the Spark >> Structured

Re: Cannot perform operation after producer has been closed

2020-11-10 Thread Eric Beabes
ov 10, 2020 at 11:17 AM Eric Beabes wrote: > Thanks for the reply. We are on Spark 2.4. Is there no way to get this > fixed in Spark 2.4? > > On Mon, Nov 2, 2020 at 8:32 PM Jungtaek Lim > wrote: > >> Which Spark version do you use? There's a known issue on Kafka produ

Blacklisting in Spark Stateful Structured Streaming

2020-11-10 Thread Eric Beabes
Currently we’ve a “Stateful” Spark Structured Streaming job that computes aggregates for each ID. I need to implement a new requirement which says that if the no. of incoming messages for a particular ID exceeds a certain value then add this ID to a blacklist & remove the state for it. Going forwar

Re: Cannot perform operation after producer has been closed

2020-11-18 Thread Eric Beabes
been asked to rewrite the code in Flink*. Moving to Spark 3.0 is not an easy option 'cause Cloudera 6.2 doesn't have a Spark 3.0 parcel So we can't upgrade to 3.0. So sad. Let me ask one more time. *Is there no way to fix this in Spark 2.4?* On Tue, Nov 10, 2020 at 11:33 AM Eric Bea

Re: Cannot perform operation after producer has been closed

2020-11-19 Thread Eric Beabes
ough time to migrate to > Spark 3. > > > On Wed, Nov 18, 2020 at 11:12 PM Eric Beabes > wrote: > >> I must say.. *Spark has let me down in this case*. I am surprised an >> important issue like this hasn't been fixed in Spark 2.4. >> >> I am fighting a batt

Re: Blacklisting in Spark Stateful Structured Streaming

2020-11-20 Thread Eric Beabes
t;stateful" SS job, > the blacklisting structure can be put into the user-defined state. > To use a 3rd-party cache should also be a good choice. > > Eric Beabes 于2020年11月11日周三 上午6:54写道: > >> Currently we’ve a “Stateful” Spark Structured Streaming job that computes >

Re: Cannot perform operation after producer has been closed

2020-12-09 Thread Eric Beabes
, Nov 20, 2020 at 7:30 AM Gabor Somogyi wrote: > Happy that saved some time for you :) > We've invested quite an effort in the latest releases into streaming and > hope there will be less and less headaches like this. > > On Thu, Nov 19, 2020 at 5:55 PM Eric Beabes > wrote:

Re: Understanding Executors UI

2021-01-07 Thread Eric Beabes
Spark memory management is documented at > https://spark.apache.org/docs/latest/tuning.html#memory-management-overview > > > Additional resource: see also this diagram > https://canali.web.cern.ch/docs/SparkExecutorMemory.png and > https://db-blog.web.cern.ch/blog/luca-canali/2020-08

Data source v2 streaming sinks does not support Update mode

2021-01-11 Thread Eric Beabes
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below: org.apache.spark spark-sql-kafka-0-10_${scala.binary.version} 3.1.0 Every time I run it under Spark 3.0, I get this message: *Data source v2 streaming

Re: Understanding Executors UI

2021-01-12 Thread Eric Beabes
gt; >> For example look at the details per executor (the numbers you reported >> are aggregated values), then also look at the “storage tab” for a list of >> cached RDDs with details. >> >> In case, Spark 3.0 has improved memory instrumentation and improved >> instru

Re: Data source v2 streaming sinks does not support Update mode

2021-01-12 Thread Eric Beabes
because you've said you've used Spark 3.0 but spark-sql-kafka > dependency pointed to 3.1.0.) > > On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes > wrote: > >> org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 >> streaming sinks does not support

Re: Data source v2 streaming sinks does not support Update mode

2021-01-13 Thread Eric Beabes
uild >> script. >> >> Thanks in advance! >> >> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes >> wrote: >> >>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0. >>> >>> On Wed, Jan 13, 2021 at 11:35 AM Jungta

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
org.scalastyle scalastyle-maven-plugin 1.0.0 false true true false ${project.basedir}/src/main/scala ${project.basedir}/src/test/scala lib/scalastyle_config

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
e a jira and commit the > code into github? > It would speed things up a lot. > > G > > > On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes > wrote: > >> Here's a very simple reproducer app. I've attached 3 files: >> SparkTest.scala, QueryListener.

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
e.org/jira/projects/SPARK/summary> and >>> regarding the repo, I believe just commit it to your personal repo and that >>> should be it. >>> >>> Regards >>> >>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes >>> wrote: >>> >

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Eric Beabes
Confirmed. The cluster Admin said his team installed the latest version from Cloudera which comes with Spark 3.0.0-preview2. They are going to try to upgrade it with the Community edition Spark 3.1.0. Thanks Jungtaek for the tip. Greatly appreciate it. On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes

Re: Data source v2 streaming sinks does not support Update mode

2021-01-19 Thread Eric Beabes
Will do, thanks! On Tue, Jan 19, 2021 at 1:39 PM Gabor Somogyi wrote: > Thanks for double checking the version. Please report back with 3.1 > version whether it works or not. > > G > > > On Tue, 19 Jan 2021, 07:41 Eric Beabes, wrote: > >> Confirmed. The cluster

Re: Only one Active task in Spark Structured Streaming application

2021-01-21 Thread Eric Beabes
t; >> https://about.me/JacekLaskowski >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> >> >> On Thu, Jan 21, 2021

Re: Only one Active task in Spark Structured Streaming application

2021-01-21 Thread Eric Beabes
fter a long time. Some memory leak in your app > putting GC/memory pressure on the JVM, etc too. > > On Thu, Jan 21, 2021 at 5:13 AM Eric Beabes > wrote: > >> Hello, >> >> My Spark Structured Streaming application was performing well for quite >> some ti

Spark doesn't add _SUCCESS file when 'partitionBy' is used

2021-04-05 Thread Eric Beabes
When I do the following, Spark( 2.4) doesn't put _SUCCESS file in the partition directory: val outputPath = s"s3://mybucket/$table" df .orderBy(time) .coalesce(numFiles) .write .partitionBy("partitionDate") .mode("overwrite") .format("parquet") .save(outputPath) But when I remove 'partitionBy'

Stream which needs to be “joined” with another Stream of “Reference” data.

2021-05-03 Thread Eric Beabes
I would like to develop a Spark Structured Streaming job that reads messages in a Stream which needs to be “joined” with another Stream of “Reference” data. For example, let’s say I’m reading messages from Kafka coming in from (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

2021-05-03 Thread Eric Beabes
wn risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such lo

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

2021-05-03 Thread Eric Beabes
no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> ‪On Mon, 3 May 2021 at 18:27, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ < >> yur...@gmail.com> wrote:‬ >> >>> You

NullPointerException in SparkSession while reading Parquet files on S3

2021-05-25 Thread Eric Beabes
I keep getting the following exception when I am trying to read a Parquet file from a Path on S3 in Spark/Scala. Note: I am running this on EMR. java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144) at org.apache.spark

Reading parquet files in parallel on the cluster

2021-05-25 Thread Eric Beabes
I've a use case in which I need to read Parquet files in parallel from over 1000+ directories. I am doing something like this: val df = list.toList.toDF() df.foreach(c => { val config = *getConfigs()* doSomething(spark, config) }) In the doSomething method, when I try to

Re: Reading parquet files in parallel on the cluster

2021-05-25 Thread Eric Beabes
> > val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths) > > > > *From: *Eric Beabes > *Date: *Tuesday, May 25, 2021 at 1:24 PM > *To: *spark-user > *Subject: *Reading parquet files in parallel on the cluster > > > > I've a use case in which

Re: Reading parquet files in parallel on the cluster

2021-05-25 Thread Eric Beabes
arquet, for > example. You would just have 10s or 100s of those jobs running at the same > time. You have to write a bit of async code to do it, but it's pretty easy > with Scala Futures. > > On Tue, May 25, 2021 at 3:31 PM Eric Beabes > wrote: > >> Here's

Re: Reading parquet files in parallel on the cluster

2021-05-25 Thread Eric Beabes
ay be significant. But it seems like the > simplest thing and will probably work fine. > > On Tue, May 25, 2021 at 4:34 PM Eric Beabes > wrote: > >> Right... but the problem is still the same, no? Those N Jobs (aka Futures >> or Threads) will all be running on the Driver.

Naming files while saving a Dataframe

2021-07-16 Thread Eric Beabes
We've two (or more) jobs that write data into the same directory via a Dataframe.save method. We need to be able to figure out which job wrote which file. Maybe provide a 'prefix' to the file names. I was wondering if there's any 'option' that allows us to do this. Googling didn't come up with any

Re: Naming files while saving a Dataframe

2021-07-17 Thread Eric Beabes
yan guha wrote: > IMHO - this is a bad idea esp in failure scenarios. > > How about creating a subfolder each for the jobs? > > On Sat, 17 Jul 2021 at 9:11 am, Eric Beabes > wrote: > >> We've two (or more) jobs that write data into the same directory via a >> Da

Re: Naming files while saving a Dataframe

2021-07-17 Thread Eric Beabes
:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary da

Re: Naming files while saving a Dataframe

2021-07-17 Thread Eric Beabes
own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss,

Replacing BroadcastNestedLoopJoin

2021-08-12 Thread Eric Beabes
We’ve two datasets that look like this: Dataset A: App specific data that contains (among other fields): ip_address Dataset B: Location data that contains start_ip_address_int, end_ip_address_int, latitude, longitude We’re (left) joining these two datasets as: A.ip_address >= B.start_ip_address

Re: Naming files while saving a Dataframe

2021-08-12 Thread Eric Beabes
pends on Hadoop writing files. You can try to set the > Hadoop property: mapreduce.output.basename > > > https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkContext.html#hadoopConfiguration-- > > > Am 18.07.2021 um 01:15 schrieb Eric Beabes : > >  >

Re: Spark Structured Streaming: “earliest” as “startingOffsets” is not working

2020-06-26 Thread Eric Beabes
My apologies... After I set the 'maxOffsetsPerTrigger' to a value such as '20' it started working. Hopefully this will help someone. Thanks. On Fri, Jun 26, 2020 at 2:12 PM Something Something < mailinglist...@gmail.com> wrote: > My Spark Structured Streaming job works fine when I set "start