What I can tell is how the HBase input format works..if you look at AbstractTableInputFormat [1] this is the nextRecord() function:
public T nextRecord(T reuse) throws IOException { if (resultScanner == null) { throw new IOException("No table result scanner provided!"); } try { Result res = resultScanner.next(); if (res != null) { scannedRows++; currentRow = res.getRow(); return mapResultToOutType(res); } } catch (Exception e) { resultScanner.close(); //workaround for timeout on scan LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e); scan.setStartRow(currentRow); resultScanner = table.getScanner(scan); Result res = resultScanner.next(); if (res != null) { scannedRows++; currentRow = res.getRow(); return mapResultToOutType(res); } } endReached = true; return null; } When the resultScanner dies because of a timeout (this happens a lot when you have backpressure and the time between 2 consecutive reads exceed the scanner timeout), the code creates a new scanner and restart from where it was (starRow = currentRow). So there should not be any duplicates (in theory), but this could be the root of the problem.. Best, Flavio [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java On Sat, Nov 23, 2019 at 11:07 PM Mark Davis <moda...@protonmail.com> wrote: > Hello, > > I am reading Results from an HBase table and process them with Batch API. > Everything works fine until I receive a ScannerTimeoutException from HBase. > Maybe my transformations get stuck or a GC pause happen - hard to tell. > The HBase Client restarts the scan and the processing continues. > Except one problem - every time I receive this Exception I observe a > duplicate Result processing - the Result which was processed just before > ScannerTimeoutException is thrown is processed twice. > > Is this expected behavior? Should I be prepared to handle it? > And how should I handle it? Keeping track of all processed Results is not > feasible in my case. > > Here is a simple job demonstrating an issue (HBase scan and RPC timeouts > are set to 60 sec) > > Thank you! > > Best regards, > Mark > > > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > env.createInput(new Src()) > .map(new Mapper()) > .print(); > } > > private static class Mapper implements MapFunction<Tuple1<String>, > String> { > > private int cnt = 0; > > @Override > public String map(Tuple1<String> value) throws Exception { > if (cnt++ % 2 == 0) { > Thread.sleep(120000); > } > return value.f0; > } > > } > > private static class Src extends > AbstractTableInputFormat<Tuple1<String>> { > > @Override > protected Scan getScanner() { > Scan scan = new Scan(); > scan.setStartRow(getStartRow()); > scan.setStopRow(getEndRow()); > scan.setCaching(1); > scan.setCacheBlocks(false); > return scan; > } > > @Override > protected String getTableName() { > return getTable(); > } > > @Override > protected Tuple1<String> mapResultToOutType(Result r) { > return new Tuple1<String>(Bytes.toString(r.getRow())); > } > > @Override > public void configure(org.apache.flink.configuration.Configuration > parameters) { > scan = getScanner(); > try { > table = new HTable(getHadoopConf(), getTableName()); > } catch (IOException e) { > e.printStackTrace(); > } > } > > } >