I am so sorry. Please ignore my previous reply. Actually my input was too
big so it hung. So stupid of me. Thanks a lot! Your example worked!

On Fri, Apr 29, 2016 at 12:35 AM, Punit Naik <naik.puni...@gmail.com> wrote:

> I tried exactly what you told me. But when I execute this code, first of
> all it gives me a warning saying "Type Any has no fields that are visible
> from Scala Type analysis. Falling back to Java Type Analysis
> (TypeExtractor)." in eclipse, and when I run it, the code just hangs and
> does not print a thing.
>
> On Thu, Apr 28, 2016 at 7:11 PM, Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> Hi Punit,
>>
>> what you want to do is something like this:
>>
>>     val env = ExecutionEnvironment.getExecutionEnvironment
>>     env.
>>       readTextFile("path/to/test.json").
>>       flatMap(line => JSON.parseFull(line)).
>>       print
>>
>> The JSON.parseFull function in the Scala standard library takes a string
>> (a
>> line coming from the text file you read) and outputs an Option[Any],
>> meaning it will output an object that represents the possibility of a
>> missing output (Option) wrapping Any, which has been (somewhat
>> confusingly)
>> chosen to represent the actual parsed value (if present). If you "just"
>> mapped over the input you would've ended up with a DataSet[Option[Any]],
>> whereas your objective is to extract that inner type. FlatMap does just
>> that for you.
>>
>> If you execute the code I've shown (with the correct path in the right
>> place) you'll see it'll print the same JSON in input, but in its Scala Map
>> representation. For more information on how to access data parsed by the
>> Scala standard library JSON parser, unfortunately I can't help you as I'm
>> not very familiar with it, but I'm pretty sure it's pretty well
>> documented.
>>
>> Hacking around with Flink is very fun, but before you move further I'd
>> like
>> to point you to the excellent programming guide in the official
>> documentation [1]. I'm sure you'll find the reading very interesting and
>> worthwhile.
>>
>> [1]:
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html
>>
>> On Thu, Apr 28, 2016 at 12:44 PM, Punit Naik <naik.puni...@gmail.com>
>> wrote:
>>
>> > I had one more request though. I have been struggling with JSONs and
>> Flink
>> > for the past two days since I started using it. I have a JSON file which
>> > has one JSON object per line and I want to read it and store it as maps
>> in
>> > another flink Dataset. In my JSON the values might be anything, for e.g.
>> > int, double, map, array etc. I have attached a small two line input file
>> > and I request you to please implement the logic that I have explained
>> above
>> > using flink. It would be a great help.
>> >
>> > On Thu, Apr 28, 2016 at 4:04 PM, Punit Naik <naik.puni...@gmail.com>
>> > wrote:
>> >
>> >> I managed to fix this error. I basically had to do val j=data.map { x
>> =>
>> >> (x.replaceAll("\"","\\\"")) } instead of val j=data.map { x =>
>> ("\"\"\""+
>> >> x+"\"\"\"") }
>> >>
>> >> On Wed, Apr 27, 2016 at 4:05 PM, Punit Naik <naik.puni...@gmail.com>
>> >> wrote:
>> >>
>> >>> I have my Apache Flink program:
>> >>>
>> >>> import org.apache.flink.api.scala._import scala.util.parsing.json._
>> >>> object numHits extends App {
>> >>>     val env = ExecutionEnvironment.getExecutionEnvironment
>> >>>     val data=env.readTextFile("file:///path/to/json/file")
>> >>>     val j=data.map { x => ("\"\"\""+x+"\"\"\"") }
>> >>>     /*1*/ println( ((j.first(1).collect())(0)).getClass() )
>> >>>
>> >>>     /*2*/ println( ((j.first(1).collect())(0)) )
>> >>>
>> >>>     /*3*/ println( JSON.parseFull((j.first(1).collect())(0)) )
>> >>>     }
>> >>>
>> >>> I want to parse the input JSON file into normal scala Map and for
>> that I
>> >>> am using the default scala.util.parsing.json._ library.
>> >>>
>> >>> The output of the first println statement is class java.lang.String
>> >>> which is required by the JSON parsing function.
>> >>>
>> >>> Output of the second println function is the actual JSON string
>> >>> appended and prepended by "\"\"\"" which is also required by the JSON
>> >>> parser.
>> >>>
>> >>> Now at this point if I copy the output of the second println command
>> >>> printed in the console and pass it to the JSON.parseFull() function,
>> it
>> >>> properly parses it.
>> >>>
>> >>> Therefore the third println function should properly parse the same
>> >>> string passed to it but it does not as it outputs a "None" string
>> which
>> >>> means it failed.
>> >>>
>> >>> Why does this happen and how can I make it work?
>> >>>
>> >>> On Wed, Apr 27, 2016 at 12:41 PM, Punit Naik <naik.puni...@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> I just tried it and it still cannot parse it. It still takes the
>> input
>> >>>> as a dataset object rather than a string.
>> >>>>
>> >>>> On Wed, Apr 27, 2016 at 12:36 PM, Punit Naik <naik.puni...@gmail.com
>> >
>> >>>> wrote:
>> >>>>
>> >>>>> Okay Thanks a lot Fabian!
>> >>>>>
>> >>>>> On Wed, Apr 27, 2016 at 12:34 PM, Fabian Hueske <fhue...@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> You should do the parsing in a Map operator. Map applies the
>> >>>>>> MapFunction to
>> >>>>>> each element in the DataSet.
>> >>>>>> So you can either implement another MapFunction or extend the one
>> you
>> >>>>>> have
>> >>>>>> to call the JSON parser.
>> >>>>>>
>> >>>>>> 2016-04-27 6:40 GMT+02:00 Punit Naik <naik.puni...@gmail.com>:
>> >>>>>>
>> >>>>>> > Hi
>> >>>>>> >
>> >>>>>> > So I managed to do the map part. I stuc with the "import
>> >>>>>> > scala.util.parsing.json._" library for parsing.
>> >>>>>> >
>> >>>>>> > First I read my JSON:
>> >>>>>> >
>> >>>>>> > val data=env.readTextFile("file:///home/punit/vik-in")
>> >>>>>> >
>> >>>>>> > Then I transformed it so that it can be parsed to a map:
>> >>>>>> >
>> >>>>>> > val j=data.map { x => ("\"\"\"").+(x).+("\"\"\"") }
>> >>>>>> >
>> >>>>>> >
>> >>>>>> > I check it by printing "j"s 1st value and its proper.
>> >>>>>> >
>> >>>>>> > But when I tried to parse "j" like this:
>> >>>>>> >
>> >>>>>> > JSON.parseFull(j.first(1)) ; it did not parse because the object
>> >>>>>> > "j.first(1)" is still a Dataset object and not a String object.
>> >>>>>> >
>> >>>>>> > So how can I get the underlying java object from the dataset
>> object?
>> >>>>>> >
>> >>>>>> > On Tue, Apr 26, 2016 at 3:32 PM, Fabian Hueske <
>> fhue...@gmail.com>
>> >>>>>> wrote:
>> >>>>>> >
>> >>>>>> > > Hi,
>> >>>>>> > >
>> >>>>>> > > you need to implement the MapFunction interface [1].
>> >>>>>> > > Inside the MapFunction you can use any JSON parser library
>> such as
>> >>>>>> > Jackson
>> >>>>>> > > to parse the String.
>> >>>>>> > > The exact logic depends on your use case.
>> >>>>>> > >
>> >>>>>> > > However, you should be careful to not initialize a new parser
>> in
>> >>>>>> each
>> >>>>>> > map()
>> >>>>>> > > call, because that would be quite expensive.
>> >>>>>> > > I recommend to extend the RichMapFunction and instantiate a
>> >>>>>> parser in the
>> >>>>>> > > open() method.
>> >>>>>> > >
>> >>>>>> > > Best, Fabian
>> >>>>>> > >
>> >>>>>> > > [1]
>> >>>>>> > >
>> >>>>>> > >
>> >>>>>> >
>> >>>>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#map
>> >>>>>> > > [2]
>> >>>>>> > >
>> >>>>>> > >
>> >>>>>> >
>> >>>>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-transformation-functions
>> >>>>>> > >
>> >>>>>> > > 2016-04-26 10:44 GMT+02:00 Punit Naik <naik.puni...@gmail.com
>> >:
>> >>>>>> > >
>> >>>>>> > > > Hi Fabian
>> >>>>>> > > >
>> >>>>>> > > > Thanks for the reply. Yes my json is separated by new lines.
>> It
>> >>>>>> would
>> >>>>>> > > have
>> >>>>>> > > > been great if you had explained the function that goes inside
>> >>>>>> the map.
>> >>>>>> > I
>> >>>>>> > > > tried to use the 'scala.util.parsing.json._' library but got
>> no
>> >>>>>> luck.
>> >>>>>> > > >
>> >>>>>> > > > On Tue, Apr 26, 2016 at 1:11 PM, Fabian Hueske <
>> >>>>>> fhue...@gmail.com>
>> >>>>>> > > wrote:
>> >>>>>> > > >
>> >>>>>> > > > > Hi Punit,
>> >>>>>> > > > >
>> >>>>>> > > > > JSON can be hard to parse in parallel due to its nested
>> >>>>>> structure. It
>> >>>>>> > > > > depends on the schema and (textual) representation of the
>> JSON
>> >>>>>> > whether
>> >>>>>> > > > and
>> >>>>>> > > > > how it can be done. The problem is that a parallel input
>> >>>>>> format needs
>> >>>>>> > > to
>> >>>>>> > > > be
>> >>>>>> > > > > able to identify record boundaries without context
>> >>>>>> information. This
>> >>>>>> > > can
>> >>>>>> > > > be
>> >>>>>> > > > > very easy, if your JSON data is a list of JSON objects
>> which
>> >>>>>> are
>> >>>>>> > > > separated
>> >>>>>> > > > > by a new line character. However, this is hard to
>> generalize.
>> >>>>>> That's
>> >>>>>> > > why
>> >>>>>> > > > > Flink does not offer tooling for it (yet).
>> >>>>>> > > > >
>> >>>>>> > > > > If your JSON objects are separated by new line characters,
>> the
>> >>>>>> > easiest
>> >>>>>> > > > way
>> >>>>>> > > > > is to read it as text file, where each line results in a
>> >>>>>> String and
>> >>>>>> > > parse
>> >>>>>> > > > > each object using a standard JSON parser. This would look
>> >>>>>> like:
>> >>>>>> > > > >
>> >>>>>> > > > > ExecutionEnvironment env =
>> >>>>>> > > > ExecutionEnvironment.getExecutionEnvironment();
>> >>>>>> > > > >
>> >>>>>> > > > > DataSet<String> text =
>> env.readTextFile("/path/to/jsonfile");
>> >>>>>> > > > > DataSet<YourObject> json = text.map(new
>> >>>>>> > > > YourMapFunctionWhichParsesJSON());
>> >>>>>> > > > >
>> >>>>>> > > > > Best, Fabian
>> >>>>>> > > > >
>> >>>>>> > > > > 2016-04-26 8:06 GMT+02:00 Punit Naik <
>> naik.puni...@gmail.com
>> >>>>>> >:
>> >>>>>> > > > >
>> >>>>>> > > > > > Hi
>> >>>>>> > > > > >
>> >>>>>> > > > > > I am new to Flink. I was experimenting with the Dataset
>> API
>> >>>>>> and
>> >>>>>> > found
>> >>>>>> > > > out
>> >>>>>> > > > > > that there is no explicit method for loading a JSON file
>> as
>> >>>>>> input.
>> >>>>>> > > Can
>> >>>>>> > > > > > anyone please suggest me a workaround?
>> >>>>>> > > > > >
>> >>>>>> > > > > > --
>> >>>>>> > > > > > Thank You
>> >>>>>> > > > > >
>> >>>>>> > > > > > Regards
>> >>>>>> > > > > >
>> >>>>>> > > > > > Punit Naik
>> >>>>>> > > > > >
>> >>>>>> > > > >
>> >>>>>> > > >
>> >>>>>> > > >
>> >>>>>> > > >
>> >>>>>> > > > --
>> >>>>>> > > > Thank You
>> >>>>>> > > >
>> >>>>>> > > > Regards
>> >>>>>> > > >
>> >>>>>> > > > Punit Naik
>> >>>>>> > > >
>> >>>>>> > >
>> >>>>>> >
>> >>>>>> >
>> >>>>>> >
>> >>>>>> > --
>> >>>>>> > Thank You
>> >>>>>> >
>> >>>>>> > Regards
>> >>>>>> >
>> >>>>>> > Punit Naik
>> >>>>>> >
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> --
>> >>>>> Thank You
>> >>>>>
>> >>>>> Regards
>> >>>>>
>> >>>>> Punit Naik
>> >>>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> Thank You
>> >>>>
>> >>>> Regards
>> >>>>
>> >>>> Punit Naik
>> >>>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Thank You
>> >>>
>> >>> Regards
>> >>>
>> >>> Punit Naik
>> >>>
>> >>
>> >>
>> >>
>> >> --
>> >> Thank You
>> >>
>> >> Regards
>> >>
>> >> Punit Naik
>> >>
>> >
>> >
>> >
>> > --
>> > Thank You
>> >
>> > Regards
>> >
>> > Punit Naik
>> >
>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>>
>> Software Engineer @ Radicalbit
>>
>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>



-- 
Thank You

Regards

Punit Naik

Reply via email to