Hi Punit,

what you want to do is something like this:

    val env = ExecutionEnvironment.getExecutionEnvironment
    env.
      readTextFile("path/to/test.json").
      flatMap(line => JSON.parseFull(line)).
      print

The JSON.parseFull function in the Scala standard library takes a string (a
line coming from the text file you read) and outputs an Option[Any],
meaning it will output an object that represents the possibility of a
missing output (Option) wrapping Any, which has been (somewhat confusingly)
chosen to represent the actual parsed value (if present). If you "just"
mapped over the input you would've ended up with a DataSet[Option[Any]],
whereas your objective is to extract that inner type. FlatMap does just
that for you.

If you execute the code I've shown (with the correct path in the right
place) you'll see it'll print the same JSON in input, but in its Scala Map
representation. For more information on how to access data parsed by the
Scala standard library JSON parser, unfortunately I can't help you as I'm
not very familiar with it, but I'm pretty sure it's pretty well documented.

Hacking around with Flink is very fun, but before you move further I'd like
to point you to the excellent programming guide in the official
documentation [1]. I'm sure you'll find the reading very interesting and
worthwhile.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html

On Thu, Apr 28, 2016 at 12:44 PM, Punit Naik <naik.puni...@gmail.com> wrote:

> I had one more request though. I have been struggling with JSONs and Flink
> for the past two days since I started using it. I have a JSON file which
> has one JSON object per line and I want to read it and store it as maps in
> another flink Dataset. In my JSON the values might be anything, for e.g.
> int, double, map, array etc. I have attached a small two line input file
> and I request you to please implement the logic that I have explained above
> using flink. It would be a great help.
>
> On Thu, Apr 28, 2016 at 4:04 PM, Punit Naik <naik.puni...@gmail.com>
> wrote:
>
>> I managed to fix this error. I basically had to do val j=data.map { x =>
>> (x.replaceAll("\"","\\\"")) } instead of val j=data.map { x => ("\"\"\""+
>> x+"\"\"\"") }
>>
>> On Wed, Apr 27, 2016 at 4:05 PM, Punit Naik <naik.puni...@gmail.com>
>> wrote:
>>
>>> I have my Apache Flink program:
>>>
>>> import org.apache.flink.api.scala._import scala.util.parsing.json._
>>> object numHits extends App {
>>>     val env = ExecutionEnvironment.getExecutionEnvironment
>>>     val data=env.readTextFile("file:///path/to/json/file")
>>>     val j=data.map { x => ("\"\"\""+x+"\"\"\"") }
>>>     /*1*/ println( ((j.first(1).collect())(0)).getClass() )
>>>
>>>     /*2*/ println( ((j.first(1).collect())(0)) )
>>>
>>>     /*3*/ println( JSON.parseFull((j.first(1).collect())(0)) )
>>>     }
>>>
>>> I want to parse the input JSON file into normal scala Map and for that I
>>> am using the default scala.util.parsing.json._ library.
>>>
>>> The output of the first println statement is class java.lang.String
>>> which is required by the JSON parsing function.
>>>
>>> Output of the second println function is the actual JSON string
>>> appended and prepended by "\"\"\"" which is also required by the JSON
>>> parser.
>>>
>>> Now at this point if I copy the output of the second println command
>>> printed in the console and pass it to the JSON.parseFull() function, it
>>> properly parses it.
>>>
>>> Therefore the third println function should properly parse the same
>>> string passed to it but it does not as it outputs a "None" string which
>>> means it failed.
>>>
>>> Why does this happen and how can I make it work?
>>>
>>> On Wed, Apr 27, 2016 at 12:41 PM, Punit Naik <naik.puni...@gmail.com>
>>> wrote:
>>>
>>>> I just tried it and it still cannot parse it. It still takes the input
>>>> as a dataset object rather than a string.
>>>>
>>>> On Wed, Apr 27, 2016 at 12:36 PM, Punit Naik <naik.puni...@gmail.com>
>>>> wrote:
>>>>
>>>>> Okay Thanks a lot Fabian!
>>>>>
>>>>> On Wed, Apr 27, 2016 at 12:34 PM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> You should do the parsing in a Map operator. Map applies the
>>>>>> MapFunction to
>>>>>> each element in the DataSet.
>>>>>> So you can either implement another MapFunction or extend the one you
>>>>>> have
>>>>>> to call the JSON parser.
>>>>>>
>>>>>> 2016-04-27 6:40 GMT+02:00 Punit Naik <naik.puni...@gmail.com>:
>>>>>>
>>>>>> > Hi
>>>>>> >
>>>>>> > So I managed to do the map part. I stuc with the "import
>>>>>> > scala.util.parsing.json._" library for parsing.
>>>>>> >
>>>>>> > First I read my JSON:
>>>>>> >
>>>>>> > val data=env.readTextFile("file:///home/punit/vik-in")
>>>>>> >
>>>>>> > Then I transformed it so that it can be parsed to a map:
>>>>>> >
>>>>>> > val j=data.map { x => ("\"\"\"").+(x).+("\"\"\"") }
>>>>>> >
>>>>>> >
>>>>>> > I check it by printing "j"s 1st value and its proper.
>>>>>> >
>>>>>> > But when I tried to parse "j" like this:
>>>>>> >
>>>>>> > JSON.parseFull(j.first(1)) ; it did not parse because the object
>>>>>> > "j.first(1)" is still a Dataset object and not a String object.
>>>>>> >
>>>>>> > So how can I get the underlying java object from the dataset object?
>>>>>> >
>>>>>> > On Tue, Apr 26, 2016 at 3:32 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > > Hi,
>>>>>> > >
>>>>>> > > you need to implement the MapFunction interface [1].
>>>>>> > > Inside the MapFunction you can use any JSON parser library such as
>>>>>> > Jackson
>>>>>> > > to parse the String.
>>>>>> > > The exact logic depends on your use case.
>>>>>> > >
>>>>>> > > However, you should be careful to not initialize a new parser in
>>>>>> each
>>>>>> > map()
>>>>>> > > call, because that would be quite expensive.
>>>>>> > > I recommend to extend the RichMapFunction and instantiate a
>>>>>> parser in the
>>>>>> > > open() method.
>>>>>> > >
>>>>>> > > Best, Fabian
>>>>>> > >
>>>>>> > > [1]
>>>>>> > >
>>>>>> > >
>>>>>> >
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#map
>>>>>> > > [2]
>>>>>> > >
>>>>>> > >
>>>>>> >
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-transformation-functions
>>>>>> > >
>>>>>> > > 2016-04-26 10:44 GMT+02:00 Punit Naik <naik.puni...@gmail.com>:
>>>>>> > >
>>>>>> > > > Hi Fabian
>>>>>> > > >
>>>>>> > > > Thanks for the reply. Yes my json is separated by new lines. It
>>>>>> would
>>>>>> > > have
>>>>>> > > > been great if you had explained the function that goes inside
>>>>>> the map.
>>>>>> > I
>>>>>> > > > tried to use the 'scala.util.parsing.json._' library but got no
>>>>>> luck.
>>>>>> > > >
>>>>>> > > > On Tue, Apr 26, 2016 at 1:11 PM, Fabian Hueske <
>>>>>> fhue...@gmail.com>
>>>>>> > > wrote:
>>>>>> > > >
>>>>>> > > > > Hi Punit,
>>>>>> > > > >
>>>>>> > > > > JSON can be hard to parse in parallel due to its nested
>>>>>> structure. It
>>>>>> > > > > depends on the schema and (textual) representation of the JSON
>>>>>> > whether
>>>>>> > > > and
>>>>>> > > > > how it can be done. The problem is that a parallel input
>>>>>> format needs
>>>>>> > > to
>>>>>> > > > be
>>>>>> > > > > able to identify record boundaries without context
>>>>>> information. This
>>>>>> > > can
>>>>>> > > > be
>>>>>> > > > > very easy, if your JSON data is a list of JSON objects which
>>>>>> are
>>>>>> > > > separated
>>>>>> > > > > by a new line character. However, this is hard to generalize.
>>>>>> That's
>>>>>> > > why
>>>>>> > > > > Flink does not offer tooling for it (yet).
>>>>>> > > > >
>>>>>> > > > > If your JSON objects are separated by new line characters, the
>>>>>> > easiest
>>>>>> > > > way
>>>>>> > > > > is to read it as text file, where each line results in a
>>>>>> String and
>>>>>> > > parse
>>>>>> > > > > each object using a standard JSON parser. This would look
>>>>>> like:
>>>>>> > > > >
>>>>>> > > > > ExecutionEnvironment env =
>>>>>> > > > ExecutionEnvironment.getExecutionEnvironment();
>>>>>> > > > >
>>>>>> > > > > DataSet<String> text = env.readTextFile("/path/to/jsonfile");
>>>>>> > > > > DataSet<YourObject> json = text.map(new
>>>>>> > > > YourMapFunctionWhichParsesJSON());
>>>>>> > > > >
>>>>>> > > > > Best, Fabian
>>>>>> > > > >
>>>>>> > > > > 2016-04-26 8:06 GMT+02:00 Punit Naik <naik.puni...@gmail.com
>>>>>> >:
>>>>>> > > > >
>>>>>> > > > > > Hi
>>>>>> > > > > >
>>>>>> > > > > > I am new to Flink. I was experimenting with the Dataset API
>>>>>> and
>>>>>> > found
>>>>>> > > > out
>>>>>> > > > > > that there is no explicit method for loading a JSON file as
>>>>>> input.
>>>>>> > > Can
>>>>>> > > > > > anyone please suggest me a workaround?
>>>>>> > > > > >
>>>>>> > > > > > --
>>>>>> > > > > > Thank You
>>>>>> > > > > >
>>>>>> > > > > > Regards
>>>>>> > > > > >
>>>>>> > > > > > Punit Naik
>>>>>> > > > > >
>>>>>> > > > >
>>>>>> > > >
>>>>>> > > >
>>>>>> > > >
>>>>>> > > > --
>>>>>> > > > Thank You
>>>>>> > > >
>>>>>> > > > Regards
>>>>>> > > >
>>>>>> > > > Punit Naik
>>>>>> > > >
>>>>>> > >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > Thank You
>>>>>> >
>>>>>> > Regards
>>>>>> >
>>>>>> > Punit Naik
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thank You
>>>>>
>>>>> Regards
>>>>>
>>>>> Punit Naik
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Thank You
>>>>
>>>> Regards
>>>>
>>>> Punit Naik
>>>>
>>>
>>>
>>>
>>> --
>>> Thank You
>>>
>>> Regards
>>>
>>> Punit Naik
>>>
>>
>>
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Reply via email to