Hi , I have a Event router Registry as this. By reading this as input i need to create a Job which would redirect the messages to the correct sink as per condition.
{ "eventRouterRegistry": [ { "eventType": "biling", "outputTopic": "billing" }, { "eventType": "cost", "outputTopic": "cost" }, { "eventType": "marketing", "outputTopic": "marketing" } ] } I tried the following two approaches. *1) Using the Filter method* public static void setupRouterJobsFilter( List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) { Properties props = new Properties(); props.put("bootstrap.servers", BOOTSTRAP_SERVER); props.put("client.id", "flink-example1"); FlinkKafkaConsumer011 fkC = new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), props); DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC"); for (eventRouterRegistry record : registryList) { System.out.print(record.getEventType() + " <==> " + record.getOutputTopic()); FlinkKafkaProducer011 fkp = new FlinkKafkaProducer011<>(record.getOutputTopic(), new SimpleStringSchema(), props); inputStream.filter(msg -> msg.equals(record.getEventType()) ); //sideOutputStream.print(); inputStream.addSink(fkp).name(record.getOutputTopic()); } } Here am getting the following error. ./bin/flink run -c firstflinkpackage.GenericStreamRouter ../../myrepository/flink001/target/flink001-1.0.jar Starting execution of program --------------------------- The program finished with the following exception: The implementation of the FilterFunction is not serializable. The object probably contains or references non serializable fields. org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559) org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686) firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118) firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93) Looks like I should not use record.getEventType() as this is outside of the stream. Is there a any way to use external variable here mainly to Generalise the process. *2) Using the Side Output method* Following code is my attempt in creating a generic way for sideoutput creation. I am able to create the Sink Streams based on the list (eventRouterRegistry). But i could not generalise the Output tag creation. The issue here is the output tag is fixed. My output tag need to be the Event Type and that needs to be in Process Function too. How do i implement. Should I write my own process function ? public static void setupRouterJobs( List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) { Properties props = new Properties(); props.put("bootstrap.servers", BOOTSTRAP_SERVER); props.put("client.id", "flink-example1"); FlinkKafkaConsumer011 fkC = new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), props); DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC"); //Even if i try to generalise OUtput tag here. How do i do it inside ProcessFunction final OutputTag<String> outputTag = new OutputTag<String>("side-output") {}; SingleOutputStreamOperator<String> mainDataStream = inputStream.process( new ProcessFunction<String, String>() { @Override public void processElement(String value, Context ctx, Collector<String> out) throws Exception { // emit data to side output ctx.output(OutputTag, value); } }); for (eventRouterRegistry record : registryList) { System.out.print(record.getEventType() + " <==> " + record.getOutputTopic()) ; FlinkKafkaProducer011 fkp = new FlinkKafkaProducer011<>(record.getOutputTopic(), new SimpleStringSchema(), props); DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag); sideOutputStream.print(); sideOutputStream.addSink(fkp).name(record.getOutputTopic()); } } Thanks, Prasanna. On Thu, May 28, 2020 at 8:24 PM Prasanna kumar < prasannakumarram...@gmail.com> wrote: > Alexander, > > Thanks for the reply. Will implement and come back in case of any > questions. > > Prasanna. > > On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <alexan...@ververica.com> > wrote: > >> Hi Prasanna, >> >> if the set of all possible sinks is known in advance, side outputs will >> be generic enough to express your requirements. Side output produces a >> stream. Create all of the side output tags, associate each of them with one >> sink, add conditional logic around `ctx.output(outputTag, ... *)*;` to >> decide where to dispatch the messages (see [1]), collect to none or many >> side outputs, depending on your logic. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html >> >> -- >> >> Alexander Fedulov | Solutions Architect >> >> [image: image.png] >> >> >> >> >> >> >> >> >> >> >> >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> >> On Tue, May 26, 2020 at 2:57 PM Prasanna kumar < >> prasannakumarram...@gmail.com> wrote: >> >>> Piotr, >>> >>> There is an event and subscriber registry as JSON file which has the >>> table event mapping and event-subscriber mapping as mentioned below. >>> >>> Based on the set JSON , we need to job to go through the table updates >>> and create events and for each event there is a way set how to sink them. >>> >>> The sink streams have to be added based on this JSON. Thats what i >>> mentioned as no predefined sink in code earlier. >>> >>> You could see that each event has different set of sinks. >>> >>> Just checking how much generic could Side-output streams be ?. >>> >>> Source -> generate events -> (find out sinks dynamically in code ) -> >>> write to the respective sinks. >>> >>> { >>> " tablename ": "source.table1", >>> "events": [ >>> { >>> "operation": "update", >>> "eventstobecreated": [ >>> { >>> "eventname": "USERUPDATE", >>> "Columnoperation": "and", >>> "ColumnChanges": [ >>> { >>> "columnname": "name" >>> }, >>> { >>> "columnname": "loginenabled", >>> "value": "Y" >>> } >>> ], >>> "Subscribers": [ >>> { >>> "customername": "c1", >>> "method": "Kafka", >>> "methodparams": { >>> "topicname": "USERTOPIC" >>> } >>> }, >>> { >>> "customername": "c2", >>> "method": "S3", >>> "methodparams": { >>> "folder": "aws://folderC2" >>> }}, ]}] >>> }, >>> { >>> "operation": "insert", >>> "eventstobecreated": [ >>> "eventname": "USERINSERT", >>> "operation": "insert", >>> "Subscribers": [ >>> { >>> "teamname": "General", >>> "method": "Kafka", >>> "methodparams": { >>> "topicname": "new_users" >>> } >>> }, >>> { >>> "teamname": "General", >>> "method": "kinesis", >>> "methodparams": { >>> "URL": "new_users", >>> "username": "uname", >>> "password": "pwd" >>> }}, ]}] >>> }, >>> { >>> "operation": "delete", >>> "eventstobecreated": [ >>> { >>> "eventname": "USERDELETE", >>> "Subscribers": [ >>> { >>> "customername": "c1", >>> "method": "Kafka", >>> "methodparams": { >>> "topicname": "USERTOPIC" >>> } >>> }, >>> { >>> "customername": "c4", >>> "method": "Kafka", >>> "methodparams": { >>> "topicname": "deleterecords" >>> }}, ]}] >>> }, >>> } >>> >>> Please let me know your thoughts on this. >>> >>> Thanks, >>> Prasanna. >>> >>> On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I’m not sure if I fully understand what do you mean by >>>> >>>> > The point is the sink are not predefined. >>>> >>>> You must know before submitting the job, what sinks are going to be >>>> used in the job. You can have some custom logic, that would filter out >>>> records before writing them to the sinks, as I proposed before, or you >>>> could use side outputs [1] would be better suited to your use case? >>>> >>>> Piotrek >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html >>>> >>>> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com> >>>> wrote: >>>> >>>> Thanks Piotr for the Reply. >>>> >>>> I will explain my requirement in detail. >>>> >>>> Table Updates -> Generate Business Events -> Subscribers >>>> >>>> *Source Side* >>>> There are CDC of 100 tables which the framework needs to listen to. >>>> >>>> *Event Table Mapping* >>>> >>>> There would be Event associated with table in a *m:n* fashion. >>>> >>>> say there are tables TA, TB, TC. >>>> >>>> EA, EA2 and EA3 are generated from TA (based on conditions) >>>> EB generated from TB (based on conditions) >>>> EC generated from TC (no conditions.) >>>> >>>> Say there are events EA,EB,EC generated from the tables TA, TB, TC >>>> >>>> *Event Sink Mapping* >>>> >>>> EA has following sinks. kafka topic SA,SA2,SAC. >>>> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB. >>>> EC has only rest endpoint RC. >>>> >>>> The point is the sink are not predefined. [. But i only see the example >>>> online where , flink code having explicit myStream.addSink(sink2); ] >>>> >>>> We expect around 500 types of events in our platform in another 2 years >>>> time. >>>> >>>> We are looking at writing a generic job for the same , rather than >>>> writing one for new case. >>>> >>>> Let me know your thoughts and flink suitability to this requirement. >>>> >>>> Thanks >>>> Prasanna. >>>> >>>> >>>> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> You could easily filter/map/process the streams differently before >>>>> writing them to the sinks. Building on top of my previous example, this >>>>> also should work fine: >>>>> >>>>> >>>>> DataStream myStream = env.addSource(…).foo().bar() // for custom >>>>> source, but any ; >>>>> >>>>> myStream.baz().addSink(sink1); >>>>> myStream.addSink(sink2); >>>>> myStream.qux().quuz().corge().addSink(sink3); >>>>> >>>>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions >>>>> that you wish. `foo` and `bar` would be applied once to the stream, before >>>>> it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and >>>>> `corge` would be applied to only of the sinks AFTER splitting. >>>>> >>>>> In your case, it could be: >>>>> >>>>> myStream.filter(...).addSink(sink1); >>>>> myStream.addSink(sink2); >>>>> myStream.addSink(sink3); >>>>> >>>>> So sink2 and sink3 would get all of the records, while sink1 only a >>>>> portion of them. >>>>> >>>>> Piotrek >>>>> >>>>> >>>>> On 26 May 2020, at 06:45, Prasanna kumar < >>>>> prasannakumarram...@gmail.com> wrote: >>>>> >>>>> Piotr, >>>>> >>>>> Thanks for the reply. >>>>> >>>>> There is one other case, where some events have to be written to >>>>> multiple sinks and while other have to be written to just one sink. >>>>> >>>>> How could i have a common codeflow/DAG for the same ? >>>>> >>>>> I do not want multiple jobs to do the same want to accomplish in a >>>>> single job . >>>>> >>>>> Could i add Stream code "myStream.addSink(sink1)" under a conditional >>>>> operator such as 'if' to determine . >>>>> >>>>> But i suppose here the stream works differently compared to normal >>>>> code processing. >>>>> >>>>> Prasanna. >>>>> >>>>> >>>>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> To the best of my knowledge the following pattern should work just >>>>>> fine: >>>>>> >>>>>> DataStream myStream = env.addSource(…).foo().bar() // for custom >>>>>> source, but any ; >>>>>> myStream.addSink(sink1); >>>>>> myStream.addSink(sink2); >>>>>> myStream.addSink(sink3); >>>>>> >>>>>> All of the records from `myStream` would be passed to each of the >>>>>> sinks. >>>>>> >>>>>> Piotrek >>>>>> >>>>>> > On 24 May 2020, at 19:34, Prasanna kumar < >>>>>> prasannakumarram...@gmail.com> wrote: >>>>>> > >>>>>> > Hi, >>>>>> > >>>>>> > There is a single source of events for me in my system. >>>>>> > >>>>>> > I need to process and send the events to multiple destination/sink >>>>>> at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ] >>>>>> > >>>>>> > I am able send to one sink. >>>>>> > >>>>>> > By adding more sink stream to the source stream could we achieve it >>>>>> . Are there any shortcomings. >>>>>> > >>>>>> > Please let me know if any one here has successfully implemented one >>>>>> . >>>>>> > >>>>>> > Thanks, >>>>>> > Prasanna. >>>>>> >>>>>> >>>>> >>>>