Igal Shilman created FLINK-16123:
------------------------------------

             Summary: Add routable Kafka connector
                 Key: FLINK-16123
                 URL: https://issues.apache.org/jira/browse/FLINK-16123
             Project: Flink
          Issue Type: Task
          Components: Stateful Functions
            Reporter: Igal Shilman


In some cases it is beneficial to associate a stateful function instance with a 
key in a Kafka topic.

In that case, a simplified Kafka ingress definition can be introduced. 

Consider the following example:

Imagine a Kafka topic named "signups" (1) where the keys are ut8 strings 
representing user ids,

and the values are Protobuf messages of type (2) 
com.user.foo.bar.greeter.SingupMessage.

We would like to have a stateful function of type(3)
{code:java}
FunctionType( com.user.foo.bar, SingupProcessor{code}
to be invoked for each incoming signup message.

The following spec definition:
{code:java}
    
  - ingress:
          meta:
            type:  org.apache.flink.statefun.sdk.kafka/routable-kafka-connector
            id: com.user.foo.bar/greeter
          spec:
            properties:
              - consumer.group: greeter
            topics: 
              - singups: (1)
                  typeUrl: (2) "com.user.foo.bar.greeter.SingupMessage"
                  target: (3) "com.user.foo.bar/SingupProcessor"

{code}


Defines a Kafka ingress that consumes  <utf8 strings, bytes > from a singups 
topic,
and produces an Routable Protobuf message with the following type and 
properties:

{code}
message Routable {
       Address target; (1)
       Any payload;
}
{code}
Where:
(1) is Address(FunctionType(com.user.foo.bar, SingupProcessor),  <a consumer 
record's key>)
(2) the Any's typeUrl would be com.user.foo.bar.greeter.SingupMessage and the 
value bytes
would come directly from the consumer record value bytes

This would require an additional AutoRoutable router,
that basically forwards the payload to the target address.

 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to