Hi Teena,

which Flink version are you using? Have you tried whether this happens with
the latest release 1.6.2 as well?

Cheers,
Till

On Fri, Oct 26, 2018 at 1:17 PM Teena Kappen // BPRISE <
teena.kap...@bprise.com> wrote:

> Hi all,
>
>
>
> I have implemented RichInputFormat for reading result of aggregation
> queries in Elasticsearch. There are around 100000 buckets, which are of
> type json array. Note: This is one time response.
>
>
>
> My idea here is to iterate these arrays in parallel. Here is the pseudo
> code.
>
>
>
> public void configure(Configuration parameters) {
>
> System.out.println("configure");
>
> }
>
>
>
> public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
>
> }
>
>
>
> public ResponseInputSplit[] createInputSplits(int minNumSplits){
>
> System.out.println("createInputSplits");
>
>
>
> //read from elastic
>
> // add buckets to array
>
> }
>
>
>
> public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[]
> inputSplits) {
>
> //this is default
>
> System.out.println("getInputSplitAssigner");
>
> return new DefaultInputSplitAssigner(inputSplits);
>
> }
>
>
>
> public void open(ResponseInputSplit split) {
>
> //read buckets
>
> }
>
>
>
> public boolean reachedEnd(){
>
> System.out.println("reachedEnd");
>
> }
>
>
>
> public Bounce nextRecord(Bounce reuse) {
>
> }
>
>
>
> public void close(){
>
> }
>
>
>
> // my main method,
>
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>
> DataSet<Bounce> bounce_data_set = env.createInput(new
> MyInputDataSetInputFormat());
>
>
>
> When running in eclipse, it executes createInputSplits and the results
> look fine. Logs are given below.
>
> Output is à
>
> configure
>
> Connected to JobManager at
> Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id...
>
> configure
>
> createInputSplits
>
>
>
> When submitting job in flink cluster, it doesn’t execute ‘configure’ and
> ‘createInputSplits’ methods. Instead it directly goes to nextRecord
> function. Logs are given below.
>
> Output is à
>
> Starting execution of program
>
> configure
>
> Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for
> job completion.
>
> Connected to JobManager at Actor[akka.tcp://flink@xxxx:xxx
> /user/jobmanager#1219973491] with leader session id...
>
> 10/26/2018 15:05:57     Job execution switched to status RUNNING.
>
> 10/26/2018 15:05:57     DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED
>
> 10/26/2018 15:05:57     DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING
>
> 10/26/2018 15:06:00     DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING
>
> 10/26/2018 15:06:00     DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED
>
> java.lang.NullPointerException
>
>                                at com.xxx.test.
> MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)
>
>
>
> Regards,
>
> Teena
>
>
>

Reply via email to