Thank you Chiwan! Yes, I understand that there are workarounds that don't
use function argument (and thus do not require implicit arguments). I try
to avoid positional and string-based keys because there is no compiler
guarantees when you refactor or accidentally change the underlying case
classes. Providing a function is the cleanest solution (and arguably is the
most readable) so it'd be great to make it work.

BTW, TypeInformation.of has an implementation that takes TypeHint (
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java)
which, according to documentation, is supposed to be used for generic
classes, but using it still leads to the same exception.

Thanks,
Timur


On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Timur,
>
> You can use a composite key [1] to compare keys consisting of multiple
> fields. For example:
>
> ```
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(“f1”, “f2”) // Flink compares the values of f1 first, and
> compares the values of f2 if values of f1 are same.
>   .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
>     (left, right) => 1
>   }
> ```
>
> Composite key can be applied to Scala tuple also:
>
> ```
> val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> a.coGroup(b)
>   .where(0, 1) // Note that field numbers start from 0.
>   .equalTo(0, 1) {
>     (left, right) => 1
>   }
> ```
>
> I hope this helps.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
>
> Regards,
> Chiwan Park
>
> > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> >
> > Hello,
> >
> > Another issue I have encountered is incorrect implicit resolution (I'm
> using Scala 2.11.7). Here's the example (with a workaround):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => e.f1)
> >   //.equalTo(e => e) { //this fails to compile because equalTo expects
> an implicit
> >   .equalTo("f1") {
> >     (left, right) => 1
> >   }
> > However, the workaround does not quite work when key is a tuple (I
> suspect this applies to other generic classes as well):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String,
> String)])) { (left, right) => 1} // throws InvalidProgramException
> > Here, I try to provide the implicit TypeInformation explicitly, but
> apparently it's not compatible with the way implicit inference is done.
> (TypeInformation I generate is GenericType<scala.Tuple2>, while
> scala.Tuple2<String, String> is expected).
> >
> > Now, I can split this in 2 operations like below:
> > val tmp = a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))
> >
> > tmp { (left, right) => 1}
> > but, I would like to avoid adding clutter to my processing logic, and
> it's not entirely clear to me how this would be scheduled.
> >
> > As an option, I can hash the hell out of my keys like that:
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2).hashCode)
> >   .equalTo(e => (e.f1,
> e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
> > but that, again, adds some indirection and clutter, not mentioning the
> hassle of dealing with collisions (which can be alleviated by using fancy
> hashes, but I'd like to avoid that).
> >
> > Any insights on what is the way to go here are highly appreciated.
> >
> > Thanks,
> > Timur
>
>

Reply via email to