Hi Timur, You can use a composite key [1] to compare keys consisting of multiple fields. For example:
``` val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) a.coGroup(b) .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares the values of f2 if values of f1 are same. .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys (left, right) => 1 } ``` Composite key can be applied to Scala tuple also: ``` val a = env.fromCollection(Seq(("a", "b"), ("c", "d"))) val b = env.fromCollection(Seq(("a", "x"), ("z", "m"))) a.coGroup(b) .where(0, 1) // Note that field numbers start from 0. .equalTo(0, 1) { (left, right) => 1 } ``` I hope this helps. [1]: https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples Regards, Chiwan Park > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > > Hello, > > Another issue I have encountered is incorrect implicit resolution (I'm using > Scala 2.11.7). Here's the example (with a workaround): > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > a.coGroup(b) > .where(e => e.f1) > //.equalTo(e => e) { //this fails to compile because equalTo expects an > implicit > .equalTo("f1") { > (left, right) => 1 > } > However, the workaround does not quite work when key is a tuple (I suspect > this applies to other generic classes as well): > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > a.coGroup(b) > .where(e => (e.f1, e.f2)) > .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) > { (left, right) => 1} // throws InvalidProgramException > Here, I try to provide the implicit TypeInformation explicitly, but > apparently it's not compatible with the way implicit inference is done. > (TypeInformation I generate is GenericType<scala.Tuple2>, while > scala.Tuple2<String, String> is expected). > > Now, I can split this in 2 operations like below: > val tmp = a.coGroup(b) > .where(e => (e.f1, e.f2)) > .equalTo(e => (e.f1, e.f2)) > > tmp { (left, right) => 1} > but, I would like to avoid adding clutter to my processing logic, and it's > not entirely clear to me how this would be scheduled. > > As an option, I can hash the hell out of my keys like that: > a.coGroup(b) > .where(e => (e.f1, e.f2).hashCode) > .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ > (left, right) => 1} > but that, again, adds some indirection and clutter, not mentioning the hassle > of dealing with collisions (which can be alleviated by using fancy hashes, > but I'd like to avoid that). > > Any insights on what is the way to go here are highly appreciated. > > Thanks, > Timur