Hi Timur, Because Option[T] is not comparable type generally (if T is a POJO type), you cannot use Option[T] as a key type. I think you have to implement KeyExtractor to compare objects including Option[T]s.
``` case class MyKey(k1: Option[String], k2: Option[String]) val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"), Some("c"))) val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"), Some("c"))) data1.join(data2) .where(_.hashCode()) .equalTo(_.hashCode()).apply { (left: MyKey, right: MyKey) => (left, right) }.print() ``` Note that the approach in example (using hashCode()) cannot be applied to sort task. Regards, Chiwan Park > On Mar 30, 2016, at 2:37 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > > There is some more detail to this question that I missed initially. It turns > out that my key is a case class of a form MyKey(k1: Option[String], k2: > Option[String]). Keys.SelectorFunctionKeys is performing a recursive check > whether every element of the MyKey class can be a key and fails when > encountering an Option. > > Is it possible to work around this situation without giving up Options? > Inability to use Options in Domain objects could be really frustrating. > > Thanks, > Timur > > On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > Hello, > > I'm evaluating Flink and one thing I noticed is Option[A] can't be used as a > key for coGroup (looking specifically here: > https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39). > I'm not clear about the reason of this and appreciate if someone can explain. > > Thanks, > Timur >