Re: Are all transformations lazy?

2014-07-25 Thread Rico
It may be confusing at first but there is also an important difference
between reduce and reduceByKey operations. 

reduce is an action on an RDD. Hence, it will request the evaluation of
transformations that resulted to the RDD.

In contrast, reduceByKey is a transformation on PairRDDs, not an action.
Therefore, distinct is implemented as a chain of transformations as below: 

map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Are-all-transformations-lazy-tp2582p10675.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Caching issue with msg: RDD block could not be dropped from memory as it does not exist

2014-07-25 Thread Rico
I could find out the issue. In fact, I did not realize before that when
loaded into memory, the data is deserialized. As a result, what seems to be
a 21Gb dataset occupies 77Gb in memory. 

Details about this is clearly explained in the guide on  serialization and
memory tuning

 
.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248p10677.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: memory leak query

2014-07-25 Thread Rico
Hi Michael, 

I have  similar question

  
before. My problem was that my data was too large to be cached in memory
because of serialization.

But I tried to reproduce your test and I did not experience any memory
problem. First, since count operates on the same rdd, it should not increase
the memory usage. Second, since you do not cache the rdd, each new action
such as count will simply reload the data.

I am not sure how much memory you have in your machine, but by default Spark
allocates 512M for each executor and spark.memory.fraction is set to 0.6,
which means you virtually have about 360Mbyte in reality. If you are running
your app on local machine, then you can monitor it by opening the GUI on
your browser using localhost:4040



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/memory-leak-query-tp8961p10679.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark DataFrame CodeGeneration in Java generates Scala specific code?

2021-04-29 Thread Rico Bergmann

Hi all!

A simplified code snippet of what my Spark pipeline written in Java does:

public class MyPojo implements Serializable {

... // some fields with Getter and Setter

}


a custom Aggregator (defined in the Driver class):

public static MyAggregator extends 
org.apache.spark.sql.expressions.Aggregator { ... }



in my Driver I do:

Dataset inputDF = ... //some calculations before

inputDF.groupBy("col1", "col2", "col3").agg(new 
MyAggregator().toColumn().name("aggregated");



When executing this part I get a CompileException complaining about an 
unknown variable or type "MyPojo$.MODULE$". For me it looks like the 
CodeGenerator generates code for Scala (since as far as I know .MODULE$ 
is a scala specific variable). I tried it with Spark 3.1.1 and Spark 3.0.1.


Does anyone have an idea what's going wrong here?


Best,

Rico.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark DataFrame CodeGeneration in Java generates Scala specific code?

2021-04-29 Thread Rico Bergmann
s.scala:559)
   at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$3(AggregationIterator.scala:199)
   at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$3$adapted(AggregationIterator.scala:199)
   at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7(AggregationIterator.scala:213)
   at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7$adapted(AggregationIterator.scala:207)
   at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:159)
   at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:78)
   at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:129)
   at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
   at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:859)
   at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:859)
   at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
   at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
   at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   at org.apache.spark.scheduler.Task.run(Task.scala:127)
   at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
   at 
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
   at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 35, Column 78: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 35, 
Column 78: Unknown variable or type 
"com.asml.sara.foundation.data.waferDomainModel.LotDataRecord$.MODULE$"
   at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1382)
   at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1467)
   at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1464)
   at 
org.sparkproject.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
   at 
org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
   ... 37 more
 
 


> Am 29.04.2021 um 15:30 schrieb Sean Owen :
> 
> 
> I don't know this code well, but yes seems like something is looking for 
> members of a companion object when there is none here. Can you show any more 
> of the stack trace or generated code?
> 
>> On Thu, Apr 29, 2021 at 7:40 AM Rico Bergmann  wrote:
>> Hi all!
>> 
>> A simplified code snippet of what my Spark pipeline written in Java does:
>> 
>> public class MyPojo implements Serializable {
>> 
>> ... // some fields with Getter and Setter
>> 
>> }
>> 
>> 
>> a custom Aggregator (defined in the Driver class):
>> 
>> public static MyAggregator extends 
>> org.apache.spark.sql.expressions.Aggregator { ... }
>> 
>> 
>> in my Driver I do:
>> 
>> Dataset inputDF = ... //some calculations before
>> 
>> inputDF.groupBy("col1", "col2", "col3").agg(new 
>> MyAggregator().toColumn().name("aggregated");
>> 
>> 
>> When executing this part I get a CompileException complaining about an 
>> unknown variable or type "MyPojo$.MODULE$". For me it looks like the 
>> CodeGenerator generates code for Scala (since as far as I know .MODULE$ 
>> is a scala specific variable). I tried it with Spark 3.1.1 and Spark 3.0.1.
>> 
>> Does anyone have an idea what's going wrong here?
>> 
>> 
>> Best,
>> 
>> Rico.
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


Re: Spark DataFrame CodeGeneration in Java generates Scala specific code?

2021-04-29 Thread Rico Bergmann
It didn’t have it. So I added public no args and all args constructors. But I 
still get the same error



> Am 29.04.2021 um 17:47 schrieb Sean Owen :
> 
> 
> From tracing the code a bit, it might do this if the POJO class has no public 
> constructors - does it?
> 
>> On Thu, Apr 29, 2021 at 9:55 AM Rico Bergmann  wrote:
>> Here is the relevant generated code and the Exception stacktrace. 
>> 
>> The problem in the generated code is at line 35. 
>> 


Re: Spark DataFrame CodeGeneration in Java generates Scala specific code?

2021-04-29 Thread Rico Bergmann
Indeed adding public constructors solved the problem...

Thanks a lot!


> Am 29.04.2021 um 18:53 schrieb Rico Bergmann :
> 
> 
> It didn’t have it. So I added public no args and all args constructors. But I 
> still get the same error
> 
> 
> 
>>> Am 29.04.2021 um 17:47 schrieb Sean Owen :
>>> 
>> 
>> From tracing the code a bit, it might do this if the POJO class has no 
>> public constructors - does it?
>> 
>>> On Thu, Apr 29, 2021 at 9:55 AM Rico Bergmann  wrote:
>>> Here is the relevant generated code and the Exception stacktrace. 
>>> 
>>> The problem in the generated code is at line 35. 
>>> 


Cast int to string not possible?

2022-02-16 Thread Rico Bergmann
Hi!

I am reading a partitioned dataFrame into spark using automatic type inference 
for the partition columns. For one partition column the data contains an 
integer, therefor Spark uses IntegerType for this column. In general this is 
supposed to be a StringType column. So I tried to cast this column to 
StringType. But this fails with AnalysisException “cannot cast int to string”.

Is this a bug? Or is it really not allowed to cast an int to a string?

I’m using Spark 3.1.1

Best regards

Rico. 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Cast int to string not possible?

2022-02-16 Thread Rico Bergmann
Here is the code snippet:

var df = session.read().parquet(basepath);
for(Column partition : partitionColumnsList){
  df = df.withColumn(partition.getName(), 
df.col(partition.getName()).cast(partition.getType()));
}

Column is a class containing Schema Information, like for example the name of 
the column and the data type of the column. 

Best, Rico.

> Am 17.02.2022 um 03:17 schrieb Morven Huang :
> 
> Hi Rico, you have any code snippet? I have no problem casting int to string.
> 
>> 2022年2月17日 上午12:26,Rico Bergmann  写道:
>> 
>> Hi!
>> 
>> I am reading a partitioned dataFrame into spark using automatic type 
>> inference for the partition columns. For one partition column the data 
>> contains an integer, therefor Spark uses IntegerType for this column. In 
>> general this is supposed to be a StringType column. So I tried to cast this 
>> column to StringType. But this fails with AnalysisException “cannot cast int 
>> to string”.
>> 
>> Is this a bug? Or is it really not allowed to cast an int to a string?
>> 
>> I’m using Spark 3.1.1
>> 
>> Best regards
>> 
>> Rico. 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Cast int to string not possible?

2022-02-17 Thread Rico Bergmann
hi!

Casting another int column that is not a partition column fails with the same 
error. 

The Schema before the cast (column names are anonymized):

root
|-- valueObject: struct (nullable = true)
||-- value1: string (nullable = true)
||-- value2: string (nullable = true)
||-- value3: timestamp (nullable = true)
||-- value4: string (nullable = true)
|-- partitionColumn2: string (nullable = true)
|-- partitionColumn3: timestamp (nullable = true)
|-- partitionColumn1: integer (nullable = true)

I wanted to cast partitionColumn1 to String which gives me the described error. 

Best,
Rico


> Am 17.02.2022 um 09:56 schrieb ayan guha :
> 
> Can you try to cast any other Int field which is NOT a partition column? 
> 
> On Thu, 17 Feb 2022 at 7:34 pm, Gourav Sengupta  
> wrote:
>> Hi,
>> 
>> This appears interesting, casting INT to STRING has never been an issue for 
>> me.
>> 
>> Can you just help us with the output of : df.printSchema()  ?
>> 
>> I prefer to use SQL, and the method I use for casting is: CAST(<> name>> AS STRING) <>.
>> 
>> Regards,
>> Gourav
>> 
>> 
>> 
>> 
>> 
>> 
>> On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann  wrote:
>>> Here is the code snippet:
>>> 
>>> var df = session.read().parquet(basepath);
>>> for(Column partition : partitionColumnsList){
>>>   df = df.withColumn(partition.getName(), 
>>> df.col(partition.getName()).cast(partition.getType()));
>>> }
>>> 
>>> Column is a class containing Schema Information, like for example the name 
>>> of the column and the data type of the column. 
>>> 
>>> Best, Rico.
>>> 
>>> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
>>> > 
>>> > Hi Rico, you have any code snippet? I have no problem casting int to 
>>> > string.
>>> > 
>>> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
>>> >> 
>>> >> Hi!
>>> >> 
>>> >> I am reading a partitioned dataFrame into spark using automatic type 
>>> >> inference for the partition columns. For one partition column the data 
>>> >> contains an integer, therefor Spark uses IntegerType for this column. In 
>>> >> general this is supposed to be a StringType column. So I tried to cast 
>>> >> this column to StringType. But this fails with AnalysisException “cannot 
>>> >> cast int to string”.
>>> >> 
>>> >> Is this a bug? Or is it really not allowed to cast an int to a string?
>>> >> 
>>> >> I’m using Spark 3.1.1
>>> >> 
>>> >> Best regards
>>> >> 
>>> >> Rico. 
>>> >> 
>>> >> -
>>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >> 
>>> > 
>>> > 
>>> > -
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> > 
>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> -- 
> Best Regards,
> Ayan Guha


Re: Cast int to string not possible?

2022-02-17 Thread Rico Bergmann
I found the reason why it did not work:

When returning the Spark data type I was calling new StringType(). When 
changing it to DataTypes.StringType it worked. 

Greets,
Rico. 

> Am 17.02.2022 um 14:13 schrieb Gourav Sengupta :
> 
> 
> Hi,
> 
> can you please post a screen shot of the exact CAST statement that you are 
> using? Did you use the SQL method mentioned by me earlier?
> 
> Regards,
> Gourav Sengupta
> 
>> On Thu, Feb 17, 2022 at 12:17 PM Rico Bergmann  wrote:
>> hi!
>> 
>> Casting another int column that is not a partition column fails with the 
>> same error. 
>> 
>> The Schema before the cast (column names are anonymized):
>> 
>> root
>> |-- valueObject: struct (nullable = true)
>> ||-- value1: string (nullable = true)
>> ||-- value2: string (nullable = true)
>> ||-- value3: timestamp (nullable = true)
>> ||-- value4: string (nullable = true)
>> |-- partitionColumn2: string (nullable = true)
>> |-- partitionColumn3: timestamp (nullable = true)
>> |-- partitionColumn1: integer (nullable = true)
>> 
>> I wanted to cast partitionColumn1 to String which gives me the described 
>> error. 
>> 
>> Best,
>> Rico
>> 
>> 
>>>> Am 17.02.2022 um 09:56 schrieb ayan guha :
>>>> 
>>> 
>>> Can you try to cast any other Int field which is NOT a partition column? 
>>> 
>>>> On Thu, 17 Feb 2022 at 7:34 pm, Gourav Sengupta 
>>>>  wrote:
>>>> Hi,
>>>> 
>>>> This appears interesting, casting INT to STRING has never been an issue 
>>>> for me.
>>>> 
>>>> Can you just help us with the output of : df.printSchema()  ?
>>>> 
>>>> I prefer to use SQL, and the method I use for casting is: CAST(<>>> name>> AS STRING) <>.
>>>> 
>>>> Regards,
>>>> Gourav
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann  
>>>>> wrote:
>>>>> Here is the code snippet:
>>>>> 
>>>>> var df = session.read().parquet(basepath);
>>>>> for(Column partition : partitionColumnsList){
>>>>>   df = df.withColumn(partition.getName(), 
>>>>> df.col(partition.getName()).cast(partition.getType()));
>>>>> }
>>>>> 
>>>>> Column is a class containing Schema Information, like for example the 
>>>>> name of the column and the data type of the column. 
>>>>> 
>>>>> Best, Rico.
>>>>> 
>>>>> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
>>>>> > 
>>>>> > Hi Rico, you have any code snippet? I have no problem casting int to 
>>>>> > string.
>>>>> > 
>>>>> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
>>>>> >> 
>>>>> >> Hi!
>>>>> >> 
>>>>> >> I am reading a partitioned dataFrame into spark using automatic type 
>>>>> >> inference for the partition columns. For one partition column the data 
>>>>> >> contains an integer, therefor Spark uses IntegerType for this column. 
>>>>> >> In general this is supposed to be a StringType column. So I tried to 
>>>>> >> cast this column to StringType. But this fails with AnalysisException 
>>>>> >> “cannot cast int to string”.
>>>>> >> 
>>>>> >> Is this a bug? Or is it really not allowed to cast an int to a string?
>>>>> >> 
>>>>> >> I’m using Spark 3.1.1
>>>>> >> 
>>>>> >> Best regards
>>>>> >> 
>>>>> >> Rico. 
>>>>> >> 
>>>>> >> -
>>>>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>> >> 
>>>>> > 
>>>>> > 
>>>>> > -
>>>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>> > 
>>>>> 
>>>>> 
>>>>> -
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>> 
>>> -- 
>>> Best Regards,
>>> Ayan Guha


Problem running Kubernetes example v2.2.0-kubernetes-0.5.0

2018-04-11 Thread Rico Bergmann
Hi!

I was trying to get the SparkPi example running using the spark-on-k8s
distro from kubespark. But I get the following error:
+ /sbin/tini -s -- driver
[FATAL tini (11)] exec driver failed: No such file or directory

Did anyone get the example running on a Kubernetes cluster?

Best,
Rico.

invoked cmd:
bin/spark-submit \
  --deploy-mode cluster \
  --class org.apache.spark.examples.SparkPi \
  --master k8s://https://cluster:port \
  --conf spark.executor.instances=2 \
  --conf spark.app.name=spark-pi \
  --conf
spark.kubernetes.container.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.5.0
\
  --conf
spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.5.0
\
  --conf
spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.2.0-kubernetes-0.5.0
\
 
local:///opt/spark/examples/jars/spark-examples_2.11-v2.2.0-kubernetes-0.5.0.jar

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Strange codegen error for SortMergeJoin in Spark 2.2.1

2018-06-05 Thread Rico Bergmann
Hi!

I get a strange error when executing a complex SQL-query involving 4
tables that are left-outer-joined:

Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 37, Column 18: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 37, 
Column 18: No applicable constructor/method found for actual parameters "int"; 
candidates are: 
"org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(org.apache.spark.memory.TaskMemoryManager,
 org.apache.spark.storage.BlockManager, 
org.apache.spark.serializer.SerializerManager, org.apache.spark.TaskContext, 
int, long, int, int)", 
"org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(int, int)"

...

/* 037 */ smj_matches = new 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647);

The same query works with Spark 2.2.0.

I checked the Spark source code and saw that in
ExternalAppendOnlyUnsafeRowArray a second int was introduced into the
constructor in 2.2.1

But looking at the codegeneration part of SortMergeJoinExec:

// A list to hold all matched rows from right side. val matches = 
ctx.freshName("matches")
val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName

val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold

ctx.addMutableState(clsName, matches, s"$matches= new 
$clsName($inMemoryThreshold, $spillThreshold);")

it should get 2 parameters, not just one.


May be anyone has an idea?


Best,

Rico.



Re: Strange codegen error for SortMergeJoin in Spark 2.2.1

2018-06-08 Thread Rico Bergmann
Hi!


I finally found the problem. I was not aware, that the program was run
in Client mode. The client used version 2.2.0. This caused the problem.

Best,

Rico.


Am 07.06.2018 um 08:49 schrieb Kazuaki Ishizaki:
> Thank you for reporting a problem.
> Would it be possible to create a JIRA entry with a small program that
> can reproduce this problem?
>
> Best Regards,
> Kazuaki Ishizaki
>
>
>
> From:        Rico Bergmann 
> To:        "user@spark.apache.org" 
> Date:        2018/06/05 19:58
> Subject:        Strange codegen error for SortMergeJoin in Spark 2.2.1
> 
>
>
>
> Hi!
>
> I get a strange error when executing a complex SQL-query involving 4
> tables that are left-outer-joined:
> Caused by: org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 37, Column 18: failed to compile:
> org.codehaus.commons.compiler.CompileException: File 'generated.java',
> Line 37, Column 18: No applicable constructor/method found for actual
> parameters "int"; candidates are:
> "org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(org.apache.spark.memory.TaskMemoryManager,org.apache.spark.storage.BlockManager,
> org.apache.spark.serializer.SerializerManager,
> org.apache.spark.TaskContext, int, long, int, int)",
> "org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(int,
> int)"
>
> ...
>
> /* 037 */     smj_matches = new
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647);
>
> The same query works with Spark 2.2.0.
>
> I checked the Spark source code and saw that in
> ExternalAppendOnlyUnsafeRowArray a second int was introduced into the
> constructor in 2.2.1
>
> But looking at the codegeneration part of SortMergeJoinExec:
>
> // A list to hold all matched rows from right side.
> *val *matches = ctx.freshName("matches")
> *val *clsName = /classOf/[ExternalAppendOnlyUnsafeRowArray].getName
>
> *val *spillThreshold = getSpillThreshold
> *val *inMemoryThreshold = getInMemoryThreshold
>
> ctx.addMutableState(clsName, matches,
>  s"*$*matches= new *$*clsName(*$*inMemoryThreshold, *$*spillThreshold);")
>
> it should get 2 parameters, not just one.
>
> May be anyone has an idea?
>
> Best,
>
> Rico.
>
>



Re: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Case class

2018-11-16 Thread Rico B.
Did you or anyone else find a solution to this problem? I'm stuck with the
same Issue ...



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Structured Streaming Microbatch Semantics

2021-03-05 Thread Dipl.-Inf. Rico Bergmann

Hi all!

I'm using Spark structured streaming for a data ingestion pipeline. 
Basically the pipeline reads events (notifications of new available 
data) from a Kafka topic and then queries a REST endpoint to get the 
real data (within a flatMap).


For one single event the pipeline creates a few thousand records (rows) 
that have to be stored. And to write the data I use foreachBatch().


My question is now: Is it guaranteed by Spark that all output records of 
one event are always contained in a single batch or can the records also 
be split into multiple batches?



Best,

Rico.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured Streaming Microbatch Semantics

2021-03-05 Thread Dipl.-Inf. Rico Bergmann

Hi!

As abstract code what I do in my streaming program is:

readStream() //from Kafka

.flatMap(readIngestionDatasetViaREST) //can return thousands of records 
for a single event


.writeStream.outputMode("append").foreachBatch(upsertIntoDeltaTable).start()


I don't use triggers but I limit the number of events per trigger in the 
Kafka reader.



What do you mean with process rate below batch duration? The process 
rate is records per sec. (in my current deployment it's approx. 1), 
batch duration is sec. (at around 60 sec.)



Best,

Rico

Am 05.03.2021 um 10:58 schrieb Mich Talebzadeh:

Hi Ricco,

Just to clarify, your batch interval  may have a variable number of 
rows sent to Kafka topic for each event?


In your writeStream code

 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 foreachBatch(SendToBigQuery). \
 trigger(processingTime='2 seconds'). \
                     start()


Have you defined trigger(processingTime)? That is equivalent to your 
sliding interval.


In general, processingTime == bath interval (the event).

In Spark GUI, under Structured streaming, you have Input Rate, Process 
Rate and Batch Duration. Your process Rate has to be below Batch 
Duration. ForeachBatch will process all the data come in before moving 
to the next batch. It is up to the designer to ensure that the 
processing time is below the event so Spark can process it.


HTH


LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw 
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>/




*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.




On Fri, 5 Mar 2021 at 08:06, Dipl.-Inf. Rico Bergmann 
mailto:i...@ricobergmann.de>> wrote:


Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records
(rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output
records of
one event are always contained in a single batch or can the
records also
be split into multiple batches?


Best,

Rico.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>



Re: Structured Streaming Microbatch Semantics

2021-03-05 Thread Dipl.-Inf. Rico Bergmann

Hi!

Thanks for your reply!

For several reasons we don't want to "pipe" the real data through Kafka.

What may be a problem arising from this approach?

Best,

Rico.


Am 05.03.2021 um 09:18 schrieb Roland Johann:

Hi Rico,

there is no way to deferr records from one micro batch to the next 
one. So it‘s guaranteed that the data and trigger event will be 
processed within the dame batch.


I assume that one trigger event lead to an unknown batch size of 
actual events pulled via HTTP. This bypasses throughput properties of 
spark streaming. Depending on the amount of the resulting HTTP 
records, maybe you consider splitting the pipeline into two parts:

- process trigger event, pull data from HTTP, write to kafka
- perform structured streaming ingestion

Kind regards

Dipl.-Inf. Rico Bergmann <mailto:i...@ricobergmann.de>> schrieb am Fr. 5. März 2021 um 09:06:


Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records
(rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output
records of
one event are always contained in a single batch or can the
records also
be split into multiple batches?


Best,

Rico.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>

--
Roland Johann
Data Architect/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: roland.joh...@phenetic.io <mailto:roland.joh...@phenetic.io>
Web: phenetic.io <http://phenetic.io>

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann


Updating Broadcast Variable in Spark Streaming 2.4.4

2022-07-22 Thread Dipl.-Inf. Rico Bergmann

Hi folks!

I'm trying to implement an update of a broadcast var in Spark Streaming. 
The idea is that whenever some configuration value has changed (this is 
periodically checked by the driver) the existing broadcast variable is 
unpersisted and then (re-)broadcasted.


In a local test setup (using a local Spark) it works fine but on a real 
cluster it doesn't work. The broadcast variable never gets updated. Am I 
doing something wrong? Or is this simply not possible? Or a bug?


Code snippet:

@RequiredArgsConstructor
public class BroadcastUpdater implements Runnable {
    private final transient JavaSparkContext sparkContext;
    @Getter
    private transient volatile Broadcast> broadcastVar;
    private transient Map configMap;

    public void run() {
    Map configMap = getConfigMap();
    if (this.broadcastVar == null || 
!configMap.equals(this.configMap)) {

        this.configMap = configMap;
        if (broadcastVar != null) {
            broadcastVar.unpersist(true);
            broadcastVar.destroy(true);
        }
        this.broadcastVar = 
this.sparkContext.broadcast(this.configMap);

    }
    }

    private Map getConfigMap() {
    //impl details
    }
}

public class StreamingFunction implements Serializable {

    private transient volatile BroadcastUpdater broadcastUpdater;

    protected JavaStreamingContext startStreaming(JavaStreamingContext 
context, ConsumerStrategy consumerStrategy) {

    broadcastUpdater = new BroadcastUpdater(context.sparkContext());
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new 
ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0, 
3, TimeUnit.SECONDS);


    final JavaInputDStreamChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,

            LocationStrategies.PreferConsistent(), consumerStrategy);

    inputStream.foreachRDD(rdd -> {
        Broadcast> broadcastVar = 
broadcastUpdater.getBroadcastVar();

        rdd.foreachPartition(partition -> {
            if (partition.hasNext()) {
                Map configMap = 
broadcastVar.getValue();


                // iterate
                while (partition.hasNext()) {
                    //impl logic using broadcast variable
                }
            }
        }
    }
    }
}

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Updating Broadcast Variable in Spark Streaming 2.4.4

2022-09-28 Thread Dipl.-Inf. Rico Bergmann

Hi folks!


I'm trying to implement an update of a broadcast var in Spark Streaming. 
The idea is that whenever some configuration value has changed (this is 
periodically checked by the driver) the existing broadcast variable is 
unpersisted and then (re-)broadcasted.


In a local test setup (using a local Spark) it works fine but on a real 
cluster it doesn't work. The broadcast variable never gets updated. What 
I can see after adding some log messages is that the BroadcastUpdater 
thread is only called twice and then never again. Anyone any idea why 
this happens?


Code snippet:

@RequiredArgsConstructor
public class BroadcastUpdater implements Runnable {
    private final transient JavaSparkContext sparkContext;
    @Getter
    private transient volatile Broadcast> broadcastVar;
    private transient volatile Map configMap;

    public void run() {
    Map configMap = getConfigMap();
    if (this.broadcastVar == null || 
!configMap.equals(this.configMap)) {

        this.configMap = configMap;
        if (broadcastVar != null) {
            broadcastVar.unpersist(true);
            broadcastVar.destroy(true);
        }
        this.broadcastVar = 
this.sparkContext.broadcast(this.configMap);

    }
    }

    private Map getConfigMap() {
    //impl details
    }
}

public class StreamingFunction implements Serializable {

    private transient volatile BroadcastUpdater broadcastUpdater;
    private transient ScheduledThreadPoolExecutor 
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);


    protected JavaStreamingContext startStreaming(JavaStreamingContext 
context, ConsumerStrategy consumerStrategy) {

    broadcastUpdater = new BroadcastUpdater(context.sparkContext());
scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0, 
3, TimeUnit.SECONDS);


    final JavaInputDStreamChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,

            LocationStrategies.PreferConsistent(), consumerStrategy);

    inputStream.foreachRDD(rdd -> {
        Broadcast> broadcastVar = 
broadcastUpdater.getBroadcastVar();

        rdd.foreachPartition(partition -> {
            if (partition.hasNext()) {
                Map configMap = 
broadcastVar.getValue();


                // iterate
                while (partition.hasNext()) {
                    //impl logic using broadcast variable
                }
            }
        }
    }
    }
}



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark DataSets and multiple write(.) calls

2018-11-19 Thread Dipl.-Inf. Rico Bergmann
Hi!

I have a SparkSQL programm, having one input and 6 ouputs (write). When
executing this programm every call to write(.) executes the plan. My
problem is, that I want all these writes to happen in parallel (inside
one execution plan), because all writes have a common and compute
intensive subpart, that can be shared by all plans. Is there a
possibility to do this? (Caching is not a solution because the input
dataset is way to large...)

Hoping for advises ...

Best, Rico B.


---
Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
https://www.avast.com/antivirus


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark DataSets and multiple write(.) calls

2018-11-19 Thread Dipl.-Inf. Rico Bergmann
Thanks for your advise. But I'm using Batch processing. Does anyone have
a solution for the batch processing case?

Best,

Rico.


Am 19.11.2018 um 09:43 schrieb Magnus Nilsson:
>
>
>   Magnus Nilsson
>
>   
> 9:43 AM (0 minutes ago)
>   
>   
> to info
>
> I had the same requirements. As far as I know the only way is to
> extend the foreachwriter, cache the microbatch result and write to
> each output.
>
> https://docs.databricks.com/spark/latest/structured-streaming/foreach.html
>
> Unfortunately it seems as if you have to make a new connection "per
> batch" instead of creating one long lasting connections for the
> pipeline as such. Ie you might have to implement some sort of
> connection pooling by yourself depending on sink. 
>
> Regards,
>
> Magnus
>
>
> On Mon, Nov 19, 2018 at 9:13 AM Dipl.-Inf. Rico Bergmann
> mailto:i...@ricobergmann.de>> wrote:
>
> Hi!
>
> I have a SparkSQL programm, having one input and 6 ouputs (write).
> When
> executing this programm every call to write(.) executes the plan. My
> problem is, that I want all these writes to happen in parallel (inside
> one execution plan), because all writes have a common and compute
> intensive subpart, that can be shared by all plans. Is there a
> possibility to do this? (Caching is not a solution because the input
> dataset is way to large...)
>
> Hoping for advises ...
>
> Best, Rico B.
>
>
> ---
> Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
> https://www.avast.com/antivirus
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> <mailto:user-unsubscr...@spark.apache.org>
>



pEpkey.asc
Description: application/pgp-keys

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark DataSets and multiple write(.) calls

2018-11-20 Thread Dipl.-Inf. Rico Bergmann
Hi!

Thanks Vadim for your answer. But this would be like caching the
dataset, right? Or is checkpointing faster then persisting to memory or
disk?

I attach a pdf of my dataflow program. If I could compute the output of
outputs 1-5 in parallel the output of flatmap1 and groupBy could be
reused, avoiding to write to disk (at least until the grouping).

Any other ideas or proposals?

Best,

Rico.


Am 19.11.2018 um 19:12 schrieb Vadim Semenov:
> You can use checkpointing, in this case Spark will write out an rdd to
> whatever destination you specify, and then the RDD can be reused from
> the checkpointed state avoiding recomputing.
>
> On Mon, Nov 19, 2018 at 7:51 AM Dipl.-Inf. Rico Bergmann
> mailto:i...@ricobergmann.de>> wrote:
>
> Thanks for your advise. But I'm using Batch processing. Does
> anyone have a solution for the batch processing case?
>
> Best,
>
> Rico.
>
>
> Am 19.11.2018 um 09:43 schrieb Magnus Nilsson:
>>
>>
>>   Magnus Nilsson
>>
>>  
>> 9:43 AM (0 minutes ago)
>>  
>>  
>> to info
>>
>> I had the same requirements. As far as I know the only way is to
>> extend the foreachwriter, cache the microbatch result and write
>> to each output.
>>
>> 
>> https://docs.databricks.com/spark/latest/structured-streaming/foreach.html
>>
>> Unfortunately it seems as if you have to make a new connection
>> "per batch" instead of creating one long lasting connections for
>> the pipeline as such. Ie you might have to implement some sort of
>> connection pooling by yourself depending on sink. 
>>
>> Regards,
>>
>> Magnus
>>
>>
>> On Mon, Nov 19, 2018 at 9:13 AM Dipl.-Inf. Rico Bergmann
>> mailto:i...@ricobergmann.de>> wrote:
>>
>> Hi!
>>
>> I have a SparkSQL programm, having one input and 6 ouputs
>> (write). When
>> executing this programm every call to write(.) executes the
>> plan. My
>> problem is, that I want all these writes to happen in
>> parallel (inside
>> one execution plan), because all writes have a common and compute
>> intensive subpart, that can be shared by all plans. Is there a
>> possibility to do this? (Caching is not a solution because
>> the input
>> dataset is way to large...)
>>
>> Hoping for advises ...
>>
>> Best, Rico B.
>>
>>
>> ---
>> Diese E-Mail wurde von Avast Antivirus-Software auf Viren
>> geprüft.
>> https://www.avast.com/antivirus
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> <mailto:user-unsubscr...@spark.apache.org>
>>
>
>
> 
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
>   Virenfrei. www.avast.com
> 
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
>
>
> 
> <#m_-7118895712672043959_m_6471921890789606388_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> <mailto:user-unsubscr...@spark.apache.org>
>
>
>
> -- 
> Sent from my iPhone



---
Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
https://www.avast.com/antivirus


ExecutionDAG.pdf
Description: Adobe PDF document

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Spark 2.2.1 Dataframes multiple joins bug?

2020-03-23 Thread Dipl.-Inf. Rico Bergmann

Hi all!

Is it possible that Spark creates under certain circumstances duplicate 
rows when doing multiple joins?


What I did:

buse.count

res0: Long = 20554365

buse.alias("buse").join(bdef.alias("bdef"), $"buse._c4"===$"bdef._c4").count

res1: Long = 20554365

buse.alias("buse").join(bdef.alias("bdef"), 
$"buse._c4"===$"bdef._c4").join(crnb.alias("crnb"), 
$"bdef._c9"===$"crnb._c4").count


res2: Long = 20554365

buse.alias("buse").join(bdef.alias("bdef"), 
$"buse._c4"===$"bdef._c4").join(crnb.alias("crnb"), 
$"bdef._c9"===$"crnb._c4").join(wreg.alias("wreg"), 
$"crnb._c1"===$"wreg._c5").count


res3: Long = 21633023

For explanation: buse and crnb are 1:1 relationship tables.

In the last join I expected again 20554365 but suddenly duplicate rows 
exist. "wreg._c5" is a unique key, so it should not create more records:


wreg.groupBy($"_c5").agg(count($"_c2") as "cnt").filter($"cnt">1).show
+---+---+
|_c5|cnt|
+---+---+
+---+---+

When doing a distinct on the 4-way join I get the expected number of 
records:


buse.alias("buse").join(bdef.alias("bdef"), 
$"buse._c4"===$"bdef._c4").join(crnb.alias("crnb"), 
$"bdef._c9"===$"crnb._c4").join(wreg.alias("wreg"), 
$"crnb._c1"===$"wreg._c5").distinct.count

res10: Long = 20554365

This (in my opinion) means, that Spark is creating duplicte rows, 
although it shouldn't. Or do I miss something?



Best, Rico.