Hi Ruibin,

Our code [1] targets a very old version of Flink 1.8, for current development 
my employer didn’t decide (yet?) to contribute it to the public.
That old code does not yet contain the abstractions for setup of state 
primitive, so let me sketch it here:

  *   Derive a specific implementation per operator from 
SetupDualUnboundedBoundedState
  *   All state primitive setup is then implemented in the respective open() 
function
  *   Derive the operator and other (savepoint reader/writer) from this state 
setup class/trait
  *   For convenience there is a boundedMode field that tells the operator 
whether run in bounded/streaming mode (as the time semantics are similar yet 
different)
  *   This is one example where we ‘patched’ the non-public runtime 
implementation (mentioned in that other mail), therefore it needs to be 
maintained Flink version by Flink version 😊

Feel free to query details …

Sincere greetings

Thias


[1] https://github.com/VisecaCard/flink-commons
[2] common savepoint setup:
/** Marker trait for flink functions/operators that can run in both Bounded 
(BATCH) and Unbounded (PIPELINED) mode,
* and for auxiliary functions for savepoint priming and reading.
*
* @note Derive a specific trait/mixin for each respective flink streaming 
function/operator that initializes
*       state primitives. Mixin that trait into auxiliary functions for 
savepoint priming and reading, to have a common
*       state initialization.
* @note Call 
[[ch.viseca.flink.operators.state.SetupDualUnboundedBoundedState#open(org.apache.flink.api.common.functions.RuntimeContext)]]
*       in order to initialize this field.
*
* */
trait SetupDualUnboundedBoundedState extends Serializable {

  /** Determines at runtime, if the job DAG is running in Bounded (BATCH) or 
Unbounded (PIPELINED) mode.
   *
   * @note Call 
[[ch.viseca.flink.operators.state.SetupDualUnboundedBoundedState#open(org.apache.flink.api.common.functions.RuntimeContext)]]
   *       in order to initialize this field.
   * */
  @transient var boundedMode = false

  /** Opens the respective function/operator for initialization of state 
primitives */
  def open(rtc: RuntimeContext): Unit = {
    boundedMode =
      rtc match {
        case src: StreamingRuntimeContext =>
          src.getTaskManagerRuntimeInfo.getConfiguration
            .get[RuntimeExecutionMode](ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.BATCH
        case _ => false
      }
  }
}




From: Ruibin Xing <xingro...@gmail.com>
Sent: Wednesday, March 27, 2024 10:41 AM
To: Schwalbe Matthias <matthias.schwa...@viseca.ch>
Cc: Marco Villalobos <mvillalo...@kineteque.com>; Ganesh Walse 
<ganesh.wa...@gmail.com>; user@flink.apache.org
Subject: Re: need flink support framework for dependency injection<EOM>

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Thias,

Could you share your approach to job setup using Spring, if that's possible? We 
also use Spring Boot for DI in jobs, primarily relying on profiles. I'm 
particularly interested in how you use the same job structures for different 
scenarios, such as reading savepoints. Thank you very much.

On Wed, Mar 27, 2024 at 3:12 PM Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote:
Hi Ganesh,

I tend to agree with Marco. However your 'feature request' is very loose and 
leave much room for misunderstanding.

There are at least two scenarios for DI integration:
- DI for job setup:
  - we use spring for job setup, which
        - lets us use the same job structure for (at least) 4 scenarios: 
streaming job, batch job for savepoint priming, savepoint reading, 
transformation for complex schema changes -> savepoint writing
        - we also appreciate a very convenient integration of a layered 
configuration by means of spring profiles
        - we can easily replace e.g. sources and sinks for test/local 
develop/debug scenarios
        - however this can also easily be done without DI
        - our approach is public (Apache 2.0 license), if interested
- DI for Flink would probably be counterproductive for a number of reasons 
(some guesswork here 😊 )
        - from what I see, the Flink code base is separated into two clearly 
distinct parts: the public API, and the non-public implementation
        - Flink community takes great efforts to guarantee backwards 
compatibility of the public API, which also allows for replacing the underneath 
implementation
        - the private API mostly uses the Service-Locator pattern (sort of) 
also to make it harder to introduce arbitrary changes to the architecture, 
which would be hard to include into the backwards-compatibility-guaranties
        - if need be, in most cases you can change the non-public implementation
                - by implementing a set of replacement classes (quite tedious) 
and wire them in, but
                - that forces you to re-integrate for every new version of 
Flink (even more tedious 😊)
        - we've done so in select cases that were not of interest for the 
general public,
- alternatively, if your extension use case is of public interest, it is better 
to make a proposal for a change and negotiate agreement with the community of 
whether and how to implement it
        - we've also done so (recent example: [1])

WDYT? What is your case for DI? ...

Sincere greetings

Thias

[1] https://issues.apache.org/jira/browse/FLINK-26585




-----Original Message-----
From: Marco Villalobos 
<mvillalo...@kineteque.com<mailto:mvillalo...@kineteque.com>>
Sent: Tuesday, March 26, 2024 11:40 PM
To: Ganesh Walse <ganesh.wa...@gmail.com<mailto:ganesh.wa...@gmail.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: need flink support framework for dependency injection<EOM>

Hi Ganesh,

I disagree. I don’t think Flink needs a dependency injection framework. I have 
implemented many complex jobs without one. Can you please articulate why you 
think it needs a dependency injection framework, along with some use cases that 
will show its benefit?

I would rather see more features related to stream programming, 
data-governance, file based table formats, or ML.


> On Mar 26, 2024, at 2:27 PM, Ganesh Walse 
> <ganesh.wa...@gmail.com<mailto:ganesh.wa...@gmail.com>> wrote:
>
> 
>
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to