Hi Yuval, >From the exception stack, it seems the original sql union all two LookupTableSource and write to a sink, right?
I think it is reasonable to throw an exception, maybe we could make the error message more clear to understand. `LookupableTableSource` is used to look up data from external storage by one or more keys during runtime, it would not read the entire data. So the original sql which union all two LookupTableSource and write to a sink would not work because there is no left stream to trigger look up data from dimension table. BTW, Some connectors would implements `LookupTableSource` and ` ScanTableSource` at same time, it could work fine under the above case because the optimizer would automatically convert the physical plan to ` StreamPhysicalTableSourceScan` which would scan the data. Best, JING ZHANG Yuval Itzchakov <yuva...@gmail.com> 于2021年8月16日周一 上午4:15写道: > Hi, > > I'm trying to run a UNION ALL query on two LookupTableSource tables > defined with JDBC. When attempting this Flink complains that this is an > unsupported feature: > > Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > There are not enough rules to produce a node with desired properties: > convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, > MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], > UpdateKindTraitDef=[NONE]. > Missing conversions are FlinkLogicalTableSourceScan[convention: LOGICAL -> > STREAM_PHYSICAL] (2 cases) > There are 2 empty subsets: > Empty subset 0: rel#191:RelSubset#4.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE], the relevant part of the original plan is as follows > 168:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, > table]], fields=[...]) > > Empty subset 1: rel#192:RelSubset#5.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE], the relevant part of the original plan is as follows > 170:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, > other_table]], fields=[...]) > > Root: rel#189:RelSubset#7.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE] > Original rel: > FlinkLogicalSink(subset=[rel#166:RelSubset#3.LOGICAL.any.None: > 0.[NONE].[NONE]], table=[default_catalog.default_database.table_output], > fields=[...]): rowcount = 2.0E8, cumulative cost = {2.0E8 rows, 2.0E8 cpu, > 0.0 io, 0.0 network, 0.0 memory}, id = 174 > FlinkLogicalUnion(subset=[rel#173:RelSubset#2.LOGICAL.any.None: > 0.[NONE].[NONE]], all=[true]): rowcount = 2.0E8, cumulative cost = {2.0E8 > rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 172 > > FlinkLogicalTableSourceScan(subset=[rel#169:RelSubset#0.LOGICAL.any.None: > 0.[NONE].[NONE]], table=[[default_catalog, default_database, table]], > fields=[...]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, > 1.32E10 io, 0.0 network, 0.0 memory}, id = 168 > > FlinkLogicalTableSourceScan(subset=[rel#171:RelSubset#1.LOGICAL.any.None: > 0.[NONE].[NONE]], table=[[default_catalog, default_database, other_table]], > fields=[...]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, > 1.32E10 io, 0.0 network, 0.0 memory}, id = 170 > > I do understand that the semantics of unioning two lookups may be a bit > complicated, but was wondering if this is planned to be supported in the > future? > > -- > Best Regards, > Yuval Itzchakov. >