So, the first thing is a "feature" of the Java API that removes duplicate fields in keys, so an equi-join on (0,0) with (0,1) would throw an error because one 0 is removed from the first key.
The second thing is a feature of the Table API where the error message is hinting at the problem: Could not derive equi-join predicates for predicate 'nodeID === 'src || 'nodeID === 'target The problem is, that this would have to be executed as a cross followed by a filter because none of the predicates are equi-join predicates that must always be true (because of the OR relation). This I don't want to allow, because a cross can be very expensive. I will add a jira ticket for adding a manual cross operation to the Table API. On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <neut...@googlemail.com> wrote: > Hi, > > I want to join two tables in the following way: > > case class WeightedEdge(src: Int, target: Int, weight: Double) > case class Community(communityID: Int, nodeID: Int) > > case class CommunitySumTotal(communityID: Int, sumTotal: Double) > > val communities: DataSet[Community] > val weightedEdges: DataSet[WeightedEdge] > > val communitiesTable = communities.toTable > val weightedEdgesTable = weightedEdges.toTable > > val sumTotal = communitiesTable.join(weightedEdgesTable) > .where("nodeID = src && nodeID = target") > .groupBy('communityID) > .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal] > > > but I get this exception: > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: The types of the key > fields do not match: The number of specified keys is different. > at > org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96) > at > org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197) > at > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > at > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > at > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > Moreover when I use the following where clause: > > .where("nodeID = src || nodeID = target") > > I get another error: > > Exception in thread "main" > org.apache.flink.api.table.ExpressionException: Could not derive > equi-join predicates for predicate 'nodeID === 'src || 'nodeID === > 'target. > > at > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > at > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > at > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > > > Apart from that the TableApi seems really promising. It's a really great tool. > > Thank you for your help, > > Felix