Actually, there is an even easier solution (which I saw in your reply to my other question): ``` a.coGroup(b) .where(e => (e.f1, e.f2)) .equalTo(e => (e.f1, e.f2)).apply { (left, right) => 1 }.print() ``` pretty much does what I want. Explicit `apply` gives a hint that a compiler was missing before. Nevertheless, `createTypeInformation` works too, thanks for sharing!
Thanks, Timur On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <chiwanp...@apache.org> wrote: > Hi Timur, > > You have to use `createTypeInfomation` method in `org.apache.flink.api` > package to create TypeInformation object for Scala-specific objects such as > case classes, tuples, eithers, options. For example: > > ``` > import org.apache.flink.api.scala._ // to import package object > > val a: DataSet[Thing] = … > val b: DataSet[Thing] = … > > a.coGroup(b) > .where(e => (e.f1, e.f2)) > .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) { > (left, right) => 1 > }.print() > ``` > > Note that Flink creates internally copied 2-tuples consisted of (extracted > key by KeySelector, original value). So there is some performance decrease > when you are using KeySelector. > > Regards, > Chiwan Park > > > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > > > Thank you Chiwan! Yes, I understand that there are workarounds that > don't use function argument (and thus do not require implicit arguments). I > try to avoid positional and string-based keys because there is no compiler > guarantees when you refactor or accidentally change the underlying case > classes. Providing a function is the cleanest solution (and arguably is the > most readable) so it'd be great to make it work. > > > > BTW, TypeInformation.of has an implementation that takes TypeHint ( > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) > which, according to documentation, is supposed to be used for generic > classes, but using it still leads to the same exception. > > > > Thanks, > > Timur > > > > > > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> > wrote: > > Hi Timur, > > > > You can use a composite key [1] to compare keys consisting of multiple > fields. For example: > > > > ``` > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > a.coGroup(b) > > .where(“f1”, “f2”) // Flink compares the values of f1 first, and > compares the values of f2 if values of f1 are same. > > .equalTo(“f1”, “f2”) { // Note that you must specify same number of > keys > > (left, right) => 1 > > } > > ``` > > > > Composite key can be applied to Scala tuple also: > > > > ``` > > val a = env.fromCollection(Seq(("a", "b"), ("c", "d"))) > > val b = env.fromCollection(Seq(("a", "x"), ("z", "m"))) > > a.coGroup(b) > > .where(0, 1) // Note that field numbers start from 0. > > .equalTo(0, 1) { > > (left, right) => 1 > > } > > ``` > > > > I hope this helps. > > > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples > > > > Regards, > > Chiwan Park > > > > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > > > > > Hello, > > > > > > Another issue I have encountered is incorrect implicit resolution (I'm > using Scala 2.11.7). Here's the example (with a workaround): > > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > > a.coGroup(b) > > > .where(e => e.f1) > > > //.equalTo(e => e) { //this fails to compile because equalTo expects > an implicit > > > .equalTo("f1") { > > > (left, right) => 1 > > > } > > > However, the workaround does not quite work when key is a tuple (I > suspect this applies to other generic classes as well): > > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > > a.coGroup(b) > > > .where(e => (e.f1, e.f2)) > > > .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, > String)])) { (left, right) => 1} // throws InvalidProgramException > > > Here, I try to provide the implicit TypeInformation explicitly, but > apparently it's not compatible with the way implicit inference is done. > (TypeInformation I generate is GenericType<scala.Tuple2>, while > scala.Tuple2<String, String> is expected). > > > > > > Now, I can split this in 2 operations like below: > > > val tmp = a.coGroup(b) > > > .where(e => (e.f1, e.f2)) > > > .equalTo(e => (e.f1, e.f2)) > > > > > > tmp { (left, right) => 1} > > > but, I would like to avoid adding clutter to my processing logic, and > it's not entirely clear to me how this would be scheduled. > > > > > > As an option, I can hash the hell out of my keys like that: > > > a.coGroup(b) > > > .where(e => (e.f1, e.f2).hashCode) > > > .equalTo(e => (e.f1, > e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1} > > > but that, again, adds some indirection and clutter, not mentioning the > hassle of dealing with collisions (which can be alleviated by using fancy > hashes, but I'd like to avoid that). > > > > > > Any insights on what is the way to go here are highly appreciated. > > > > > > Thanks, > > > Timur > > > > > >