I tried exactly what you told me. But when I execute this code, first of all it gives me a warning saying "Type Any has no fields that are visible from Scala Type analysis. Falling back to Java Type Analysis (TypeExtractor)." in eclipse, and when I run it, the code just hangs and does not print a thing.
On Thu, Apr 28, 2016 at 7:11 PM, Stefano Baghino < stefano.bagh...@radicalbit.io> wrote: > Hi Punit, > > what you want to do is something like this: > > val env = ExecutionEnvironment.getExecutionEnvironment > env. > readTextFile("path/to/test.json"). > flatMap(line => JSON.parseFull(line)). > print > > The JSON.parseFull function in the Scala standard library takes a string (a > line coming from the text file you read) and outputs an Option[Any], > meaning it will output an object that represents the possibility of a > missing output (Option) wrapping Any, which has been (somewhat confusingly) > chosen to represent the actual parsed value (if present). If you "just" > mapped over the input you would've ended up with a DataSet[Option[Any]], > whereas your objective is to extract that inner type. FlatMap does just > that for you. > > If you execute the code I've shown (with the correct path in the right > place) you'll see it'll print the same JSON in input, but in its Scala Map > representation. For more information on how to access data parsed by the > Scala standard library JSON parser, unfortunately I can't help you as I'm > not very familiar with it, but I'm pretty sure it's pretty well documented. > > Hacking around with Flink is very fun, but before you move further I'd like > to point you to the excellent programming guide in the official > documentation [1]. I'm sure you'll find the reading very interesting and > worthwhile. > > [1]: > > https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html > > On Thu, Apr 28, 2016 at 12:44 PM, Punit Naik <naik.puni...@gmail.com> > wrote: > > > I had one more request though. I have been struggling with JSONs and > Flink > > for the past two days since I started using it. I have a JSON file which > > has one JSON object per line and I want to read it and store it as maps > in > > another flink Dataset. In my JSON the values might be anything, for e.g. > > int, double, map, array etc. I have attached a small two line input file > > and I request you to please implement the logic that I have explained > above > > using flink. It would be a great help. > > > > On Thu, Apr 28, 2016 at 4:04 PM, Punit Naik <naik.puni...@gmail.com> > > wrote: > > > >> I managed to fix this error. I basically had to do val j=data.map { x => > >> (x.replaceAll("\"","\\\"")) } instead of val j=data.map { x => > ("\"\"\""+ > >> x+"\"\"\"") } > >> > >> On Wed, Apr 27, 2016 at 4:05 PM, Punit Naik <naik.puni...@gmail.com> > >> wrote: > >> > >>> I have my Apache Flink program: > >>> > >>> import org.apache.flink.api.scala._import scala.util.parsing.json._ > >>> object numHits extends App { > >>> val env = ExecutionEnvironment.getExecutionEnvironment > >>> val data=env.readTextFile("file:///path/to/json/file") > >>> val j=data.map { x => ("\"\"\""+x+"\"\"\"") } > >>> /*1*/ println( ((j.first(1).collect())(0)).getClass() ) > >>> > >>> /*2*/ println( ((j.first(1).collect())(0)) ) > >>> > >>> /*3*/ println( JSON.parseFull((j.first(1).collect())(0)) ) > >>> } > >>> > >>> I want to parse the input JSON file into normal scala Map and for that > I > >>> am using the default scala.util.parsing.json._ library. > >>> > >>> The output of the first println statement is class java.lang.String > >>> which is required by the JSON parsing function. > >>> > >>> Output of the second println function is the actual JSON string > >>> appended and prepended by "\"\"\"" which is also required by the JSON > >>> parser. > >>> > >>> Now at this point if I copy the output of the second println command > >>> printed in the console and pass it to the JSON.parseFull() function, it > >>> properly parses it. > >>> > >>> Therefore the third println function should properly parse the same > >>> string passed to it but it does not as it outputs a "None" string which > >>> means it failed. > >>> > >>> Why does this happen and how can I make it work? > >>> > >>> On Wed, Apr 27, 2016 at 12:41 PM, Punit Naik <naik.puni...@gmail.com> > >>> wrote: > >>> > >>>> I just tried it and it still cannot parse it. It still takes the input > >>>> as a dataset object rather than a string. > >>>> > >>>> On Wed, Apr 27, 2016 at 12:36 PM, Punit Naik <naik.puni...@gmail.com> > >>>> wrote: > >>>> > >>>>> Okay Thanks a lot Fabian! > >>>>> > >>>>> On Wed, Apr 27, 2016 at 12:34 PM, Fabian Hueske <fhue...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> You should do the parsing in a Map operator. Map applies the > >>>>>> MapFunction to > >>>>>> each element in the DataSet. > >>>>>> So you can either implement another MapFunction or extend the one > you > >>>>>> have > >>>>>> to call the JSON parser. > >>>>>> > >>>>>> 2016-04-27 6:40 GMT+02:00 Punit Naik <naik.puni...@gmail.com>: > >>>>>> > >>>>>> > Hi > >>>>>> > > >>>>>> > So I managed to do the map part. I stuc with the "import > >>>>>> > scala.util.parsing.json._" library for parsing. > >>>>>> > > >>>>>> > First I read my JSON: > >>>>>> > > >>>>>> > val data=env.readTextFile("file:///home/punit/vik-in") > >>>>>> > > >>>>>> > Then I transformed it so that it can be parsed to a map: > >>>>>> > > >>>>>> > val j=data.map { x => ("\"\"\"").+(x).+("\"\"\"") } > >>>>>> > > >>>>>> > > >>>>>> > I check it by printing "j"s 1st value and its proper. > >>>>>> > > >>>>>> > But when I tried to parse "j" like this: > >>>>>> > > >>>>>> > JSON.parseFull(j.first(1)) ; it did not parse because the object > >>>>>> > "j.first(1)" is still a Dataset object and not a String object. > >>>>>> > > >>>>>> > So how can I get the underlying java object from the dataset > object? > >>>>>> > > >>>>>> > On Tue, Apr 26, 2016 at 3:32 PM, Fabian Hueske <fhue...@gmail.com > > > >>>>>> wrote: > >>>>>> > > >>>>>> > > Hi, > >>>>>> > > > >>>>>> > > you need to implement the MapFunction interface [1]. > >>>>>> > > Inside the MapFunction you can use any JSON parser library such > as > >>>>>> > Jackson > >>>>>> > > to parse the String. > >>>>>> > > The exact logic depends on your use case. > >>>>>> > > > >>>>>> > > However, you should be careful to not initialize a new parser in > >>>>>> each > >>>>>> > map() > >>>>>> > > call, because that would be quite expensive. > >>>>>> > > I recommend to extend the RichMapFunction and instantiate a > >>>>>> parser in the > >>>>>> > > open() method. > >>>>>> > > > >>>>>> > > Best, Fabian > >>>>>> > > > >>>>>> > > [1] > >>>>>> > > > >>>>>> > > > >>>>>> > > >>>>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#map > >>>>>> > > [2] > >>>>>> > > > >>>>>> > > > >>>>>> > > >>>>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-transformation-functions > >>>>>> > > > >>>>>> > > 2016-04-26 10:44 GMT+02:00 Punit Naik <naik.puni...@gmail.com>: > >>>>>> > > > >>>>>> > > > Hi Fabian > >>>>>> > > > > >>>>>> > > > Thanks for the reply. Yes my json is separated by new lines. > It > >>>>>> would > >>>>>> > > have > >>>>>> > > > been great if you had explained the function that goes inside > >>>>>> the map. > >>>>>> > I > >>>>>> > > > tried to use the 'scala.util.parsing.json._' library but got > no > >>>>>> luck. > >>>>>> > > > > >>>>>> > > > On Tue, Apr 26, 2016 at 1:11 PM, Fabian Hueske < > >>>>>> fhue...@gmail.com> > >>>>>> > > wrote: > >>>>>> > > > > >>>>>> > > > > Hi Punit, > >>>>>> > > > > > >>>>>> > > > > JSON can be hard to parse in parallel due to its nested > >>>>>> structure. It > >>>>>> > > > > depends on the schema and (textual) representation of the > JSON > >>>>>> > whether > >>>>>> > > > and > >>>>>> > > > > how it can be done. The problem is that a parallel input > >>>>>> format needs > >>>>>> > > to > >>>>>> > > > be > >>>>>> > > > > able to identify record boundaries without context > >>>>>> information. This > >>>>>> > > can > >>>>>> > > > be > >>>>>> > > > > very easy, if your JSON data is a list of JSON objects which > >>>>>> are > >>>>>> > > > separated > >>>>>> > > > > by a new line character. However, this is hard to > generalize. > >>>>>> That's > >>>>>> > > why > >>>>>> > > > > Flink does not offer tooling for it (yet). > >>>>>> > > > > > >>>>>> > > > > If your JSON objects are separated by new line characters, > the > >>>>>> > easiest > >>>>>> > > > way > >>>>>> > > > > is to read it as text file, where each line results in a > >>>>>> String and > >>>>>> > > parse > >>>>>> > > > > each object using a standard JSON parser. This would look > >>>>>> like: > >>>>>> > > > > > >>>>>> > > > > ExecutionEnvironment env = > >>>>>> > > > ExecutionEnvironment.getExecutionEnvironment(); > >>>>>> > > > > > >>>>>> > > > > DataSet<String> text = > env.readTextFile("/path/to/jsonfile"); > >>>>>> > > > > DataSet<YourObject> json = text.map(new > >>>>>> > > > YourMapFunctionWhichParsesJSON()); > >>>>>> > > > > > >>>>>> > > > > Best, Fabian > >>>>>> > > > > > >>>>>> > > > > 2016-04-26 8:06 GMT+02:00 Punit Naik < > naik.puni...@gmail.com > >>>>>> >: > >>>>>> > > > > > >>>>>> > > > > > Hi > >>>>>> > > > > > > >>>>>> > > > > > I am new to Flink. I was experimenting with the Dataset > API > >>>>>> and > >>>>>> > found > >>>>>> > > > out > >>>>>> > > > > > that there is no explicit method for loading a JSON file > as > >>>>>> input. > >>>>>> > > Can > >>>>>> > > > > > anyone please suggest me a workaround? > >>>>>> > > > > > > >>>>>> > > > > > -- > >>>>>> > > > > > Thank You > >>>>>> > > > > > > >>>>>> > > > > > Regards > >>>>>> > > > > > > >>>>>> > > > > > Punit Naik > >>>>>> > > > > > > >>>>>> > > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > -- > >>>>>> > > > Thank You > >>>>>> > > > > >>>>>> > > > Regards > >>>>>> > > > > >>>>>> > > > Punit Naik > >>>>>> > > > > >>>>>> > > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > -- > >>>>>> > Thank You > >>>>>> > > >>>>>> > Regards > >>>>>> > > >>>>>> > Punit Naik > >>>>>> > > >>>>>> > >>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> Thank You > >>>>> > >>>>> Regards > >>>>> > >>>>> Punit Naik > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> Thank You > >>>> > >>>> Regards > >>>> > >>>> Punit Naik > >>>> > >>> > >>> > >>> > >>> -- > >>> Thank You > >>> > >>> Regards > >>> > >>> Punit Naik > >>> > >> > >> > >> > >> -- > >> Thank You > >> > >> Regards > >> > >> Punit Naik > >> > > > > > > > > -- > > Thank You > > > > Regards > > > > Punit Naik > > > > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit > -- Thank You Regards Punit Naik