Hello,

Im using flink-connector-kafka version 1.15.2 to consume messages from a kafka 
topic which has 3 partitions that later its connected to to another kafka 
source and then processed in a BroadcastProcessFunction.

The Kafka source is created as follows:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "600000");
properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200");
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000");

KafkaSource<String> kafkaSource = KafkaSource.<T>builder()
              .setBootstrapServers("localhost:9092")
              .setTopics("mytopic")
              .setGroupId("group-id")
              .setClientIdPrefix("client-id")
              .setStartingOffsets(OffsetsInitializer.latest())
              .setProperty("security.protocol", "SSL")
              .setProperty("partition.discovery.interval.ms", "300000")
              .setProperties(properties)
              .setDeserializer(new StringDeserializationSchema())
    .build();

DataStreamSource<String> myStreamSource =
              env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"myStreamSource");


Then I start sending 10 messages per second to the topic and notice that the 
consumer starts reading messages but after some minutes the consumer stops to 
read messages from the topic, for example if I send 3000 messages to the topic 
only around 1200 or 2000 are consumed.
I do not get any exception or error message in the task manager logs, the job 
does not restart and the backpressure its around 15 to 20% when its reading 
messages and then drops to 0%

Please let me know any suggestion or additional information required to fix 
this issue.

Best.

________________________________

The information contained in this message is intended only for the recipient, 
and may be a confidential attorney-client communication or may otherwise be 
privileged and confidential and protected from disclosure. If the reader of 
this message is not the intended recipient, or an employee or agent responsible 
for delivering this message to the intended recipient, please be aware that any 
dissemination or copying of this communication is strictly prohibited. If you 
have received this communication in error, please immediately notify us by 
replying to the message and deleting it from your computer. S&P Global Inc. 
reserves the right, subject to applicable local law, to monitor, review and 
process the content of any electronic message or information sent to or from 
S&P Global Inc. e-mail addresses without informing the sender or recipient of 
the message. By sending electronic message or information to S&P Global Inc. 
e-mail addresses you, as the sender, are consenting to S&P Global Inc. 
processing any of your personal data therein.

Reply via email to