Thanks Arvid for the reply.

Can you please elaborate a little bit on option 2 , if possible ?

Thanks
Jessy

On Mon, Mar 22, 2021, 7:27 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Jessy,
>
> Can I add a new sink into the execution graph at runtime, for example : a
>> new Kafka producer , without restarting the current application  or using
>> option1 ?
>>
>
> No, there is no way to add a sink without restart currently. Could you
> elaborate why a restart is not an option for you?
>
> You can use Option 2, which means that you implement 1 source and 1 sink
> which will dynamically read from or write to different topics possibly by
> wrapping the existing source and sink. This is a rather complex task that I
> would not recommend to a new Flink user.
>
> If you have a known set of possible sink topics, another option would be
> to add all sinks from the go and only route messages dynamically with
> side-outputs. However, I'm not aware that such a pattern exists for
> sources. Although with the new source interface, it should be possible to
> do that.
>
> On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping <tech.user.str...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> Can you provide your thoughts on this, it will be helpful ..
>>
>> Thanks
>> Jessy
>>
>> On Tue, 16 Mar 2021 at 21:29, Jessy Ping <tech.user.str...@gmail.com>
>> wrote:
>>
>>> Hi Timo/Team,
>>> Thanks for the reply.
>>>
>>> Just take the example from the following pseduo code,
>>> Suppose , this is the current application logic.
>>>
>>> firstInputStream = addSource(...)* //Kafka consumer C1*
>>> secondInputStream =  addSource(...) *//Kafka consumer C2*
>>>
>>> outputStream = firstInputStream,keyBy(a -> a.key)
>>> .connect(secondInputStream.keyBy(b->b.key))
>>> .coProcessFunction(....)
>>> * // logic determines : whether a new sink should be added to the
>>> application or not ?. If not: then the event will be produced to the
>>> existing sink(s). If a new sink is required: produce the events to the
>>> existing sinks + the new one*
>>> sink1 = addSink(outPutStream). //Kafka producer P1
>>> .
>>> .
>>> .
>>> sinkN =  addSink(outPutStream). //Kafka producer PN
>>>
>>> *Questions*
>>> --> Can I add a new sink into the execution graph at runtime, for
>>> example : a new Kafka producer , without restarting the current
>>> application  or using option1 ?
>>>
>>> -->  (Option 2 )What do you mean by adding a custom sink at
>>> coProcessFunction , how will it change the execution graph ?
>>>
>>> Thanks
>>> Jessy
>>>
>>>
>>>
>>> On Tue, 16 Mar 2021 at 17:45, Timo Walther <twal...@apache.org> wrote:
>>>
>>>> Hi Jessy,
>>>>
>>>> to be precise, the JobGraph is not used at runtime. It is translated
>>>> into an ExecutionGraph.
>>>>
>>>> But nevertheless such patterns are possible but require a bit of manual
>>>> implementation.
>>>>
>>>> Option 1) You stop the job with a savepoint and restart the application
>>>> with slightly different parameters. If the pipeline has not changed
>>>> much, the old state can be remapped to the slightly modified job graph.
>>>> This is the easiest solution but with the downside of maybe a couple of
>>>> seconds downtime.
>>>>
>>>> Option 2) You introduce a dedicated control stream (i.e. by using the
>>>> connect() DataStream API [1]). Either you implement a custom sink in
>>>> the
>>>> main stream of the CoProcessFunction. Or you enrich every record in the
>>>> main stream with sink parameters that are read by you custom sink
>>>> implementation.
>>>>
>>>> I hope this helps.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> [1]
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>>>
>>>> On 16.03.21 12:37, Jessy Ping wrote:
>>>> > Hi Team,
>>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want
>>>> to
>>>> > add a new sink to the flink application at runtime that depends upon
>>>> > the  specific parameters in the incoming events.Can i edit the
>>>> jobgraph
>>>> > of a running flink application ?
>>>> >
>>>> > Thanks
>>>> > Jessy
>>>>
>>>>

Reply via email to