I had one more request though. I have been struggling with JSONs and Flink for the past two days since I started using it. I have a JSON file which has one JSON object per line and I want to read it and store it as maps in another flink Dataset. In my JSON the values might be anything, for e.g. int, double, map, array etc. I have attached a small two line input file and I request you to please implement the logic that I have explained above using flink. It would be a great help.
On Thu, Apr 28, 2016 at 4:04 PM, Punit Naik <naik.puni...@gmail.com> wrote: > I managed to fix this error. I basically had to do val j=data.map { x => ( > x.replaceAll("\"","\\\"")) } instead of val j=data.map { x => ("\"\"\""+x+ > "\"\"\"") } > > On Wed, Apr 27, 2016 at 4:05 PM, Punit Naik <naik.puni...@gmail.com> > wrote: > >> I have my Apache Flink program: >> >> import org.apache.flink.api.scala._import scala.util.parsing.json._ >> object numHits extends App { >> val env = ExecutionEnvironment.getExecutionEnvironment >> val data=env.readTextFile("file:///path/to/json/file") >> val j=data.map { x => ("\"\"\""+x+"\"\"\"") } >> /*1*/ println( ((j.first(1).collect())(0)).getClass() ) >> >> /*2*/ println( ((j.first(1).collect())(0)) ) >> >> /*3*/ println( JSON.parseFull((j.first(1).collect())(0)) ) >> } >> >> I want to parse the input JSON file into normal scala Map and for that I >> am using the default scala.util.parsing.json._ library. >> >> The output of the first println statement is class java.lang.String >> which is required by the JSON parsing function. >> >> Output of the second println function is the actual JSON string appended >> and prepended by "\"\"\"" which is also required by the JSON parser. >> >> Now at this point if I copy the output of the second println command >> printed in the console and pass it to the JSON.parseFull() function, it >> properly parses it. >> >> Therefore the third println function should properly parse the same >> string passed to it but it does not as it outputs a "None" string which >> means it failed. >> >> Why does this happen and how can I make it work? >> >> On Wed, Apr 27, 2016 at 12:41 PM, Punit Naik <naik.puni...@gmail.com> >> wrote: >> >>> I just tried it and it still cannot parse it. It still takes the input >>> as a dataset object rather than a string. >>> >>> On Wed, Apr 27, 2016 at 12:36 PM, Punit Naik <naik.puni...@gmail.com> >>> wrote: >>> >>>> Okay Thanks a lot Fabian! >>>> >>>> On Wed, Apr 27, 2016 at 12:34 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> You should do the parsing in a Map operator. Map applies the >>>>> MapFunction to >>>>> each element in the DataSet. >>>>> So you can either implement another MapFunction or extend the one you >>>>> have >>>>> to call the JSON parser. >>>>> >>>>> 2016-04-27 6:40 GMT+02:00 Punit Naik <naik.puni...@gmail.com>: >>>>> >>>>> > Hi >>>>> > >>>>> > So I managed to do the map part. I stuc with the "import >>>>> > scala.util.parsing.json._" library for parsing. >>>>> > >>>>> > First I read my JSON: >>>>> > >>>>> > val data=env.readTextFile("file:///home/punit/vik-in") >>>>> > >>>>> > Then I transformed it so that it can be parsed to a map: >>>>> > >>>>> > val j=data.map { x => ("\"\"\"").+(x).+("\"\"\"") } >>>>> > >>>>> > >>>>> > I check it by printing "j"s 1st value and its proper. >>>>> > >>>>> > But when I tried to parse "j" like this: >>>>> > >>>>> > JSON.parseFull(j.first(1)) ; it did not parse because the object >>>>> > "j.first(1)" is still a Dataset object and not a String object. >>>>> > >>>>> > So how can I get the underlying java object from the dataset object? >>>>> > >>>>> > On Tue, Apr 26, 2016 at 3:32 PM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> > >>>>> > > Hi, >>>>> > > >>>>> > > you need to implement the MapFunction interface [1]. >>>>> > > Inside the MapFunction you can use any JSON parser library such as >>>>> > Jackson >>>>> > > to parse the String. >>>>> > > The exact logic depends on your use case. >>>>> > > >>>>> > > However, you should be careful to not initialize a new parser in >>>>> each >>>>> > map() >>>>> > > call, because that would be quite expensive. >>>>> > > I recommend to extend the RichMapFunction and instantiate a parser >>>>> in the >>>>> > > open() method. >>>>> > > >>>>> > > Best, Fabian >>>>> > > >>>>> > > [1] >>>>> > > >>>>> > > >>>>> > >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#map >>>>> > > [2] >>>>> > > >>>>> > > >>>>> > >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-transformation-functions >>>>> > > >>>>> > > 2016-04-26 10:44 GMT+02:00 Punit Naik <naik.puni...@gmail.com>: >>>>> > > >>>>> > > > Hi Fabian >>>>> > > > >>>>> > > > Thanks for the reply. Yes my json is separated by new lines. It >>>>> would >>>>> > > have >>>>> > > > been great if you had explained the function that goes inside >>>>> the map. >>>>> > I >>>>> > > > tried to use the 'scala.util.parsing.json._' library but got no >>>>> luck. >>>>> > > > >>>>> > > > On Tue, Apr 26, 2016 at 1:11 PM, Fabian Hueske < >>>>> fhue...@gmail.com> >>>>> > > wrote: >>>>> > > > >>>>> > > > > Hi Punit, >>>>> > > > > >>>>> > > > > JSON can be hard to parse in parallel due to its nested >>>>> structure. It >>>>> > > > > depends on the schema and (textual) representation of the JSON >>>>> > whether >>>>> > > > and >>>>> > > > > how it can be done. The problem is that a parallel input >>>>> format needs >>>>> > > to >>>>> > > > be >>>>> > > > > able to identify record boundaries without context >>>>> information. This >>>>> > > can >>>>> > > > be >>>>> > > > > very easy, if your JSON data is a list of JSON objects which >>>>> are >>>>> > > > separated >>>>> > > > > by a new line character. However, this is hard to generalize. >>>>> That's >>>>> > > why >>>>> > > > > Flink does not offer tooling for it (yet). >>>>> > > > > >>>>> > > > > If your JSON objects are separated by new line characters, the >>>>> > easiest >>>>> > > > way >>>>> > > > > is to read it as text file, where each line results in a >>>>> String and >>>>> > > parse >>>>> > > > > each object using a standard JSON parser. This would look like: >>>>> > > > > >>>>> > > > > ExecutionEnvironment env = >>>>> > > > ExecutionEnvironment.getExecutionEnvironment(); >>>>> > > > > >>>>> > > > > DataSet<String> text = env.readTextFile("/path/to/jsonfile"); >>>>> > > > > DataSet<YourObject> json = text.map(new >>>>> > > > YourMapFunctionWhichParsesJSON()); >>>>> > > > > >>>>> > > > > Best, Fabian >>>>> > > > > >>>>> > > > > 2016-04-26 8:06 GMT+02:00 Punit Naik <naik.puni...@gmail.com>: >>>>> > > > > >>>>> > > > > > Hi >>>>> > > > > > >>>>> > > > > > I am new to Flink. I was experimenting with the Dataset API >>>>> and >>>>> > found >>>>> > > > out >>>>> > > > > > that there is no explicit method for loading a JSON file as >>>>> input. >>>>> > > Can >>>>> > > > > > anyone please suggest me a workaround? >>>>> > > > > > >>>>> > > > > > -- >>>>> > > > > > Thank You >>>>> > > > > > >>>>> > > > > > Regards >>>>> > > > > > >>>>> > > > > > Punit Naik >>>>> > > > > > >>>>> > > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > -- >>>>> > > > Thank You >>>>> > > > >>>>> > > > Regards >>>>> > > > >>>>> > > > Punit Naik >>>>> > > > >>>>> > > >>>>> > >>>>> > >>>>> > >>>>> > -- >>>>> > Thank You >>>>> > >>>>> > Regards >>>>> > >>>>> > Punit Naik >>>>> > >>>>> >>>> >>>> >>>> >>>> -- >>>> Thank You >>>> >>>> Regards >>>> >>>> Punit Naik >>>> >>> >>> >>> >>> -- >>> Thank You >>> >>> Regards >>> >>> Punit Naik >>> >> >> >> >> -- >> Thank You >> >> Regards >> >> Punit Naik >> > > > > -- > Thank You > > Regards > > Punit Naik > -- Thank You Regards Punit Naik
test.json
Description: application/json