Hi Paul,

Thanks for the suggestion, this sounds like a nice solution. I will give it
a shot.

Best,
Yik San

On Wed, Jun 30, 2021 at 2:26 PM Paul Lam <paullin3...@gmail.com> wrote:

> Hi Yik San,
>
> Maybe you could use watermark to trigger the last flush. Source operations
> will emit MAX_WATERMARK to trigger all the timers when it terminates (see
> [1]).
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
>
> Best,
> Paul Lam
>
> 2021年6月30日 10:38,Yik San Chan <evan.chanyik...@gmail.com> 写道:
>
> Hi community,
>
> I have a batch job that consumes records from a bounded source (e.g.,
> Hive), walk them through a BufferingSink as described in [docs](
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction).
> In the BufferingSink, I want to flush out records to the sink in
> 1000-record batches.
>
> Given the source is bounded, I will need to flush out all records when it
> comes to the end, otherwise records buffered (variable bufferedElements)
> will be lost.
>
> An obvious way of doing so is to flush out all records in the `close`
> method. That should work fine.
>
> However, I wonder if it's possible to tell if a record is the last record
> in the `invoke` method? In other words, how to implement the `isLastRecord`
> method below?
>
> ```java
> @Override public void invoke(Tuple2<String, Integer> value, Context
> context) throws Exception {
>     bufferedElements.add(value);
>     if (bufferedElements.size() == threshold || isLastRecord()) {
>         for (Tuple2<String, Integer> element: bufferedElements) {
>             // send it to the sink
>         }
>         bufferedElements.clear();
>     }
> }
> ```
>
> Thanks!
>
> Best,
> Yik San
>
>
>

Reply via email to