Piotr,

There is an event and subscriber registry as JSON file which has the table
event mapping and event-subscriber mapping as mentioned below.

Based on the set JSON , we need to job to go through the table updates and
create events and for each event there is a way set how to sink them.

The sink streams have to be added based on this JSON. Thats what i
mentioned as no predefined sink in code earlier.

You could see that each event has different set of sinks.

Just checking how much generic could Side-output streams be ?.

Source -> generate events -> (find out sinks dynamically in code ) -> write
to the respective sinks.

{
  " tablename ": "source.table1",
  "events": [
    {
      "operation": "update",
      "eventstobecreated": [
        {
          "eventname": "USERUPDATE",
          "Columnoperation": "and",
          "ColumnChanges": [
            {
              "columnname": "name"
            },
            {
              "columnname": "loginenabled",
              "value": "Y"
            }
          ],
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c2",
              "method": "S3",
              "methodparams": {
                "folder": "aws://folderC2"
              }}, ]}]
    },
    {
      "operation": "insert",
      "eventstobecreated": [
          "eventname": "USERINSERT",
          "operation": "insert",
          "Subscribers": [
            {
              "teamname": "General",
              "method": "Kafka",
              "methodparams": {
                "topicname": "new_users"
              }
            },
            {
              "teamname": "General",
              "method": "kinesis",
              "methodparams": {
                "URL": "new_users",
                "username": "uname",
                "password":  "pwd"
              }}, ]}]
    },
    {
      "operation": "delete",
      "eventstobecreated": [
        {
          "eventname": "USERDELETE",
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c4",
              "method": "Kafka",
              "methodparams": {
                "topicname": "deleterecords"
             }}, ]}]
     },
}

Please let me know your thoughts on this.

Thanks,
Prasanna.

On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> I’m not sure if I fully understand what do you mean by
>
> > The point is the sink are not predefined.
>
> You must know before submitting the job, what sinks are going to be used
> in the job. You can have some custom logic, that would filter out records
> before writing them to the sinks, as I proposed before, or you could use
> side outputs [1] would be better suited to your use case?
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com>
> wrote:
>
> Thanks Piotr for the Reply.
>
> I will explain my requirement in detail.
>
> Table Updates -> Generate Business Events -> Subscribers
>
> *Source Side*
> There are CDC of 100 tables which the framework needs to listen to.
>
> *Event Table Mapping*
>
> There would be Event associated with table in a *m:n* fashion.
>
> say there are tables TA, TB, TC.
>
> EA, EA2 and EA3 are generated from TA (based on conditions)
> EB generated from TB (based on conditions)
> EC generated from TC (no conditions.)
>
> Say there are events EA,EB,EC generated from the tables TA, TB, TC
>
> *Event Sink Mapping*
>
> EA has following sinks. kafka topic SA,SA2,SAC.
> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
> EC has only rest endpoint RC.
>
> The point is the sink are not predefined. [. But i only see the example
> online where , flink code having explicit myStream.addSink(sink2);   ]
>
> We expect around 500 types of events in our platform in another 2 years
> time.
>
> We are looking at writing a generic job for the same , rather than writing
> one for new case.
>
> Let me know your thoughts and flink suitability to this requirement.
>
> Thanks
> Prasanna.
>
>
> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com>
> wrote:
>
>> Hi,
>>
>> You could easily filter/map/process the streams differently before
>> writing them to the sinks. Building on top of my previous example, this
>> also should work fine:
>>
>>
>> DataStream myStream = env.addSource(…).foo().bar() // for custom source,
>> but any ;
>>
>> myStream.baz().addSink(sink1);
>> myStream.addSink(sink2);
>> myStream.qux().quuz().corge().addSink(sink3);
>>
>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that
>> you wish. `foo` and `bar` would be applied once to the stream, before it’s
>> going to be split to different sinks, while `baz`, `qux`, `quuz` and
>> `corge` would be applied to only of the sinks AFTER splitting.
>>
>> In your case, it could be:
>>
>> myStream.filter(...).addSink(sink1);
>> myStream.addSink(sink2);
>> myStream.addSink(sink3);
>>
>> So sink2 and sink3 would get all of the records, while sink1 only a
>> portion of them.
>>
>> Piotrek
>>
>>
>> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com>
>> wrote:
>>
>> Piotr,
>>
>> Thanks for the reply.
>>
>> There is one other case, where some events have to be written to multiple
>> sinks and while other have to be written to just one sink.
>>
>> How could i have a common codeflow/DAG for the same ?
>>
>> I do not want multiple jobs to do the same want to accomplish in a single
>> job .
>>
>> Could i add Stream code "myStream.addSink(sink1)" under a conditional
>> operator such as 'if' to determine .
>>
>> But i suppose here the stream works differently compared to normal code
>> processing.
>>
>> Prasanna.
>>
>>
>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com> wrote:
>>
>>> Hi,
>>>
>>> To the best of my knowledge the following pattern should work just fine:
>>>
>>> DataStream myStream = env.addSource(…).foo().bar() // for custom source,
>>> but any ;
>>> myStream.addSink(sink1);
>>> myStream.addSink(sink2);
>>> myStream.addSink(sink3);
>>>
>>> All of the records from `myStream` would be passed to each of the sinks.
>>>
>>> Piotrek
>>>
>>> > On 24 May 2020, at 19:34, Prasanna kumar <
>>> prasannakumarram...@gmail.com> wrote:
>>> >
>>> > Hi,
>>> >
>>> > There is a single source of events for me in my system.
>>> >
>>> > I need to process and send the events to multiple destination/sink at
>>> the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>>> >
>>> > I am able send to one sink.
>>> >
>>> > By adding more sink stream to the source stream could we achieve it .
>>> Are there any shortcomings.
>>> >
>>> > Please let me know if any one here has successfully implemented one .
>>> >
>>> > Thanks,
>>> > Prasanna.
>>>
>>>
>>
>

Reply via email to