Hi Teena, which Flink version are you using? Have you tried whether this happens with the latest release 1.6.2 as well?
Cheers, Till On Fri, Oct 26, 2018 at 1:17 PM Teena Kappen // BPRISE < teena.kap...@bprise.com> wrote: > Hi all, > > > > I have implemented RichInputFormat for reading result of aggregation > queries in Elasticsearch. There are around 100000 buckets, which are of > type json array. Note: This is one time response. > > > > My idea here is to iterate these arrays in parallel. Here is the pseudo > code. > > > > public void configure(Configuration parameters) { > > System.out.println("configure"); > > } > > > > public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { > > } > > > > public ResponseInputSplit[] createInputSplits(int minNumSplits){ > > System.out.println("createInputSplits"); > > > > //read from elastic > > // add buckets to array > > } > > > > public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[] > inputSplits) { > > //this is default > > System.out.println("getInputSplitAssigner"); > > return new DefaultInputSplitAssigner(inputSplits); > > } > > > > public void open(ResponseInputSplit split) { > > //read buckets > > } > > > > public boolean reachedEnd(){ > > System.out.println("reachedEnd"); > > } > > > > public Bounce nextRecord(Bounce reuse) { > > } > > > > public void close(){ > > } > > > > // my main method, > > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > > DataSet<Bounce> bounce_data_set = env.createInput(new > MyInputDataSetInputFormat()); > > > > When running in eclipse, it executes createInputSplits and the results > look fine. Logs are given below. > > Output is à > > configure > > Connected to JobManager at > Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id... > > configure > > createInputSplits > > > > When submitting job in flink cluster, it doesn’t execute ‘configure’ and > ‘createInputSplits’ methods. Instead it directly goes to nextRecord > function. Logs are given below. > > Output is à > > Starting execution of program > > configure > > Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for > job completion. > > Connected to JobManager at Actor[akka.tcp://flink@xxxx:xxx > /user/jobmanager#1219973491] with leader session id... > > 10/26/2018 15:05:57 Job execution switched to status RUNNING. > > 10/26/2018 15:05:57 DataSource (at > createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED > > 10/26/2018 15:05:57 DataSource (at > createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING > > 10/26/2018 15:06:00 DataSource (at > createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING > > 10/26/2018 15:06:00 DataSource (at > createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED > > java.lang.NullPointerException > > at com.xxx.test. > MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143) > > > > Regards, > > Teena > > >