Hi , I have tested it. There are some small problems. When checkpoint is finished, the name of the file will change, and the success file will be written before checkpoint.
Best, Ben > On 1 Feb 2018, at 8:06 PM, Kien Truong <duckientru...@gmail.com> wrote: > > Hi, > > I did not actually test this, but I think with Flink 1.4 you can extend > BucketingSink and overwrite the invoke method to access the watermark > Pseudo code: > invoke(IN value, SinkFunction.Context context) { > long currentWatermark = context.watermark() > long taskIndex = getRuntimeContext().getIndexOfThisSubtask() > if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) { > Write _SUCCESS > lastSuccessWatermark = currentWatermark round down to 1 hour > } > invoke(value) > } > > Regards, > Kien > On 1/31/2018 5:54 PM, xiaobin yan wrote: >> Hi: >> >> I think so too! But I have a question that when should I add this logic in >> BucketingSink! And who does this logic, and ensures that the logic is >> executed only once, not every parallel instance of the sink that executes >> this logic! >> >> Best, >> Ben >> >>> On 31 Jan 2018, at 5:58 PM, Hung <unicorn.bana...@gmail.com> >>> <mailto:unicorn.bana...@gmail.com> wrote: >>> >>> it depends on how you partition your file. in my case I write file per hour, >>> so I'm sure that file is ready after that hour period, in processing time. >>> Here, read to be ready means this file contains all the data in that hour >>> period. >>> >>> If the downstream runs in a batch way, you may want to ensure the file is >>> ready. >>> In this case, ready to read can mean all the data before watermark as >>> arrived. >>> You could take the BucketingSink and implement this logic there, maybe wait >>> until watermark >>> reaches >>> >>> Best, >>> >>> Sendoh >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>