[ https://issues.apache.org/jira/browse/FLINK-19015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186959#comment-17186959 ]
gkgkgk commented on FLINK-19015: -------------------------------- [~any] maybe you can use "Integer" replace "Int" > java.lang.RuntimeException: Could not instantiate generated class > 'GroupAggsHandler$15' > --------------------------------------------------------------------------------------- > > Key: FLINK-19015 > URL: https://issues.apache.org/jira/browse/FLINK-19015 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.11.1 > Reporter: jack sun > Priority: Major > > source code : > import org.apache.flink.api.common.eventtime.{Watermark, WatermarkGenerator, > WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy} > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.table.functions.AggregateFunction > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.table.api._ > import org.apache.flink.table.api.bridge.scala._ > import org.apache.flink.types.Row > object TestAggFunction { > def main(args: Array[String]) { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tenv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()) > env.setParallelism(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > val socketStream = env.socketTextStream("127.0.0.1",9090) > .map(x=>{ > val c=x.split(" ").toList > Temp3(c(0),c(1).toInt,System.currentTimeMillis()) > }) > .assignTimestampsAndWatermarks(new WatermarkStrategy[Temp3] { > override def createWatermarkGenerator(context: > WatermarkGeneratorSupplier.Context): WatermarkGenerator[Temp3] = { > new WatermarkGenerator[Temp3] { > val delay:Long = 0L//Time.seconds(10).toMilliseconds > var maxTimestamp: Long = 0L > > override def onEvent(t: Temp3, l: Long, > watermarkOutput: WatermarkOutput): Unit = { > maxTimestamp = maxTimestamp.max(t.timestamp) > val wm = new Watermark(maxTimestamp - delay) > watermarkOutput.emitWatermark(wm) > } > > override def onPeriodicEmit(watermarkOutput: > WatermarkOutput): Unit = Nil > } > } > }) > val table = > tenv.fromDataStream(socketStream,'role,'value,'pt.proctime) > tenv.createTemporaryView("t1",table) > tenv.registerFunction("testMax",new MaxAgg) > tenv.sqlQuery("select role,testMax(`value`) from t1 group by > role").toRetractStream[Row].print() > // tenv.sqlQuery("select * from t1").toRetractStream[Row].print() > env.execute("test") > } > } > case class Temp3(role:String,value:Int,timestamp:Long) > class MaxAgg extends AggregateFunction[Int,Int]{ > override def getValue(acc: Int): Int = acc > override def createAccumulator(): Int = 0 > def accumulate(acc: Int, rowVal:Int): Unit = acc.max(rowVal) > } > exceptions: > 18:09:49,800 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils > - Log file environment variable 'log.file' is not set. > 18:09:49,800 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils > - JobManager log files are unavailable in the web dashboard. Log file > location not found in environment variable 'log.file' or configuration key > 'web.log.path'. > 18:09:50,204 WARN org.apache.flink.metrics.MetricGroup > - The operator name > SourceConversion(table=[default_catalog.default_database.t1], fields=[role, > value, pt]) exceeded the 80 characters length limit and was truncated. > /* 1 */ > /* 2 */ public final class GroupAggsHandler$15 implements > org.apache.flink.table.runtime.generated.AggsHandleFunction { > /* 3 */ > /* 4 */ private transient com.youyantech.streamJobs.MaxAgg > function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e; > /* 5 */ org.apache.flink.table.data.GenericRowData acc$7 = new > org.apache.flink.table.data.GenericRowData(1); > /* 6 */ org.apache.flink.table.data.GenericRowData acc$8 = new > org.apache.flink.table.data.GenericRowData(1); > /* 7 */ private java.lang.Integer agg0_acc_internal; > /* 8 */ private java.lang.Integer agg0_acc_external; > /* 9 */ org.apache.flink.table.data.GenericRowData aggValue$14 = new > org.apache.flink.table.data.GenericRowData(1); > /* 10 */ > /* 11 */ private > org.apache.flink.table.runtime.dataview.StateDataViewStore store; > /* 12 */ > /* 13 */ public GroupAggsHandler$15(java.lang.Object[] references) > throws Exception { > /* 14 */ > function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e = > (((com.youyantech.streamJobs.MaxAgg) references[0])); > /* 15 */ } > /* 16 */ > /* 17 */ private > org.apache.flink.api.common.functions.RuntimeContext getRuntimeContext() { > /* 18 */ return store.getRuntimeContext(); > /* 19 */ } > /* 20 */ > /* 21 */ @Override > /* 22 */ public void > open(org.apache.flink.table.runtime.dataview.StateDataViewStore store) throws > Exception { > /* 23 */ this.store = store; > /* 24 */ > /* 25 */ > function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e.open(new > org.apache.flink.table.functions.FunctionContext(store.getRuntimeContext())); > /* 26 */ > /* 27 */ } > /* 28 */ > /* 29 */ @Override > /* 30 */ public void accumulate(org.apache.flink.table.data.RowData > accInput) throws Exception { > /* 31 */ > /* 32 */ int field$10; > /* 33 */ boolean isNull$10; > /* 34 */ isNull$10 = accInput.isNullAt(1); > /* 35 */ field$10 = -1; > /* 36 */ if (!isNull$10) { > /* 37 */ field$10 = accInput.getInt(1); > /* 38 */ } > /* 39 */ > /* 40 */ > /* 41 */ > function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e.accumulate(agg0_acc_external, > field$10); > /* 42 */ > /* 43 */ > /* 44 */ } > /* 45 */ > /* 46 */ @Override > /* 47 */ public void retract(org.apache.flink.table.data.RowData > retractInput) throws Exception { > /* 48 */ > /* 49 */ throw new java.lang.RuntimeException("This function not > require retract method, but the retract method is called."); > /* 50 */ > /* 51 */ } > /* 52 */ > /* 53 */ @Override > /* 54 */ public void merge(org.apache.flink.table.data.RowData > otherAcc) throws Exception { > /* 55 */ > /* 56 */ throw new java.lang.RuntimeException("This function not > require merge method, but the merge method is called."); > /* 57 */ > /* 58 */ } > /* 59 */ > /* 60 */ @Override > /* 61 */ public void > setAccumulators(org.apache.flink.table.data.RowData acc) throws Exception { > /* 62 */ > /* 63 */ int field$9; > /* 64 */ boolean isNull$9; > /* 65 */ isNull$9 = acc.isNullAt(0); > /* 66 */ field$9 = -1; > /* 67 */ if (!isNull$9) { > /* 68 */ field$9 = acc.getInt(0); > /* 69 */ } > /* 70 */ > /* 71 */ agg0_acc_internal = field$9; > /* 72 */ agg0_acc_external = agg0_acc_internal; > /* 73 */ > /* 74 */ > /* 75 */ } > /* 76 */ > /* 77 */ @Override > /* 78 */ public void resetAccumulators() throws Exception { > /* 79 */ > /* 80 */ > /* 81 */ > /* 82 */ agg0_acc_external = (java.lang.Integer) > function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e.createAccumulator(); > /* 83 */ agg0_acc_internal = agg0_acc_external; > /* 84 */ > /* 85 */ > /* 86 */ } > /* 87 */ > /* 88 */ @Override > /* 89 */ public org.apache.flink.table.data.RowData > getAccumulators() throws Exception { > /* 90 */ > /* 91 */ > /* 92 */ > /* 93 */ acc$8 = new org.apache.flink.table.data.GenericRowData(1); > /* 94 */ > /* 95 */ agg0_acc_internal = agg0_acc_external; > /* 96 */ if (false) { > /* 97 */ acc$8.setField(0, null); > /* 98 */ } else { > /* 99 */ acc$8.setField(0, agg0_acc_internal); > /* 100 */ } > /* 101 */ > /* 102 */ > /* 103 */ return acc$8; > /* 104 */ > /* 105 */ } > /* 106 */ > /* 107 */ @Override > /* 108 */ public org.apache.flink.table.data.RowData > createAccumulators() throws Exception { > /* 109 */ > /* 110 */ > /* 111 */ > /* 112 */ acc$7 = new > org.apache.flink.table.data.GenericRowData(1); > /* 113 */ > /* 114 */ java.lang.Integer acc_internal$6 = (java.lang.Integer) > function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e.createAccumulator(); > /* 115 */ if (false) { > /* 116 */ acc$7.setField(0, null); > /* 117 */ } else { > /* 118 */ acc$7.setField(0, acc_internal$6); > /* 119 */ } > /* 120 */ > /* 121 */ > /* 122 */ return acc$7; > /* 123 */ > /* 124 */ } > /* 125 */ > /* 126 */ @Override > /* 127 */ public org.apache.flink.table.data.RowData getValue() > throws Exception { > /* 128 */ > /* 129 */ > /* 130 */ > /* 131 */ aggValue$14 = new > org.apache.flink.table.data.GenericRowData(1); > /* 132 */ > /* 133 */ > /* 134 */ java.lang.Integer value_external$11 = (java.lang.Integer) > /* 135 */ > function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e.getValue(agg0_acc_external); > /* 136 */ java.lang.Integer value_internal$12 = > /* 137 */ value_external$11; > /* 138 */ boolean valueIsNull$13 = value_internal$12 == null; > /* 139 */ > /* 140 */ if (valueIsNull$13) { > /* 141 */ aggValue$14.setField(0, null); > /* 142 */ } else { > /* 143 */ aggValue$14.setField(0, value_internal$12); > /* 144 */ } > /* 145 */ > /* 146 */ > /* 147 */ return aggValue$14; > /* 148 */ > /* 149 */ } > /* 150 */ > /* 151 */ @Override > /* 152 */ public void cleanup() throws Exception { > /* 153 */ > /* 154 */ > /* 155 */ } > /* 156 */ > /* 157 */ @Override > /* 158 */ public void close() throws Exception { > /* 159 */ > /* 160 */ > function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e.close(); > /* 161 */ > /* 162 */ } > /* 163 */ } > /* 164 */ > 18:09:50,240 WARN org.apache.flink.runtime.taskmanager.Task > - GroupAggregate(groupBy=[role], select=[role, testMax(value) AS EXPR$1]) > -> SinkConversionToTuple2 -> Sink: Print to Std. Out (1/1) > (5a5bc8f589eaa506c80a21f69b771479) switched from RUNNING to FAILED. > java.lang.RuntimeException: Could not instantiate generated class > 'GroupAggsHandler$15' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:57) > at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:52) > ... 12 more > Caused by: > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) > ... 14 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) > ... 17 more > Caused by: org.codehaus.janino.InternalCompilerException: Compiling > "GroupAggsHandler$15": Incompatible return types > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) > ... 23 more > Caused by: org.codehaus.janino.InternalCompilerException: Incompatible return > types > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9393) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9154) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9036) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8938) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) > at org.codehaus.janino.Java$Cast.accept(Java.java:4887) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790) > at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732) > at org.codehaus.janino.Java$Assignment.accept(Java.java:4466) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > ... 30 more > Exception in thread "main" java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699) > at > com.youyantech.streamJobs.TestAggFunction$.main(TestAggFunction.scala:54) > at com.youyantech.streamJobs.TestAggFunction.main(TestAggFunction.scala) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at > org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892) > at akka.dispatch.OnComplete.internal(Future.scala:264) > at akka.dispatch.OnComplete.internal(Future.scala:261) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > ... 4 more > Caused by: java.lang.RuntimeException: Could not instantiate generated class > 'GroupAggsHandler$15' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:57) > at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:52) > ... 12 more > Caused by: > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) > ... 14 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) > ... 17 more > Caused by: org.codehaus.janino.InternalCompilerException: Compiling > "GroupAggsHandler$15": Incompatible return types > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) > ... 23 more > Caused by: org.codehaus.janino.InternalCompilerException: Incompatible return > types > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9393) > at > org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9154) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9036) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8938) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) > at org.codehaus.janino.Java$Cast.accept(Java.java:4887) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790) > at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732) > at org.codehaus.janino.Java$Assignment.accept(Java.java:4466) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > ... 30 more > Process finished with exit code 1 > Process finished with exit code 1 -- This message was sent by Atlassian Jira (v8.3.4#803005)