While on that Marton,  would it make sense to have a
dataStream.writeAsJson() method?

On Sat, Oct 3, 2015 at 11:54 PM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> Hi Jay,
>
> As for the NPE: the file monitoring function throws it when the location
> is empty. Try running the datagenerator first! :) This behaviour is
> unwanted though, I am adding a JIRA ticket for it.
>
> Best,
>
> Marton
>
> On Sun, Oct 4, 2015 at 5:28 AM, Márton Balassi <balassi.mar...@gmail.com>
> wrote:
>
>> Hi Jay,
>>
>> Creating a batch and a streaming environment in a single Java source file
>> is fine, they just run separately. (If you run it from an IDE locally they
>> might conflict as the second one would try to launch a local executor on a
>> port that is most likely already taken by the first one.) I would suggest
>> to have these jobs in separate files currently, exactly for the previous
>> reason.
>>
>> Looking at your
>> code 
>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class,
>> new FlinkBPSGenerator.ProductSerializer()); does not do much good for you.
>> You need to register your serializers to the environment to which you are
>> using. Currently you would need to register it to the streaming env
>> variable. If you would like to also assemble a batch job you need to add
>> them there too.
>>
>> As for the streaming job I assume that you are using Flink version 0.9.1
>> and checking out the problem shortly.
>>
>> Best,
>>
>> Marton
>>
>> On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <jayunit100.apa...@gmail.com>
>> wrote:
>>
>>> Here is a distilled example of the issue, should be easier to debug for
>>> folks interested... :)
>>>
>>> public static void main(String[] args) {
>>>
>>>
>>>     
>>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class,
>>>  new FlinkBPSGenerator.ProductSerializer());
>>>     
>>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class,
>>>  new FlinkBPSGenerator.TransactionSerializer());
>>>
>>>     final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>     //when running "env.execute" this stream should start consuming...
>>>     DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, 
>>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
>>>     dataStream.iterate().map(new MapFunction<String, String>() {
>>>         public String map(String value) throws Exception {
>>>             System.out.println(value);
>>>             return ">>> > > > > " + value + " < < < <  <<<";
>>>         }
>>>     });
>>>     try {
>>>         env.execute();
>>>     }
>>>     catch(Exception e){
>>>         e.printStackTrace();
>>>     }
>>> }
>>>
>>>
>>>
>>> On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <jayunit100.apa...@gmail.com>
>>> wrote:
>>>
>>>> Hi flink !
>>>>
>>>> Looks like "setSlotSharing" is throwing an NPE when I try to start a
>>>> Thread  which runs a streaming job.
>>>>
>>>> I'm trying to do this by creating a dataStream from env.readFileStream,
>>>> and then later starting a job which writes files out ...
>>>>
>>>> However, I get
>>>>
>>>> Exception in thread "main" java.lang.NullPointerException
>>>>     at
>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)
>>>>
>>>> What is the right way to create a stream and batch job all in one
>>>> environment?
>>>>
>>>> For reference, here is a gist of the code
>>>> https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the
>>>> offending line is the
>>>>
>>>> DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, 
>>>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
>>>>
>>>> line.
>>>>
>>>> Thanks again !
>>>>
>>>> --
>>>> jay vyas
>>>>
>>>
>>>
>>>
>>> --
>>> jay vyas
>>>
>>
>>
>

Reply via email to