Hi all, I've finally managed to use the tool with the --command-config flag and a consumer.properties file containing the line "security.protocol=SASL_PLAINTEXT" (the security-protocol flag of the command line tool does not seem to have any effect for this particular tool).
However, I'm still at square one as the current offset and lag of my consumer is reported as "unknown"... GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER test, winserver, 0, unknown, 1046852790, unknown, consumer-1_/172.29.129.71 I've recompiled a version with the consumer manually committing the offset after each poll, and it continuously consumes message from the topic. Any idea ? Thanks, Pierre 2016-06-08 11:19 GMT+02:00 Pierre Labiausse <pierre.labiau...@gmail.com>: > The thread on the hortonworks community forum is there -> > https://community.hortonworks.com/questions/38409/not-able-to-monitor-consumer-group-lag-with-new-co.html > > 2016-06-08 9:59 GMT+02:00 Pierre Labiausse <pierre.labiau...@gmail.com>: > >> Hello again, >> >> I've tried to use the command line tool again without success, but I'm >> not seing the IOException anymore in the error stack, only an EOFException. >> Nothing is logged server-side, as if the request is not even received. >> >> Ewen, I've tried using the command-config flag as you suggested, but I am >> not sure what additional configuration the client would need. I don't need >> additional configuration files when using the console producer and consumer >> (just setting the security-protocol is okay), and when using the java >> consumer, I only need to provide a JAAS file to either use the kerberos >> ticket cache or a keytab to authenticate. >> >> I've found a consumer.properties file in kafka configuration folder, >> which I'll paste below, but I don't think it contains relevant information. >> >> I will create a thread on hortonworks community forum, and will add the >> link here when it goes through their validation process. >> >> Regards, >> Pierre >> >> >> *** consumer.properties file *** >> # Licensed to the Apache Software Foundation (ASF) under one or more >> # contributor license agreements. See the NOTICE file distributed with >> # this work for additional information regarding copyright ownership. >> # The ASF licenses this file to You under the Apache License, Version 2.0 >> # (the "License"); you may not use this file except in compliance with >> # the License. You may obtain a copy of the License at >> # >> # http://www.apache.org/licenses/LICENSE-2.0 >> # >> # Unless required by applicable law or agreed to in writing, software >> # distributed under the License is distributed on an "AS IS" BASIS, >> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> # See the License for the specific language governing permissions and >> # limitations under the License. >> # see kafka.consumer.ConsumerConfig for more details >> >> # Zookeeper connection string >> # comma separated host:port pairs, each corresponding to a zk >> # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" >> zookeeper.connect=127.0.0.1:2181 >> >> # timeout in ms for connecting to zookeeper >> zookeeper.connection.timeout.ms=6000 >> >> #consumer group id >> group.id=test-consumer-group >> >> #consumer timeout >> #consumer.timeout.ms=5000 >> >> >> 2016-06-08 1:24 GMT+02:00 Ewen Cheslack-Postava <e...@confluent.io>: >> >>> Pierre, >>> >>> I think you'll need the rest of the security-related configs you'd use >>> for >>> a normal client as well. You can use the --command-config flag to include >>> additional settings stored in a property file. >>> >>> -Ewen >>> >>> On Tue, Jun 7, 2016 at 8:57 AM, Pierre LABIAUSSE < >>> plabiau...@i-tracing.com> >>> wrote: >>> >>> > Hi, >>> > >>> > I'm not able to use kafka-consumer-groups.sh to monitor the lag of my >>> > consumers when my cluster is kerberized. >>> > >>> > I'm using kafka version 0.9.0 installed on an hortonworks hdp 2.4.0 >>> > cluster. >>> > >>> > I've replicated my setup on two sandboxes, one without kerberos and one >>> > with kerberos. >>> > >>> > On the one without kerberos, I'm able to get an answer from th >>> > consumer-groups client with the following command: >>> > ./kafka-consumer-groups.sh --list --new-consumer --bootstrap-server >>> > sandbox.hortonworks.com:6667 >>> > >>> > On the one with kerberos activated, i've tried the same command without >>> > specifying the security-protocoln which fails with a >>> java.io.EOFException >>> > when connecting to the broker. >>> > When specifying the security-protocol (PLAINTEXTSASL or >>> SASL_PLAINTEXT), >>> > I'm seeing a java.io.IOException: Connection reset by peer (full stack >>> is >>> > below). >>> > >>> > In both the cases with kerberos, without activating the debug logs in >>> the >>> > client, it appears to hang since it is stuck in an infinite loop of >>> > retrying to connect with the broker (and failing) >>> > >>> > >>> > Am I missing something in order to use this tool in a kerberized >>> > environment ? As of now, I'm not seeing any other way to monitor >>> consumer >>> > offsets, since they are not stored in zookeeper anymore. >>> > >>> > Thanks in advance, >>> > Pierre >>> > >>> > >>> > ****** Full stack of execution with kerberos and security-protocol >>> > specified ****** >>> > ./kafka-consumer-groups.sh --security-protocol PLAINTEXTSASL >>> > --new-consumer --bootstrap-server sandbox.hortonworks.com:6667 --list >>> > [2016-06-07 12:42:26,951] INFO Successfully logged in. >>> > (org.apache.kafka.common.security.kerberos.Login) >>> > [2016-06-07 12:42:26,952] DEBUG It is a Kerberos ticket >>> > (org.apache.kafka.common.security.kerberos.Login) >>> > [2016-06-07 12:42:26,971] INFO TGT refresh thread started. >>> > (org.apache.kafka.common.security.kerberos.Login) >>> > [2016-06-07 12:42:26,981] DEBUG Found TGT Ticket (hex) = >>> > 0000: 61 82 01 5F 30 82 01 5B A0 03 02 01 05 A1 09 1B a.._0..[........ >>> > 0010: 07 49 54 52 2E 4C 41 4E A2 1C 30 1A A0 03 02 01 .ITR.LAN..0..... >>> > 0020: 02 A1 13 30 11 1B 06 6B 72 62 74 67 74 1B 07 49 ...0...krbtgt..I >>> > 0030: 54 52 2E 4C 41 4E A3 82 01 29 30 82 01 25 A0 03 TR.LAN...)0..%.. >>> > 0040: 02 01 12 A1 03 02 01 01 A2 82 01 17 04 82 01 13 ................ >>> > 0050: D9 9F 09 9C F7 96 72 D2 5F 84 20 B9 D7 5D DC 7B ......r._. ..].. >>> > 0060: 8D 4F A0 03 DC D5 85 54 86 4D A1 A6 F1 31 5A BF .O.....T.M...1Z. >>> > 0070: F9 40 71 43 20 97 7F 84 D6 F7 2D 93 16 27 06 B2 .@qC .....-..'.. >>> > 0080: 42 45 9D DE C3 4C 61 B6 8B 9B B3 E5 F8 F7 EB 3E BE...La........> >>> > 0090: D6 53 AE 9D 5D E0 06 DA 75 E0 43 DE 28 9C DE CB .S..]...u.C.(... >>> > 00A0: CF 72 00 5A CC 69 20 82 5F C6 4F 1D 7F D0 1F FB .r.Z.i ._.O..... >>> > 00B0: 92 55 B6 31 69 F1 E8 5B FD 2B 22 F8 15 E0 5D 84 .U.1i..[.+"...]. >>> > 00C0: 5A 1A 2B 6D 0B 90 93 97 5B 06 EC 30 37 3C BB 71 Z.+m....[..07<.q >>> > 00D0: 0B 23 24 67 F2 70 ED 1A E2 FF 6F 3A 12 0F B2 1D .#$g.p....o:.... >>> > 00E0: AD B9 C9 2C 24 B3 89 B3 90 22 8F 5C 1E AE 86 99 ...,$....".\.... >>> > 00F0: 1A B5 B4 4A 3E 1D 6F 73 FD CB 60 D3 E3 76 71 6C ...J>.os..`..vql >>> > 0100: 90 B5 EA 4A D3 74 87 0E 02 9E C4 6D 0E 49 A2 47 ...J.t.....m.I.G >>> > 0110: A4 2A FA CD D4 96 65 F3 FC E1 FB 9A 6F A1 0F 0E .*....e.....o... >>> > 0120: AF 6F 9F 9F D5 7C 5A 29 FE BF 84 18 2E CC 7F 0C .o....Z)........ >>> > 0130: 07 53 D2 F9 0A 44 DA 8E 3C B6 90 C0 71 69 5C CA .S...D..<...qi\. >>> > 0140: 9F E1 FE 23 71 C1 B7 B1 1A 7D 84 BD 33 AA ED A6 ...#q.......3... >>> > 0150: 9A CE 08 A9 9B 6E 29 54 B5 6B 06 9A 4D 4C 5F 3A .....n)T.k..ML_: >>> > 0160: CF A6 FF ... >>> > >>> > Client Principal = kafka/sandbox.hortonworks....@itr.lan >>> > Server Principal = krbtgt/itr....@itr.lan >>> > Session Key = EncryptionKey: keyType=18 keyBytes (hex dump)= >>> > 0000: 8E 4E 45 F8 0D B4 33 0C ED C5 7C A2 2D E2 C2 19 .NE...3.....-... >>> > 0010: 87 CC 27 68 72 B1 5B F8 C4 7D E8 BF EC F0 E9 F4 ..'hr.[......... >>> > >>> > >>> > Forwardable Ticket true >>> > Forwarded Ticket false >>> > Proxiable Ticket false >>> > Proxy Ticket false >>> > Postdated Ticket false >>> > Renewable Ticket true >>> > Initial Ticket true >>> > Auth Time = Tue Jun 07 12:36:46 UTC 2016 >>> > Start Time = Tue Jun 07 12:36:46 UTC 2016 >>> > End Time = Wed Jun 08 12:36:46 UTC 2016 >>> > Renew Till = Tue Jun 07 12:36:46 UTC 2016 >>> > Client Addresses Null . >>> (org.apache.kafka.common.security.kerberos.Login) >>> > [2016-06-07 12:42:26,981] INFO TGT valid starting at: Tue Jun 07 >>> 12:36:46 >>> > UTC 2016 (org.apache.kafka.common.security.kerberos.Login) >>> > [2016-06-07 12:42:26,981] INFO TGT expires: Wed Jun 08 12:36:46 UTC >>> 2016 >>> > (org.apache.kafka.common.security.kerberos.Login) >>> > [2016-06-07 12:42:26,981] INFO TGT refresh sleeping until: Tue Jun 07 >>> > 14:40:41 UTC 2016 (org.apache.kafka.common.security.kerberos.Login) >>> > [2016-06-07 12:42:27,036] DEBUG Updated cluster metadata version 1 to >>> > Cluster(nodes = [Node(-1, sandbox.hortonworks.com, 6667)], partitions >>> = >>> > []) (org.apache.kafka.clients.Metadata) >>> > [2016-06-07 12:42:27,044] DEBUG Added sensor with name >>> connections-closed: >>> > (org.apache.kafka.common.metrics.Metrics) >>> > [2016-06-07 12:42:27,046] DEBUG Added sensor with name >>> > connections-created: (org.apache.kafka.common.metrics.Metrics) >>> > [2016-06-07 12:42:27,047] DEBUG Added sensor with name >>> > bytes-sent-received: (org.apache.kafka.common.metrics.Metrics) >>> > [2016-06-07 12:42:27,047] DEBUG Added sensor with name bytes-sent: >>> > (org.apache.kafka.common.metrics.Metrics) >>> > [2016-06-07 12:42:27,048] DEBUG Added sensor with name bytes-received: >>> > (org.apache.kafka.common.metrics.Metrics) >>> > [2016-06-07 12:42:27,048] DEBUG Added sensor with name select-time: >>> > (org.apache.kafka.common.metrics.Metrics) >>> > [2016-06-07 12:42:27,048] DEBUG Added sensor with name io-time: >>> > (org.apache.kafka.common.metrics.Metrics) >>> > [2016-06-07 12:42:27,082] DEBUG Initiating connection to node -1 at >>> > sandbox.hortonworks.com:6667. (org.apache.kafka.clients.NetworkClient) >>> > [2016-06-07 12:42:27,091] DEBUG Added sensor with name >>> node--1.bytes-sent >>> > (org.apache.kafka.common.metrics.Metrics) >>> > [2016-06-07 12:42:27,091] DEBUG Added sensor with name >>> > node--1.bytes-received (org.apache.kafka.common.metrics.Metrics) >>> > [2016-06-07 12:42:27,091] DEBUG Added sensor with name node--1.latency >>> > (org.apache.kafka.common.metrics.Metrics) >>> > [2016-06-07 12:42:27,091] DEBUG Completed connection to node -1 >>> > (org.apache.kafka.clients.NetworkClient) >>> > [2016-06-07 12:42:27,182] DEBUG Sending metadata request >>> > ClientRequest(expectResponse=true, callback=null, >>> > >>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=3,client_id=admin-1}, >>> > body={topics=[]}), isInitiatedByNetworkClient, >>> createdTimeMs=1465303347182, >>> > sendTimeMs=0) to node -1 (org.apache.kafka.clients.NetworkClient) >>> > [2016-06-07 12:42:27,187] DEBUG Connection with >>> > sandbox.hortonworks.com/10.0.2.15 disconnected >>> > (org.apache.kafka.common.network.Selector) >>> > java.io.IOException: Connection reset by peer >>> > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) >>> > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) >>> > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) >>> > at sun.nio.ch.IOUtil.read(IOUtil.java:197) >>> > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384) >>> > at >>> > >>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108) >>> > at >>> > >>> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) >>> > at >>> > >>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) >>> > at >>> > >>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160) >>> > at >>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141) >>> > at org.apache.kafka.common.network.Selector.poll(Selector.java:286) >>> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) >>> > at >>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) >>> > at >>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) >>> > at >>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) >>> > at >>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) >>> > at >>> > >>> kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminClient.scala:52) >>> > at >>> > >>> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:67) >>> > at >>> > >>> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:64) >>> > at scala.collection.immutable.List.foreach(List.scala:318) >>> > at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:64) >>> > at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:93) >>> > at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:101) >>> > at >>> kafka.admin.AdminClient.listAllGroupsFlattened(AdminClient.scala:122) >>> > at >>> > >>> kafka.admin.AdminClient.listAllConsumerGroupsFlattened(AdminClient.scala:126) >>> > at >>> > >>> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.list(ConsumerGroupCommand.scala:322) >>> > at >>> kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:73) >>> > at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) >>> > [2016-06-07 12:42:27,190] DEBUG Node -1 disconnected. >>> > (org.apache.kafka.clients.NetworkClient) >>> > [2016-06-07 12:42:27,190] TRACE Cancelled request >>> > ClientRequest(expectResponse=true, >>> > >>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@3528b777 >>> , >>> > >>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=2,client_id=admin-1}, >>> > body={topics=[]}), createdTimeMs=1465303347182, >>> sendTimeMs=1465303347184) >>> > due to node -1 being disconnected >>> (org.apache.kafka.clients.NetworkClient) >>> > [2016-06-07 12:42:27,190] TRACE Cancelled request >>> > ClientRequest(expectResponse=true, callback=null, >>> > >>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=3,client_id=admin-1}, >>> > body={topics=[]}), isInitiatedByNetworkClient, >>> createdTimeMs=1465303347182, >>> > sendTimeMs=1465303347182) due to node -1 being disconnected >>> > (org.apache.kafka.clients.NetworkClient) >>> > [2016-06-07 12:42:27,190] DEBUG Give up sending metadata request since >>> no >>> > node is available (org.apache.kafka.clients.NetworkClient) >>> > >>> >>> >>> >>> -- >>> Thanks, >>> Ewen >>> >> >> >