Could you try to use blink planner? I guess this works in blink planner. Besides, it is suggested to use String with null values instead of Option[String]. Flink SQL/Table doesn't know Option and will recognize it as a RAW/Generic type which is rather slower. There should be no NPE, otherwise, it might be a bug in Flink SQL.
Best, Jark On Fri, 19 Jun 2020 at 11:08, YI <uuu...@protonmail.com> wrote: > Hi, all > > I am trying to join two datastream whose element types are respectively > ``` > case class MyEvent( > _id: Long = 0L, > _cId: Long = 0L, > _url: Option[String] = None, > ) > ``` > and > ``` > case class MyCategory( > _id: Long = 0L, > _name: Option[String] = None, > ) > ``` > > When I tried to join those two tables with > ``` > SELECT * FROM rawCategory INNER JOIN rawEvent ON rawEvent._cId = > rawCategory._id > ``` > > The following exception is thrown, > > ``` > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type 'scala.Option' cannot be used in a join operation because it does not > implement a proper hashCode() method. > at > org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:176) > at > org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153) > at > org.apache.flink.table.typeutils.TypeCheckUtils$.$anonfun$validateEqualsHashCode$1(TypeCheckUtils.scala:149) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) > at > org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147) > at > org.apache.flink.table.runtime.join.NonWindowJoin.<init>(NonWindowJoin.scala:57) > at > org.apache.flink.table.runtime.join.NonWindowInnerJoin.<init>(NonWindowInnerJoin.scala:50) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:118) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.getJoinOperator(DataStreamJoinToCoProcessTranslator.scala:102) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:119) > at > org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:251) > at > org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:412) > at > org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402) > at > org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185) > at > org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableLike.map(TraversableLike.scala:273) > at scala.collection.TraversableLike.map$(TraversableLike.scala:266) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117) > at > org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210) > at > org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107) > at > org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101) > at io.redacted.sub.package$.testJoin(package.scala:143) > at io.redacted.sub.package$.process(package.scala:128) > at io.redacted.DataAggregator$.main(DataAggregator.scala:15) > at io.redacted.DataAggregator.main(DataAggregator.scala) > > Process finished with exit code 1 > ``` > > I tried using vanilla String with null. I encountered several NPE. > My intention is to use Option to indicate some value is missing, just like > null in database and hopefully without NPE. > > How should I define my data types? And which configuration should I take > special care? > > Bests, > Yi > >