Hi Jessy, Can I add a new sink into the execution graph at runtime, for example : a > new Kafka producer , without restarting the current application or using > option1 ? >
No, there is no way to add a sink without restart currently. Could you elaborate why a restart is not an option for you? You can use Option 2, which means that you implement 1 source and 1 sink which will dynamically read from or write to different topics possibly by wrapping the existing source and sink. This is a rather complex task that I would not recommend to a new Flink user. If you have a known set of possible sink topics, another option would be to add all sinks from the go and only route messages dynamically with side-outputs. However, I'm not aware that such a pattern exists for sources. Although with the new source interface, it should be possible to do that. On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping <tech.user.str...@gmail.com> wrote: > Hi Team, > > Can you provide your thoughts on this, it will be helpful .. > > Thanks > Jessy > > On Tue, 16 Mar 2021 at 21:29, Jessy Ping <tech.user.str...@gmail.com> > wrote: > >> Hi Timo/Team, >> Thanks for the reply. >> >> Just take the example from the following pseduo code, >> Suppose , this is the current application logic. >> >> firstInputStream = addSource(...)* //Kafka consumer C1* >> secondInputStream = addSource(...) *//Kafka consumer C2* >> >> outputStream = firstInputStream,keyBy(a -> a.key) >> .connect(secondInputStream.keyBy(b->b.key)) >> .coProcessFunction(....) >> * // logic determines : whether a new sink should be added to the >> application or not ?. If not: then the event will be produced to the >> existing sink(s). If a new sink is required: produce the events to the >> existing sinks + the new one* >> sink1 = addSink(outPutStream). //Kafka producer P1 >> . >> . >> . >> sinkN = addSink(outPutStream). //Kafka producer PN >> >> *Questions* >> --> Can I add a new sink into the execution graph at runtime, for example >> : a new Kafka producer , without restarting the current application or >> using option1 ? >> >> --> (Option 2 )What do you mean by adding a custom sink at >> coProcessFunction , how will it change the execution graph ? >> >> Thanks >> Jessy >> >> >> >> On Tue, 16 Mar 2021 at 17:45, Timo Walther <twal...@apache.org> wrote: >> >>> Hi Jessy, >>> >>> to be precise, the JobGraph is not used at runtime. It is translated >>> into an ExecutionGraph. >>> >>> But nevertheless such patterns are possible but require a bit of manual >>> implementation. >>> >>> Option 1) You stop the job with a savepoint and restart the application >>> with slightly different parameters. If the pipeline has not changed >>> much, the old state can be remapped to the slightly modified job graph. >>> This is the easiest solution but with the downside of maybe a couple of >>> seconds downtime. >>> >>> Option 2) You introduce a dedicated control stream (i.e. by using the >>> connect() DataStream API [1]). Either you implement a custom sink in the >>> main stream of the CoProcessFunction. Or you enrich every record in the >>> main stream with sink parameters that are read by you custom sink >>> implementation. >>> >>> I hope this helps. >>> >>> Regards, >>> Timo >>> >>> >>> [1] >>> >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect >>> >>> On 16.03.21 12:37, Jessy Ping wrote: >>> > Hi Team, >>> > Is it possible to edit the job graph at runtime ? . Suppose, I want to >>> > add a new sink to the flink application at runtime that depends upon >>> > the specific parameters in the incoming events.Can i edit the >>> jobgraph >>> > of a running flink application ? >>> > >>> > Thanks >>> > Jessy >>> >>>