Actually, there is an even easier solution (which I saw in your reply to my
other question):
```
a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2)).apply {
    (left, right) => 1
  }.print()
```
pretty much does what I want. Explicit `apply` gives a hint that a compiler
was missing before. Nevertheless, `createTypeInformation` works too, thanks
for sharing!

Thanks,
Timur

On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Timur,
>
> You have to use `createTypeInfomation` method in `org.apache.flink.api`
> package to create TypeInformation object for Scala-specific objects such as
> case classes, tuples, eithers, options. For example:
>
> ```
> import org.apache.flink.api.scala._ // to import package object
>
> val a: DataSet[Thing] = …
> val b: DataSet[Thing] = …
>
> a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
>     (left, right) => 1
>   }.print()
> ```
>
> Note that Flink creates internally copied 2-tuples consisted of (extracted
> key by KeySelector, original value). So there is some performance decrease
> when you are using KeySelector.
>
> Regards,
> Chiwan Park
>
> > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> >
> > Thank you Chiwan! Yes, I understand that there are workarounds that
> don't use function argument (and thus do not require implicit arguments). I
> try to avoid positional and string-based keys because there is no compiler
> guarantees when you refactor or accidentally change the underlying case
> classes. Providing a function is the cleanest solution (and arguably is the
> most readable) so it'd be great to make it work.
> >
> > BTW, TypeInformation.of has an implementation that takes TypeHint (
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java)
> which, according to documentation, is supposed to be used for generic
> classes, but using it still leads to the same exception.
> >
> > Thanks,
> > Timur
> >
> >
> > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org>
> wrote:
> > Hi Timur,
> >
> > You can use a composite key [1] to compare keys consisting of multiple
> fields. For example:
> >
> > ```
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(“f1”, “f2”) // Flink compares the values of f1 first, and
> compares the values of f2 if values of f1 are same.
> >   .equalTo(“f1”, “f2”) { // Note that you must specify same number of
> keys
> >     (left, right) => 1
> >   }
> > ```
> >
> > Composite key can be applied to Scala tuple also:
> >
> > ```
> > val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> > val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> > a.coGroup(b)
> >   .where(0, 1) // Note that field numbers start from 0.
> >   .equalTo(0, 1) {
> >     (left, right) => 1
> >   }
> > ```
> >
> > I hope this helps.
> >
> > [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
> >
> > Regards,
> > Chiwan Park
> >
> > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> > >
> > > Hello,
> > >
> > > Another issue I have encountered is incorrect implicit resolution (I'm
> using Scala 2.11.7). Here's the example (with a workaround):
> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > > a.coGroup(b)
> > >   .where(e => e.f1)
> > >   //.equalTo(e => e) { //this fails to compile because equalTo expects
> an implicit
> > >   .equalTo("f1") {
> > >     (left, right) => 1
> > >   }
> > > However, the workaround does not quite work when key is a tuple (I
> suspect this applies to other generic classes as well):
> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > > a.coGroup(b)
> > >   .where(e => (e.f1, e.f2))
> > >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String,
> String)])) { (left, right) => 1} // throws InvalidProgramException
> > > Here, I try to provide the implicit TypeInformation explicitly, but
> apparently it's not compatible with the way implicit inference is done.
> (TypeInformation I generate is GenericType<scala.Tuple2>, while
> scala.Tuple2<String, String> is expected).
> > >
> > > Now, I can split this in 2 operations like below:
> > > val tmp = a.coGroup(b)
> > >   .where(e => (e.f1, e.f2))
> > >   .equalTo(e => (e.f1, e.f2))
> > >
> > > tmp { (left, right) => 1}
> > > but, I would like to avoid adding clutter to my processing logic, and
> it's not entirely clear to me how this would be scheduled.
> > >
> > > As an option, I can hash the hell out of my keys like that:
> > > a.coGroup(b)
> > >   .where(e => (e.f1, e.f2).hashCode)
> > >   .equalTo(e => (e.f1,
> e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
> > > but that, again, adds some indirection and clutter, not mentioning the
> hassle of dealing with collisions (which can be alleviated by using fancy
> hashes, but I'd like to avoid that).
> > >
> > > Any insights on what is the way to go here are highly appreciated.
> > >
> > > Thanks,
> > > Timur
> >
> >
>
>

Reply via email to