Hi, I have a flink pipeline which reads from a kafka topic does a map operation(builds an ElasticSearch model) and sinks it to Elasticsearch
*Pipeline-1:* Flink-Kafka-Connector-Consumer(topic1) (parallelism 8) -> Map (parallelism 8) -> Flink-Es-connector-Sink(es1) (parallelism 8) Now i want some messages to be prioritized(processed quickly not necessarily in any order). I am okay in creating a new topic and placing the priority messages in it (or) do a partition based buckets(Ex: https://github.com/riferrei/bucket-priority-pattern i don't think it's possible in flink kafka connector since partition assignment is present inside FlinkKafkaConsumerBase ). *I tried the below solution:* I created another topic (topic2 in which i placed the priority messages) and with it a new Flink pipeline *Pipeline-2:* Flink-Kafka-Connector-Consumer(topic2) (parallelism 8) -> Map (parallelism 8) -> Flink-Es-connector-Sink(es1) (parallelism 8) But the problem is, I want to consume topic2 as soon as possible. I can have a delay/slowness in topic1 because of that. If there is no message in topic2 then topic1 should be given more priority. But in the above case both the pipelines are getting processed equally. Increasing the parallelism of pipeline-2 to a big number doesn't help as when there is no message in topic2 then topic1 is still very slow(parallelism of topic 2 is wasted). How can i achieve this using Flink Kafka connector? Is it possible to achieve it in any other way? Regards, Vignesh