Suneel, Flink comes with a built-in AvroOutputFormat. Is that good enough
for you?

On Sun, Oct 4, 2015 at 6:01 AM, Suneel Marthi <suneel.mar...@gmail.com>
wrote:

> While on that Marton,  would it make sense to have a
> dataStream.writeAsJson() method?
>
> On Sat, Oct 3, 2015 at 11:54 PM, Márton Balassi <balassi.mar...@gmail.com>
> wrote:
>
>> Hi Jay,
>>
>> As for the NPE: the file monitoring function throws it when the location
>> is empty. Try running the datagenerator first! :) This behaviour is
>> unwanted though, I am adding a JIRA ticket for it.
>>
>> Best,
>>
>> Marton
>>
>> On Sun, Oct 4, 2015 at 5:28 AM, Márton Balassi <balassi.mar...@gmail.com>
>> wrote:
>>
>>> Hi Jay,
>>>
>>> Creating a batch and a streaming environment in a single Java source
>>> file is fine, they just run separately. (If you run it from an IDE locally
>>> they might conflict as the second one would try to launch a local executor
>>> on a port that is most likely already taken by the first one.) I would
>>> suggest to have these jobs in separate files currently, exactly for the
>>> previous reason.
>>>
>>> Looking at your
>>> code 
>>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class,
>>> new FlinkBPSGenerator.ProductSerializer()); does not do much good for you.
>>> You need to register your serializers to the environment to which you are
>>> using. Currently you would need to register it to the streaming env
>>> variable. If you would like to also assemble a batch job you need to add
>>> them there too.
>>>
>>> As for the streaming job I assume that you are using Flink version 0.9.1
>>> and checking out the problem shortly.
>>>
>>> Best,
>>>
>>> Marton
>>>
>>> On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <jayunit100.apa...@gmail.com>
>>> wrote:
>>>
>>>> Here is a distilled example of the issue, should be easier to debug for
>>>> folks interested... :)
>>>>
>>>> public static void main(String[] args) {
>>>>
>>>>
>>>>     
>>>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class,
>>>>  new FlinkBPSGenerator.ProductSerializer());
>>>>     
>>>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class,
>>>>  new FlinkBPSGenerator.TransactionSerializer());
>>>>
>>>>     final StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>     //when running "env.execute" this stream should start consuming...
>>>>     DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, 
>>>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
>>>>     dataStream.iterate().map(new MapFunction<String, String>() {
>>>>         public String map(String value) throws Exception {
>>>>             System.out.println(value);
>>>>             return ">>> > > > > " + value + " < < < <  <<<";
>>>>         }
>>>>     });
>>>>     try {
>>>>         env.execute();
>>>>     }
>>>>     catch(Exception e){
>>>>         e.printStackTrace();
>>>>     }
>>>> }
>>>>
>>>>
>>>>
>>>> On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <jayunit100.apa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi flink !
>>>>>
>>>>> Looks like "setSlotSharing" is throwing an NPE when I try to start a
>>>>> Thread  which runs a streaming job.
>>>>>
>>>>> I'm trying to do this by creating a dataStream from
>>>>> env.readFileStream, and then later starting a job which writes files out 
>>>>> ...
>>>>>
>>>>> However, I get
>>>>>
>>>>> Exception in thread "main" java.lang.NullPointerException
>>>>>     at
>>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)
>>>>>
>>>>> What is the right way to create a stream and batch job all in one
>>>>> environment?
>>>>>
>>>>> For reference, here is a gist of the code
>>>>> https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the
>>>>> offending line is the
>>>>>
>>>>> DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, 
>>>>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
>>>>>
>>>>> line.
>>>>>
>>>>> Thanks again !
>>>>>
>>>>> --
>>>>> jay vyas
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> jay vyas
>>>>
>>>
>>>
>>
>

Reply via email to