Rajiya Mulani created FLINK-23360: ------------------------------------- Summary: Facing broker disconnected issue while establishing Kafka with Azure Event hubs in Spring boot application #543 Key: FLINK-23360 URL: https://issues.apache.org/jira/browse/FLINK-23360 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Runtime / Configuration Reporter: Rajiya Mulani
# Article link I followed to configure Event hub. [https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs] (TLS/SSL) # code Configurations in .properties file ############################################################################################################### h2. h2. Properties For Azure Event Hub with Kafka ############################################################################################################### kafkaConnectionUrl=******.servicebus.windows.net:9093* *security.protocol=SASL_SSL* *sasl.mechanism=PLAIN* *sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://******.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*************************"; {{ POM File : }} 4.0.0 *******-processor* *0.0.1-SNAPSHOT* *******-common* *jar* *<project.root.directory>${basedir}/..</project.root.directory>* *com.*.iot *****-processor-data org.apache.kafka kafka-clients 0.11.0.0 org.apache.commons commons-lang3 3.9 org.apache.commons commons-io 1.3.2 commons-configuration commons-configuration 1.10 ' Kafka producer Configurations : [@bean|https://github.com/bean] public Producer<String, IndexMessage> IndexMessageKafkaProducer() { {{ logger.info("BootStrap Server "); logger.info(new CommonUtility().getPropertyValue("kafkaConnectionUrl")); Map<String, Object> configProps = new HashMap(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new CommonUtility().getPropertyValue("kafkaConnectionUrl")); configProps.put(ProducerConfig.ACKS_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerAcks")); configProps.put(ProducerConfig.RETRIES_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerRetries")); configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerBatchSize")); configProps.put(ProducerConfig.LINGER_MS_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerLingerMs")); configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerBufferMemory")); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new CommonUtility().getPropertyValue("kafkaTopicKeySerializer")); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new CommonUtility().getPropertyValue("IndexMessageSerializer")); }}} Kafka consumer Configurations: [@bean|https://github.com/bean] public Consumer<String, IndexMessage> IndexMessageKafkaConsumer() { {{ Map<String, Object> configProps = new HashMap(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, new CommonUtility().getPropertyValue("kafkaConnectionUrl")); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, new CommonUtility().getPropertyValue("indexConsumerGroupid")); configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, new CommonUtility().getPropertyValue("offset")); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, new CommonUtility().getPropertyValue("autoCommit")); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new CommonUtility().getPropertyValue("kafkaTopicKeyDeserializer")); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new CommonUtility().getPropertyValue("IndexMessageDeserializer")); System.out.println("Inside Consumer Configuration"); return new KafkaConsumer(configProps); }}} {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)