Hi,
Couple of things:
1. Please create a Jira ticket with this proposal, so we can move discussion
from user mailing list.
I haven’t thought it through, so take my comments with a grain of salt, however:
2. If we were to go with such callback, I would prefer to have one
BucketStateChangeCallback, with methods like `onInProgressToPending(…)`,
`onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one
interface passed three times/four times for different purposes.
3. Other thing that I had in mind is that BucketingSink could be rewritten to
extend TwoPhaseCommitSinkFunction. In that case, with
public class BucketingSink2 extends TwoPhaseCommitSinkFunction<???>
user could add his own hooks by overriding following methods
BucketingSink2#beginTransaction, BucketingSink2#preCommit,
BucketingSink2#commit, BucketingSink2#abort. For example:
public class MyBucketingSink extends BucketingSink2 {
@Override
protected void commit(??? txn) {
super.commit(txn);
// My hook on moving file from pending to commit state
};
}
Alternatively, we could implement before mentioned callbacks support in
TwoPhaseCommitSinkFunction and provide such feature to
Kafka/Pravega/BucketingSink at once.
Piotrek
> On 13 Jun 2018, at 22:45, Rinat <[email protected]> wrote:
>
> Hi guys, thx for your reply.
>
> The following code info is actual for release-1.5.0 tag,
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class
>
> For now, BucketingSink has the following lifecycle of files
>
> When moving files from opened to pending state:
> on each item (method invoke:434 line), we check that suitable bucket exist,
> and contain opened file, in case, when opened file doesn’t exist, we create
> one, and write item to it
> on each item (method invoke:434 line), we check that suitable opened file
> doesn’t exceed the limits, and if limits are exceeded, we close it and move
> into pending state using closeCurrentPartFile:568 line - private method
> on each timer request (onProcessingTime:482 line), we check, if items haven't
> been added to the opened file longer, than specified period of time, we close
> it, using the same private method closeCurrentPartFile:588 line
>
> So, the only way, that we have, is to call our hook from
> closeCurrentPartFile, that is private, so we copy-pasted the current impl and
> injected our logic there
>
>
> Files are moving from pending state into final, during checkpointing
> lifecycle, in notifyCheckpointComplete:657 line, that is public, and contains
> a lot of logic, including discovery of files in pending states,
> synchronization of state access and it’s modification, etc …
>
> So we couldn’t override it, or call super method and add some logic, because
> when current impl changes the state of files, it removes them from state, and
> we don’t have any opportunity to know,
> for which files state have been changed.
>
> To solve such problem, we've created the following interface
>
> /**
> * The {@code FileStateChangeCallback} is used to perform any additional
> operations, when {@link BucketingSink}
> * moves file from one state to another. For more information about state
> management of {@code BucketingSink}, look
> * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable {
>
> /**
> * Used to perform any additional operations, related with moving of file
> into next state.
> *
> * @param fs provides access for working with file system
> * @param path path to the file, moved into next state
> *
> * @throws IOException if something went wrong, while performing any
> operations with file system
> */
> void call(FileSystem fs, Path path) throws IOException;
> }
> And have added an ability to register this callbacks in BucketingSink impl in
> the following manner
>
> public BucketingSink<T>
> registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) {...}
> public BucketingSink<T>
> registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks)
> {...}
>
> I’m ready to discuss the best ways, how such hooks could be implemented in
> the core impl or any other improvements, that will help us to add such
> functionality into our extension, using public api, instead of copy-pasting
> the source code.
>
> Thx for your help, mates =)
>
>
>> On 11 Jun 2018, at 11:37, Piotr Nowojski <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi,
>>
>> I see that could be a useful feature. What exactly now is preventing you
>> from inheriting from BucketingSink? Maybe it would be just enough to make
>> the BucketingSink easier extendable.
>>
>> One thing now that could collide with such feature is that Kostas is now
>> working on larger BucketingSink rework/refactor.
>>
>> Piotrek
>>
>>> On 8 Jun 2018, at 16:38, Rinat <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Hi mates, I got a proposal about functionality of BucketingSink.
>>>
>>> During implementation of one of our tasks we got the following need -
>>> create a meta-file, with the path and additional information about the
>>> file, created by BucketingSink, when it’s been moved into final place.
>>> Unfortunately such behaviour is currently not available for us.
>>>
>>> We’ve implemented our own Sink, that provides an opportunity to register
>>> notifiers, that will be called, when file state is changing, but current
>>> API doesn’t allow us to add such behaviour using inheritance ...
>>>
>>> It seems, that such functionality could be useful, and could be a part of
>>> BucketingSink API
>>> What do you sink, should I make a PR ?
>>>
>>> Sincerely yours,
>>> Rinat Sharipov
>>> Software Engineer at 1DMP CORE Team
>>>
>>> email: [email protected] <mailto:[email protected]>
>>> mobile: +7 (925) 416-37-26
>>>
>>> CleverDATA
>>> make your data clever
>>>
>>
>
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
>
> email: [email protected] <mailto:[email protected]>
> mobile: +7 (925) 416-37-26
>
> CleverDATA
> make your data clever
>