Igal Shilman created FLINK-16123: ------------------------------------ Summary: Add routable Kafka connector Key: FLINK-16123 URL: https://issues.apache.org/jira/browse/FLINK-16123 Project: Flink Issue Type: Task Components: Stateful Functions Reporter: Igal Shilman
In some cases it is beneficial to associate a stateful function instance with a key in a Kafka topic. In that case, a simplified Kafka ingress definition can be introduced. Consider the following example: Imagine a Kafka topic named "signups" (1) where the keys are ut8 strings representing user ids, and the values are Protobuf messages of type (2) com.user.foo.bar.greeter.SingupMessage. We would like to have a stateful function of type(3) {code:java} FunctionType( com.user.foo.bar, SingupProcessor{code} to be invoked for each incoming signup message. The following spec definition: {code:java} - ingress: meta: type: org.apache.flink.statefun.sdk.kafka/routable-kafka-connector id: com.user.foo.bar/greeter spec: properties: - consumer.group: greeter topics: - singups: (1) typeUrl: (2) "com.user.foo.bar.greeter.SingupMessage" target: (3) "com.user.foo.bar/SingupProcessor" {code} Defines a Kafka ingress that consumes <utf8 strings, bytes > from a singups topic, and produces an Routable Protobuf message with the following type and properties: {code} message Routable { Address target; (1) Any payload; } {code} Where: (1) is Address(FunctionType(com.user.foo.bar, SingupProcessor), <a consumer record's key>) (2) the Any's typeUrl would be com.user.foo.bar.greeter.SingupMessage and the value bytes would come directly from the consumer record value bytes This would require an additional AutoRoutable router, that basically forwards the payload to the target address. -- This message was sent by Atlassian Jira (v8.3.4#803005)