Hi Santhosh,

Flink does not support automatic DI on task level and there is no immediate
plan as of now to support it out-of-the-box. In general, there are quite a
few implications of using automatic DI in a distributed setting. For
example, how is a singleton supposed to work? Nevertheless, Flink's job
startup got overhauled in the last and the upcoming release, so it might be
easier to support DI frameworks in the near future.

What I usually recommend is to use automatic DI while creating the
DataStream application and then switch to manual DI on task manager level
(most folks confuse DI with automatic DI, but DI is a general pattern that
is independent of any framework).

Here is an example. Suppose you want to use ServiceA in some asyncIO call.

DataStream<Integer> inputStream = env.addSource(...);
AsyncFunction<Integer, String> function = new ExternalLookupFunction();
AsyncDataStream.unorderedWait(inputStream, function, 1,
TimeUnit.SECONDS).print();

class ExternalLookupFunction extends AsyncFunction<Integer, String> {
        @Autowired
        ServiceA service; // <-- will be injected wherever the DataStream
graph is created

        @Override
        public void asyncInvoke(Integer input, ResultFuture<String>
resultFuture) throws IOException {
                service.call(input, resultFuture::complete); // <-- called only 
on
task manager
        }
}


Now the question is how ServiceA is transferred from client/job manager to
task manager. One solution is to make ServiceA Serializable and just let
Java Serialization handle everything automatically. Alternatively, you can
only serialize the configuration information and create the service on
RichAsyncFunction#open.

Let's see if someone else made progress on providing the initialization
hooks as described in your linked thread. Note that the community is busy
getting Flink 1.12 done, so it might take a while for more answers.

Best,

Arvid

On Tue, Nov 3, 2020 at 12:03 AM santhosh venkat <
santhoshvenkat1...@gmail.com> wrote:

>
> Hi,
>
> I'm trying to integrate a dependency injection framework with flink within
> my company. When I searched the user-mailing list, I found the following
> thread in flink which discussed about this in the past:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dependency-Injection-and-Flink-td18880.html
>
> Since the thread was ~2 yrs old, I'm creating this request.
>
> 1. How do we expect users to integrate flink with a dependency injection
> framework. Are there any hooks/entry-points that we can use to seamlessly
> integrate a DI-fwk with flink? How does the community recommend the
> dependency injection integration?
>
> 2. Would it be possible to create the object(say spring objects) at a
> flink-task scope ? Or all these objects(say spring) from a dependency
> injection fwk are expected to be created at an entire process(JM/TM) level?
>
> Can someone please help answer the above questions and help me understand
> the flink-guarantees better. Any help would be greatly appreciated.
>
> Thanks.
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to