Suneel, Flink comes with a built-in AvroOutputFormat. Is that good enough for you?
On Sun, Oct 4, 2015 at 6:01 AM, Suneel Marthi <suneel.mar...@gmail.com> wrote: > While on that Marton, would it make sense to have a > dataStream.writeAsJson() method? > > On Sat, Oct 3, 2015 at 11:54 PM, Márton Balassi <balassi.mar...@gmail.com> > wrote: > >> Hi Jay, >> >> As for the NPE: the file monitoring function throws it when the location >> is empty. Try running the datagenerator first! :) This behaviour is >> unwanted though, I am adding a JIRA ticket for it. >> >> Best, >> >> Marton >> >> On Sun, Oct 4, 2015 at 5:28 AM, Márton Balassi <balassi.mar...@gmail.com> >> wrote: >> >>> Hi Jay, >>> >>> Creating a batch and a streaming environment in a single Java source >>> file is fine, they just run separately. (If you run it from an IDE locally >>> they might conflict as the second one would try to launch a local executor >>> on a port that is most likely already taken by the first one.) I would >>> suggest to have these jobs in separate files currently, exactly for the >>> previous reason. >>> >>> Looking at your >>> code >>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, >>> new FlinkBPSGenerator.ProductSerializer()); does not do much good for you. >>> You need to register your serializers to the environment to which you are >>> using. Currently you would need to register it to the streaming env >>> variable. If you would like to also assemble a batch job you need to add >>> them there too. >>> >>> As for the streaming job I assume that you are using Flink version 0.9.1 >>> and checking out the problem shortly. >>> >>> Best, >>> >>> Marton >>> >>> On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <jayunit100.apa...@gmail.com> >>> wrote: >>> >>>> Here is a distilled example of the issue, should be easier to debug for >>>> folks interested... :) >>>> >>>> public static void main(String[] args) { >>>> >>>> >>>> >>>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, >>>> new FlinkBPSGenerator.ProductSerializer()); >>>> >>>> ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class, >>>> new FlinkBPSGenerator.TransactionSerializer()); >>>> >>>> final StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> //when running "env.execute" this stream should start consuming... >>>> DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, >>>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES); >>>> dataStream.iterate().map(new MapFunction<String, String>() { >>>> public String map(String value) throws Exception { >>>> System.out.println(value); >>>> return ">>> > > > > " + value + " < < < < <<<"; >>>> } >>>> }); >>>> try { >>>> env.execute(); >>>> } >>>> catch(Exception e){ >>>> e.printStackTrace(); >>>> } >>>> } >>>> >>>> >>>> >>>> On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <jayunit100.apa...@gmail.com> >>>> wrote: >>>> >>>>> Hi flink ! >>>>> >>>>> Looks like "setSlotSharing" is throwing an NPE when I try to start a >>>>> Thread which runs a streaming job. >>>>> >>>>> I'm trying to do this by creating a dataStream from >>>>> env.readFileStream, and then later starting a job which writes files out >>>>> ... >>>>> >>>>> However, I get >>>>> >>>>> Exception in thread "main" java.lang.NullPointerException >>>>> at >>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361) >>>>> >>>>> What is the right way to create a stream and batch job all in one >>>>> environment? >>>>> >>>>> For reference, here is a gist of the code >>>>> https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the >>>>> offending line is the >>>>> >>>>> DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, >>>>> FileMonitoringFunction.WatchType.ONLY_NEW_FILES); >>>>> >>>>> line. >>>>> >>>>> Thanks again ! >>>>> >>>>> -- >>>>> jay vyas >>>>> >>>> >>>> >>>> >>>> -- >>>> jay vyas >>>> >>> >>> >> >