Hi Thais,

Thanks, that's really detailed and inspiring! I think we can use the same
pattern for states too.

On Wed, Mar 27, 2024 at 6:40 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Ruibin,
>
>
>
> Our code [1] targets a very old version of Flink 1.8, for current
> development my employer didn’t decide (yet?) to contribute it to the public.
>
> That old code does not yet contain the abstractions for setup of state
> primitive, so let me sketch it here:
>
>    - Derive a specific implementation per operator from
>    SetupDualUnboundedBoundedState
>    - All state primitive setup is then implemented in the respective
>    open() function
>    - Derive the operator and other (savepoint reader/writer) from this
>    state setup class/trait
>    - For convenience there is a boundedMode field that tells the operator
>    whether run in bounded/streaming mode (as the time semantics are similar
>    yet different)
>    - This is one example where we ‘patched’ the non-public runtime
>    implementation (mentioned in that other mail), therefore it needs to be
>    maintained Flink version by Flink version 😊
>
>
>
> Feel free to query details …
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> [1] https://github.com/VisecaCard/flink-commons
>
> [2] common savepoint setup:
>
>
>
>
> */** Marker trait for flink functions/operators that can run in both
> Bounded (BATCH) and Unbounded (PIPELINED) mode, * and for auxiliary
> functions for savepoint priming and reading. * * *@note
>
>
> *Derive a specific trait/mixin for each respective flink streaming
> function/operator that initializes *       state primitives. Mixin that
> trait into auxiliary functions for savepoint priming and reading, to have a
> common *       state initialization. * *@note *Call *[[*ch**.**viseca**.*
> *flink**.**operators**.**state**.**SetupDualUnboundedBoundedState*
> *#open(org.apache.flink.api.common.functions.RuntimeContext)*]]
>
>
>
> **       in order to initialize this field. * * */ *trait 
> SetupDualUnboundedBoundedState
> extends Serializable {
>
>
>
> */** Determines at runtime, if the job DAG is running in Bounded (BATCH)
> or Unbounded (PIPELINED) mode.    *    * *@note *Call *[[*ch**.**viseca*
> *.**flink**.**operators**.**state**.**SetupDualUnboundedBoundedState*
> *#open(org.apache.flink.api.common.functions.RuntimeContext)*]]
>
>
> **       in order to initialize this field.    * */   *@transient var 
> *boundedMode
> *= false
>
>
> */** Opens the respective function/operator for initialization of state
> primitives */   *def open(rtc: RuntimeContext): Unit = {
>     *boundedMode *=
>       rtc match {
>         case src: StreamingRuntimeContext =>
>           src.getTaskManagerRuntimeInfo.getConfiguration
>             .get[RuntimeExecutionMode](ExecutionOptions.*RUNTIME_MODE*) ==
> RuntimeExecutionMode.
> *BATCH         *case _ => false
>       }
>   }
> }
>
>
>
>
>
>
>
>
>
> *From:* Ruibin Xing <xingro...@gmail.com>
> *Sent:* Wednesday, March 27, 2024 10:41 AM
> *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Cc:* Marco Villalobos <mvillalo...@kineteque.com>; Ganesh Walse <
> ganesh.wa...@gmail.com>; user@flink.apache.org
> *Subject:* Re: need flink support framework for dependency injection<EOM>
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Thias,
>
> Could you share your approach to job setup using Spring, if that's
> possible? We also use Spring Boot for DI in jobs, primarily relying on
> profiles. I'm particularly interested in how you use the same job
> structures for different scenarios, such as reading savepoints. Thank you
> very much.
>
>
>
> On Wed, Mar 27, 2024 at 3:12 PM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
> Hi Ganesh,
>
> I tend to agree with Marco. However your 'feature request' is very loose
> and leave much room for misunderstanding.
>
> There are at least two scenarios for DI integration:
> - DI for job setup:
>   - we use spring for job setup, which
>         - lets us use the same job structure for (at least) 4 scenarios:
> streaming job, batch job for savepoint priming, savepoint reading,
> transformation for complex schema changes -> savepoint writing
>         - we also appreciate a very convenient integration of a layered
> configuration by means of spring profiles
>         - we can easily replace e.g. sources and sinks for test/local
> develop/debug scenarios
>         - however this can also easily be done without DI
>         - our approach is public (Apache 2.0 license), if interested
> - DI for Flink would probably be counterproductive for a number of reasons
> (some guesswork here 😊 )
>         - from what I see, the Flink code base is separated into two
> clearly distinct parts: the public API, and the non-public implementation
>         - Flink community takes great efforts to guarantee backwards
> compatibility of the public API, which also allows for replacing the
> underneath implementation
>         - the private API mostly uses the Service-Locator pattern (sort
> of) also to make it harder to introduce arbitrary changes to the
> architecture, which would be hard to include into the
> backwards-compatibility-guaranties
>         - if need be, in most cases you can change the non-public
> implementation
>                 - by implementing a set of replacement classes (quite
> tedious) and wire them in, but
>                 - that forces you to re-integrate for every new version of
> Flink (even more tedious 😊)
>         - we've done so in select cases that were not of interest for the
> general public,
> - alternatively, if your extension use case is of public interest, it is
> better to make a proposal for a change and negotiate agreement with the
> community of whether and how to implement it
>         - we've also done so (recent example: [1])
>
> WDYT? What is your case for DI? ...
>
> Sincere greetings
>
> Thias
>
> [1] https://issues.apache.org/jira/browse/FLINK-26585
>
>
>
>
> -----Original Message-----
> From: Marco Villalobos <mvillalo...@kineteque.com>
> Sent: Tuesday, March 26, 2024 11:40 PM
> To: Ganesh Walse <ganesh.wa...@gmail.com>
> Cc: user@flink.apache.org
> Subject: Re: need flink support framework for dependency injection<EOM>
>
> Hi Ganesh,
>
> I disagree. I don’t think Flink needs a dependency injection framework. I
> have implemented many complex jobs without one. Can you please articulate
> why you think it needs a dependency injection framework, along with some
> use cases that will show its benefit?
>
> I would rather see more features related to stream programming,
> data-governance, file based table formats, or ML.
>
>
> > On Mar 26, 2024, at 2:27 PM, Ganesh Walse <ganesh.wa...@gmail.com>
> wrote:
> >
> > 
> >
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to