Hi Ruibin, Our code [1] targets a very old version of Flink 1.8, for current development my employer didn’t decide (yet?) to contribute it to the public. That old code does not yet contain the abstractions for setup of state primitive, so let me sketch it here:
* Derive a specific implementation per operator from SetupDualUnboundedBoundedState * All state primitive setup is then implemented in the respective open() function * Derive the operator and other (savepoint reader/writer) from this state setup class/trait * For convenience there is a boundedMode field that tells the operator whether run in bounded/streaming mode (as the time semantics are similar yet different) * This is one example where we ‘patched’ the non-public runtime implementation (mentioned in that other mail), therefore it needs to be maintained Flink version by Flink version 😊 Feel free to query details … Sincere greetings Thias [1] https://github.com/VisecaCard/flink-commons [2] common savepoint setup: /** Marker trait for flink functions/operators that can run in both Bounded (BATCH) and Unbounded (PIPELINED) mode, * and for auxiliary functions for savepoint priming and reading. * * @note Derive a specific trait/mixin for each respective flink streaming function/operator that initializes * state primitives. Mixin that trait into auxiliary functions for savepoint priming and reading, to have a common * state initialization. * @note Call [[ch.viseca.flink.operators.state.SetupDualUnboundedBoundedState#open(org.apache.flink.api.common.functions.RuntimeContext)]] * in order to initialize this field. * * */ trait SetupDualUnboundedBoundedState extends Serializable { /** Determines at runtime, if the job DAG is running in Bounded (BATCH) or Unbounded (PIPELINED) mode. * * @note Call [[ch.viseca.flink.operators.state.SetupDualUnboundedBoundedState#open(org.apache.flink.api.common.functions.RuntimeContext)]] * in order to initialize this field. * */ @transient var boundedMode = false /** Opens the respective function/operator for initialization of state primitives */ def open(rtc: RuntimeContext): Unit = { boundedMode = rtc match { case src: StreamingRuntimeContext => src.getTaskManagerRuntimeInfo.getConfiguration .get[RuntimeExecutionMode](ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH case _ => false } } } From: Ruibin Xing <xingro...@gmail.com> Sent: Wednesday, March 27, 2024 10:41 AM To: Schwalbe Matthias <matthias.schwa...@viseca.ch> Cc: Marco Villalobos <mvillalo...@kineteque.com>; Ganesh Walse <ganesh.wa...@gmail.com>; user@flink.apache.org Subject: Re: need flink support framework for dependency injection<EOM> ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi Thias, Could you share your approach to job setup using Spring, if that's possible? We also use Spring Boot for DI in jobs, primarily relying on profiles. I'm particularly interested in how you use the same job structures for different scenarios, such as reading savepoints. Thank you very much. On Wed, Mar 27, 2024 at 3:12 PM Schwalbe Matthias <matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote: Hi Ganesh, I tend to agree with Marco. However your 'feature request' is very loose and leave much room for misunderstanding. There are at least two scenarios for DI integration: - DI for job setup: - we use spring for job setup, which - lets us use the same job structure for (at least) 4 scenarios: streaming job, batch job for savepoint priming, savepoint reading, transformation for complex schema changes -> savepoint writing - we also appreciate a very convenient integration of a layered configuration by means of spring profiles - we can easily replace e.g. sources and sinks for test/local develop/debug scenarios - however this can also easily be done without DI - our approach is public (Apache 2.0 license), if interested - DI for Flink would probably be counterproductive for a number of reasons (some guesswork here 😊 ) - from what I see, the Flink code base is separated into two clearly distinct parts: the public API, and the non-public implementation - Flink community takes great efforts to guarantee backwards compatibility of the public API, which also allows for replacing the underneath implementation - the private API mostly uses the Service-Locator pattern (sort of) also to make it harder to introduce arbitrary changes to the architecture, which would be hard to include into the backwards-compatibility-guaranties - if need be, in most cases you can change the non-public implementation - by implementing a set of replacement classes (quite tedious) and wire them in, but - that forces you to re-integrate for every new version of Flink (even more tedious 😊) - we've done so in select cases that were not of interest for the general public, - alternatively, if your extension use case is of public interest, it is better to make a proposal for a change and negotiate agreement with the community of whether and how to implement it - we've also done so (recent example: [1]) WDYT? What is your case for DI? ... Sincere greetings Thias [1] https://issues.apache.org/jira/browse/FLINK-26585 -----Original Message----- From: Marco Villalobos <mvillalo...@kineteque.com<mailto:mvillalo...@kineteque.com>> Sent: Tuesday, March 26, 2024 11:40 PM To: Ganesh Walse <ganesh.wa...@gmail.com<mailto:ganesh.wa...@gmail.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: need flink support framework for dependency injection<EOM> Hi Ganesh, I disagree. I don’t think Flink needs a dependency injection framework. I have implemented many complex jobs without one. Can you please articulate why you think it needs a dependency injection framework, along with some use cases that will show its benefit? I would rather see more features related to stream programming, data-governance, file based table formats, or ML. > On Mar 26, 2024, at 2:27 PM, Ganesh Walse > <ganesh.wa...@gmail.com<mailto:ganesh.wa...@gmail.com>> wrote: > > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited. Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.