The `get` method on the Scala map returns an Option, which is not
(currently) a valid key type for Flink (but there's ongoing work on this
[1]). Flink must be aware of how to use a particular type as a key if you
want to group by a value of said type. See the advanced DataSet concepts in
the official Flink training for more info on this [2].

If you're just playing around the easy way to make it work is to directly
apply the key to the map (or use the apply method). Beware that you're
prone to exceptions in this way. A cleaner solution would be to write your
own KeySelector for the Option type.

val k=j.groupBy(_("ga_date")) or
val k=j.groupBy(ga => ga("ga_date")) or
val k=j.groupBy(_.apply("ga_date"))

As a side note, I believe the user mailing list may be more appropriate for
this kind of issues.

[1]: https://issues.apache.org/jira/browse/FLINK-2673
[2]:
http://dataartisans.github.io/flink-training/dataSetAdvanced/slides.html

On Fri, Apr 29, 2016 at 10:13 AM, Punit Naik <naik.puni...@gmail.com> wrote:

> Below is my code:
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val data=env.readTextFile("file:///home/punit/test").flatMap( line =>
> JSON.parseFull(line) )
> val j=data.flatMap{ _ match {case map: Map[String, Any] =>
>                             {List(Map("ga_date" ->
> map.get("ga_dateHour").get.toString().substring(0,
> map.get("ga_dateHour").get.toString().length()-2))) }}}
>
> val k=j.groupBy(_.get("ga_date"))
>
> But when I execute this, it throws an exception saying:
>
> org.apache.flink.api.common.InvalidProgramException: Return type
> Option[String] of KeySelector class
> org.apache.flink.api.scala.DataSet$$anon$12 is not a valid key type
>
> Where am I going wrong?
> --
> Thank You
>
> Regards
>
> Punit Naik
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Reply via email to