I'm very content with an extra `apply`, it's much cleaner than any of my initial solutions.
On Thu, Mar 31, 2016 at 2:18 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > I'm afraid there is no way around having that extra ".apply" because the > Scala compiler will get confused with the additional implicit parameter. > It's a bit ugly, though ... > > On Wed, 30 Mar 2016 at 18:34 Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > >> Actually, there is an even easier solution (which I saw in your reply to >> my other question): >> ``` >> a.coGroup(b) >> .where(e => (e.f1, e.f2)) >> .equalTo(e => (e.f1, e.f2)).apply { >> >> (left, right) => 1 >> }.print() >> ``` >> pretty much does what I want. Explicit `apply` gives a hint that a >> compiler was missing before. Nevertheless, `createTypeInformation` works >> too, thanks for sharing! >> >> Thanks, >> Timur >> >> On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <chiwanp...@apache.org> >> wrote: >> >>> Hi Timur, >>> >>> You have to use `createTypeInfomation` method in `org.apache.flink.api` >>> package to create TypeInformation object for Scala-specific objects such as >>> case classes, tuples, eithers, options. For example: >>> >>> ``` >>> import org.apache.flink.api.scala._ // to import package object >>> >>> val a: DataSet[Thing] = … >>> val b: DataSet[Thing] = … >>> >>> a.coGroup(b) >>> .where(e => (e.f1, e.f2)) >>> .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) { >>> (left, right) => 1 >>> }.print() >>> ``` >>> >>> Note that Flink creates internally copied 2-tuples consisted of >>> (extracted key by KeySelector, original value). So there is some >>> performance decrease when you are using KeySelector. >>> >>> Regards, >>> Chiwan Park >>> >>> > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com> >>> wrote: >>> > >>> > Thank you Chiwan! Yes, I understand that there are workarounds that >>> don't use function argument (and thus do not require implicit arguments). I >>> try to avoid positional and string-based keys because there is no compiler >>> guarantees when you refactor or accidentally change the underlying case >>> classes. Providing a function is the cleanest solution (and arguably is the >>> most readable) so it'd be great to make it work. >>> > >>> > BTW, TypeInformation.of has an implementation that takes TypeHint ( >>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) >>> which, according to documentation, is supposed to be used for generic >>> classes, but using it still leads to the same exception. >>> > >>> > Thanks, >>> > Timur >>> > >>> > >>> > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> >>> wrote: >>> > Hi Timur, >>> > >>> > You can use a composite key [1] to compare keys consisting of multiple >>> fields. For example: >>> > >>> > ``` >>> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) >>> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) >>> > a.coGroup(b) >>> > .where(“f1”, “f2”) // Flink compares the values of f1 first, and >>> compares the values of f2 if values of f1 are same. >>> > .equalTo(“f1”, “f2”) { // Note that you must specify same number of >>> keys >>> > (left, right) => 1 >>> > } >>> > ``` >>> > >>> > Composite key can be applied to Scala tuple also: >>> > >>> > ``` >>> > val a = env.fromCollection(Seq(("a", "b"), ("c", "d"))) >>> > val b = env.fromCollection(Seq(("a", "x"), ("z", "m"))) >>> > a.coGroup(b) >>> > .where(0, 1) // Note that field numbers start from 0. >>> > .equalTo(0, 1) { >>> > (left, right) => 1 >>> > } >>> > ``` >>> > >>> > I hope this helps. >>> > >>> > [1]: >>> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples >>> > >>> > Regards, >>> > Chiwan Park >>> > >>> > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov < >>> timur.fairu...@gmail.com> wrote: >>> > > >>> > > Hello, >>> > > >>> > > Another issue I have encountered is incorrect implicit resolution >>> (I'm using Scala 2.11.7). Here's the example (with a workaround): >>> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) >>> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) >>> > > a.coGroup(b) >>> > > .where(e => e.f1) >>> > > //.equalTo(e => e) { //this fails to compile because equalTo >>> expects an implicit >>> > > .equalTo("f1") { >>> > > (left, right) => 1 >>> > > } >>> > > However, the workaround does not quite work when key is a tuple (I >>> suspect this applies to other generic classes as well): >>> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) >>> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) >>> > > a.coGroup(b) >>> > > .where(e => (e.f1, e.f2)) >>> > > .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, >>> String)])) { (left, right) => 1} // throws InvalidProgramException >>> > > Here, I try to provide the implicit TypeInformation explicitly, but >>> apparently it's not compatible with the way implicit inference is done. >>> (TypeInformation I generate is GenericType<scala.Tuple2>, while >>> scala.Tuple2<String, String> is expected). >>> > > >>> > > Now, I can split this in 2 operations like below: >>> > > val tmp = a.coGroup(b) >>> > > .where(e => (e.f1, e.f2)) >>> > > .equalTo(e => (e.f1, e.f2)) >>> > > >>> > > tmp { (left, right) => 1} >>> > > but, I would like to avoid adding clutter to my processing logic, >>> and it's not entirely clear to me how this would be scheduled. >>> > > >>> > > As an option, I can hash the hell out of my keys like that: >>> > > a.coGroup(b) >>> > > .where(e => (e.f1, e.f2).hashCode) >>> > > .equalTo(e => (e.f1, >>> e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1} >>> > > but that, again, adds some indirection and clutter, not mentioning >>> the hassle of dealing with collisions (which can be alleviated by using >>> fancy hashes, but I'd like to avoid that). >>> > > >>> > > Any insights on what is the way to go here are highly appreciated. >>> > > >>> > > Thanks, >>> > > Timur >>> > >>> > >>> >>> >>