[ 
https://issues.apache.org/jira/browse/FLINK-25137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17452538#comment-17452538
 ] 

Hongbo commented on FLINK-25137:
--------------------------------

Hi [~wenlong.lwl]  [~jingzhang] Thanks for the response!

 

[~wenlong.lwl]  Is this limitation documented? I also found that when joining 
an unbounded stream with a bounded table, both sides will be kept in the state. 
Thus the unbound stream will cause an unbounded state. If we use 
table.exec.state.ttl to purge the state, the bounded will also be purged which 
is not expected. Does it mean we should never do a regular join between a 
stream and bounded table (unless the distinct row in the stream is limited)?  

[~jingzhang]  Lookup join is the first thing we tried before using regular 
join. We have some concerns:
1. One problem is that when the QPS is high (~10000), the database is 
overloaded. We can turn on caching, it's not optimal. Our key is evenly 
distributed over a large range, so we need to cache nearly the whole table to 
make it effective. But that will consume a large amount of memory. As we will 
run multiple queries at the same time against the same lookup table, it will 
need to cache the large table multiple times.
2. Ideally we hoped to keep the lookup in Flink, otherwise need to maintain an 
extra high-performance DB for the lookup. 

We wish there could be something like the cache/persist in Spark that is 
sharable among the queries, but it seems not there yet in Flink.

> Cannot do window aggregation after a regular join: Window can only be defined 
> on a time attribute column
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25137
>                 URL: https://issues.apache.org/jira/browse/FLINK-25137
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.14.0
>            Reporter: Hongbo
>            Priority: Major
>
> I have a stream joining with a bounded lookup table. Then I want to do a 
> window aggregation on this stream.
> The code looks like this:
>  
> {code:java}
> var joined = tableEnv.sqlQuery("SELECT test.ts, test.v, lookup.c from test" +
> " join lookup on test.v = lookup.a");
> tableEnv.createTemporaryView("joined_table", joined);
> var agg = tableEnv.sqlQuery("SELECT window_start, window_end, c, sum(v)\n" +
> " FROM TABLE(\n" +
> " HOP(TABLE joined_table, DESCRIPTOR(ts), INTERVAL '10' seconds, INTERVAL '5' 
> minutes))\n" +
> " GROUP BY window_start, window_end, c");
> {code}
> where "test" is a stream and "lookup" is a bounded table. Full code is here: 
> [https://github.com/liuhb86/flink-test/blob/flink25137/flink-test/src/test/java/com/flinktest/FlinkTest.java]
>  
>  
> However, it failed with an exception:
> {code:java}
> Window can only be defined on a time attribute column, but is type of 
> TIMESTAMP(3)
> org.apache.flink.table.api.ValidationException: Window can only be defined on 
> a time attribute column, but is type of TIMESTAMP(3)
>     at 
> org.apache.flink.table.planner.plan.utils.WindowUtil$.convertToWindowingStrategy(WindowUtil.scala:183)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:193)
>     at 
> GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown 
> Source)
>     at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown 
> Source)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getProjectWindowProperties(FlinkRelMdWindowProperties.scala:89)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:75)
>     at 
> GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown 
> Source)
>     at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown 
> Source)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.gatherIndicesToMaterialize(RelTimeIndicatorConverter.java:433)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convertAggInput(RelTimeIndicatorConverter.java:416)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visitAggregate(RelTimeIndicatorConverter.java:402)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visit(RelTimeIndicatorConverter.java:167)
>     at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:217)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convert(RelTimeIndicatorConverter.java:133)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkRelTimeIndicatorProgram.optimize(FlinkRelTimeIndicatorProgram.scala:35)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:606)
>     at 
> com.microstrategy.realtime.FlinkTest.testAggregationAfterJoin(FlinkTest.java:67)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
>     at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>     at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>     at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
>     at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
>     at 
> org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
>     at 
> org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
>     at 
> org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
>     at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>     at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>     at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>     at com.sun.proxy.$Proxy2.stop(Unknown Source)
>     at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:133)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>     at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
>     at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
>     at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
>     at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
>     at 
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at 
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
>     at java.base/java.lang.Thread.run(Thread.java:829 {code}
> Is there any limitation to do this?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to