Thanks for checking in quickly, Below is what I got on printSchema on the two tables (left joining the second one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from the string field uc_update_ts
root |-- uc_pk: String |-- uc_update_ts: String |-- rowtime: TimeIndicatorTypeInfo(rowtime) |-- uc_version: String |-- uc_type: String |-- data_parsed: Map<String, String> root |-- i_uc_pk: String |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime) |-- image_count: Long |-- i_data: Multiset<Map<String, String>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xingc...@gmail.com> wrote: > Hi Karl, > > It seems that some field types of your inputs were not properly extracted. > Could you share the result of `printSchema()` for your input tables? > > Best, > Xingcan > > > On Feb 25, 2019, at 4:35 PM, Karl Jin <karl....@gmail.com> wrote: > > > > Hello, > > > > First time posting, so please let me know if the formatting isn't > correct, etc. > > > > I'm trying to left join two Kafka sources, running 1.7.2 locally, but > getting the below exception. Looks like some sort of query optimization > process but I'm not sure where to start investigating/debugging. I see > things are marked as NONE in the object so that's a bit of a flag to me, > although I don't know for sure. Any pointer would be much appreciated: > > > > Exception in thread "main" java.lang.RuntimeException: Error while > applying rule FlinkLogicalJoinConverter, args > [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, > $6),joinType=left)] > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) > > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) > > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373) > > at > org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) > > at > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) > > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) > > at > org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205) > > at > org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185) > > at > org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143) > > ... > > Caused by: java.lang.RuntimeException: Error occurred while applying > rule FlinkLogicalJoinConverter > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149) > > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234) > > at > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) > > ... 11 more > > Caused by: java.lang.NullPointerException > > at > org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84) > > at > org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29) > > at > org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104) > > at > org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29) > > at > org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80) > > at > org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79) > > at > scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) > > at > scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) > > at > scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48) > > at > org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79) > > at > org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29) > > at > org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51) > > at > org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162) > > at > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown > Source) > > at > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown > Source) > > at > org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953) > > at > org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339) > > at > org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135) > >