I write a sliding window analytic program and use the functions.window function ( https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/functions.html#window(org.apache.spark.sql.Column,%20java.lang.String,%20java.lang.String) ) The code looks like this:
Column slidingWindow = functions.window(myDF.col("timestamp"), "24 hours", "1 seconds"); Dataset<Row> finalRes = myDF.groupBy(slidingWindow, myDF.col("user")).agg(functions.collect_set("purchase").as("purchases")); As you can see in this usecase I have small steps and large window. A code with same flavor caused the following error (which in my understanding is related to the creation of the Java code generation): Caused by: org.spark_project.guava.util.concurrent.ExecutionError: java.lang.OutOfMemoryError: Java heap space at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2261) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:890) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:357) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:85) at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:121) at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:112) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 77 more Caused by: java.lang.OutOfMemoryError: Java heap space at java.util.HashMap.resize(HashMap.java:703) at java.util.HashMap.putVal(HashMap.java:628) at java.util.HashMap.putMapEntries(HashMap.java:514) at java.util.HashMap.putAll(HashMap.java:784) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3073) at org.codehaus.janino.UnitCompiler.access$4900(UnitCompiler.java:206) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:2958) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:2926) at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:2974) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3033) at org.codehaus.janino.UnitCompiler.access$4400(UnitCompiler.java:206) at org.codehaus.janino.UnitCompiler$8.visitSwitchStatement(UnitCompiler.java:2950) at org.codehaus.janino.UnitCompiler$8.visitSwitchStatement(UnitCompiler.java:2926) at org.codehaus.janino.Java$SwitchStatement.accept(Java.java:2866) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2982) at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:206) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2944) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2926) at org.codehaus.janino.Java$Block.accept(Java.java:2471) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2999) at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:206) at org.codehaus.janino.UnitCompiler$8.visitForStatement(UnitCompiler.java:2946) at org.codehaus.janino.UnitCompiler$8.visitForStatement(UnitCompiler.java:2926) at org.codehaus.janino.Java$ForStatement.accept(Java.java:2660) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2982) at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:206) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2944) When I run the code with a really small data and window=3 minutes, step= 1 seconds I get this error: there are tens of thousands of lines like: /* 30761 */ if (!expand_isNull6254) { /* 30762 */ expand_isNull6253 = false; // resultCode could change nullability. /* 30763 */ expand_value6253 = expand_value6254 * 1000000L; and the error: haus.janino.JaninoRuntimeException: Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB also I see hundreds of "Code generated in 4.979587 ms" plus it is taking quite a lot time to compute although it has only 10 rows of data. Am I doing something wrong? is it a bug? what is the right way to use this function? Please relate to Java when answering.