Piotr, There is an event and subscriber registry as JSON file which has the table event mapping and event-subscriber mapping as mentioned below.
Based on the set JSON , we need to job to go through the table updates and create events and for each event there is a way set how to sink them. The sink streams have to be added based on this JSON. Thats what i mentioned as no predefined sink in code earlier. You could see that each event has different set of sinks. Just checking how much generic could Side-output streams be ?. Source -> generate events -> (find out sinks dynamically in code ) -> write to the respective sinks. { " tablename ": "source.table1", "events": [ { "operation": "update", "eventstobecreated": [ { "eventname": "USERUPDATE", "Columnoperation": "and", "ColumnChanges": [ { "columnname": "name" }, { "columnname": "loginenabled", "value": "Y" } ], "Subscribers": [ { "customername": "c1", "method": "Kafka", "methodparams": { "topicname": "USERTOPIC" } }, { "customername": "c2", "method": "S3", "methodparams": { "folder": "aws://folderC2" }}, ]}] }, { "operation": "insert", "eventstobecreated": [ "eventname": "USERINSERT", "operation": "insert", "Subscribers": [ { "teamname": "General", "method": "Kafka", "methodparams": { "topicname": "new_users" } }, { "teamname": "General", "method": "kinesis", "methodparams": { "URL": "new_users", "username": "uname", "password": "pwd" }}, ]}] }, { "operation": "delete", "eventstobecreated": [ { "eventname": "USERDELETE", "Subscribers": [ { "customername": "c1", "method": "Kafka", "methodparams": { "topicname": "USERTOPIC" } }, { "customername": "c4", "method": "Kafka", "methodparams": { "topicname": "deleterecords" }}, ]}] }, } Please let me know your thoughts on this. Thanks, Prasanna. On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com> wrote: > Hi, > > I’m not sure if I fully understand what do you mean by > > > The point is the sink are not predefined. > > You must know before submitting the job, what sinks are going to be used > in the job. You can have some custom logic, that would filter out records > before writing them to the sinks, as I proposed before, or you could use > side outputs [1] would be better suited to your use case? > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html > > On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com> > wrote: > > Thanks Piotr for the Reply. > > I will explain my requirement in detail. > > Table Updates -> Generate Business Events -> Subscribers > > *Source Side* > There are CDC of 100 tables which the framework needs to listen to. > > *Event Table Mapping* > > There would be Event associated with table in a *m:n* fashion. > > say there are tables TA, TB, TC. > > EA, EA2 and EA3 are generated from TA (based on conditions) > EB generated from TB (based on conditions) > EC generated from TC (no conditions.) > > Say there are events EA,EB,EC generated from the tables TA, TB, TC > > *Event Sink Mapping* > > EA has following sinks. kafka topic SA,SA2,SAC. > EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB. > EC has only rest endpoint RC. > > The point is the sink are not predefined. [. But i only see the example > online where , flink code having explicit myStream.addSink(sink2); ] > > We expect around 500 types of events in our platform in another 2 years > time. > > We are looking at writing a generic job for the same , rather than writing > one for new case. > > Let me know your thoughts and flink suitability to this requirement. > > Thanks > Prasanna. > > > On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com> > wrote: > >> Hi, >> >> You could easily filter/map/process the streams differently before >> writing them to the sinks. Building on top of my previous example, this >> also should work fine: >> >> >> DataStream myStream = env.addSource(…).foo().bar() // for custom source, >> but any ; >> >> myStream.baz().addSink(sink1); >> myStream.addSink(sink2); >> myStream.qux().quuz().corge().addSink(sink3); >> >> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that >> you wish. `foo` and `bar` would be applied once to the stream, before it’s >> going to be split to different sinks, while `baz`, `qux`, `quuz` and >> `corge` would be applied to only of the sinks AFTER splitting. >> >> In your case, it could be: >> >> myStream.filter(...).addSink(sink1); >> myStream.addSink(sink2); >> myStream.addSink(sink3); >> >> So sink2 and sink3 would get all of the records, while sink1 only a >> portion of them. >> >> Piotrek >> >> >> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com> >> wrote: >> >> Piotr, >> >> Thanks for the reply. >> >> There is one other case, where some events have to be written to multiple >> sinks and while other have to be written to just one sink. >> >> How could i have a common codeflow/DAG for the same ? >> >> I do not want multiple jobs to do the same want to accomplish in a single >> job . >> >> Could i add Stream code "myStream.addSink(sink1)" under a conditional >> operator such as 'if' to determine . >> >> But i suppose here the stream works differently compared to normal code >> processing. >> >> Prasanna. >> >> >> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com> wrote: >> >>> Hi, >>> >>> To the best of my knowledge the following pattern should work just fine: >>> >>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, >>> but any ; >>> myStream.addSink(sink1); >>> myStream.addSink(sink2); >>> myStream.addSink(sink3); >>> >>> All of the records from `myStream` would be passed to each of the sinks. >>> >>> Piotrek >>> >>> > On 24 May 2020, at 19:34, Prasanna kumar < >>> prasannakumarram...@gmail.com> wrote: >>> > >>> > Hi, >>> > >>> > There is a single source of events for me in my system. >>> > >>> > I need to process and send the events to multiple destination/sink at >>> the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ] >>> > >>> > I am able send to one sink. >>> > >>> > By adding more sink stream to the source stream could we achieve it . >>> Are there any shortcomings. >>> > >>> > Please let me know if any one here has successfully implemented one . >>> > >>> > Thanks, >>> > Prasanna. >>> >>> >> >