Piotr and Alexander ,

I have fixed the programmatic error in filter method and it is working now.

Thanks for the detailed help from both of you. Am able to add the sinks
based on the JSON and create DAG.

Thanks,
Prasanna.

On Wed, Jun 3, 2020 at 4:51 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi Prasanna,
>
> 1.
>
> > The object probably contains or references non serializable fields.
>
> That should speak for itself. Flink was not able to distribute your code
> to the worker nodes.
>
> You have used a lambda function that turned out to be non serialisable.
> You should unit test your code and in this case add a
> serialisation/deserialisation round trip unit test for the filter function.
> For starters I would suggest to not use lambda function, but a full/proper
> named class and work from there.
>
> 2.
>
> Can not you create an array/map/collection of OutputTags corresponding to
> the the sinks/topics combinations. One OutputTag per sink(/topic) and use
> this array/map/collection inside your process function?
>
> Piotrek
>
> On 2 Jun 2020, at 13:49, Prasanna kumar <prasannakumarram...@gmail.com>
> wrote:
>
> Hi ,
>
> I have a Event router Registry as this. By reading this as input i need to
> create a Job which would redirect the messages to the correct sink as per
> condition.
>
> {
>   "eventRouterRegistry": [
>     { "eventType": "biling", "outputTopic": "billing" },
>     { "eventType": "cost", "outputTopic": "cost" },
>     { "eventType": "marketing", "outputTopic": "marketing" }
>   ]
> }
>
> I tried the following two approaches.
> *1) Using the Filter method*
>
> public static void setupRouterJobsFilter(
>             List<eventRouterRegistry> registryList, 
> StreamExecutionEnvironment env) {
>
>    Properties props = new Properties();
>    props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>    props.put("client.id", "flink-example1");
>
>    FlinkKafkaConsumer011 fkC =
>                new FlinkKafkaConsumer011<>("EVENTTOPIC", new 
> SimpleStringSchema(), props);
>
>    DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");
>
>    for (eventRouterRegistry record : registryList) {
>       System.out.print(record.getEventType() + " <==> " + 
> record.getOutputTopic());
>
>       FlinkKafkaProducer011 fkp =
>                   new FlinkKafkaProducer011<>(record.getOutputTopic(), new 
> SimpleStringSchema(), props);
>
>       inputStream.filter(msg -> msg.equals(record.getEventType()) );
>       //sideOutputStream.print();
>       inputStream.addSink(fkp).name(record.getOutputTopic());
>    }
> }
>
> Here am getting the following error.
>  ./bin/flink run -c firstflinkpackage.GenericStreamRouter
> ../../myrepository/flink001/target/flink001-1.0.jar
> Starting execution of program
> ---------------------------
>  The program finished with the following exception:
>
> The implementation of the FilterFunction is not serializable. The object
> probably contains or references non serializable fields.
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
>
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>
> org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686)
>
> firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118)
> firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93)
>
> Looks like I should not use  record.getEventType() as this is outside of
> the stream.
>
> Is there a any way to use external variable here mainly to Generalise the
> process.
>
> *2) Using the Side Output method*
>
> Following code is my attempt in creating a generic way for sideoutput
> creation.
>
> I am able to create the Sink Streams based on the list
> (eventRouterRegistry).
>
> But i could not generalise the Output tag creation.
> The issue here is the output tag is fixed.
> My output tag need to be the Event Type and that needs to be in Process
> Function too.
>
> How do i implement. Should I write my own process function ?
>
>  public static void setupRouterJobs(
>      List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) {
>
>    Properties props = new Properties();
>    props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>    props.put("client.id", "flink-example1");
>
>    FlinkKafkaConsumer011 fkC =
>        new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), 
> props);
>
>    DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");
>
>      //Even if i try to generalise OUtput tag here. How do i do it inside 
> ProcessFunction
>
>      final OutputTag<String> outputTag = new OutputTag<String>("side-output") 
> {};
>
> SingleOutputStreamOperator<String> mainDataStream =
> inputStream.process(
> new ProcessFunction<String, String>() {
>
> @Override
> public void processElement(String value, Context ctx, Collector<String>
> out)
> throws Exception {
> // emit data to side output
> ctx.output(OutputTag, value);
> }
> });
>
> for (eventRouterRegistry record : registryList) {
> System.out.print(record.getEventType() + " <==> " +
> record.getOutputTopic());
>
> FlinkKafkaProducer011 fkp =
> new FlinkKafkaProducer011<>(record.getOutputTopic(), new
> SimpleStringSchema(), props);
>
> DataStream<String> sideOutputStream =
> mainDataStream.getSideOutput(outputTag);
> sideOutputStream.print();
> sideOutputStream.addSink(fkp).name(record.getOutputTopic());
> }
> }
>
>
> Thanks,
> Prasanna.
>
>
>
> On Thu, May 28, 2020 at 8:24 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Alexander,
>>
>> Thanks for the reply. Will implement and come back in case of any
>> questions.
>>
>> Prasanna.
>>
>> On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <
>> alexan...@ververica.com> wrote:
>>
>>> Hi Prasanna,
>>>
>>> if the set of all possible sinks is known in advance, side outputs will
>>> be generic enough to express your requirements. Side output produces a
>>> stream. Create all of the side output tags, associate each of them with one
>>> sink, add conditional logic around `ctx.output(outputTag, ... *)*;`  to
>>> decide where to dispatch the messages  (see [1]), collect to none or many
>>> side outputs, depending on your logic.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>> --
>>> Alexander Fedulov | Solutions Architect
>>>
>>> <image.png>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>> --
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>> Stream Processing | Event Driven | Real Time
>>>
>>>
>>> On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <
>>> prasannakumarram...@gmail.com> wrote:
>>>
>>>> Piotr,
>>>>
>>>> There is an event and subscriber registry as JSON file which has the
>>>> table event mapping and event-subscriber mapping as mentioned below.
>>>>
>>>> Based on the set JSON , we need to job to go through the table updates
>>>> and create events and for each event there is a way set how to sink them.
>>>>
>>>> The sink streams have to be added based on this JSON. Thats what i
>>>> mentioned as no predefined sink in code earlier.
>>>>
>>>> You could see that each event has different set of sinks.
>>>>
>>>> Just checking how much generic could Side-output streams be ?.
>>>>
>>>> Source -> generate events -> (find out sinks dynamically in code ) ->
>>>> write to the respective sinks.
>>>>
>>>> {
>>>>   " tablename ": "source.table1",
>>>>   "events": [
>>>>     {
>>>>       "operation": "update",
>>>>       "eventstobecreated": [
>>>>         {
>>>>           "eventname": "USERUPDATE",
>>>>           "Columnoperation": "and",
>>>>           "ColumnChanges": [
>>>>             {
>>>>               "columnname": "name"
>>>>             },
>>>>             {
>>>>               "columnname": "loginenabled",
>>>>               "value": "Y"
>>>>             }
>>>>           ],
>>>>           "Subscribers": [
>>>>             {
>>>>               "customername": "c1",
>>>>               "method": "Kafka",
>>>>               "methodparams": {
>>>>                 "topicname": "USERTOPIC"
>>>>               }
>>>>             },
>>>>             {
>>>>               "customername": "c2",
>>>>               "method": "S3",
>>>>               "methodparams": {
>>>>                 "folder": "aws://folderC2"
>>>>               }}, ]}]
>>>>     },
>>>>     {
>>>>       "operation": "insert",
>>>>       "eventstobecreated": [
>>>>           "eventname": "USERINSERT",
>>>>           "operation": "insert",
>>>>           "Subscribers": [
>>>>             {
>>>>               "teamname": "General",
>>>>               "method": "Kafka",
>>>>               "methodparams": {
>>>>                 "topicname": "new_users"
>>>>               }
>>>>             },
>>>>             {
>>>>               "teamname": "General",
>>>>               "method": "kinesis",
>>>>               "methodparams": {
>>>>                 "URL": "new_users",
>>>>                 "username": "uname",
>>>>                 "password":  "pwd"
>>>>               }}, ]}]
>>>>     },
>>>>     {
>>>>       "operation": "delete",
>>>>       "eventstobecreated": [
>>>>         {
>>>>           "eventname": "USERDELETE",
>>>>           "Subscribers": [
>>>>             {
>>>>               "customername": "c1",
>>>>               "method": "Kafka",
>>>>               "methodparams": {
>>>>                 "topicname": "USERTOPIC"
>>>>               }
>>>>             },
>>>>             {
>>>>               "customername": "c4",
>>>>               "method": "Kafka",
>>>>               "methodparams": {
>>>>                 "topicname": "deleterecords"
>>>>              }}, ]}]
>>>>      },
>>>> }
>>>>
>>>> Please let me know your thoughts on this.
>>>>
>>>> Thanks,
>>>> Prasanna.
>>>>
>>>> On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I’m not sure if I fully understand what do you mean by
>>>>>
>>>>> > The point is the sink are not predefined.
>>>>>
>>>>> You must know before submitting the job, what sinks are going to be
>>>>> used in the job. You can have some custom logic, that would filter out
>>>>> records before writing them to the sinks, as I proposed before, or you
>>>>> could use side outputs [1] would be better suited to your use case?
>>>>>
>>>>> Piotrek
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>>>>
>>>>> On 26 May 2020, at 12:20, Prasanna kumar <
>>>>> prasannakumarram...@gmail.com> wrote:
>>>>>
>>>>> Thanks Piotr for the Reply.
>>>>>
>>>>> I will explain my requirement in detail.
>>>>>
>>>>> Table Updates -> Generate Business Events -> Subscribers
>>>>>
>>>>> *Source Side*
>>>>> There are CDC of 100 tables which the framework needs to listen to.
>>>>>
>>>>> *Event Table Mapping*
>>>>>
>>>>> There would be Event associated with table in a *m:n* fashion.
>>>>>
>>>>> say there are tables TA, TB, TC.
>>>>>
>>>>> EA, EA2 and EA3 are generated from TA (based on conditions)
>>>>> EB generated from TB (based on conditions)
>>>>> EC generated from TC (no conditions.)
>>>>>
>>>>> Say there are events EA,EB,EC generated from the tables TA, TB, TC
>>>>>
>>>>> *Event Sink Mapping*
>>>>>
>>>>> EA has following sinks. kafka topic SA,SA2,SAC.
>>>>> EB has following sinks. kafka topic SB , S3 sink and a rest
>>>>> endpoint RB.
>>>>> EC has only rest endpoint RC.
>>>>>
>>>>> The point is the sink are not predefined. [. But i only see the
>>>>> example online where , flink code having explicit myStream.addSink(sink2);
>>>>>  ]
>>>>>
>>>>> We expect around 500 types of events in our platform in another 2
>>>>> years time.
>>>>>
>>>>> We are looking at writing a generic job for the same , rather than
>>>>> writing one for new case.
>>>>>
>>>>> Let me know your thoughts and flink suitability to this requirement.
>>>>>
>>>>> Thanks
>>>>> Prasanna.
>>>>>
>>>>>
>>>>> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> You could easily filter/map/process the streams differently before
>>>>>> writing them to the sinks. Building on top of my previous example, this
>>>>>> also should work fine:
>>>>>>
>>>>>>
>>>>>> DataStream myStream = env.addSource(…).foo().bar() // for custom
>>>>>> source, but any ;
>>>>>>
>>>>>> myStream.baz().addSink(sink1);
>>>>>> myStream.addSink(sink2);
>>>>>> myStream.qux().quuz().corge().addSink(sink3);
>>>>>>
>>>>>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions
>>>>>> that you wish. `foo` and `bar` would be applied once to the stream, 
>>>>>> before
>>>>>> it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and
>>>>>> `corge` would be applied to only of the sinks AFTER splitting.
>>>>>>
>>>>>> In your case, it could be:
>>>>>>
>>>>>> myStream.filter(...).addSink(sink1);
>>>>>> myStream.addSink(sink2);
>>>>>> myStream.addSink(sink3);
>>>>>>
>>>>>> So sink2 and sink3 would get all of the records, while sink1 only a
>>>>>> portion of them.
>>>>>>
>>>>>> Piotrek
>>>>>>
>>>>>>
>>>>>> On 26 May 2020, at 06:45, Prasanna kumar <
>>>>>> prasannakumarram...@gmail.com> wrote:
>>>>>>
>>>>>> Piotr,
>>>>>>
>>>>>> Thanks for the reply.
>>>>>>
>>>>>> There is one other case, where some events have to be written to
>>>>>> multiple sinks and while other have to be written to just one sink.
>>>>>>
>>>>>> How could i have a common codeflow/DAG for the same ?
>>>>>>
>>>>>> I do not want multiple jobs to do the same want to accomplish in a
>>>>>> single job .
>>>>>>
>>>>>> Could i add Stream code "myStream.addSink(sink1)" under a conditional
>>>>>> operator such as 'if' to determine .
>>>>>>
>>>>>> But i suppose here the stream works differently compared to normal
>>>>>> code processing.
>>>>>>
>>>>>> Prasanna.
>>>>>>
>>>>>>
>>>>>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> To the best of my knowledge the following pattern should work just
>>>>>>> fine:
>>>>>>>
>>>>>>> DataStream myStream = env.addSource(…).foo().bar() // for custom
>>>>>>> source, but any ;
>>>>>>> myStream.addSink(sink1);
>>>>>>> myStream.addSink(sink2);
>>>>>>> myStream.addSink(sink3);
>>>>>>>
>>>>>>> All of the records from `myStream` would be passed to each of the
>>>>>>> sinks.
>>>>>>>
>>>>>>> Piotrek
>>>>>>>
>>>>>>> > On 24 May 2020, at 19:34, Prasanna kumar <
>>>>>>> prasannakumarram...@gmail.com> wrote:
>>>>>>> >
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > There is a single source of events for me in my system.
>>>>>>> >
>>>>>>> > I need to process and send the events to multiple destination/sink
>>>>>>> at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>>>>>>> >
>>>>>>> > I am able send to one sink.
>>>>>>> >
>>>>>>> > By adding more sink stream to the source stream could we achieve
>>>>>>> it . Are there any shortcomings.
>>>>>>> >
>>>>>>> > Please let me know if any one here has successfully implemented
>>>>>>> one .
>>>>>>> >
>>>>>>> > Thanks,
>>>>>>> > Prasanna.
>>>>>>
>>>>>>
>

Reply via email to