Hi Paul, Thanks for the suggestion, this sounds like a nice solution. I will give it a shot.
Best, Yik San On Wed, Jun 30, 2021 at 2:26 PM Paul Lam <paullin3...@gmail.com> wrote: > Hi Yik San, > > Maybe you could use watermark to trigger the last flush. Source operations > will emit MAX_WATERMARK to trigger all the timers when it terminates (see > [1]). > > [1] > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java > > Best, > Paul Lam > > 2021年6月30日 10:38,Yik San Chan <evan.chanyik...@gmail.com> 写道: > > Hi community, > > I have a batch job that consumes records from a bounded source (e.g., > Hive), walk them through a BufferingSink as described in [docs]( > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction). > In the BufferingSink, I want to flush out records to the sink in > 1000-record batches. > > Given the source is bounded, I will need to flush out all records when it > comes to the end, otherwise records buffered (variable bufferedElements) > will be lost. > > An obvious way of doing so is to flush out all records in the `close` > method. That should work fine. > > However, I wonder if it's possible to tell if a record is the last record > in the `invoke` method? In other words, how to implement the `isLastRecord` > method below? > > ```java > @Override public void invoke(Tuple2<String, Integer> value, Context > context) throws Exception { > bufferedElements.add(value); > if (bufferedElements.size() == threshold || isLastRecord()) { > for (Tuple2<String, Integer> element: bufferedElements) { > // send it to the sink > } > bufferedElements.clear(); > } > } > ``` > > Thanks! > > Best, > Yik San > > >