Hi Jay,

As for the NPE: the file monitoring function throws it when the location is
empty. Try running the datagenerator first! :) This behaviour is unwanted
though, I am adding a JIRA ticket for it.

Best,

Marton

On Sun, Oct 4, 2015 at 5:28 AM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> Hi Jay,
>
> Creating a batch and a streaming environment in a single Java source file
> is fine, they just run separately. (If you run it from an IDE locally they
> might conflict as the second one would try to launch a local executor on a
> port that is most likely already taken by the first one.) I would suggest
> to have these jobs in separate files currently, exactly for the previous
> reason.
>
> Looking at your
> code 
> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class,
> new FlinkBPSGenerator.ProductSerializer()); does not do much good for you.
> You need to register your serializers to the environment to which you are
> using. Currently you would need to register it to the streaming env
> variable. If you would like to also assemble a batch job you need to add
> them there too.
>
> As for the streaming job I assume that you are using Flink version 0.9.1
> and checking out the problem shortly.
>
> Best,
>
> Marton
>
> On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <jayunit100.apa...@gmail.com>
> wrote:
>
>> Here is a distilled example of the issue, should be easier to debug for
>> folks interested... :)
>>
>> public static void main(String[] args) {
>>
>>
>>     
>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class,
>>  new FlinkBPSGenerator.ProductSerializer());
>>     
>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class,
>>  new FlinkBPSGenerator.TransactionSerializer());
>>
>>     final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>     //when running "env.execute" this stream should start consuming...
>>     DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, 
>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
>>     dataStream.iterate().map(new MapFunction<String, String>() {
>>         public String map(String value) throws Exception {
>>             System.out.println(value);
>>             return ">>> > > > > " + value + " < < < <  <<<";
>>         }
>>     });
>>     try {
>>         env.execute();
>>     }
>>     catch(Exception e){
>>         e.printStackTrace();
>>     }
>> }
>>
>>
>>
>> On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <jayunit100.apa...@gmail.com>
>> wrote:
>>
>>> Hi flink !
>>>
>>> Looks like "setSlotSharing" is throwing an NPE when I try to start a
>>> Thread  which runs a streaming job.
>>>
>>> I'm trying to do this by creating a dataStream from env.readFileStream,
>>> and then later starting a job which writes files out ...
>>>
>>> However, I get
>>>
>>> Exception in thread "main" java.lang.NullPointerException
>>>     at
>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)
>>>
>>> What is the right way to create a stream and batch job all in one
>>> environment?
>>>
>>> For reference, here is a gist of the code
>>> https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the
>>> offending line is the
>>>
>>> DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, 
>>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
>>>
>>> line.
>>>
>>> Thanks again !
>>>
>>> --
>>> jay vyas
>>>
>>
>>
>>
>> --
>> jay vyas
>>
>
>

Reply via email to