I had one more request though. I have been struggling with JSONs and Flink
for the past two days since I started using it. I have a JSON file which
has one JSON object per line and I want to read it and store it as maps in
another flink Dataset. In my JSON the values might be anything, for e.g.
int, double, map, array etc. I have attached a small two line input file
and I request you to please implement the logic that I have explained above
using flink. It would be a great help.

On Thu, Apr 28, 2016 at 4:04 PM, Punit Naik <naik.puni...@gmail.com> wrote:

> I managed to fix this error. I basically had to do val j=data.map { x => (
> x.replaceAll("\"","\\\"")) } instead of val j=data.map { x => ("\"\"\""+x+
> "\"\"\"") }
>
> On Wed, Apr 27, 2016 at 4:05 PM, Punit Naik <naik.puni...@gmail.com>
> wrote:
>
>> I have my Apache Flink program:
>>
>> import org.apache.flink.api.scala._import scala.util.parsing.json._
>> object numHits extends App {
>>     val env = ExecutionEnvironment.getExecutionEnvironment
>>     val data=env.readTextFile("file:///path/to/json/file")
>>     val j=data.map { x => ("\"\"\""+x+"\"\"\"") }
>>     /*1*/ println( ((j.first(1).collect())(0)).getClass() )
>>
>>     /*2*/ println( ((j.first(1).collect())(0)) )
>>
>>     /*3*/ println( JSON.parseFull((j.first(1).collect())(0)) )
>>     }
>>
>> I want to parse the input JSON file into normal scala Map and for that I
>> am using the default scala.util.parsing.json._ library.
>>
>> The output of the first println statement is class java.lang.String
>> which is required by the JSON parsing function.
>>
>> Output of the second println function is the actual JSON string appended
>> and prepended by "\"\"\"" which is also required by the JSON parser.
>>
>> Now at this point if I copy the output of the second println command
>> printed in the console and pass it to the JSON.parseFull() function, it
>> properly parses it.
>>
>> Therefore the third println function should properly parse the same
>> string passed to it but it does not as it outputs a "None" string which
>> means it failed.
>>
>> Why does this happen and how can I make it work?
>>
>> On Wed, Apr 27, 2016 at 12:41 PM, Punit Naik <naik.puni...@gmail.com>
>> wrote:
>>
>>> I just tried it and it still cannot parse it. It still takes the input
>>> as a dataset object rather than a string.
>>>
>>> On Wed, Apr 27, 2016 at 12:36 PM, Punit Naik <naik.puni...@gmail.com>
>>> wrote:
>>>
>>>> Okay Thanks a lot Fabian!
>>>>
>>>> On Wed, Apr 27, 2016 at 12:34 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> You should do the parsing in a Map operator. Map applies the
>>>>> MapFunction to
>>>>> each element in the DataSet.
>>>>> So you can either implement another MapFunction or extend the one you
>>>>> have
>>>>> to call the JSON parser.
>>>>>
>>>>> 2016-04-27 6:40 GMT+02:00 Punit Naik <naik.puni...@gmail.com>:
>>>>>
>>>>> > Hi
>>>>> >
>>>>> > So I managed to do the map part. I stuc with the "import
>>>>> > scala.util.parsing.json._" library for parsing.
>>>>> >
>>>>> > First I read my JSON:
>>>>> >
>>>>> > val data=env.readTextFile("file:///home/punit/vik-in")
>>>>> >
>>>>> > Then I transformed it so that it can be parsed to a map:
>>>>> >
>>>>> > val j=data.map { x => ("\"\"\"").+(x).+("\"\"\"") }
>>>>> >
>>>>> >
>>>>> > I check it by printing "j"s 1st value and its proper.
>>>>> >
>>>>> > But when I tried to parse "j" like this:
>>>>> >
>>>>> > JSON.parseFull(j.first(1)) ; it did not parse because the object
>>>>> > "j.first(1)" is still a Dataset object and not a String object.
>>>>> >
>>>>> > So how can I get the underlying java object from the dataset object?
>>>>> >
>>>>> > On Tue, Apr 26, 2016 at 3:32 PM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> > > Hi,
>>>>> > >
>>>>> > > you need to implement the MapFunction interface [1].
>>>>> > > Inside the MapFunction you can use any JSON parser library such as
>>>>> > Jackson
>>>>> > > to parse the String.
>>>>> > > The exact logic depends on your use case.
>>>>> > >
>>>>> > > However, you should be careful to not initialize a new parser in
>>>>> each
>>>>> > map()
>>>>> > > call, because that would be quite expensive.
>>>>> > > I recommend to extend the RichMapFunction and instantiate a parser
>>>>> in the
>>>>> > > open() method.
>>>>> > >
>>>>> > > Best, Fabian
>>>>> > >
>>>>> > > [1]
>>>>> > >
>>>>> > >
>>>>> >
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#map
>>>>> > > [2]
>>>>> > >
>>>>> > >
>>>>> >
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-transformation-functions
>>>>> > >
>>>>> > > 2016-04-26 10:44 GMT+02:00 Punit Naik <naik.puni...@gmail.com>:
>>>>> > >
>>>>> > > > Hi Fabian
>>>>> > > >
>>>>> > > > Thanks for the reply. Yes my json is separated by new lines. It
>>>>> would
>>>>> > > have
>>>>> > > > been great if you had explained the function that goes inside
>>>>> the map.
>>>>> > I
>>>>> > > > tried to use the 'scala.util.parsing.json._' library but got no
>>>>> luck.
>>>>> > > >
>>>>> > > > On Tue, Apr 26, 2016 at 1:11 PM, Fabian Hueske <
>>>>> fhue...@gmail.com>
>>>>> > > wrote:
>>>>> > > >
>>>>> > > > > Hi Punit,
>>>>> > > > >
>>>>> > > > > JSON can be hard to parse in parallel due to its nested
>>>>> structure. It
>>>>> > > > > depends on the schema and (textual) representation of the JSON
>>>>> > whether
>>>>> > > > and
>>>>> > > > > how it can be done. The problem is that a parallel input
>>>>> format needs
>>>>> > > to
>>>>> > > > be
>>>>> > > > > able to identify record boundaries without context
>>>>> information. This
>>>>> > > can
>>>>> > > > be
>>>>> > > > > very easy, if your JSON data is a list of JSON objects which
>>>>> are
>>>>> > > > separated
>>>>> > > > > by a new line character. However, this is hard to generalize.
>>>>> That's
>>>>> > > why
>>>>> > > > > Flink does not offer tooling for it (yet).
>>>>> > > > >
>>>>> > > > > If your JSON objects are separated by new line characters, the
>>>>> > easiest
>>>>> > > > way
>>>>> > > > > is to read it as text file, where each line results in a
>>>>> String and
>>>>> > > parse
>>>>> > > > > each object using a standard JSON parser. This would look like:
>>>>> > > > >
>>>>> > > > > ExecutionEnvironment env =
>>>>> > > > ExecutionEnvironment.getExecutionEnvironment();
>>>>> > > > >
>>>>> > > > > DataSet<String> text = env.readTextFile("/path/to/jsonfile");
>>>>> > > > > DataSet<YourObject> json = text.map(new
>>>>> > > > YourMapFunctionWhichParsesJSON());
>>>>> > > > >
>>>>> > > > > Best, Fabian
>>>>> > > > >
>>>>> > > > > 2016-04-26 8:06 GMT+02:00 Punit Naik <naik.puni...@gmail.com>:
>>>>> > > > >
>>>>> > > > > > Hi
>>>>> > > > > >
>>>>> > > > > > I am new to Flink. I was experimenting with the Dataset API
>>>>> and
>>>>> > found
>>>>> > > > out
>>>>> > > > > > that there is no explicit method for loading a JSON file as
>>>>> input.
>>>>> > > Can
>>>>> > > > > > anyone please suggest me a workaround?
>>>>> > > > > >
>>>>> > > > > > --
>>>>> > > > > > Thank You
>>>>> > > > > >
>>>>> > > > > > Regards
>>>>> > > > > >
>>>>> > > > > > Punit Naik
>>>>> > > > > >
>>>>> > > > >
>>>>> > > >
>>>>> > > >
>>>>> > > >
>>>>> > > > --
>>>>> > > > Thank You
>>>>> > > >
>>>>> > > > Regards
>>>>> > > >
>>>>> > > > Punit Naik
>>>>> > > >
>>>>> > >
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Thank You
>>>>> >
>>>>> > Regards
>>>>> >
>>>>> > Punit Naik
>>>>> >
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Thank You
>>>>
>>>> Regards
>>>>
>>>> Punit Naik
>>>>
>>>
>>>
>>>
>>> --
>>> Thank You
>>>
>>> Regards
>>>
>>> Punit Naik
>>>
>>
>>
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>



-- 
Thank You

Regards

Punit Naik

Attachment: test.json
Description: application/json

Reply via email to