Hello,

I am reading Results from an HBase table and process them with Batch API. 
Everything works fine until I receive a ScannerTimeoutException from HBase.
Maybe my transformations get stuck or a GC pause happen - hard to tell. The 
HBase Client restarts the scan and the processing continues.
Except one problem - every time I receive this Exception I observe a duplicate 
Result processing - the Result which was processed just before 
ScannerTimeoutException is thrown is processed twice.

Is this expected behavior? Should I be prepared to handle it?
And how should I handle it? Keeping track of all processed Results is not 
feasible in my case.

Here is a simple job demonstrating an issue (HBase scan and RPC timeouts are 
set to 60 sec)

Thank you!

Best regards,
Mark

  public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env.createInput(new Src())
        .map(new Mapper())
        .print();
  }

  private static class Mapper implements MapFunction<Tuple1<String>, String> {

    private int cnt = 0;

    @Override
    public String map(Tuple1<String> value) throws Exception {
      if (cnt++ % 2 == 0) {
        Thread.sleep(120000);
      }
      return value.f0;
    }

  }

  private static class Src extends AbstractTableInputFormat<Tuple1<String>> {

    @Override
    protected Scan getScanner() {
      Scan scan = new Scan();
      scan.setStartRow(getStartRow());
      scan.setStopRow(getEndRow());
      scan.setCaching(1);
      scan.setCacheBlocks(false);
      return scan;
    }

    @Override
    protected String getTableName() {
      return getTable();
    }

    @Override
    protected Tuple1<String> mapResultToOutType(Result r) {
      return new Tuple1<String>(Bytes.toString(r.getRow()));
    }

    @Override
    public void configure(org.apache.flink.configuration.Configuration 
parameters) {
      scan = getScanner();
      try {
        table = new HTable(getHadoopConf(), getTableName());
      } catch (IOException e) {
        e.printStackTrace();
      }
    }

  }

Reply via email to