Hi Karl,

I think this is a bug and created FLINK-11769 
<https://issues.apache.org/jira/browse/FLINK-11769> to track it.

Best,
Xingcan

> On Feb 26, 2019, at 2:02 PM, Karl Jin <karl....@gmail.com> wrote:
> 
> I removed the multiset<map<string,string>> field and the join worked fine. 
> The field was created from a Kafka source through a query that looks like 
> "select collect(data) as i_data from ... group by pk"
> 
> Do you think this is a bug or is this something I can get around using some 
> configuration?
> 
> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <xingc...@gmail.com 
> <mailto:xingc...@gmail.com>> wrote:
> Yes. Please check that. If it's the nested type's problem, this might be a 
> bug.
> 
> On Mon, Feb 25, 2019, 21:50 Karl Jin <karl....@gmail.com 
> <mailto:karl....@gmail.com>> wrote:
> Do you think something funky might be happening with Map/Multiset types? If 
> so how do I deal with it (I think I can verify by removing those columns and 
> retry?)?
> 
> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <karl....@gmail.com 
> <mailto:karl....@gmail.com>> wrote:
> Thanks for checking in quickly,
> 
> Below is what I got on printSchema on the two tables (left joining the second 
> one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from 
> the string field uc_update_ts
> 
> root
>  |-- uc_pk: String
>  |-- uc_update_ts: String
>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>  |-- uc_version: String
>  |-- uc_type: String
>  |-- data_parsed: Map<String, String>
> 
> root
>  |-- i_uc_pk: String
>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>  |-- image_count: Long
>  |-- i_data: Multiset<Map<String, String>>
> 
> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xingc...@gmail.com 
> <mailto:xingc...@gmail.com>> wrote:
> Hi Karl,
> 
> It seems that some field types of your inputs were not properly extracted. 
> Could you share the result of `printSchema()` for your input tables?
> 
> Best,
> Xingcan
> 
> > On Feb 25, 2019, at 4:35 PM, Karl Jin <karl....@gmail.com 
> > <mailto:karl....@gmail.com>> wrote:
> > 
> > Hello,
> > 
> > First time posting, so please let me know if the formatting isn't correct, 
> > etc.
> > 
> > I'm trying to left join two Kafka sources, running 1.7.2 locally, but 
> > getting the below exception. Looks like some sort of query optimization 
> > process but I'm not sure where to start investigating/debugging. I see 
> > things are marked as NONE in the object so that's a bit of a flag to me, 
> > although I don't know for sure. Any pointer would be much appreciated:
> > 
> > Exception in thread "main" java.lang.RuntimeException: Error while applying 
> > rule FlinkLogicalJoinConverter, args 
> > [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
> >  $6),joinType=left)]
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
> >       at 
> > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
> >       at 
> > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
> >       at 
> > org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
> >       at 
> > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
> >       at 
> > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> >       at 
> > org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
> >       at 
> > org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
> >       at 
> > org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
> >     ...
> > Caused by: java.lang.RuntimeException: Error occurred while applying rule 
> > FlinkLogicalJoinConverter
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
> >       at 
> > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
> >       at 
> > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> >       ... 11 more
> > Caused by: java.lang.NullPointerException
> >       at 
> > org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
> >       at 
> > org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> >       at 
> > org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
> >       at 
> > org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> >       at 
> > org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
> >       at 
> > org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
> >       at 
> > scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
> >       at 
> > scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
> >       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
> >       at 
> > org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
> >       at 
> > org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
> >       at 
> > org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
> >       at 
> > org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
> >       at 
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
> > Source)
> >       at 
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown 
> > Source)
> >       at 
> > org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
> >       at 
> > org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
> >       at 
> > org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
> >       at 
> > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
> 

Reply via email to