Hello, I am reading Results from an HBase table and process them with Batch API. Everything works fine until I receive a ScannerTimeoutException from HBase. Maybe my transformations get stuck or a GC pause happen - hard to tell. The HBase Client restarts the scan and the processing continues. Except one problem - every time I receive this Exception I observe a duplicate Result processing - the Result which was processed just before ScannerTimeoutException is thrown is processed twice.
Is this expected behavior? Should I be prepared to handle it? And how should I handle it? Keeping track of all processed Results is not feasible in my case. Here is a simple job demonstrating an issue (HBase scan and RPC timeouts are set to 60 sec) Thank you! Best regards, Mark public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.createInput(new Src()) .map(new Mapper()) .print(); } private static class Mapper implements MapFunction<Tuple1<String>, String> { private int cnt = 0; @Override public String map(Tuple1<String> value) throws Exception { if (cnt++ % 2 == 0) { Thread.sleep(120000); } return value.f0; } } private static class Src extends AbstractTableInputFormat<Tuple1<String>> { @Override protected Scan getScanner() { Scan scan = new Scan(); scan.setStartRow(getStartRow()); scan.setStopRow(getEndRow()); scan.setCaching(1); scan.setCacheBlocks(false); return scan; } @Override protected String getTableName() { return getTable(); } @Override protected Tuple1<String> mapResultToOutType(Result r) { return new Tuple1<String>(Bytes.toString(r.getRow())); } @Override public void configure(org.apache.flink.configuration.Configuration parameters) { scan = getScanner(); try { table = new HTable(getHadoopConf(), getTableName()); } catch (IOException e) { e.printStackTrace(); } } }