Alexander,

Thanks for the reply. Will implement and come back in case of any
questions.

Prasanna.

On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <alexan...@ververica.com>
wrote:

> Hi Prasanna,
>
> if the set of all possible sinks is known in advance, side outputs will be
> generic enough to express your requirements. Side output produces a stream.
> Create all of the side output tags, associate each of them with one sink,
> add conditional logic around `ctx.output(outputTag, ... *)*;`  to decide
> where to dispatch the messages  (see [1]), collect to none or many side
> outputs, depending on your logic.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>
> On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Piotr,
>>
>> There is an event and subscriber registry as JSON file which has the
>> table event mapping and event-subscriber mapping as mentioned below.
>>
>> Based on the set JSON , we need to job to go through the table updates
>> and create events and for each event there is a way set how to sink them.
>>
>> The sink streams have to be added based on this JSON. Thats what i
>> mentioned as no predefined sink in code earlier.
>>
>> You could see that each event has different set of sinks.
>>
>> Just checking how much generic could Side-output streams be ?.
>>
>> Source -> generate events -> (find out sinks dynamically in code ) ->
>> write to the respective sinks.
>>
>> {
>>   " tablename ": "source.table1",
>>   "events": [
>>     {
>>       "operation": "update",
>>       "eventstobecreated": [
>>         {
>>           "eventname": "USERUPDATE",
>>           "Columnoperation": "and",
>>           "ColumnChanges": [
>>             {
>>               "columnname": "name"
>>             },
>>             {
>>               "columnname": "loginenabled",
>>               "value": "Y"
>>             }
>>           ],
>>           "Subscribers": [
>>             {
>>               "customername": "c1",
>>               "method": "Kafka",
>>               "methodparams": {
>>                 "topicname": "USERTOPIC"
>>               }
>>             },
>>             {
>>               "customername": "c2",
>>               "method": "S3",
>>               "methodparams": {
>>                 "folder": "aws://folderC2"
>>               }}, ]}]
>>     },
>>     {
>>       "operation": "insert",
>>       "eventstobecreated": [
>>           "eventname": "USERINSERT",
>>           "operation": "insert",
>>           "Subscribers": [
>>             {
>>               "teamname": "General",
>>               "method": "Kafka",
>>               "methodparams": {
>>                 "topicname": "new_users"
>>               }
>>             },
>>             {
>>               "teamname": "General",
>>               "method": "kinesis",
>>               "methodparams": {
>>                 "URL": "new_users",
>>                 "username": "uname",
>>                 "password":  "pwd"
>>               }}, ]}]
>>     },
>>     {
>>       "operation": "delete",
>>       "eventstobecreated": [
>>         {
>>           "eventname": "USERDELETE",
>>           "Subscribers": [
>>             {
>>               "customername": "c1",
>>               "method": "Kafka",
>>               "methodparams": {
>>                 "topicname": "USERTOPIC"
>>               }
>>             },
>>             {
>>               "customername": "c4",
>>               "method": "Kafka",
>>               "methodparams": {
>>                 "topicname": "deleterecords"
>>              }}, ]}]
>>      },
>> }
>>
>> Please let me know your thoughts on this.
>>
>> Thanks,
>> Prasanna.
>>
>> On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I’m not sure if I fully understand what do you mean by
>>>
>>> > The point is the sink are not predefined.
>>>
>>> You must know before submitting the job, what sinks are going to be used
>>> in the job. You can have some custom logic, that would filter out records
>>> before writing them to the sinks, as I proposed before, or you could use
>>> side outputs [1] would be better suited to your use case?
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>>
>>> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com>
>>> wrote:
>>>
>>> Thanks Piotr for the Reply.
>>>
>>> I will explain my requirement in detail.
>>>
>>> Table Updates -> Generate Business Events -> Subscribers
>>>
>>> *Source Side*
>>> There are CDC of 100 tables which the framework needs to listen to.
>>>
>>> *Event Table Mapping*
>>>
>>> There would be Event associated with table in a *m:n* fashion.
>>>
>>> say there are tables TA, TB, TC.
>>>
>>> EA, EA2 and EA3 are generated from TA (based on conditions)
>>> EB generated from TB (based on conditions)
>>> EC generated from TC (no conditions.)
>>>
>>> Say there are events EA,EB,EC generated from the tables TA, TB, TC
>>>
>>> *Event Sink Mapping*
>>>
>>> EA has following sinks. kafka topic SA,SA2,SAC.
>>> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
>>> EC has only rest endpoint RC.
>>>
>>> The point is the sink are not predefined. [. But i only see the example
>>> online where , flink code having explicit myStream.addSink(sink2);   ]
>>>
>>> We expect around 500 types of events in our platform in another 2 years
>>> time.
>>>
>>> We are looking at writing a generic job for the same , rather than
>>> writing one for new case.
>>>
>>> Let me know your thoughts and flink suitability to this requirement.
>>>
>>> Thanks
>>> Prasanna.
>>>
>>>
>>> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> You could easily filter/map/process the streams differently before
>>>> writing them to the sinks. Building on top of my previous example, this
>>>> also should work fine:
>>>>
>>>>
>>>> DataStream myStream = env.addSource(…).foo().bar() // for custom
>>>> source, but any ;
>>>>
>>>> myStream.baz().addSink(sink1);
>>>> myStream.addSink(sink2);
>>>> myStream.qux().quuz().corge().addSink(sink3);
>>>>
>>>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions
>>>> that you wish. `foo` and `bar` would be applied once to the stream, before
>>>> it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and
>>>> `corge` would be applied to only of the sinks AFTER splitting.
>>>>
>>>> In your case, it could be:
>>>>
>>>> myStream.filter(...).addSink(sink1);
>>>> myStream.addSink(sink2);
>>>> myStream.addSink(sink3);
>>>>
>>>> So sink2 and sink3 would get all of the records, while sink1 only a
>>>> portion of them.
>>>>
>>>> Piotrek
>>>>
>>>>
>>>> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com>
>>>> wrote:
>>>>
>>>> Piotr,
>>>>
>>>> Thanks for the reply.
>>>>
>>>> There is one other case, where some events have to be written to
>>>> multiple sinks and while other have to be written to just one sink.
>>>>
>>>> How could i have a common codeflow/DAG for the same ?
>>>>
>>>> I do not want multiple jobs to do the same want to accomplish in a
>>>> single job .
>>>>
>>>> Could i add Stream code "myStream.addSink(sink1)" under a conditional
>>>> operator such as 'if' to determine .
>>>>
>>>> But i suppose here the stream works differently compared to normal code
>>>> processing.
>>>>
>>>> Prasanna.
>>>>
>>>>
>>>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> To the best of my knowledge the following pattern should work just
>>>>> fine:
>>>>>
>>>>> DataStream myStream = env.addSource(…).foo().bar() // for custom
>>>>> source, but any ;
>>>>> myStream.addSink(sink1);
>>>>> myStream.addSink(sink2);
>>>>> myStream.addSink(sink3);
>>>>>
>>>>> All of the records from `myStream` would be passed to each of the
>>>>> sinks.
>>>>>
>>>>> Piotrek
>>>>>
>>>>> > On 24 May 2020, at 19:34, Prasanna kumar <
>>>>> prasannakumarram...@gmail.com> wrote:
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > There is a single source of events for me in my system.
>>>>> >
>>>>> > I need to process and send the events to multiple destination/sink
>>>>> at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>>>>> >
>>>>> > I am able send to one sink.
>>>>> >
>>>>> > By adding more sink stream to the source stream could we achieve it
>>>>> . Are there any shortcomings.
>>>>> >
>>>>> > Please let me know if any one here has successfully implemented one .
>>>>> >
>>>>> > Thanks,
>>>>> > Prasanna.
>>>>>
>>>>>
>>>>
>>>

Reply via email to