I am so sorry. Please ignore my previous reply. Actually my input was too big so it hung. So stupid of me. Thanks a lot! Your example worked!
On Fri, Apr 29, 2016 at 12:35 AM, Punit Naik <naik.puni...@gmail.com> wrote: > I tried exactly what you told me. But when I execute this code, first of > all it gives me a warning saying "Type Any has no fields that are visible > from Scala Type analysis. Falling back to Java Type Analysis > (TypeExtractor)." in eclipse, and when I run it, the code just hangs and > does not print a thing. > > On Thu, Apr 28, 2016 at 7:11 PM, Stefano Baghino < > stefano.bagh...@radicalbit.io> wrote: > >> Hi Punit, >> >> what you want to do is something like this: >> >> val env = ExecutionEnvironment.getExecutionEnvironment >> env. >> readTextFile("path/to/test.json"). >> flatMap(line => JSON.parseFull(line)). >> print >> >> The JSON.parseFull function in the Scala standard library takes a string >> (a >> line coming from the text file you read) and outputs an Option[Any], >> meaning it will output an object that represents the possibility of a >> missing output (Option) wrapping Any, which has been (somewhat >> confusingly) >> chosen to represent the actual parsed value (if present). If you "just" >> mapped over the input you would've ended up with a DataSet[Option[Any]], >> whereas your objective is to extract that inner type. FlatMap does just >> that for you. >> >> If you execute the code I've shown (with the correct path in the right >> place) you'll see it'll print the same JSON in input, but in its Scala Map >> representation. For more information on how to access data parsed by the >> Scala standard library JSON parser, unfortunately I can't help you as I'm >> not very familiar with it, but I'm pretty sure it's pretty well >> documented. >> >> Hacking around with Flink is very fun, but before you move further I'd >> like >> to point you to the excellent programming guide in the official >> documentation [1]. I'm sure you'll find the reading very interesting and >> worthwhile. >> >> [1]: >> >> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html >> >> On Thu, Apr 28, 2016 at 12:44 PM, Punit Naik <naik.puni...@gmail.com> >> wrote: >> >> > I had one more request though. I have been struggling with JSONs and >> Flink >> > for the past two days since I started using it. I have a JSON file which >> > has one JSON object per line and I want to read it and store it as maps >> in >> > another flink Dataset. In my JSON the values might be anything, for e.g. >> > int, double, map, array etc. I have attached a small two line input file >> > and I request you to please implement the logic that I have explained >> above >> > using flink. It would be a great help. >> > >> > On Thu, Apr 28, 2016 at 4:04 PM, Punit Naik <naik.puni...@gmail.com> >> > wrote: >> > >> >> I managed to fix this error. I basically had to do val j=data.map { x >> => >> >> (x.replaceAll("\"","\\\"")) } instead of val j=data.map { x => >> ("\"\"\""+ >> >> x+"\"\"\"") } >> >> >> >> On Wed, Apr 27, 2016 at 4:05 PM, Punit Naik <naik.puni...@gmail.com> >> >> wrote: >> >> >> >>> I have my Apache Flink program: >> >>> >> >>> import org.apache.flink.api.scala._import scala.util.parsing.json._ >> >>> object numHits extends App { >> >>> val env = ExecutionEnvironment.getExecutionEnvironment >> >>> val data=env.readTextFile("file:///path/to/json/file") >> >>> val j=data.map { x => ("\"\"\""+x+"\"\"\"") } >> >>> /*1*/ println( ((j.first(1).collect())(0)).getClass() ) >> >>> >> >>> /*2*/ println( ((j.first(1).collect())(0)) ) >> >>> >> >>> /*3*/ println( JSON.parseFull((j.first(1).collect())(0)) ) >> >>> } >> >>> >> >>> I want to parse the input JSON file into normal scala Map and for >> that I >> >>> am using the default scala.util.parsing.json._ library. >> >>> >> >>> The output of the first println statement is class java.lang.String >> >>> which is required by the JSON parsing function. >> >>> >> >>> Output of the second println function is the actual JSON string >> >>> appended and prepended by "\"\"\"" which is also required by the JSON >> >>> parser. >> >>> >> >>> Now at this point if I copy the output of the second println command >> >>> printed in the console and pass it to the JSON.parseFull() function, >> it >> >>> properly parses it. >> >>> >> >>> Therefore the third println function should properly parse the same >> >>> string passed to it but it does not as it outputs a "None" string >> which >> >>> means it failed. >> >>> >> >>> Why does this happen and how can I make it work? >> >>> >> >>> On Wed, Apr 27, 2016 at 12:41 PM, Punit Naik <naik.puni...@gmail.com> >> >>> wrote: >> >>> >> >>>> I just tried it and it still cannot parse it. It still takes the >> input >> >>>> as a dataset object rather than a string. >> >>>> >> >>>> On Wed, Apr 27, 2016 at 12:36 PM, Punit Naik <naik.puni...@gmail.com >> > >> >>>> wrote: >> >>>> >> >>>>> Okay Thanks a lot Fabian! >> >>>>> >> >>>>> On Wed, Apr 27, 2016 at 12:34 PM, Fabian Hueske <fhue...@gmail.com> >> >>>>> wrote: >> >>>>> >> >>>>>> You should do the parsing in a Map operator. Map applies the >> >>>>>> MapFunction to >> >>>>>> each element in the DataSet. >> >>>>>> So you can either implement another MapFunction or extend the one >> you >> >>>>>> have >> >>>>>> to call the JSON parser. >> >>>>>> >> >>>>>> 2016-04-27 6:40 GMT+02:00 Punit Naik <naik.puni...@gmail.com>: >> >>>>>> >> >>>>>> > Hi >> >>>>>> > >> >>>>>> > So I managed to do the map part. I stuc with the "import >> >>>>>> > scala.util.parsing.json._" library for parsing. >> >>>>>> > >> >>>>>> > First I read my JSON: >> >>>>>> > >> >>>>>> > val data=env.readTextFile("file:///home/punit/vik-in") >> >>>>>> > >> >>>>>> > Then I transformed it so that it can be parsed to a map: >> >>>>>> > >> >>>>>> > val j=data.map { x => ("\"\"\"").+(x).+("\"\"\"") } >> >>>>>> > >> >>>>>> > >> >>>>>> > I check it by printing "j"s 1st value and its proper. >> >>>>>> > >> >>>>>> > But when I tried to parse "j" like this: >> >>>>>> > >> >>>>>> > JSON.parseFull(j.first(1)) ; it did not parse because the object >> >>>>>> > "j.first(1)" is still a Dataset object and not a String object. >> >>>>>> > >> >>>>>> > So how can I get the underlying java object from the dataset >> object? >> >>>>>> > >> >>>>>> > On Tue, Apr 26, 2016 at 3:32 PM, Fabian Hueske < >> fhue...@gmail.com> >> >>>>>> wrote: >> >>>>>> > >> >>>>>> > > Hi, >> >>>>>> > > >> >>>>>> > > you need to implement the MapFunction interface [1]. >> >>>>>> > > Inside the MapFunction you can use any JSON parser library >> such as >> >>>>>> > Jackson >> >>>>>> > > to parse the String. >> >>>>>> > > The exact logic depends on your use case. >> >>>>>> > > >> >>>>>> > > However, you should be careful to not initialize a new parser >> in >> >>>>>> each >> >>>>>> > map() >> >>>>>> > > call, because that would be quite expensive. >> >>>>>> > > I recommend to extend the RichMapFunction and instantiate a >> >>>>>> parser in the >> >>>>>> > > open() method. >> >>>>>> > > >> >>>>>> > > Best, Fabian >> >>>>>> > > >> >>>>>> > > [1] >> >>>>>> > > >> >>>>>> > > >> >>>>>> > >> >>>>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#map >> >>>>>> > > [2] >> >>>>>> > > >> >>>>>> > > >> >>>>>> > >> >>>>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-transformation-functions >> >>>>>> > > >> >>>>>> > > 2016-04-26 10:44 GMT+02:00 Punit Naik <naik.puni...@gmail.com >> >: >> >>>>>> > > >> >>>>>> > > > Hi Fabian >> >>>>>> > > > >> >>>>>> > > > Thanks for the reply. Yes my json is separated by new lines. >> It >> >>>>>> would >> >>>>>> > > have >> >>>>>> > > > been great if you had explained the function that goes inside >> >>>>>> the map. >> >>>>>> > I >> >>>>>> > > > tried to use the 'scala.util.parsing.json._' library but got >> no >> >>>>>> luck. >> >>>>>> > > > >> >>>>>> > > > On Tue, Apr 26, 2016 at 1:11 PM, Fabian Hueske < >> >>>>>> fhue...@gmail.com> >> >>>>>> > > wrote: >> >>>>>> > > > >> >>>>>> > > > > Hi Punit, >> >>>>>> > > > > >> >>>>>> > > > > JSON can be hard to parse in parallel due to its nested >> >>>>>> structure. It >> >>>>>> > > > > depends on the schema and (textual) representation of the >> JSON >> >>>>>> > whether >> >>>>>> > > > and >> >>>>>> > > > > how it can be done. The problem is that a parallel input >> >>>>>> format needs >> >>>>>> > > to >> >>>>>> > > > be >> >>>>>> > > > > able to identify record boundaries without context >> >>>>>> information. This >> >>>>>> > > can >> >>>>>> > > > be >> >>>>>> > > > > very easy, if your JSON data is a list of JSON objects >> which >> >>>>>> are >> >>>>>> > > > separated >> >>>>>> > > > > by a new line character. However, this is hard to >> generalize. >> >>>>>> That's >> >>>>>> > > why >> >>>>>> > > > > Flink does not offer tooling for it (yet). >> >>>>>> > > > > >> >>>>>> > > > > If your JSON objects are separated by new line characters, >> the >> >>>>>> > easiest >> >>>>>> > > > way >> >>>>>> > > > > is to read it as text file, where each line results in a >> >>>>>> String and >> >>>>>> > > parse >> >>>>>> > > > > each object using a standard JSON parser. This would look >> >>>>>> like: >> >>>>>> > > > > >> >>>>>> > > > > ExecutionEnvironment env = >> >>>>>> > > > ExecutionEnvironment.getExecutionEnvironment(); >> >>>>>> > > > > >> >>>>>> > > > > DataSet<String> text = >> env.readTextFile("/path/to/jsonfile"); >> >>>>>> > > > > DataSet<YourObject> json = text.map(new >> >>>>>> > > > YourMapFunctionWhichParsesJSON()); >> >>>>>> > > > > >> >>>>>> > > > > Best, Fabian >> >>>>>> > > > > >> >>>>>> > > > > 2016-04-26 8:06 GMT+02:00 Punit Naik < >> naik.puni...@gmail.com >> >>>>>> >: >> >>>>>> > > > > >> >>>>>> > > > > > Hi >> >>>>>> > > > > > >> >>>>>> > > > > > I am new to Flink. I was experimenting with the Dataset >> API >> >>>>>> and >> >>>>>> > found >> >>>>>> > > > out >> >>>>>> > > > > > that there is no explicit method for loading a JSON file >> as >> >>>>>> input. >> >>>>>> > > Can >> >>>>>> > > > > > anyone please suggest me a workaround? >> >>>>>> > > > > > >> >>>>>> > > > > > -- >> >>>>>> > > > > > Thank You >> >>>>>> > > > > > >> >>>>>> > > > > > Regards >> >>>>>> > > > > > >> >>>>>> > > > > > Punit Naik >> >>>>>> > > > > > >> >>>>>> > > > > >> >>>>>> > > > >> >>>>>> > > > >> >>>>>> > > > >> >>>>>> > > > -- >> >>>>>> > > > Thank You >> >>>>>> > > > >> >>>>>> > > > Regards >> >>>>>> > > > >> >>>>>> > > > Punit Naik >> >>>>>> > > > >> >>>>>> > > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> > -- >> >>>>>> > Thank You >> >>>>>> > >> >>>>>> > Regards >> >>>>>> > >> >>>>>> > Punit Naik >> >>>>>> > >> >>>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> -- >> >>>>> Thank You >> >>>>> >> >>>>> Regards >> >>>>> >> >>>>> Punit Naik >> >>>>> >> >>>> >> >>>> >> >>>> >> >>>> -- >> >>>> Thank You >> >>>> >> >>>> Regards >> >>>> >> >>>> Punit Naik >> >>>> >> >>> >> >>> >> >>> >> >>> -- >> >>> Thank You >> >>> >> >>> Regards >> >>> >> >>> Punit Naik >> >>> >> >> >> >> >> >> >> >> -- >> >> Thank You >> >> >> >> Regards >> >> >> >> Punit Naik >> >> >> > >> > >> > >> > -- >> > Thank You >> > >> > Regards >> > >> > Punit Naik >> > >> >> >> >> -- >> BR, >> Stefano Baghino >> >> Software Engineer @ Radicalbit >> > > > > -- > Thank You > > Regards > > Punit Naik > -- Thank You Regards Punit Naik