Rajiya Mulani created FLINK-23360:
-------------------------------------

             Summary: Facing broker disconnected issue while establishing Kafka 
with Azure Event hubs in Spring boot application #543
                 Key: FLINK-23360
                 URL: https://issues.apache.org/jira/browse/FLINK-23360
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka, Runtime / Configuration
            Reporter: Rajiya Mulani


# Article link I followed to configure Event hub.
[https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs]
 (TLS/SSL)

 # code
Configurations in .properties file

###############################################################################################################
h2.  
h2. Properties For Azure Event Hub with Kafka

###############################################################################################################

kafkaConnectionUrl=******.servicebus.windows.net:9093*
*security.protocol=SASL_SSL*
*sasl.mechanism=PLAIN*
*sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="$ConnectionString" 
password="Endpoint=sb://******.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*************************";
 {{ POM File : }}

4.0.0

*******-processor*
*0.0.1-SNAPSHOT*

*******-common*
*jar*

*<project.root.directory>${basedir}/..</project.root.directory>*



*com.*.iot
*****-processor-data


org.apache.kafka
kafka-clients
0.11.0.0


org.apache.commons
commons-lang3
3.9


org.apache.commons
commons-io
1.3.2


commons-configuration
commons-configuration
1.10


'

Kafka producer Configurations :
[@bean|https://github.com/bean]
public Producer<String, IndexMessage> IndexMessageKafkaProducer() {
 {{     logger.info("BootStrap Server ");
        logger.info(new CommonUtility().getPropertyValue("kafkaConnectionUrl"));
        Map<String, Object> configProps = new HashMap();
        
          configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaConnectionUrl"));
          configProps.put(ProducerConfig.ACKS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerAcks"));
          configProps.put(ProducerConfig.RETRIES_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerRetries"));
          configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerBatchSize"));
          configProps.put(ProducerConfig.LINGER_MS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerLingerMs"));
          configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerBufferMemory"));
          configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new 
CommonUtility().getPropertyValue("kafkaTopicKeySerializer"));
          configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new 
CommonUtility().getPropertyValue("IndexMessageSerializer"));  }}}
Kafka consumer Configurations:

[@bean|https://github.com/bean]
public Consumer<String, IndexMessage> IndexMessageKafkaConsumer() {
 {{     Map<String, Object> configProps = new HashMap();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaConnectionUrl"));
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, new 
CommonUtility().getPropertyValue("indexConsumerGroupid"));
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, new 
CommonUtility().getPropertyValue("offset"));
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, new 
CommonUtility().getPropertyValue("autoCommit"));

        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaTopicKeyDeserializer"));
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new 
CommonUtility().getPropertyValue("IndexMessageDeserializer"));
        
        
        System.out.println("Inside Consumer Configuration");
        return new KafkaConsumer(configProps); }}}
{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to