Hi Colin, thanks for reporting the bug. I had a look at it and it seems that the wrong classloader is used when compiling the code (both for the batch as well as the streaming queries). I have a fix that I need to verify.
It's not necessary to open a new JIRA for that. We can cover all cases under FLINK-7490. Thanks, Fabian 2017-11-15 5:32 GMT+01:00 Colin Williams <colin.williams.seat...@gmail.com>: > From the documentation there is a note which instructs not to include the > flink-table dependency into the project. However when I put the flink-table > dependency on the cluster the User-defined Aggregate Function gives an > Exception. > > When I do include the flink-table into the dependencies, the project runs > just fine. However I'd expect that there will then be garbage collection > issues. > > This seems similar to https://issues.apache.org/jira/browse/FLINK-7490, > where I made a comment. I believe the issue is likely related to the > classloading as suggested, but the related classes are different (Batch vs > Stream). > > Should another bug report be filed? > > Also that bug report hasn't really had any activity and it's been a few > months. > > Best Regards, > > Colin Williams > > > java.io.IOException: Exception while applying AggregateFunction in > aggregating state > at org.apache.flink.runtime.state.heap.HeapAggregatingState.add( > HeapAggregatingState.java:91) > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(WindowOperator.java:442) > at org.apache.flink.streaming.runtime.io.StreamInputProcessor. > processInput(StreamInputProcessor.java:206) > at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask.java:69) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table > program cannot be compiled. This is a bug. Please file an issue. > at org.apache.flink.table.codegen.Compiler$class. > compile(Compiler.scala:36) > at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.compile( > AggregateAggFunction.scala:33) > at org.apache.flink.table.runtime.aggregate.AggregateAggFunction. > initFunction(AggregateAggFunction.scala:72) > at org.apache.flink.table.runtime.aggregate.AggregateAggFunction. > createAccumulator(AggregateAggFunction.scala:41) > at org.apache.flink.table.runtime.aggregate.AggregateAggFunction. > createAccumulator(AggregateAggFunction.scala:33) > at org.apache.flink.runtime.state.heap.HeapAggregatingState$ > AggregateTransformation.apply(HeapAggregatingState.java:115) > at org.apache.flink.runtime.state.heap.NestedMapsStateTable.transform( > NestedMapsStateTable.java:298) > at org.apache.flink.runtime.state.heap.HeapAggregatingState.add( > HeapAggregatingState.java:89) > ... 6 more > Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column > 14: Cannot determine simple type name "com" > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at org.codehaus.janino.UnitCompiler.getReferenceType( > UnitCompiler.java:6416) > at org.codehaus.janino.UnitCompiler.getReferenceType( > UnitCompiler.java:6177) > at org.codehaus.janino.UnitCompiler.getReferenceType( > UnitCompiler.java:6190) > at org.codehaus.janino.UnitCompiler.getReferenceType( > UnitCompiler.java:6190) > at org.codehaus.janino.UnitCompiler.getReferenceType( > UnitCompiler.java:6190) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) > at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType( > UnitCompiler.java:6064) > at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType( > UnitCompiler.java:6059) > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) > at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) > at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) > at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) > at org.codehaus.janino.IClass$IField.getDescriptor(IClass.java:1299) > at org.codehaus.janino.UnitCompiler.getfield(UnitCompiler.java:11439) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4118) > at org.codehaus.janino.UnitCompiler.access$6800(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(UnitCompiler. > java:4053) > at org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(UnitCompiler. > java:4048) > at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) > at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4048) > at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4044) > at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4109) > at org.codehaus.janino.UnitCompiler.access$6600(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$12$1.visitAmbiguousName( > UnitCompiler.java:4051) > at org.codehaus.janino.UnitCompiler$12$1.visitAmbiguousName( > UnitCompiler.java:4048) > at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) > at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4048) > at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4044) > at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044) > at org.codehaus.janino.UnitCompiler.compileGetValue( > UnitCompiler.java:5224) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4667) > at org.codehaus.janino.UnitCompiler.access$7700(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$12.visitCast(UnitCompiler.java:4066) > at org.codehaus.janino.UnitCompiler$12.visitCast(UnitCompiler.java:4044) > at org.codehaus.janino.Java$Cast.accept(Java.java:4699) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044) > at org.codehaus.janino.UnitCompiler.compileGetValue( > UnitCompiler.java:5224) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2536) > at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationS > tatement(UnitCompiler.java:1459) > at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationS > tatement(UnitCompiler.java:1443) > at org.codehaus.janino.Java$LocalVariableDeclarationStatem > ent.accept(Java.java:3348) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443) > at org.codehaus.janino.UnitCompiler.compileStatements( > UnitCompiler.java:1523) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052) > at org.codehaus.janino.UnitCompiler.compileDeclaredMethods( > UnitCompiler.java:1313) > at org.codehaus.janino.UnitCompiler.compileDeclaredMethods( > UnitCompiler.java:1286) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclara > tion(UnitCompiler.java:390) > at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclara > tion(UnitCompiler.java:385) > at org.codehaus.janino.Java$PackageMemberClassDeclaration. > accept(Java.java:1405) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:357) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234) > at org.codehaus.janino.SimpleCompiler.compileToClassLoader( > SimpleCompiler.java:446) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:213) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > at org.apache.flink.table.codegen.Compiler$class. > compile(Compiler.scala:33) > ... 13 more > > > *Note:* Due to an issue in Apache Calcite, which prevents the user > classloaders from being garbage-collected, we do *not* recommend building > a fat-jar that includes the flink-table dependency. Instead, we recommend > configuring Flink to include the flink-table dependency in the system > classloader. This can be done by copying the flink-table.jar file from > the ./opt folder to the ./lib folder. See these instructions > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html> > for further details. > > > > >