[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-9592: ---------------------------------- Labels: auto-deprioritized-major auto-unassigned pull-request-available stale-minor (was: auto-deprioritized-major auto-unassigned pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Notify on moving file into pending/ final state > ----------------------------------------------- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem > Reporter: Rinat Sharipov > Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, > pull-request-available, stale-minor > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > ------------------------------------------------------------------------------------------------------------------------ > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > ________________________________________________________________________ > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * * @param fs provides access for working with file system * @param > path path to the file, moved into next state * * @throws IOException if > something went wrong, while performing any operations with file system */ > void call(FileSystem fs, Path path) throws IOException; } > And have added an ability to register this callbacks in BucketingSink impl in > the following manner > > public BucketingSink<T> > registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...} > public BucketingSink<T> > registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) > \{...} > > I’m ready to discuss the best ways, how such hooks could be implemented in > the core impl or any other improvements, that will help us to add such > functionality into our extension, using public api, instead of copy-pasting > the source code. > > Thx for your help, mates =) > [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0] > > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > ________________________________________________________________________ > > Hi, > > Couple of things: > > 1. Please create a Jira ticket with this proposal, so we can move discussion > from user mailing list. > > I haven’t thought it through, so take my comments with a grain of salt, > however: > > 2. If we were to go with such callback, I would prefer to have one > BucketStateChangeCallback, with methods like `onInProgressToPending(…)`, > `onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one > interface passed three times/four times for different purposes. > > 3. Other thing that I had in mind is that BucketingSink could be rewritten to > extend TwoPhaseCommitSinkFunction. In that case, with > > public class BucketingSink2 extends TwoPhaseCommitSinkFunction<???> > > user could add his own hooks by overriding following methods > > BucketingSink2#beginTransaction, BucketingSink2#preCommit, > BucketingSink2#commit, BucketingSink2#abort. For example: > > public class MyBucketingSink extends BucketingSink2 { > @Override > protected void commit(??? txn) > { super.commit(txn); // My hook on moving file from pending to commit > state } > ; > } > > Alternatively, we could implement before mentioned callbacks support in > TwoPhaseCommitSinkFunction and provide such feature to > Kafka/Pravega/BucketingSink at once. > > Piotrek > -- This message was sent by Atlassian Jira (v8.20.1#820001)