I just did that: public T nextRecord(final T reuse) throws IOException { if (this.rs == null){ // throw new IOException("No table result scanner provided!"); return null; } ...
because in the class FileSourceFunction we have: @Override public void run(SourceContext<OUT> ctx) throws Exception { while (isRunning) { OUT nextElement = serializer.createInstance(); nextElement = format.nextRecord(nextElement); if (nextElement == null && splitIterator.hasNext()) { format.open(splitIterator.next()); continue; } else if (nextElement == null) { break; } ctx.collect(nextElement); } } (I had to copy TableInputSplit as its constructor is not visible...) 2016-06-06 16:07 GMT+02:00 Ufuk Celebi <u...@apache.org>: > From the code it looks like the open method of the TableInputFormat is > never called. What are you doing differently in the > StreamingTableInputFormat? > > – Ufuk > > > On Mon, Jun 6, 2016 at 1:49 PM, Christophe Salperwyck > <christophe.salperw...@gmail.com> wrote: > > Hi all, > > > > I am trying to read data from HBase and use the windows functions of > Flink > > streaming. I can read my data using the ExecutionEnvironment but not from > > the StreamExecutionEnvironment. > > > > Is that a known issue? > > > > Are the inputsplits used in the streaming environment? > > > > Here a sample of my code: > > > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > @SuppressWarnings("serial") > > final DataStreamSource<ANA> anaDS = env.createInput(new > > TableInputFormat<ANA>() { > > ... > > } > > > > final WindowedStream<ANA, Tuple, TimeWindow> ws = anaDS. > > assignTimestampsAndWatermarks(new > xxxxAssignerWithPunctuatedWatermarks()). > > keyBy(0). > > timeWindow(Time.days(30), Time.days(30)); > > > > ws.sum(2).printToErr(); > > env.execute(); > > > > The error I get is: > > Caused by: java.io.IOException: No table result scanner provided! > > at > > > org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:103) > > > > It seems the "Result" is not read for a first time before calling this > > function. > > > > I built a "StreamingTableInputFormat" as a temporary work around but let > me > > know if there is something I did wrong. > > > > Thanks for everything, Flink is great! > > > > Cheers, > > Christophe >