Hi Prasanna, if the set of all possible sinks is known in advance, side outputs will be generic enough to express your requirements. Side output produces a stream. Create all of the side output tags, associate each of them with one sink, add conditional logic around `ctx.output(outputTag, ... *)*;` to decide where to dispatch the messages (see [1]), collect to none or many side outputs, depending on your logic.
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html -- Alexander Fedulov | Solutions Architect <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time On Tue, May 26, 2020 at 2:57 PM Prasanna kumar < prasannakumarram...@gmail.com> wrote: > Piotr, > > There is an event and subscriber registry as JSON file which has the table > event mapping and event-subscriber mapping as mentioned below. > > Based on the set JSON , we need to job to go through the table updates and > create events and for each event there is a way set how to sink them. > > The sink streams have to be added based on this JSON. Thats what i > mentioned as no predefined sink in code earlier. > > You could see that each event has different set of sinks. > > Just checking how much generic could Side-output streams be ?. > > Source -> generate events -> (find out sinks dynamically in code ) -> > write to the respective sinks. > > { > " tablename ": "source.table1", > "events": [ > { > "operation": "update", > "eventstobecreated": [ > { > "eventname": "USERUPDATE", > "Columnoperation": "and", > "ColumnChanges": [ > { > "columnname": "name" > }, > { > "columnname": "loginenabled", > "value": "Y" > } > ], > "Subscribers": [ > { > "customername": "c1", > "method": "Kafka", > "methodparams": { > "topicname": "USERTOPIC" > } > }, > { > "customername": "c2", > "method": "S3", > "methodparams": { > "folder": "aws://folderC2" > }}, ]}] > }, > { > "operation": "insert", > "eventstobecreated": [ > "eventname": "USERINSERT", > "operation": "insert", > "Subscribers": [ > { > "teamname": "General", > "method": "Kafka", > "methodparams": { > "topicname": "new_users" > } > }, > { > "teamname": "General", > "method": "kinesis", > "methodparams": { > "URL": "new_users", > "username": "uname", > "password": "pwd" > }}, ]}] > }, > { > "operation": "delete", > "eventstobecreated": [ > { > "eventname": "USERDELETE", > "Subscribers": [ > { > "customername": "c1", > "method": "Kafka", > "methodparams": { > "topicname": "USERTOPIC" > } > }, > { > "customername": "c4", > "method": "Kafka", > "methodparams": { > "topicname": "deleterecords" > }}, ]}] > }, > } > > Please let me know your thoughts on this. > > Thanks, > Prasanna. > > On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com> > wrote: > >> Hi, >> >> I’m not sure if I fully understand what do you mean by >> >> > The point is the sink are not predefined. >> >> You must know before submitting the job, what sinks are going to be used >> in the job. You can have some custom logic, that would filter out records >> before writing them to the sinks, as I proposed before, or you could use >> side outputs [1] would be better suited to your use case? >> >> Piotrek >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html >> >> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com> >> wrote: >> >> Thanks Piotr for the Reply. >> >> I will explain my requirement in detail. >> >> Table Updates -> Generate Business Events -> Subscribers >> >> *Source Side* >> There are CDC of 100 tables which the framework needs to listen to. >> >> *Event Table Mapping* >> >> There would be Event associated with table in a *m:n* fashion. >> >> say there are tables TA, TB, TC. >> >> EA, EA2 and EA3 are generated from TA (based on conditions) >> EB generated from TB (based on conditions) >> EC generated from TC (no conditions.) >> >> Say there are events EA,EB,EC generated from the tables TA, TB, TC >> >> *Event Sink Mapping* >> >> EA has following sinks. kafka topic SA,SA2,SAC. >> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB. >> EC has only rest endpoint RC. >> >> The point is the sink are not predefined. [. But i only see the example >> online where , flink code having explicit myStream.addSink(sink2); ] >> >> We expect around 500 types of events in our platform in another 2 years >> time. >> >> We are looking at writing a generic job for the same , rather than >> writing one for new case. >> >> Let me know your thoughts and flink suitability to this requirement. >> >> Thanks >> Prasanna. >> >> >> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com> >> wrote: >> >>> Hi, >>> >>> You could easily filter/map/process the streams differently before >>> writing them to the sinks. Building on top of my previous example, this >>> also should work fine: >>> >>> >>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, >>> but any ; >>> >>> myStream.baz().addSink(sink1); >>> myStream.addSink(sink2); >>> myStream.qux().quuz().corge().addSink(sink3); >>> >>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions >>> that you wish. `foo` and `bar` would be applied once to the stream, before >>> it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and >>> `corge` would be applied to only of the sinks AFTER splitting. >>> >>> In your case, it could be: >>> >>> myStream.filter(...).addSink(sink1); >>> myStream.addSink(sink2); >>> myStream.addSink(sink3); >>> >>> So sink2 and sink3 would get all of the records, while sink1 only a >>> portion of them. >>> >>> Piotrek >>> >>> >>> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com> >>> wrote: >>> >>> Piotr, >>> >>> Thanks for the reply. >>> >>> There is one other case, where some events have to be written to >>> multiple sinks and while other have to be written to just one sink. >>> >>> How could i have a common codeflow/DAG for the same ? >>> >>> I do not want multiple jobs to do the same want to accomplish in a >>> single job . >>> >>> Could i add Stream code "myStream.addSink(sink1)" under a conditional >>> operator such as 'if' to determine . >>> >>> But i suppose here the stream works differently compared to normal code >>> processing. >>> >>> Prasanna. >>> >>> >>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com> wrote: >>> >>>> Hi, >>>> >>>> To the best of my knowledge the following pattern should work just fine: >>>> >>>> DataStream myStream = env.addSource(…).foo().bar() // for custom >>>> source, but any ; >>>> myStream.addSink(sink1); >>>> myStream.addSink(sink2); >>>> myStream.addSink(sink3); >>>> >>>> All of the records from `myStream` would be passed to each of the sinks. >>>> >>>> Piotrek >>>> >>>> > On 24 May 2020, at 19:34, Prasanna kumar < >>>> prasannakumarram...@gmail.com> wrote: >>>> > >>>> > Hi, >>>> > >>>> > There is a single source of events for me in my system. >>>> > >>>> > I need to process and send the events to multiple destination/sink at >>>> the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ] >>>> > >>>> > I am able send to one sink. >>>> > >>>> > By adding more sink stream to the source stream could we achieve it . >>>> Are there any shortcomings. >>>> > >>>> > Please let me know if any one here has successfully implemented one . >>>> > >>>> > Thanks, >>>> > Prasanna. >>>> >>>> >>> >>