Hi, I also found the similar issue here [1].
Best, Tony Wei [1] https://issues.apache.org/jira/browse/FLINK-11433 Tony Wei <tony19920...@gmail.com> 於 2019年7月19日 週五 下午5:38寫道: > Hi, > > Is there any update for this issue? I have had the same problem just like > Karl's. > After I remove query like "select collect(data) ..." from one of the > joined tables, > the sql can be executed correctly without throwing any NPE. > > Best regards, > Tony Wei > > Xingcan Cui <xingc...@gmail.com> 於 2019年2月27日 週三 下午12:53寫道: > >> Hi Karl, >> >> I think this is a bug and created FLINK-11769 >> <https://issues.apache.org/jira/browse/FLINK-11769> to track it. >> >> Best, >> Xingcan >> >> On Feb 26, 2019, at 2:02 PM, Karl Jin <karl....@gmail.com> wrote: >> >> I removed the multiset<map<string,string>> field and the join worked >> fine. The field was created from a Kafka source through a query that looks >> like "select collect(data) as i_data from ... group by pk" >> >> Do you think this is a bug or is this something I can get around using >> some configuration? >> >> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <xingc...@gmail.com> wrote: >> >>> Yes. Please check that. If it's the nested type's problem, this might be >>> a bug. >>> >>> On Mon, Feb 25, 2019, 21:50 Karl Jin <karl....@gmail.com> wrote: >>> >>>> Do you think something funky might be happening with Map/Multiset >>>> types? If so how do I deal with it (I think I can verify by removing those >>>> columns and retry?)? >>>> >>>> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <karl....@gmail.com> wrote: >>>> >>>>> Thanks for checking in quickly, >>>>> >>>>> Below is what I got on printSchema on the two tables (left joining the >>>>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are >>>>> extracted from the string field uc_update_ts >>>>> >>>>> root >>>>> |-- uc_pk: String >>>>> |-- uc_update_ts: String >>>>> |-- rowtime: TimeIndicatorTypeInfo(rowtime) >>>>> |-- uc_version: String >>>>> |-- uc_type: String >>>>> |-- data_parsed: Map<String, String> >>>>> >>>>> root >>>>> |-- i_uc_pk: String >>>>> |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime) >>>>> |-- image_count: Long >>>>> |-- i_data: Multiset<Map<String, String>> >>>>> >>>>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xingc...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Karl, >>>>>> >>>>>> It seems that some field types of your inputs were not properly >>>>>> extracted. >>>>>> Could you share the result of `printSchema()` for your input tables? >>>>>> >>>>>> Best, >>>>>> Xingcan >>>>>> >>>>>> > On Feb 25, 2019, at 4:35 PM, Karl Jin <karl....@gmail.com> wrote: >>>>>> > >>>>>> > Hello, >>>>>> > >>>>>> > First time posting, so please let me know if the formatting isn't >>>>>> correct, etc. >>>>>> > >>>>>> > I'm trying to left join two Kafka sources, running 1.7.2 locally, >>>>>> but getting the below exception. Looks like some sort of query >>>>>> optimization >>>>>> process but I'm not sure where to start investigating/debugging. I see >>>>>> things are marked as NONE in the object so that's a bit of a flag to me, >>>>>> although I don't know for sure. Any pointer would be much appreciated: >>>>>> > >>>>>> > Exception in thread "main" java.lang.RuntimeException: Error while >>>>>> applying rule FlinkLogicalJoinConverter, args >>>>>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, >>>>>> $6),joinType=left)] >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) >>>>>> > at >>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) >>>>>> > at >>>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373) >>>>>> > at >>>>>> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) >>>>>> > at >>>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) >>>>>> > at >>>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) >>>>>> > at >>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205) >>>>>> > at >>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185) >>>>>> > at >>>>>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143) >>>>>> > ... >>>>>> > Caused by: java.lang.RuntimeException: Error occurred while >>>>>> applying rule FlinkLogicalJoinConverter >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149) >>>>>> > at >>>>>> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234) >>>>>> > at >>>>>> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) >>>>>> > ... 11 more >>>>>> > Caused by: java.lang.NullPointerException >>>>>> > at >>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84) >>>>>> > at >>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29) >>>>>> > at >>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104) >>>>>> > at >>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29) >>>>>> > at >>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80) >>>>>> > at >>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79) >>>>>> > at >>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) >>>>>> > at >>>>>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) >>>>>> > at >>>>>> scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48) >>>>>> > at >>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79) >>>>>> > at >>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29) >>>>>> > at >>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51) >>>>>> > at >>>>>> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162) >>>>>> > at >>>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown >>>>>> Source) >>>>>> > at >>>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown >>>>>> Source) >>>>>> > at >>>>>> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755) >>>>>> > at >>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135) >>>>>> >>>>>> >>