Hi, Couple of things:
1. Please create a Jira ticket with this proposal, so we can move discussion from user mailing list. I haven’t thought it through, so take my comments with a grain of salt, however: 2. If we were to go with such callback, I would prefer to have one BucketStateChangeCallback, with methods like `onInProgressToPending(…)`, `onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one interface passed three times/four times for different purposes. 3. Other thing that I had in mind is that BucketingSink could be rewritten to extend TwoPhaseCommitSinkFunction. In that case, with public class BucketingSink2 extends TwoPhaseCommitSinkFunction<???> user could add his own hooks by overriding following methods BucketingSink2#beginTransaction, BucketingSink2#preCommit, BucketingSink2#commit, BucketingSink2#abort. For example: public class MyBucketingSink extends BucketingSink2 { @Override protected void commit(??? txn) { super.commit(txn); // My hook on moving file from pending to commit state }; } Alternatively, we could implement before mentioned callbacks support in TwoPhaseCommitSinkFunction and provide such feature to Kafka/Pravega/BucketingSink at once. Piotrek > On 13 Jun 2018, at 22:45, Rinat <r.shari...@cleverdata.ru> wrote: > > Hi guys, thx for your reply. > > The following code info is actual for release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > on each item (method invoke:434 line), we check that suitable bucket exist, > and contain opened file, in case, when opened file doesn’t exist, we create > one, and write item to it > on each item (method invoke:434 line), we check that suitable opened file > doesn’t exceed the limits, and if limits are exceeded, we close it and move > into pending state using closeCurrentPartFile:568 line - private method > on each timer request (onProcessingTime:482 line), we check, if items haven't > been added to the opened file longer, than specified period of time, we close > it, using the same private method closeCurrentPartFile:588 line > > So, the only way, that we have, is to call our hook from > closeCurrentPartFile, that is private, so we copy-pasted the current impl and > injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in notifyCheckpointComplete:657 line, that is public, and contains > a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The {@code FileStateChangeCallback} is used to perform any additional > operations, when {@link BucketingSink} > * moves file from one state to another. For more information about state > management of {@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable { > > /** > * Used to perform any additional operations, related with moving of file > into next state. > * > * @param fs provides access for working with file system > * @param path path to the file, moved into next state > * > * @throws IOException if something went wrong, while performing any > operations with file system > */ > void call(FileSystem fs, Path path) throws IOException; > } > And have added an ability to register this callbacks in BucketingSink impl in > the following manner > > public BucketingSink<T> > registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) {...} > public BucketingSink<T> > registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) > {...} > > I’m ready to discuss the best ways, how such hooks could be implemented in > the core impl or any other improvements, that will help us to add such > functionality into our extension, using public api, instead of copy-pasting > the source code. > > Thx for your help, mates =) > > >> On 11 Jun 2018, at 11:37, Piotr Nowojski <pi...@data-artisans.com >> <mailto:pi...@data-artisans.com>> wrote: >> >> Hi, >> >> I see that could be a useful feature. What exactly now is preventing you >> from inheriting from BucketingSink? Maybe it would be just enough to make >> the BucketingSink easier extendable. >> >> One thing now that could collide with such feature is that Kostas is now >> working on larger BucketingSink rework/refactor. >> >> Piotrek >> >>> On 8 Jun 2018, at 16:38, Rinat <r.shari...@cleverdata.ru >>> <mailto:r.shari...@cleverdata.ru>> wrote: >>> >>> Hi mates, I got a proposal about functionality of BucketingSink. >>> >>> During implementation of one of our tasks we got the following need - >>> create a meta-file, with the path and additional information about the >>> file, created by BucketingSink, when it’s been moved into final place. >>> Unfortunately such behaviour is currently not available for us. >>> >>> We’ve implemented our own Sink, that provides an opportunity to register >>> notifiers, that will be called, when file state is changing, but current >>> API doesn’t allow us to add such behaviour using inheritance ... >>> >>> It seems, that such functionality could be useful, and could be a part of >>> BucketingSink API >>> What do you sink, should I make a PR ? >>> >>> Sincerely yours, >>> Rinat Sharipov >>> Software Engineer at 1DMP CORE Team >>> >>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> >>> mobile: +7 (925) 416-37-26 >>> >>> CleverDATA >>> make your data clever >>> >> > > Sincerely yours, > Rinat Sharipov > Software Engineer at 1DMP CORE Team > > email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> > mobile: +7 (925) 416-37-26 > > CleverDATA > make your data clever >