Hi Thais, Thanks, that's really detailed and inspiring! I think we can use the same pattern for states too.
On Wed, Mar 27, 2024 at 6:40 PM Schwalbe Matthias < matthias.schwa...@viseca.ch> wrote: > Hi Ruibin, > > > > Our code [1] targets a very old version of Flink 1.8, for current > development my employer didn’t decide (yet?) to contribute it to the public. > > That old code does not yet contain the abstractions for setup of state > primitive, so let me sketch it here: > > - Derive a specific implementation per operator from > SetupDualUnboundedBoundedState > - All state primitive setup is then implemented in the respective > open() function > - Derive the operator and other (savepoint reader/writer) from this > state setup class/trait > - For convenience there is a boundedMode field that tells the operator > whether run in bounded/streaming mode (as the time semantics are similar > yet different) > - This is one example where we ‘patched’ the non-public runtime > implementation (mentioned in that other mail), therefore it needs to be > maintained Flink version by Flink version 😊 > > > > Feel free to query details … > > > > Sincere greetings > > > > Thias > > > > > > [1] https://github.com/VisecaCard/flink-commons > > [2] common savepoint setup: > > > > > */** Marker trait for flink functions/operators that can run in both > Bounded (BATCH) and Unbounded (PIPELINED) mode, * and for auxiliary > functions for savepoint priming and reading. * * *@note > > > *Derive a specific trait/mixin for each respective flink streaming > function/operator that initializes * state primitives. Mixin that > trait into auxiliary functions for savepoint priming and reading, to have a > common * state initialization. * *@note *Call *[[*ch**.**viseca**.* > *flink**.**operators**.**state**.**SetupDualUnboundedBoundedState* > *#open(org.apache.flink.api.common.functions.RuntimeContext)*]] > > > > ** in order to initialize this field. * * */ *trait > SetupDualUnboundedBoundedState > extends Serializable { > > > > */** Determines at runtime, if the job DAG is running in Bounded (BATCH) > or Unbounded (PIPELINED) mode. * * *@note *Call *[[*ch**.**viseca* > *.**flink**.**operators**.**state**.**SetupDualUnboundedBoundedState* > *#open(org.apache.flink.api.common.functions.RuntimeContext)*]] > > > ** in order to initialize this field. * */ *@transient var > *boundedMode > *= false > > > */** Opens the respective function/operator for initialization of state > primitives */ *def open(rtc: RuntimeContext): Unit = { > *boundedMode *= > rtc match { > case src: StreamingRuntimeContext => > src.getTaskManagerRuntimeInfo.getConfiguration > .get[RuntimeExecutionMode](ExecutionOptions.*RUNTIME_MODE*) == > RuntimeExecutionMode. > *BATCH *case _ => false > } > } > } > > > > > > > > > > *From:* Ruibin Xing <xingro...@gmail.com> > *Sent:* Wednesday, March 27, 2024 10:41 AM > *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch> > *Cc:* Marco Villalobos <mvillalo...@kineteque.com>; Ganesh Walse < > ganesh.wa...@gmail.com>; user@flink.apache.org > *Subject:* Re: need flink support framework for dependency injection<EOM> > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hi Thias, > > Could you share your approach to job setup using Spring, if that's > possible? We also use Spring Boot for DI in jobs, primarily relying on > profiles. I'm particularly interested in how you use the same job > structures for different scenarios, such as reading savepoints. Thank you > very much. > > > > On Wed, Mar 27, 2024 at 3:12 PM Schwalbe Matthias < > matthias.schwa...@viseca.ch> wrote: > > Hi Ganesh, > > I tend to agree with Marco. However your 'feature request' is very loose > and leave much room for misunderstanding. > > There are at least two scenarios for DI integration: > - DI for job setup: > - we use spring for job setup, which > - lets us use the same job structure for (at least) 4 scenarios: > streaming job, batch job for savepoint priming, savepoint reading, > transformation for complex schema changes -> savepoint writing > - we also appreciate a very convenient integration of a layered > configuration by means of spring profiles > - we can easily replace e.g. sources and sinks for test/local > develop/debug scenarios > - however this can also easily be done without DI > - our approach is public (Apache 2.0 license), if interested > - DI for Flink would probably be counterproductive for a number of reasons > (some guesswork here 😊 ) > - from what I see, the Flink code base is separated into two > clearly distinct parts: the public API, and the non-public implementation > - Flink community takes great efforts to guarantee backwards > compatibility of the public API, which also allows for replacing the > underneath implementation > - the private API mostly uses the Service-Locator pattern (sort > of) also to make it harder to introduce arbitrary changes to the > architecture, which would be hard to include into the > backwards-compatibility-guaranties > - if need be, in most cases you can change the non-public > implementation > - by implementing a set of replacement classes (quite > tedious) and wire them in, but > - that forces you to re-integrate for every new version of > Flink (even more tedious 😊) > - we've done so in select cases that were not of interest for the > general public, > - alternatively, if your extension use case is of public interest, it is > better to make a proposal for a change and negotiate agreement with the > community of whether and how to implement it > - we've also done so (recent example: [1]) > > WDYT? What is your case for DI? ... > > Sincere greetings > > Thias > > [1] https://issues.apache.org/jira/browse/FLINK-26585 > > > > > -----Original Message----- > From: Marco Villalobos <mvillalo...@kineteque.com> > Sent: Tuesday, March 26, 2024 11:40 PM > To: Ganesh Walse <ganesh.wa...@gmail.com> > Cc: user@flink.apache.org > Subject: Re: need flink support framework for dependency injection<EOM> > > Hi Ganesh, > > I disagree. I don’t think Flink needs a dependency injection framework. I > have implemented many complex jobs without one. Can you please articulate > why you think it needs a dependency injection framework, along with some > use cases that will show its benefit? > > I would rather see more features related to stream programming, > data-governance, file based table formats, or ML. > > > > On Mar 26, 2024, at 2:27 PM, Ganesh Walse <ganesh.wa...@gmail.com> > wrote: > > > > > > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >