Great, Punit, I'm glad I've been of some help. If you have similar issues, feel free to write to the user mailing list, I believe you'll find help more easily there as you approach Flink. Happy hacking! :)
On Thu, Apr 28, 2016 at 9:10 PM, Punit Naik <naik.puni...@gmail.com> wrote: > I am so sorry. Please ignore my previous reply. Actually my input was too > big so it hung. So stupid of me. Thanks a lot! Your example worked! > > On Fri, Apr 29, 2016 at 12:35 AM, Punit Naik <naik.puni...@gmail.com> > wrote: > > > I tried exactly what you told me. But when I execute this code, first of > > all it gives me a warning saying "Type Any has no fields that are visible > > from Scala Type analysis. Falling back to Java Type Analysis > > (TypeExtractor)." in eclipse, and when I run it, the code just hangs and > > does not print a thing. > > > > On Thu, Apr 28, 2016 at 7:11 PM, Stefano Baghino < > > stefano.bagh...@radicalbit.io> wrote: > > > >> Hi Punit, > >> > >> what you want to do is something like this: > >> > >> val env = ExecutionEnvironment.getExecutionEnvironment > >> env. > >> readTextFile("path/to/test.json"). > >> flatMap(line => JSON.parseFull(line)). > >> print > >> > >> The JSON.parseFull function in the Scala standard library takes a string > >> (a > >> line coming from the text file you read) and outputs an Option[Any], > >> meaning it will output an object that represents the possibility of a > >> missing output (Option) wrapping Any, which has been (somewhat > >> confusingly) > >> chosen to represent the actual parsed value (if present). If you "just" > >> mapped over the input you would've ended up with a DataSet[Option[Any]], > >> whereas your objective is to extract that inner type. FlatMap does just > >> that for you. > >> > >> If you execute the code I've shown (with the correct path in the right > >> place) you'll see it'll print the same JSON in input, but in its Scala > Map > >> representation. For more information on how to access data parsed by the > >> Scala standard library JSON parser, unfortunately I can't help you as > I'm > >> not very familiar with it, but I'm pretty sure it's pretty well > >> documented. > >> > >> Hacking around with Flink is very fun, but before you move further I'd > >> like > >> to point you to the excellent programming guide in the official > >> documentation [1]. I'm sure you'll find the reading very interesting and > >> worthwhile. > >> > >> [1]: > >> > >> > https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html > >> > >> On Thu, Apr 28, 2016 at 12:44 PM, Punit Naik <naik.puni...@gmail.com> > >> wrote: > >> > >> > I had one more request though. I have been struggling with JSONs and > >> Flink > >> > for the past two days since I started using it. I have a JSON file > which > >> > has one JSON object per line and I want to read it and store it as > maps > >> in > >> > another flink Dataset. In my JSON the values might be anything, for > e.g. > >> > int, double, map, array etc. I have attached a small two line input > file > >> > and I request you to please implement the logic that I have explained > >> above > >> > using flink. It would be a great help. > >> > > >> > On Thu, Apr 28, 2016 at 4:04 PM, Punit Naik <naik.puni...@gmail.com> > >> > wrote: > >> > > >> >> I managed to fix this error. I basically had to do val j=data.map { x > >> => > >> >> (x.replaceAll("\"","\\\"")) } instead of val j=data.map { x => > >> ("\"\"\""+ > >> >> x+"\"\"\"") } > >> >> > >> >> On Wed, Apr 27, 2016 at 4:05 PM, Punit Naik <naik.puni...@gmail.com> > >> >> wrote: > >> >> > >> >>> I have my Apache Flink program: > >> >>> > >> >>> import org.apache.flink.api.scala._import scala.util.parsing.json._ > >> >>> object numHits extends App { > >> >>> val env = ExecutionEnvironment.getExecutionEnvironment > >> >>> val data=env.readTextFile("file:///path/to/json/file") > >> >>> val j=data.map { x => ("\"\"\""+x+"\"\"\"") } > >> >>> /*1*/ println( ((j.first(1).collect())(0)).getClass() ) > >> >>> > >> >>> /*2*/ println( ((j.first(1).collect())(0)) ) > >> >>> > >> >>> /*3*/ println( JSON.parseFull((j.first(1).collect())(0)) ) > >> >>> } > >> >>> > >> >>> I want to parse the input JSON file into normal scala Map and for > >> that I > >> >>> am using the default scala.util.parsing.json._ library. > >> >>> > >> >>> The output of the first println statement is class java.lang.String > >> >>> which is required by the JSON parsing function. > >> >>> > >> >>> Output of the second println function is the actual JSON string > >> >>> appended and prepended by "\"\"\"" which is also required by the > JSON > >> >>> parser. > >> >>> > >> >>> Now at this point if I copy the output of the second println command > >> >>> printed in the console and pass it to the JSON.parseFull() function, > >> it > >> >>> properly parses it. > >> >>> > >> >>> Therefore the third println function should properly parse the same > >> >>> string passed to it but it does not as it outputs a "None" string > >> which > >> >>> means it failed. > >> >>> > >> >>> Why does this happen and how can I make it work? > >> >>> > >> >>> On Wed, Apr 27, 2016 at 12:41 PM, Punit Naik < > naik.puni...@gmail.com> > >> >>> wrote: > >> >>> > >> >>>> I just tried it and it still cannot parse it. It still takes the > >> input > >> >>>> as a dataset object rather than a string. > >> >>>> > >> >>>> On Wed, Apr 27, 2016 at 12:36 PM, Punit Naik < > naik.puni...@gmail.com > >> > > >> >>>> wrote: > >> >>>> > >> >>>>> Okay Thanks a lot Fabian! > >> >>>>> > >> >>>>> On Wed, Apr 27, 2016 at 12:34 PM, Fabian Hueske < > fhue...@gmail.com> > >> >>>>> wrote: > >> >>>>> > >> >>>>>> You should do the parsing in a Map operator. Map applies the > >> >>>>>> MapFunction to > >> >>>>>> each element in the DataSet. > >> >>>>>> So you can either implement another MapFunction or extend the one > >> you > >> >>>>>> have > >> >>>>>> to call the JSON parser. > >> >>>>>> > >> >>>>>> 2016-04-27 6:40 GMT+02:00 Punit Naik <naik.puni...@gmail.com>: > >> >>>>>> > >> >>>>>> > Hi > >> >>>>>> > > >> >>>>>> > So I managed to do the map part. I stuc with the "import > >> >>>>>> > scala.util.parsing.json._" library for parsing. > >> >>>>>> > > >> >>>>>> > First I read my JSON: > >> >>>>>> > > >> >>>>>> > val data=env.readTextFile("file:///home/punit/vik-in") > >> >>>>>> > > >> >>>>>> > Then I transformed it so that it can be parsed to a map: > >> >>>>>> > > >> >>>>>> > val j=data.map { x => ("\"\"\"").+(x).+("\"\"\"") } > >> >>>>>> > > >> >>>>>> > > >> >>>>>> > I check it by printing "j"s 1st value and its proper. > >> >>>>>> > > >> >>>>>> > But when I tried to parse "j" like this: > >> >>>>>> > > >> >>>>>> > JSON.parseFull(j.first(1)) ; it did not parse because the > object > >> >>>>>> > "j.first(1)" is still a Dataset object and not a String object. > >> >>>>>> > > >> >>>>>> > So how can I get the underlying java object from the dataset > >> object? > >> >>>>>> > > >> >>>>>> > On Tue, Apr 26, 2016 at 3:32 PM, Fabian Hueske < > >> fhue...@gmail.com> > >> >>>>>> wrote: > >> >>>>>> > > >> >>>>>> > > Hi, > >> >>>>>> > > > >> >>>>>> > > you need to implement the MapFunction interface [1]. > >> >>>>>> > > Inside the MapFunction you can use any JSON parser library > >> such as > >> >>>>>> > Jackson > >> >>>>>> > > to parse the String. > >> >>>>>> > > The exact logic depends on your use case. > >> >>>>>> > > > >> >>>>>> > > However, you should be careful to not initialize a new parser > >> in > >> >>>>>> each > >> >>>>>> > map() > >> >>>>>> > > call, because that would be quite expensive. > >> >>>>>> > > I recommend to extend the RichMapFunction and instantiate a > >> >>>>>> parser in the > >> >>>>>> > > open() method. > >> >>>>>> > > > >> >>>>>> > > Best, Fabian > >> >>>>>> > > > >> >>>>>> > > [1] > >> >>>>>> > > > >> >>>>>> > > > >> >>>>>> > > >> >>>>>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#map > >> >>>>>> > > [2] > >> >>>>>> > > > >> >>>>>> > > > >> >>>>>> > > >> >>>>>> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-transformation-functions > >> >>>>>> > > > >> >>>>>> > > 2016-04-26 10:44 GMT+02:00 Punit Naik < > naik.puni...@gmail.com > >> >: > >> >>>>>> > > > >> >>>>>> > > > Hi Fabian > >> >>>>>> > > > > >> >>>>>> > > > Thanks for the reply. Yes my json is separated by new > lines. > >> It > >> >>>>>> would > >> >>>>>> > > have > >> >>>>>> > > > been great if you had explained the function that goes > inside > >> >>>>>> the map. > >> >>>>>> > I > >> >>>>>> > > > tried to use the 'scala.util.parsing.json._' library but > got > >> no > >> >>>>>> luck. > >> >>>>>> > > > > >> >>>>>> > > > On Tue, Apr 26, 2016 at 1:11 PM, Fabian Hueske < > >> >>>>>> fhue...@gmail.com> > >> >>>>>> > > wrote: > >> >>>>>> > > > > >> >>>>>> > > > > Hi Punit, > >> >>>>>> > > > > > >> >>>>>> > > > > JSON can be hard to parse in parallel due to its nested > >> >>>>>> structure. It > >> >>>>>> > > > > depends on the schema and (textual) representation of the > >> JSON > >> >>>>>> > whether > >> >>>>>> > > > and > >> >>>>>> > > > > how it can be done. The problem is that a parallel input > >> >>>>>> format needs > >> >>>>>> > > to > >> >>>>>> > > > be > >> >>>>>> > > > > able to identify record boundaries without context > >> >>>>>> information. This > >> >>>>>> > > can > >> >>>>>> > > > be > >> >>>>>> > > > > very easy, if your JSON data is a list of JSON objects > >> which > >> >>>>>> are > >> >>>>>> > > > separated > >> >>>>>> > > > > by a new line character. However, this is hard to > >> generalize. > >> >>>>>> That's > >> >>>>>> > > why > >> >>>>>> > > > > Flink does not offer tooling for it (yet). > >> >>>>>> > > > > > >> >>>>>> > > > > If your JSON objects are separated by new line > characters, > >> the > >> >>>>>> > easiest > >> >>>>>> > > > way > >> >>>>>> > > > > is to read it as text file, where each line results in a > >> >>>>>> String and > >> >>>>>> > > parse > >> >>>>>> > > > > each object using a standard JSON parser. This would look > >> >>>>>> like: > >> >>>>>> > > > > > >> >>>>>> > > > > ExecutionEnvironment env = > >> >>>>>> > > > ExecutionEnvironment.getExecutionEnvironment(); > >> >>>>>> > > > > > >> >>>>>> > > > > DataSet<String> text = > >> env.readTextFile("/path/to/jsonfile"); > >> >>>>>> > > > > DataSet<YourObject> json = text.map(new > >> >>>>>> > > > YourMapFunctionWhichParsesJSON()); > >> >>>>>> > > > > > >> >>>>>> > > > > Best, Fabian > >> >>>>>> > > > > > >> >>>>>> > > > > 2016-04-26 8:06 GMT+02:00 Punit Naik < > >> naik.puni...@gmail.com > >> >>>>>> >: > >> >>>>>> > > > > > >> >>>>>> > > > > > Hi > >> >>>>>> > > > > > > >> >>>>>> > > > > > I am new to Flink. I was experimenting with the Dataset > >> API > >> >>>>>> and > >> >>>>>> > found > >> >>>>>> > > > out > >> >>>>>> > > > > > that there is no explicit method for loading a JSON > file > >> as > >> >>>>>> input. > >> >>>>>> > > Can > >> >>>>>> > > > > > anyone please suggest me a workaround? > >> >>>>>> > > > > > > >> >>>>>> > > > > > -- > >> >>>>>> > > > > > Thank You > >> >>>>>> > > > > > > >> >>>>>> > > > > > Regards > >> >>>>>> > > > > > > >> >>>>>> > > > > > Punit Naik > >> >>>>>> > > > > > > >> >>>>>> > > > > > >> >>>>>> > > > > >> >>>>>> > > > > >> >>>>>> > > > > >> >>>>>> > > > -- > >> >>>>>> > > > Thank You > >> >>>>>> > > > > >> >>>>>> > > > Regards > >> >>>>>> > > > > >> >>>>>> > > > Punit Naik > >> >>>>>> > > > > >> >>>>>> > > > >> >>>>>> > > >> >>>>>> > > >> >>>>>> > > >> >>>>>> > -- > >> >>>>>> > Thank You > >> >>>>>> > > >> >>>>>> > Regards > >> >>>>>> > > >> >>>>>> > Punit Naik > >> >>>>>> > > >> >>>>>> > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> -- > >> >>>>> Thank You > >> >>>>> > >> >>>>> Regards > >> >>>>> > >> >>>>> Punit Naik > >> >>>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> -- > >> >>>> Thank You > >> >>>> > >> >>>> Regards > >> >>>> > >> >>>> Punit Naik > >> >>>> > >> >>> > >> >>> > >> >>> > >> >>> -- > >> >>> Thank You > >> >>> > >> >>> Regards > >> >>> > >> >>> Punit Naik > >> >>> > >> >> > >> >> > >> >> > >> >> -- > >> >> Thank You > >> >> > >> >> Regards > >> >> > >> >> Punit Naik > >> >> > >> > > >> > > >> > > >> > -- > >> > Thank You > >> > > >> > Regards > >> > > >> > Punit Naik > >> > > >> > >> > >> > >> -- > >> BR, > >> Stefano Baghino > >> > >> Software Engineer @ Radicalbit > >> > > > > > > > > -- > > Thank You > > > > Regards > > > > Punit Naik > > > > > > -- > Thank You > > Regards > > Punit Naik > -- BR, Stefano Baghino Software Engineer @ Radicalbit