Alexander, Thanks for the reply. Will implement and come back in case of any questions.
Prasanna. On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <alexan...@ververica.com> wrote: > Hi Prasanna, > > if the set of all possible sinks is known in advance, side outputs will be > generic enough to express your requirements. Side output produces a stream. > Create all of the side output tags, associate each of them with one sink, > add conditional logic around `ctx.output(outputTag, ... *)*;` to decide > where to dispatch the messages (see [1]), collect to none or many side > outputs, depending on your logic. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html > > -- > > Alexander Fedulov | Solutions Architect > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > > On Tue, May 26, 2020 at 2:57 PM Prasanna kumar < > prasannakumarram...@gmail.com> wrote: > >> Piotr, >> >> There is an event and subscriber registry as JSON file which has the >> table event mapping and event-subscriber mapping as mentioned below. >> >> Based on the set JSON , we need to job to go through the table updates >> and create events and for each event there is a way set how to sink them. >> >> The sink streams have to be added based on this JSON. Thats what i >> mentioned as no predefined sink in code earlier. >> >> You could see that each event has different set of sinks. >> >> Just checking how much generic could Side-output streams be ?. >> >> Source -> generate events -> (find out sinks dynamically in code ) -> >> write to the respective sinks. >> >> { >> " tablename ": "source.table1", >> "events": [ >> { >> "operation": "update", >> "eventstobecreated": [ >> { >> "eventname": "USERUPDATE", >> "Columnoperation": "and", >> "ColumnChanges": [ >> { >> "columnname": "name" >> }, >> { >> "columnname": "loginenabled", >> "value": "Y" >> } >> ], >> "Subscribers": [ >> { >> "customername": "c1", >> "method": "Kafka", >> "methodparams": { >> "topicname": "USERTOPIC" >> } >> }, >> { >> "customername": "c2", >> "method": "S3", >> "methodparams": { >> "folder": "aws://folderC2" >> }}, ]}] >> }, >> { >> "operation": "insert", >> "eventstobecreated": [ >> "eventname": "USERINSERT", >> "operation": "insert", >> "Subscribers": [ >> { >> "teamname": "General", >> "method": "Kafka", >> "methodparams": { >> "topicname": "new_users" >> } >> }, >> { >> "teamname": "General", >> "method": "kinesis", >> "methodparams": { >> "URL": "new_users", >> "username": "uname", >> "password": "pwd" >> }}, ]}] >> }, >> { >> "operation": "delete", >> "eventstobecreated": [ >> { >> "eventname": "USERDELETE", >> "Subscribers": [ >> { >> "customername": "c1", >> "method": "Kafka", >> "methodparams": { >> "topicname": "USERTOPIC" >> } >> }, >> { >> "customername": "c4", >> "method": "Kafka", >> "methodparams": { >> "topicname": "deleterecords" >> }}, ]}] >> }, >> } >> >> Please let me know your thoughts on this. >> >> Thanks, >> Prasanna. >> >> On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com> >> wrote: >> >>> Hi, >>> >>> I’m not sure if I fully understand what do you mean by >>> >>> > The point is the sink are not predefined. >>> >>> You must know before submitting the job, what sinks are going to be used >>> in the job. You can have some custom logic, that would filter out records >>> before writing them to the sinks, as I proposed before, or you could use >>> side outputs [1] would be better suited to your use case? >>> >>> Piotrek >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html >>> >>> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com> >>> wrote: >>> >>> Thanks Piotr for the Reply. >>> >>> I will explain my requirement in detail. >>> >>> Table Updates -> Generate Business Events -> Subscribers >>> >>> *Source Side* >>> There are CDC of 100 tables which the framework needs to listen to. >>> >>> *Event Table Mapping* >>> >>> There would be Event associated with table in a *m:n* fashion. >>> >>> say there are tables TA, TB, TC. >>> >>> EA, EA2 and EA3 are generated from TA (based on conditions) >>> EB generated from TB (based on conditions) >>> EC generated from TC (no conditions.) >>> >>> Say there are events EA,EB,EC generated from the tables TA, TB, TC >>> >>> *Event Sink Mapping* >>> >>> EA has following sinks. kafka topic SA,SA2,SAC. >>> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB. >>> EC has only rest endpoint RC. >>> >>> The point is the sink are not predefined. [. But i only see the example >>> online where , flink code having explicit myStream.addSink(sink2); ] >>> >>> We expect around 500 types of events in our platform in another 2 years >>> time. >>> >>> We are looking at writing a generic job for the same , rather than >>> writing one for new case. >>> >>> Let me know your thoughts and flink suitability to this requirement. >>> >>> Thanks >>> Prasanna. >>> >>> >>> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com> >>> wrote: >>> >>>> Hi, >>>> >>>> You could easily filter/map/process the streams differently before >>>> writing them to the sinks. Building on top of my previous example, this >>>> also should work fine: >>>> >>>> >>>> DataStream myStream = env.addSource(…).foo().bar() // for custom >>>> source, but any ; >>>> >>>> myStream.baz().addSink(sink1); >>>> myStream.addSink(sink2); >>>> myStream.qux().quuz().corge().addSink(sink3); >>>> >>>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions >>>> that you wish. `foo` and `bar` would be applied once to the stream, before >>>> it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and >>>> `corge` would be applied to only of the sinks AFTER splitting. >>>> >>>> In your case, it could be: >>>> >>>> myStream.filter(...).addSink(sink1); >>>> myStream.addSink(sink2); >>>> myStream.addSink(sink3); >>>> >>>> So sink2 and sink3 would get all of the records, while sink1 only a >>>> portion of them. >>>> >>>> Piotrek >>>> >>>> >>>> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com> >>>> wrote: >>>> >>>> Piotr, >>>> >>>> Thanks for the reply. >>>> >>>> There is one other case, where some events have to be written to >>>> multiple sinks and while other have to be written to just one sink. >>>> >>>> How could i have a common codeflow/DAG for the same ? >>>> >>>> I do not want multiple jobs to do the same want to accomplish in a >>>> single job . >>>> >>>> Could i add Stream code "myStream.addSink(sink1)" under a conditional >>>> operator such as 'if' to determine . >>>> >>>> But i suppose here the stream works differently compared to normal code >>>> processing. >>>> >>>> Prasanna. >>>> >>>> >>>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> To the best of my knowledge the following pattern should work just >>>>> fine: >>>>> >>>>> DataStream myStream = env.addSource(…).foo().bar() // for custom >>>>> source, but any ; >>>>> myStream.addSink(sink1); >>>>> myStream.addSink(sink2); >>>>> myStream.addSink(sink3); >>>>> >>>>> All of the records from `myStream` would be passed to each of the >>>>> sinks. >>>>> >>>>> Piotrek >>>>> >>>>> > On 24 May 2020, at 19:34, Prasanna kumar < >>>>> prasannakumarram...@gmail.com> wrote: >>>>> > >>>>> > Hi, >>>>> > >>>>> > There is a single source of events for me in my system. >>>>> > >>>>> > I need to process and send the events to multiple destination/sink >>>>> at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ] >>>>> > >>>>> > I am able send to one sink. >>>>> > >>>>> > By adding more sink stream to the source stream could we achieve it >>>>> . Are there any shortcomings. >>>>> > >>>>> > Please let me know if any one here has successfully implemented one . >>>>> > >>>>> > Thanks, >>>>> > Prasanna. >>>>> >>>>> >>>> >>>