As far as I can see from the example, you are trying to filter by key and 
„flatten“ nested maps for each record. Both, data set and data stream API (from 
the question it is unclear which one you would like to use, but it works with 
both) provide transformations that can do this for you. For an overview, please 
take a look at 
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html
 
<https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html>
 . I think a combination of Filter and Map (if one record is transformed to one 
element) or FlatMap (if one record is transformed to an arbitrary number of 
elements) is probably what you are looking for. Filter can do the same as val 
x1 = m.filterKeys { s1.contains(_) } and Map can implement the unnesting and 
concatenation of the maps.

Best,
Stefan 

> Am 23.07.2016 um 08:14 schrieb Pauline Yeung (yeungp) <yeu...@cisco.com>:
> 
> I have a file, which each line is one json record
> 
> I run the following
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val data = env.readTextFile("file:///somefile" <file:///somefile%22>)
>                .map(line => JSON.parseFull(line))
> 
> and get the following for one json record.  For simplicity, the keys and 
> values are modified, and not all keys and values are shown.
> 
> Some(Map(a->1, b->2, c->3, d->Map(e->5, f->6, g->7))
> 
> How do I create a new data set from selected keys values from this data set?  
>  E.g. Map(a -> 1, b -> 2, e -> 5, f -> 6)
> 
> To do the equivalent of the following.
> 
> scala> val m = Map('a'->1, 'b'->2, 'c'->3, 'd'->Map('e'->5, 'f'->6, 'g'->7))
> m: scala.collection.immutable.Map[Char,Any] = Map(a -> 1, b -> 2, c -> 3, d 
> -> Map(e -> 5, f -> 6, g -> 7))
> scala> val s1 = Set('a', 'b')
> s1: scala.collection.immutable.Set[Char] = Set(a, b)
> scala> val x1 = m.filterKeys { s1.contains(_) }
> x1: scala.collection.immutable.Map[Char,Any] = Map(a -> 1, b -> 2)
> 
> scala> val n = m('d').asInstanceOf[Map[Char,Any]]
> p: Map[Char,Any] = Map(e -> 5, f -> 6, g -> 7)
> scala> val s2 = Set('e', 'f')
> s2: scala.collection.immutable.Set[Char] = Set(e, f)
> scala> val x2 = n.filterKeys { s2.contains(_) }
> x2: scala.collection.immutable.Map[Char,Any] = Map(e -> 5, f -> 6)
> 
> scala> x1 ++ x2
> res28: scala.collection.immutable.Map[Char,Any] = Map(a -> 1, b -> 2, e -> 5, 
> f -> 6)
> 
> Thanks
> Pauline

Reply via email to