Hi,

I have a flink pipeline which reads from a kafka topic does a map
operation(builds an ElasticSearch model) and sinks it to Elasticsearch

*Pipeline-1:*

Flink-Kafka-Connector-Consumer(topic1) (parallelism 8) -> Map (parallelism
8) -> Flink-Es-connector-Sink(es1) (parallelism 8)

Now i want some messages to be prioritized(processed quickly not
necessarily in any order). I am okay in creating a new topic and placing
the priority messages in it (or) do a partition based buckets(Ex:
https://github.com/riferrei/bucket-priority-pattern i don't think it's
possible in flink kafka connector since partition assignment is present
inside FlinkKafkaConsumerBase ).

*I tried the below solution:*

I created another topic (topic2 in which i placed the priority messages)
and with it a new Flink pipeline

*Pipeline-2:*

Flink-Kafka-Connector-Consumer(topic2) (parallelism 8) -> Map (parallelism
8) -> Flink-Es-connector-Sink(es1) (parallelism 8)

But the problem is, I want to consume topic2 as soon as possible. I can
have a delay/slowness in topic1 because of that. If there is no message in
topic2 then topic1 should be given more priority. But in the above case
both the pipelines are getting processed equally. Increasing the
parallelism of pipeline-2 to a big number doesn't help as when there is no
message in topic2 then topic1 is still very slow(parallelism of topic 2 is
wasted).

How can i achieve this using Flink Kafka connector? Is it possible to
achieve it in any other way?


Regards,

Vignesh

Reply via email to