I'm afraid there is no way around having that extra ".apply" because the Scala compiler will get confused with the additional implicit parameter. It's a bit ugly, though ...
On Wed, 30 Mar 2016 at 18:34 Timur Fayruzov <timur.fairu...@gmail.com> wrote: > Actually, there is an even easier solution (which I saw in your reply to > my other question): > ``` > a.coGroup(b) > .where(e => (e.f1, e.f2)) > .equalTo(e => (e.f1, e.f2)).apply { > > (left, right) => 1 > }.print() > ``` > pretty much does what I want. Explicit `apply` gives a hint that a > compiler was missing before. Nevertheless, `createTypeInformation` works > too, thanks for sharing! > > Thanks, > Timur > > On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <chiwanp...@apache.org> > wrote: > >> Hi Timur, >> >> You have to use `createTypeInfomation` method in `org.apache.flink.api` >> package to create TypeInformation object for Scala-specific objects such as >> case classes, tuples, eithers, options. For example: >> >> ``` >> import org.apache.flink.api.scala._ // to import package object >> >> val a: DataSet[Thing] = … >> val b: DataSet[Thing] = … >> >> a.coGroup(b) >> .where(e => (e.f1, e.f2)) >> .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) { >> (left, right) => 1 >> }.print() >> ``` >> >> Note that Flink creates internally copied 2-tuples consisted of >> (extracted key by KeySelector, original value). So there is some >> performance decrease when you are using KeySelector. >> >> Regards, >> Chiwan Park >> >> > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com> >> wrote: >> > >> > Thank you Chiwan! Yes, I understand that there are workarounds that >> don't use function argument (and thus do not require implicit arguments). I >> try to avoid positional and string-based keys because there is no compiler >> guarantees when you refactor or accidentally change the underlying case >> classes. Providing a function is the cleanest solution (and arguably is the >> most readable) so it'd be great to make it work. >> > >> > BTW, TypeInformation.of has an implementation that takes TypeHint ( >> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) >> which, according to documentation, is supposed to be used for generic >> classes, but using it still leads to the same exception. >> > >> > Thanks, >> > Timur >> > >> > >> > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> >> wrote: >> > Hi Timur, >> > >> > You can use a composite key [1] to compare keys consisting of multiple >> fields. For example: >> > >> > ``` >> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) >> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) >> > a.coGroup(b) >> > .where(“f1”, “f2”) // Flink compares the values of f1 first, and >> compares the values of f2 if values of f1 are same. >> > .equalTo(“f1”, “f2”) { // Note that you must specify same number of >> keys >> > (left, right) => 1 >> > } >> > ``` >> > >> > Composite key can be applied to Scala tuple also: >> > >> > ``` >> > val a = env.fromCollection(Seq(("a", "b"), ("c", "d"))) >> > val b = env.fromCollection(Seq(("a", "x"), ("z", "m"))) >> > a.coGroup(b) >> > .where(0, 1) // Note that field numbers start from 0. >> > .equalTo(0, 1) { >> > (left, right) => 1 >> > } >> > ``` >> > >> > I hope this helps. >> > >> > [1]: >> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples >> > >> > Regards, >> > Chiwan Park >> > >> > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com> >> wrote: >> > > >> > > Hello, >> > > >> > > Another issue I have encountered is incorrect implicit resolution >> (I'm using Scala 2.11.7). Here's the example (with a workaround): >> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) >> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) >> > > a.coGroup(b) >> > > .where(e => e.f1) >> > > //.equalTo(e => e) { //this fails to compile because equalTo >> expects an implicit >> > > .equalTo("f1") { >> > > (left, right) => 1 >> > > } >> > > However, the workaround does not quite work when key is a tuple (I >> suspect this applies to other generic classes as well): >> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) >> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) >> > > a.coGroup(b) >> > > .where(e => (e.f1, e.f2)) >> > > .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, >> String)])) { (left, right) => 1} // throws InvalidProgramException >> > > Here, I try to provide the implicit TypeInformation explicitly, but >> apparently it's not compatible with the way implicit inference is done. >> (TypeInformation I generate is GenericType<scala.Tuple2>, while >> scala.Tuple2<String, String> is expected). >> > > >> > > Now, I can split this in 2 operations like below: >> > > val tmp = a.coGroup(b) >> > > .where(e => (e.f1, e.f2)) >> > > .equalTo(e => (e.f1, e.f2)) >> > > >> > > tmp { (left, right) => 1} >> > > but, I would like to avoid adding clutter to my processing logic, and >> it's not entirely clear to me how this would be scheduled. >> > > >> > > As an option, I can hash the hell out of my keys like that: >> > > a.coGroup(b) >> > > .where(e => (e.f1, e.f2).hashCode) >> > > .equalTo(e => (e.f1, >> e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1} >> > > but that, again, adds some indirection and clutter, not mentioning >> the hassle of dealing with collisions (which can be alleviated by using >> fancy hashes, but I'd like to avoid that). >> > > >> > > Any insights on what is the way to go here are highly appreciated. >> > > >> > > Thanks, >> > > Timur >> > >> > >> >> >