Vishwas, A config that works on my Kerberized cluster (Flink on Yarn). I hope this will help you.
Flink conf: security.kerberos.login.use-ticket-cache: true security.kerberos.login.keytab: /home/myuser/myuser.keytab security.kerberos.login.principal: myuser@XXXX security.kerberos.login.contexts: Client Properties related to security passed as argument of the FlinkKafkaConsumerXX constructor: sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"myuser\" password=\"XXXX\";" sasl.mechanism=PLAIN security.protocol=SASL_SSL Le jeu. 29 août 2019 à 18:20, Vishwas Siravara <vsirav...@gmail.com> a écrit : > Hey David , > My consumers are registered , here is the debug log. The problem is the > broker does not belong to me , so I can’t see what is going on there . But > this is a new consumer group , so there is no state yet . > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 will start reading the following 40 partitions from the > committed group offsets in Kafka: > [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, > KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] > > On Thu, Aug 29, 2019 at 11:39 AM David Morin <morin.david....@gmail.com> > wrote: > >> Hello Vishwas, >> >> You can use a keytab if you prefer. You generate a keytab for your user >> and then you can reference it in the Flink configuration. >> Then this keytab will be handled by Flink in a secure way and TGT will be >> created based on this keytab. >> However, that seems to be working. >> Did you check Kafka logs on the broker side ? >> Or did you check consumer offsets with Kafka tools in order to validate >> consumers are registered onto the different partitions of your topic ? >> You could try to switch to a different groupid for your consumer group in >> order to force parallel consumption. >> >> Le jeu. 29 août 2019 à 09:57, Vishwas Siravara <vsirav...@gmail.com> a >> écrit : >> >>> I see this log as well , but I can't see any messages . I know for a >>> fact that the topic I am subscribed to has messages as I checked with a >>> simple java consumer with a different group. >>> >>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >>> Consumer subtask 0 will start reading the following 40 partitions from the >>> committed group offsets in Kafka: >>> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, >>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] >>> >>> >>> On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara <vsirav...@gmail.com> >>> wrote: >>> >>>> Hi guys, >>>> I am using kerberos for my kafka source. I pass the jaas config and >>>> krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf >>>> -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.0000-Linux-x86_64-64b-r234867/lib/ >>>> -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf >>>> -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf >>>> >>>> When I look at debug logs I see that the consumer was created with the >>>> following properties. >>>> >>>> 2019-08-29 06:49:18,298 INFO >>>> org.apache.kafka.clients.consumer.ConsumerConfig - >>>> ConsumerConfig values: >>>> auto.commit.interval.ms = 5000 >>>> auto.offset.reset = latest >>>> bootstrap.servers = [sl73oprdbd018.visa.com:9092] >>>> check.crcs = true >>>> client.id = consumer-2 >>>> connections.max.idle.ms = 540000 >>>> enable.auto.commit = true >>>> exclude.internal.topics = true >>>> fetch.max.bytes = 52428800 >>>> fetch.max.wait.ms = 500 >>>> fetch.min.bytes = 1 >>>> >>>> >>>> group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) >>>> heartbeat.interval.ms = 3000 >>>> interceptor.classes = null >>>> key.deserializer = class >>>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>>> max.partition.fetch.bytes = 1048576 >>>> max.poll.interval.ms = 300000 >>>> max.poll.records = 500 >>>> metadata.max.age.ms = 300000 >>>> metric.reporters = [] >>>> metrics.num.samples = 2 >>>> metrics.recording.level = INFO >>>> metrics.sample.window.ms = 30000 >>>> partition.assignment.strategy = [class >>>> org.apache.kafka.clients.consumer.RangeAssignor] >>>> receive.buffer.bytes = 65536 >>>> reconnect.backoff.ms = 50 >>>> request.timeout.ms = 305000 >>>> retry.backoff.ms = 100 >>>> sasl.jaas.config = null >>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>> sasl.kerberos.min.time.before.relogin = 60000 >>>> sasl.kerberos.service.name = null >>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>> sasl.mechanism = GSSAPI >>>> security.protocol = SASL_PLAINTEXT >>>> send.buffer.bytes = 131072 >>>> session.timeout.ms = 10000 >>>> ssl.cipher.suites = null >>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>> ssl.endpoint.identification.algorithm = null >>>> ssl.key.password = null >>>> ssl.keymanager.algorithm = SunX509 >>>> ssl.keystore.location = null >>>> ssl.keystore.password = null >>>> ssl.keystore.type = JKS >>>> ssl.protocol = TLS >>>> ssl.provider = null >>>> ssl.secure.random.implementation = null >>>> ssl.trustmanager.algorithm = PKIX >>>> ssl.truststore.location = null >>>> ssl.truststore.password = null >>>> ssl.truststore.type = JKS >>>> value.deserializer = class >>>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>>> >>>> >>>> I can also see that the kerberos login is working fine. Here is the log >>>> for it: >>>> >>>> >>>> >>>> 2019-08-29 06:49:18,312 INFO >>>> org.apache.kafka.common.security.authenticator.AbstractLogin - >>>> Successfully logged in. >>>> 2019-08-29 06:49:18,313 INFO >>>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT refresh >>>> thread started. >>>> 2019-08-29 06:49:18,314 INFO >>>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT valid >>>> starting at: Thu Aug 29 06:49:18 GMT 2019 >>>> 2019-08-29 06:49:18,314 INFO >>>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT expires: >>>> Thu Aug 29 16:49:18 GMT 2019 >>>> 2019-08-29 06:49:18,315 INFO >>>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT refresh >>>> sleeping until: Thu Aug 29 15:00:10 GMT 2019 >>>> 2019-08-29 06:49:18,316 WARN >>>> org.apache.kafka.clients.consumer.ConsumerConfig - The >>>> configuration 'zookeeper.connect' was supplied but isn't a known config. >>>> 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser >>>> - Kafka version : 0.10.2.0 >>>> 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser >>>> - Kafka commitId : 576d93a8dc0cf421 >>>> >>>> >>>> I then see this log : >>>> >>>> INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - >>>> Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: >>>> null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) >>>> >>>> >>>> >>>> *The problem is I do not see any error log but there is no data being >>>> processed by the consmer and it has been a nightmare to debug. * >>>> >>>> >>>> Thanks for all the help . >>>> >>>> >>>> Thanks,Vishwas >>>> >>>>