Hi,

Couple of things:

1. Please create a Jira ticket with this proposal, so we can move discussion 
from user mailing list.

I haven’t thought it through, so take my comments with a grain of salt, however:

2. If we were to go with such callback, I would prefer to have one 
BucketStateChangeCallback, with methods like `onInProgressToPending(…)`, 
`onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one 
interface passed three times/four times for different purposes.

3. Other thing that I had in mind is that BucketingSink could be rewritten to 
extend TwoPhaseCommitSinkFunction. In that case, with 

public class BucketingSink2 extends TwoPhaseCommitSinkFunction<???>

user could add his own hooks by overriding following methods

BucketingSink2#beginTransaction, BucketingSink2#preCommit, 
BucketingSink2#commit, BucketingSink2#abort. For example:

public class MyBucketingSink extends BucketingSink2 {
  @Override
  protected void  commit(??? txn) {
    super.commit(txn);
    // My hook on moving file from pending to commit state
  };
}

Alternatively, we could implement before mentioned callbacks support in 
TwoPhaseCommitSinkFunction and provide such feature to 
Kafka/Pravega/BucketingSink at once.

Piotrek

> On 13 Jun 2018, at 22:45, Rinat <r.shari...@cleverdata.ru> wrote:
> 
> Hi guys, thx for your reply.
> 
> The following code info is actual for release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class
> 
> For now, BucketingSink has the following lifecycle of files
> 
> When moving files from opened to pending state:
> on each item (method invoke:434 line), we check that suitable bucket exist, 
> and contain opened file, in case, when opened file doesn’t exist, we create 
> one, and write item to it
> on each item (method invoke:434 line), we check that suitable opened file 
> doesn’t exceed the limits, and if limits are exceeded, we close it and move 
> into pending state using closeCurrentPartFile:568 line - private method
> on each timer request (onProcessingTime:482 line), we check, if items haven't 
> been added to the opened file longer, than specified period of time, we close 
> it, using the same private method closeCurrentPartFile:588 line
> 
> So, the only way, that we have, is to call our hook from 
> closeCurrentPartFile, that is private, so we copy-pasted the current impl and 
> injected our logic there
> 
> 
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in notifyCheckpointComplete:657 line, that is public, and contains 
> a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
> 
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
> 
> To solve such problem, we've created the following interface
> 
> /**
>  * The {@code FileStateChangeCallback} is used to perform any additional 
> operations, when {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of {@code BucketingSink}, look
>  * through it's official documentation.
>  */
> public interface FileStateChangeCallback extends Serializable {
> 
>     /**
>      * Used to perform any additional operations, related with moving of file 
> into next state.
>      *
>      * @param fs provides access for working with file system
>      * @param path path to the file, moved into next state
>      *
>      * @throws IOException if something went wrong, while performing any 
> operations with file system
>      */
>     void call(FileSystem fs, Path path) throws IOException;
> }
> And have added an ability to register this callbacks in BucketingSink impl in 
> the following manner
> 
> public BucketingSink<T> 
> registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) {...}
> public BucketingSink<T> 
> registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) 
> {...}
> 
> I’m ready to discuss the best ways, how such hooks could be implemented in 
> the core impl or any other improvements, that will help us to add such 
> functionality into our extension, using public api, instead of copy-pasting 
> the source code.
> 
> Thx for your help, mates =)
> 
> 
>> On 11 Jun 2018, at 11:37, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> I see that could be a useful feature. What exactly now is preventing you 
>> from inheriting from BucketingSink? Maybe it would be just enough to make 
>> the BucketingSink easier extendable.
>> 
>> One thing now that could collide with such feature is that Kostas is now 
>> working on larger BucketingSink rework/refactor. 
>> 
>> Piotrek
>> 
>>> On 8 Jun 2018, at 16:38, Rinat <r.shari...@cleverdata.ru 
>>> <mailto:r.shari...@cleverdata.ru>> wrote:
>>> 
>>> Hi mates, I got a proposal about functionality of BucketingSink.
>>> 
>>> During implementation of one of our tasks we got the following need - 
>>> create a meta-file, with the path and additional information about the 
>>> file, created by BucketingSink, when it’s been moved into final place.
>>> Unfortunately such behaviour is currently not available for us. 
>>> 
>>> We’ve implemented our own Sink, that provides an opportunity to register 
>>> notifiers, that will be called, when file state is changing, but current 
>>> API doesn’t allow us to add such behaviour using inheritance ...
>>> 
>>> It seems, that such functionality could be useful, and could be a part of 
>>> BucketingSink API
>>> What do you sink, should I make a PR ?
>>> 
>>> Sincerely yours,
>>> Rinat Sharipov
>>> Software Engineer at 1DMP CORE Team
>>> 
>>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
>>> mobile: +7 (925) 416-37-26
>>> 
>>> CleverDATA
>>> make your data clever
>>> 
>> 
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Reply via email to