While on that Marton, would it make sense to have a dataStream.writeAsJson() method?
On Sat, Oct 3, 2015 at 11:54 PM, Márton Balassi <balassi.mar...@gmail.com> wrote: > Hi Jay, > > As for the NPE: the file monitoring function throws it when the location > is empty. Try running the datagenerator first! :) This behaviour is > unwanted though, I am adding a JIRA ticket for it. > > Best, > > Marton > > On Sun, Oct 4, 2015 at 5:28 AM, Márton Balassi <balassi.mar...@gmail.com> > wrote: > >> Hi Jay, >> >> Creating a batch and a streaming environment in a single Java source file >> is fine, they just run separately. (If you run it from an IDE locally they >> might conflict as the second one would try to launch a local executor on a >> port that is most likely already taken by the first one.) I would suggest >> to have these jobs in separate files currently, exactly for the previous >> reason. >> >> Looking at your >> code >> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, >> new FlinkBPSGenerator.ProductSerializer()); does not do much good for you. >> You need to register your serializers to the environment to which you are >> using. Currently you would need to register it to the streaming env >> variable. If you would like to also assemble a batch job you need to add >> them there too. >> >> As for the streaming job I assume that you are using Flink version 0.9.1 >> and checking out the problem shortly. >> >> Best, >> >> Marton >> >> On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <jayunit100.apa...@gmail.com> >> wrote: >> >>> Here is a distilled example of the issue, should be easier to debug for >>> folks interested... :) >>> >>> public static void main(String[] args) { >>> >>> >>> >>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, >>> new FlinkBPSGenerator.ProductSerializer()); >>> >>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class, >>> new FlinkBPSGenerator.TransactionSerializer()); >>> >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> //when running "env.execute" this stream should start consuming... >>> DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, >>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES); >>> dataStream.iterate().map(new MapFunction<String, String>() { >>> public String map(String value) throws Exception { >>> System.out.println(value); >>> return ">>> > > > > " + value + " < < < < <<<"; >>> } >>> }); >>> try { >>> env.execute(); >>> } >>> catch(Exception e){ >>> e.printStackTrace(); >>> } >>> } >>> >>> >>> >>> On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <jayunit100.apa...@gmail.com> >>> wrote: >>> >>>> Hi flink ! >>>> >>>> Looks like "setSlotSharing" is throwing an NPE when I try to start a >>>> Thread which runs a streaming job. >>>> >>>> I'm trying to do this by creating a dataStream from env.readFileStream, >>>> and then later starting a job which writes files out ... >>>> >>>> However, I get >>>> >>>> Exception in thread "main" java.lang.NullPointerException >>>> at >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361) >>>> >>>> What is the right way to create a stream and batch job all in one >>>> environment? >>>> >>>> For reference, here is a gist of the code >>>> https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the >>>> offending line is the >>>> >>>> DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, >>>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES); >>>> >>>> line. >>>> >>>> Thanks again ! >>>> >>>> -- >>>> jay vyas >>>> >>> >>> >>> >>> -- >>> jay vyas >>> >> >> >