Hi Jay, As for the NPE: the file monitoring function throws it when the location is empty. Try running the datagenerator first! :) This behaviour is unwanted though, I am adding a JIRA ticket for it.
Best, Marton On Sun, Oct 4, 2015 at 5:28 AM, Márton Balassi <balassi.mar...@gmail.com> wrote: > Hi Jay, > > Creating a batch and a streaming environment in a single Java source file > is fine, they just run separately. (If you run it from an IDE locally they > might conflict as the second one would try to launch a local executor on a > port that is most likely already taken by the first one.) I would suggest > to have these jobs in separate files currently, exactly for the previous > reason. > > Looking at your > code > ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, > new FlinkBPSGenerator.ProductSerializer()); does not do much good for you. > You need to register your serializers to the environment to which you are > using. Currently you would need to register it to the streaming env > variable. If you would like to also assemble a batch job you need to add > them there too. > > As for the streaming job I assume that you are using Flink version 0.9.1 > and checking out the problem shortly. > > Best, > > Marton > > On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <jayunit100.apa...@gmail.com> > wrote: > >> Here is a distilled example of the issue, should be easier to debug for >> folks interested... :) >> >> public static void main(String[] args) { >> >> >> >> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, >> new FlinkBPSGenerator.ProductSerializer()); >> >> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class, >> new FlinkBPSGenerator.TransactionSerializer()); >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> //when running "env.execute" this stream should start consuming... >> DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, >> FileMonitoringFunction.WatchType.ONLY_NEW_FILES); >> dataStream.iterate().map(new MapFunction<String, String>() { >> public String map(String value) throws Exception { >> System.out.println(value); >> return ">>> > > > > " + value + " < < < < <<<"; >> } >> }); >> try { >> env.execute(); >> } >> catch(Exception e){ >> e.printStackTrace(); >> } >> } >> >> >> >> On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <jayunit100.apa...@gmail.com> >> wrote: >> >>> Hi flink ! >>> >>> Looks like "setSlotSharing" is throwing an NPE when I try to start a >>> Thread which runs a streaming job. >>> >>> I'm trying to do this by creating a dataStream from env.readFileStream, >>> and then later starting a job which writes files out ... >>> >>> However, I get >>> >>> Exception in thread "main" java.lang.NullPointerException >>> at >>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361) >>> >>> What is the right way to create a stream and batch job all in one >>> environment? >>> >>> For reference, here is a gist of the code >>> https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the >>> offending line is the >>> >>> DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, >>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES); >>> >>> line. >>> >>> Thanks again ! >>> >>> -- >>> jay vyas >>> >> >> >> >> -- >> jay vyas >> > >