Yes, that is the idea, but I think in this case the user must be
protected from an operation that can get ridiculously expensive.

On Fri, Apr 17, 2015 at 10:20 AM, Felix Neutatz <neut...@googlemail.com> wrote:
> I am also against the manual cross method. Isn't it the idea of the table
> API to hide the actual implementation from the user?
>
> Best regards,
> Felix
> Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" <till.rohrm...@gmail.com>:
>
>> Why not doing two separate joins, union the results and doing a distinct
>> operation on the combined key?
>>
>> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>> > So, the first thing is a "feature" of the Java API that removes
>> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
>> > throw an error because one 0 is removed from the first key.
>> >
>> > The second thing is a feature of the Table API where the error message
>> > is hinting at the problem:
>> > Could not derive equi-join predicates for predicate 'nodeID === 'src
>> > || 'nodeID === 'target
>> >
>> > The problem is, that this would have to be executed as a cross
>> > followed by a filter because none of the predicates are equi-join
>> > predicates that must always be true (because of the OR relation). This
>> > I don't want to allow, because a cross can be very expensive. I will
>> > add a jira ticket for adding a manual cross operation to the Table
>> > API.
>> >
>> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <neut...@googlemail.com>
>> > wrote:
>> > > Hi,
>> > >
>> > > I want to join two tables in the following way:
>> > >
>> > > case class WeightedEdge(src: Int, target: Int, weight: Double)
>> > > case class Community(communityID: Int, nodeID: Int)
>> > >
>> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double)
>> > >
>> > > val communities: DataSet[Community]
>> > > val weightedEdges: DataSet[WeightedEdge]
>> > >
>> > > val communitiesTable = communities.toTable
>> > > val weightedEdgesTable = weightedEdges.toTable
>> > >
>> > > val sumTotal = communitiesTable.join(weightedEdgesTable)
>> > >  .where("nodeID = src && nodeID = target")
>> > >  .groupBy('communityID)
>> > >  .select("communityID, weight.sum as
>> sumTotal").toSet[CommunitySumTotal]
>> > >
>> > >
>> > > but I get this exception:
>> > >
>> > > Exception in thread "main"
>> > > org.apache.flink.api.common.InvalidProgramException: The types of the
>> key
>> > > fields do not match: The number of specified keys is different.
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
>> > > at
>> > >
>> >
>> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
>> > > at
>> > >
>> >
>> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
>> > > Moreover when I use the following where clause:
>> > >
>> > > .where("nodeID = src || nodeID = target")
>> > >
>> > > I get another error:
>> > >
>> > > Exception in thread "main"
>> > > org.apache.flink.api.table.ExpressionException: Could not derive
>> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
>> > > 'target.
>> > >
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
>> > > at
>> > >
>> >
>> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
>> > > at
>> > >
>> >
>> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
>> > >
>> > >
>> > > Apart from that the TableApi seems really promising. It's a really
>> great
>> > tool.
>> > >
>> > > Thank you for your help,
>> > >
>> > > Felix
>> >
>>

Reply via email to