[ 
https://issues.apache.org/jira/browse/FLINK-25137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17462909#comment-17462909
 ] 

Wenlong Lyu commented on FLINK-25137:
-------------------------------------

[~liuhb86] you could close this issue, since it is not a bug and your case is 
solved

> Cannot do window aggregation after a regular join: Window can only be defined 
> on a time attribute column
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25137
>                 URL: https://issues.apache.org/jira/browse/FLINK-25137
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.14.0
>            Reporter: Hongbo
>            Priority: Major
>
> I have a stream joining with a bounded lookup table. Then I want to do a 
> window aggregation on this stream.
> The code looks like this:
>  
> {code:java}
> var joined = tableEnv.sqlQuery("SELECT test.ts, test.v, lookup.c from test" +
> " join lookup on test.v = lookup.a");
> tableEnv.createTemporaryView("joined_table", joined);
> var agg = tableEnv.sqlQuery("SELECT window_start, window_end, c, sum(v)\n" +
> " FROM TABLE(\n" +
> " HOP(TABLE joined_table, DESCRIPTOR(ts), INTERVAL '10' seconds, INTERVAL '5' 
> minutes))\n" +
> " GROUP BY window_start, window_end, c");
> {code}
> where "test" is a stream and "lookup" is a bounded table. Full code is here: 
> [https://github.com/liuhb86/flink-test/blob/flink25137/flink-test/src/test/java/com/flinktest/FlinkTest.java]
>  
>  
> However, it failed with an exception:
> {code:java}
> Window can only be defined on a time attribute column, but is type of 
> TIMESTAMP(3)
> org.apache.flink.table.api.ValidationException: Window can only be defined on 
> a time attribute column, but is type of TIMESTAMP(3)
>     at 
> org.apache.flink.table.planner.plan.utils.WindowUtil$.convertToWindowingStrategy(WindowUtil.scala:183)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:193)
>     at 
> GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown 
> Source)
>     at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown 
> Source)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getProjectWindowProperties(FlinkRelMdWindowProperties.scala:89)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:75)
>     at 
> GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown 
> Source)
>     at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown 
> Source)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.gatherIndicesToMaterialize(RelTimeIndicatorConverter.java:433)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convertAggInput(RelTimeIndicatorConverter.java:416)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visitAggregate(RelTimeIndicatorConverter.java:402)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visit(RelTimeIndicatorConverter.java:167)
>     at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:217)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convert(RelTimeIndicatorConverter.java:133)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkRelTimeIndicatorProgram.optimize(FlinkRelTimeIndicatorProgram.scala:35)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:606)
>     at 
> com.microstrategy.realtime.FlinkTest.testAggregationAfterJoin(FlinkTest.java:67)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
>     at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>     at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>     at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
>     at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
>     at 
> org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
>     at 
> org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
>     at 
> org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
>     at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>     at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>     at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>     at com.sun.proxy.$Proxy2.stop(Unknown Source)
>     at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:133)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>     at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
>     at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
>     at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
>     at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
>     at 
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at 
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
>     at java.base/java.lang.Thread.run(Thread.java:829 {code}
> Is there any limitation to do this?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to