Hi,

I did not actually test this, but I think with Flink 1.4 you can extend BucketingSink and overwrite the invoke method to access the watermark

Pseudo code:

invoke(IN value, SinkFunction.Context context) {

   long currentWatermark = context.watermark()

   long taskIndex = getRuntimeContext().getIndexOfThisSubtask()

    if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {

       Write _SUCCESS

       lastSuccessWatermark = currentWatermark round down to 1 hour

    }

    invoke(value)

}

|Regards, Kien |

On 1/31/2018 5:54 PM, xiaobin yan wrote:
Hi:

I think so too! But I have a question that when should I add this logic in 
BucketingSink! And who does this logic, and ensures that the logic is executed 
only once, not every parallel instance of the sink that executes this logic!

Best,
Ben

On 31 Jan 2018, at 5:58 PM, Hung <unicorn.bana...@gmail.com> wrote:

it depends on how you partition your file. in my case I write file per hour,
so I'm sure that file is ready after that hour period, in processing time.
Here, read to be ready means this file contains all the data in that hour
period.

If the downstream runs in a batch way, you may want to ensure the file is
ready.
In this case, ready to read can mean all the data before watermark as
arrived.
You could take the BucketingSink and implement this logic there, maybe wait
until watermark
reaches

Best,

Sendoh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to