[ https://issues.apache.org/jira/browse/FLINK-25137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17462909#comment-17462909 ]
Wenlong Lyu commented on FLINK-25137: ------------------------------------- [~liuhb86] you could close this issue, since it is not a bug and your case is solved > Cannot do window aggregation after a regular join: Window can only be defined > on a time attribute column > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-25137 > URL: https://issues.apache.org/jira/browse/FLINK-25137 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.14.0 > Reporter: Hongbo > Priority: Major > > I have a stream joining with a bounded lookup table. Then I want to do a > window aggregation on this stream. > The code looks like this: > > {code:java} > var joined = tableEnv.sqlQuery("SELECT test.ts, test.v, lookup.c from test" + > " join lookup on test.v = lookup.a"); > tableEnv.createTemporaryView("joined_table", joined); > var agg = tableEnv.sqlQuery("SELECT window_start, window_end, c, sum(v)\n" + > " FROM TABLE(\n" + > " HOP(TABLE joined_table, DESCRIPTOR(ts), INTERVAL '10' seconds, INTERVAL '5' > minutes))\n" + > " GROUP BY window_start, window_end, c"); > {code} > where "test" is a stream and "lookup" is a bounded table. Full code is here: > [https://github.com/liuhb86/flink-test/blob/flink25137/flink-test/src/test/java/com/flinktest/FlinkTest.java] > > > However, it failed with an exception: > {code:java} > Window can only be defined on a time attribute column, but is type of > TIMESTAMP(3) > org.apache.flink.table.api.ValidationException: Window can only be defined on > a time attribute column, but is type of TIMESTAMP(3) > at > org.apache.flink.table.planner.plan.utils.WindowUtil$.convertToWindowingStrategy(WindowUtil.scala:183) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:193) > at > GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown > Source) > at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown > Source) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getProjectWindowProperties(FlinkRelMdWindowProperties.scala:89) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:75) > at > GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown > Source) > at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown > Source) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261) > at > org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.gatherIndicesToMaterialize(RelTimeIndicatorConverter.java:433) > at > org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convertAggInput(RelTimeIndicatorConverter.java:416) > at > org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visitAggregate(RelTimeIndicatorConverter.java:402) > at > org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visit(RelTimeIndicatorConverter.java:167) > at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:217) > at > org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convert(RelTimeIndicatorConverter.java:133) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkRelTimeIndicatorProgram.optimize(FlinkRelTimeIndicatorProgram.scala:35) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705) > at > org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:606) > at > com.microstrategy.realtime.FlinkTest.testAggregationAfterJoin(FlinkTest.java:67) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75) > at > org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99) > at > org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79) > at > org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.stop(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:133) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164) > at > org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414) > at > org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) > at > org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at > org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56) > at java.base/java.lang.Thread.run(Thread.java:829 {code} > Is there any limitation to do this? -- This message was sent by Atlassian Jira (v8.20.1#820001)