[ https://issues.apache.org/jira/browse/FLINK-25137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17452538#comment-17452538 ]
Hongbo commented on FLINK-25137: -------------------------------- Hi [~wenlong.lwl] [~jingzhang] Thanks for the response! [~wenlong.lwl] Is this limitation documented? I also found that when joining an unbounded stream with a bounded table, both sides will be kept in the state. Thus the unbound stream will cause an unbounded state. If we use table.exec.state.ttl to purge the state, the bounded will also be purged which is not expected. Does it mean we should never do a regular join between a stream and bounded table (unless the distinct row in the stream is limited)? [~jingzhang] Lookup join is the first thing we tried before using regular join. We have some concerns: 1. One problem is that when the QPS is high (~10000), the database is overloaded. We can turn on caching, it's not optimal. Our key is evenly distributed over a large range, so we need to cache nearly the whole table to make it effective. But that will consume a large amount of memory. As we will run multiple queries at the same time against the same lookup table, it will need to cache the large table multiple times. 2. Ideally we hoped to keep the lookup in Flink, otherwise need to maintain an extra high-performance DB for the lookup. We wish there could be something like the cache/persist in Spark that is sharable among the queries, but it seems not there yet in Flink. > Cannot do window aggregation after a regular join: Window can only be defined > on a time attribute column > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-25137 > URL: https://issues.apache.org/jira/browse/FLINK-25137 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.14.0 > Reporter: Hongbo > Priority: Major > > I have a stream joining with a bounded lookup table. Then I want to do a > window aggregation on this stream. > The code looks like this: > > {code:java} > var joined = tableEnv.sqlQuery("SELECT test.ts, test.v, lookup.c from test" + > " join lookup on test.v = lookup.a"); > tableEnv.createTemporaryView("joined_table", joined); > var agg = tableEnv.sqlQuery("SELECT window_start, window_end, c, sum(v)\n" + > " FROM TABLE(\n" + > " HOP(TABLE joined_table, DESCRIPTOR(ts), INTERVAL '10' seconds, INTERVAL '5' > minutes))\n" + > " GROUP BY window_start, window_end, c"); > {code} > where "test" is a stream and "lookup" is a bounded table. Full code is here: > [https://github.com/liuhb86/flink-test/blob/flink25137/flink-test/src/test/java/com/flinktest/FlinkTest.java] > > > However, it failed with an exception: > {code:java} > Window can only be defined on a time attribute column, but is type of > TIMESTAMP(3) > org.apache.flink.table.api.ValidationException: Window can only be defined on > a time attribute column, but is type of TIMESTAMP(3) > at > org.apache.flink.table.planner.plan.utils.WindowUtil$.convertToWindowingStrategy(WindowUtil.scala:183) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:193) > at > GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown > Source) > at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown > Source) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getProjectWindowProperties(FlinkRelMdWindowProperties.scala:89) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:75) > at > GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown > Source) > at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown > Source) > at > org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261) > at > org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.gatherIndicesToMaterialize(RelTimeIndicatorConverter.java:433) > at > org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convertAggInput(RelTimeIndicatorConverter.java:416) > at > org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visitAggregate(RelTimeIndicatorConverter.java:402) > at > org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visit(RelTimeIndicatorConverter.java:167) > at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:217) > at > org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convert(RelTimeIndicatorConverter.java:133) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkRelTimeIndicatorProgram.optimize(FlinkRelTimeIndicatorProgram.scala:35) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705) > at > org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:606) > at > com.microstrategy.realtime.FlinkTest.testAggregationAfterJoin(FlinkTest.java:67) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75) > at > org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99) > at > org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79) > at > org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.stop(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:133) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164) > at > org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414) > at > org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) > at > org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at > org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56) > at java.base/java.lang.Thread.run(Thread.java:829 {code} > Is there any limitation to do this? -- This message was sent by Atlassian Jira (v8.20.1#820001)