Thanks, I'll check it out. On Thu, Aug 29, 2019 at 1:08 PM David Morin <morin.david....@gmail.com> wrote:
> Vishwas, > > A config that works on my Kerberized cluster (Flink on Yarn). > I hope this will help you. > > Flink conf: > security.kerberos.login.use-ticket-cache: true > security.kerberos.login.keytab: /home/myuser/myuser.keytab > security.kerberos.login.principal: myuser@XXXX > security.kerberos.login.contexts: Client > > Properties related to security passed as argument of the > FlinkKafkaConsumerXX constructor: > sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule > required username=\"myuser\" password=\"XXXX\";" > sasl.mechanism=PLAIN > security.protocol=SASL_SSL > > Le jeu. 29 août 2019 à 18:20, Vishwas Siravara <vsirav...@gmail.com> a > écrit : > >> Hey David , >> My consumers are registered , here is the debug log. The problem is the >> broker does not belong to me , so I can’t see what is going on there . But >> this is a new consumer group , so there is no state yet . >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> Consumer subtask 0 will start reading the following 40 partitions from the >> committed group offsets in Kafka: >> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] >> >> On Thu, Aug 29, 2019 at 11:39 AM David Morin <morin.david....@gmail.com> >> wrote: >> >>> Hello Vishwas, >>> >>> You can use a keytab if you prefer. You generate a keytab for your user >>> and then you can reference it in the Flink configuration. >>> Then this keytab will be handled by Flink in a secure way and TGT will >>> be created based on this keytab. >>> However, that seems to be working. >>> Did you check Kafka logs on the broker side ? >>> Or did you check consumer offsets with Kafka tools in order to validate >>> consumers are registered onto the different partitions of your topic ? >>> You could try to switch to a different groupid for your consumer group >>> in order to force parallel consumption. >>> >>> Le jeu. 29 août 2019 à 09:57, Vishwas Siravara <vsirav...@gmail.com> a >>> écrit : >>> >>>> I see this log as well , but I can't see any messages . I know for a >>>> fact that the topic I am subscribed to has messages as I checked with a >>>> simple java consumer with a different group. >>>> >>>> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >>>> Consumer subtask 0 will start reading the following 40 partitions from the >>>> committed group offsets in Kafka: >>>> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, >>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] >>>> >>>> >>>> On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara <vsirav...@gmail.com> >>>> wrote: >>>> >>>>> Hi guys, >>>>> I am using kerberos for my kafka source. I pass the jaas config and >>>>> krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf >>>>> -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.0000-Linux-x86_64-64b-r234867/lib/ >>>>> -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf >>>>> -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf >>>>> >>>>> When I look at debug logs I see that the consumer was created with the >>>>> following properties. >>>>> >>>>> 2019-08-29 06:49:18,298 INFO >>>>> org.apache.kafka.clients.consumer.ConsumerConfig - >>>>> ConsumerConfig values: >>>>> auto.commit.interval.ms = 5000 >>>>> auto.offset.reset = latest >>>>> bootstrap.servers = [sl73oprdbd018.visa.com:9092] >>>>> check.crcs = true >>>>> client.id = consumer-2 >>>>> connections.max.idle.ms = 540000 >>>>> enable.auto.commit = true >>>>> exclude.internal.topics = true >>>>> fetch.max.bytes = 52428800 >>>>> fetch.max.wait.ms = 500 >>>>> fetch.min.bytes = 1 >>>>> >>>>> >>>>> group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) >>>>> heartbeat.interval.ms = 3000 >>>>> interceptor.classes = null >>>>> key.deserializer = class >>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>>>> max.partition.fetch.bytes = 1048576 >>>>> max.poll.interval.ms = 300000 >>>>> max.poll.records = 500 >>>>> metadata.max.age.ms = 300000 >>>>> metric.reporters = [] >>>>> metrics.num.samples = 2 >>>>> metrics.recording.level = INFO >>>>> metrics.sample.window.ms = 30000 >>>>> partition.assignment.strategy = [class >>>>> org.apache.kafka.clients.consumer.RangeAssignor] >>>>> receive.buffer.bytes = 65536 >>>>> reconnect.backoff.ms = 50 >>>>> request.timeout.ms = 305000 >>>>> retry.backoff.ms = 100 >>>>> sasl.jaas.config = null >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>>> sasl.kerberos.min.time.before.relogin = 60000 >>>>> sasl.kerberos.service.name = null >>>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>>> sasl.mechanism = GSSAPI >>>>> security.protocol = SASL_PLAINTEXT >>>>> send.buffer.bytes = 131072 >>>>> session.timeout.ms = 10000 >>>>> ssl.cipher.suites = null >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>>> ssl.endpoint.identification.algorithm = null >>>>> ssl.key.password = null >>>>> ssl.keymanager.algorithm = SunX509 >>>>> ssl.keystore.location = null >>>>> ssl.keystore.password = null >>>>> ssl.keystore.type = JKS >>>>> ssl.protocol = TLS >>>>> ssl.provider = null >>>>> ssl.secure.random.implementation = null >>>>> ssl.trustmanager.algorithm = PKIX >>>>> ssl.truststore.location = null >>>>> ssl.truststore.password = null >>>>> ssl.truststore.type = JKS >>>>> value.deserializer = class >>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>>>> >>>>> >>>>> I can also see that the kerberos login is working fine. Here is the log >>>>> for it: >>>>> >>>>> >>>>> >>>>> 2019-08-29 06:49:18,312 INFO >>>>> org.apache.kafka.common.security.authenticator.AbstractLogin - >>>>> Successfully logged in. >>>>> 2019-08-29 06:49:18,313 INFO >>>>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT refresh >>>>> thread started. >>>>> 2019-08-29 06:49:18,314 INFO >>>>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT valid >>>>> starting at: Thu Aug 29 06:49:18 GMT 2019 >>>>> 2019-08-29 06:49:18,314 INFO >>>>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT expires: >>>>> Thu Aug 29 16:49:18 GMT 2019 >>>>> 2019-08-29 06:49:18,315 INFO >>>>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT refresh >>>>> sleeping until: Thu Aug 29 15:00:10 GMT 2019 >>>>> 2019-08-29 06:49:18,316 WARN >>>>> org.apache.kafka.clients.consumer.ConsumerConfig - The >>>>> configuration 'zookeeper.connect' was supplied but isn't a known config. >>>>> 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser >>>>> - Kafka version : 0.10.2.0 >>>>> 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser >>>>> - Kafka commitId : 576d93a8dc0cf421 >>>>> >>>>> >>>>> I then see this log : >>>>> >>>>> INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - >>>>> Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: >>>>> null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) >>>>> >>>>> >>>>> >>>>> *The problem is I do not see any error log but there is no data being >>>>> processed by the consmer and it has been a nightmare to debug. * >>>>> >>>>> >>>>> Thanks for all the help . >>>>> >>>>> >>>>> Thanks,Vishwas >>>>> >>>>>