The thread on the hortonworks community forum is there -> https://community.hortonworks.com/questions/38409/not-able-to-monitor-consumer-group-lag-with-new-co.html
2016-06-08 9:59 GMT+02:00 Pierre Labiausse <pierre.labiau...@gmail.com>: > Hello again, > > I've tried to use the command line tool again without success, but I'm not > seing the IOException anymore in the error stack, only an EOFException. > Nothing is logged server-side, as if the request is not even received. > > Ewen, I've tried using the command-config flag as you suggested, but I am > not sure what additional configuration the client would need. I don't need > additional configuration files when using the console producer and consumer > (just setting the security-protocol is okay), and when using the java > consumer, I only need to provide a JAAS file to either use the kerberos > ticket cache or a keytab to authenticate. > > I've found a consumer.properties file in kafka configuration folder, which > I'll paste below, but I don't think it contains relevant information. > > I will create a thread on hortonworks community forum, and will add the > link here when it goes through their validation process. > > Regards, > Pierre > > > *** consumer.properties file *** > # Licensed to the Apache Software Foundation (ASF) under one or more > # contributor license agreements. See the NOTICE file distributed with > # this work for additional information regarding copyright ownership. > # The ASF licenses this file to You under the Apache License, Version 2.0 > # (the "License"); you may not use this file except in compliance with > # the License. You may obtain a copy of the License at > # > # http://www.apache.org/licenses/LICENSE-2.0 > # > # Unless required by applicable law or agreed to in writing, software > # distributed under the License is distributed on an "AS IS" BASIS, > # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > # See the License for the specific language governing permissions and > # limitations under the License. > # see kafka.consumer.ConsumerConfig for more details > > # Zookeeper connection string > # comma separated host:port pairs, each corresponding to a zk > # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" > zookeeper.connect=127.0.0.1:2181 > > # timeout in ms for connecting to zookeeper > zookeeper.connection.timeout.ms=6000 > > #consumer group id > group.id=test-consumer-group > > #consumer timeout > #consumer.timeout.ms=5000 > > > 2016-06-08 1:24 GMT+02:00 Ewen Cheslack-Postava <e...@confluent.io>: > >> Pierre, >> >> I think you'll need the rest of the security-related configs you'd use for >> a normal client as well. You can use the --command-config flag to include >> additional settings stored in a property file. >> >> -Ewen >> >> On Tue, Jun 7, 2016 at 8:57 AM, Pierre LABIAUSSE < >> plabiau...@i-tracing.com> >> wrote: >> >> > Hi, >> > >> > I'm not able to use kafka-consumer-groups.sh to monitor the lag of my >> > consumers when my cluster is kerberized. >> > >> > I'm using kafka version 0.9.0 installed on an hortonworks hdp 2.4.0 >> > cluster. >> > >> > I've replicated my setup on two sandboxes, one without kerberos and one >> > with kerberos. >> > >> > On the one without kerberos, I'm able to get an answer from th >> > consumer-groups client with the following command: >> > ./kafka-consumer-groups.sh --list --new-consumer --bootstrap-server >> > sandbox.hortonworks.com:6667 >> > >> > On the one with kerberos activated, i've tried the same command without >> > specifying the security-protocoln which fails with a >> java.io.EOFException >> > when connecting to the broker. >> > When specifying the security-protocol (PLAINTEXTSASL or SASL_PLAINTEXT), >> > I'm seeing a java.io.IOException: Connection reset by peer (full stack >> is >> > below). >> > >> > In both the cases with kerberos, without activating the debug logs in >> the >> > client, it appears to hang since it is stuck in an infinite loop of >> > retrying to connect with the broker (and failing) >> > >> > >> > Am I missing something in order to use this tool in a kerberized >> > environment ? As of now, I'm not seeing any other way to monitor >> consumer >> > offsets, since they are not stored in zookeeper anymore. >> > >> > Thanks in advance, >> > Pierre >> > >> > >> > ****** Full stack of execution with kerberos and security-protocol >> > specified ****** >> > ./kafka-consumer-groups.sh --security-protocol PLAINTEXTSASL >> > --new-consumer --bootstrap-server sandbox.hortonworks.com:6667 --list >> > [2016-06-07 12:42:26,951] INFO Successfully logged in. >> > (org.apache.kafka.common.security.kerberos.Login) >> > [2016-06-07 12:42:26,952] DEBUG It is a Kerberos ticket >> > (org.apache.kafka.common.security.kerberos.Login) >> > [2016-06-07 12:42:26,971] INFO TGT refresh thread started. >> > (org.apache.kafka.common.security.kerberos.Login) >> > [2016-06-07 12:42:26,981] DEBUG Found TGT Ticket (hex) = >> > 0000: 61 82 01 5F 30 82 01 5B A0 03 02 01 05 A1 09 1B a.._0..[........ >> > 0010: 07 49 54 52 2E 4C 41 4E A2 1C 30 1A A0 03 02 01 .ITR.LAN..0..... >> > 0020: 02 A1 13 30 11 1B 06 6B 72 62 74 67 74 1B 07 49 ...0...krbtgt..I >> > 0030: 54 52 2E 4C 41 4E A3 82 01 29 30 82 01 25 A0 03 TR.LAN...)0..%.. >> > 0040: 02 01 12 A1 03 02 01 01 A2 82 01 17 04 82 01 13 ................ >> > 0050: D9 9F 09 9C F7 96 72 D2 5F 84 20 B9 D7 5D DC 7B ......r._. ..].. >> > 0060: 8D 4F A0 03 DC D5 85 54 86 4D A1 A6 F1 31 5A BF .O.....T.M...1Z. >> > 0070: F9 40 71 43 20 97 7F 84 D6 F7 2D 93 16 27 06 B2 .@qC .....-..'.. >> > 0080: 42 45 9D DE C3 4C 61 B6 8B 9B B3 E5 F8 F7 EB 3E BE...La........> >> > 0090: D6 53 AE 9D 5D E0 06 DA 75 E0 43 DE 28 9C DE CB .S..]...u.C.(... >> > 00A0: CF 72 00 5A CC 69 20 82 5F C6 4F 1D 7F D0 1F FB .r.Z.i ._.O..... >> > 00B0: 92 55 B6 31 69 F1 E8 5B FD 2B 22 F8 15 E0 5D 84 .U.1i..[.+"...]. >> > 00C0: 5A 1A 2B 6D 0B 90 93 97 5B 06 EC 30 37 3C BB 71 Z.+m....[..07<.q >> > 00D0: 0B 23 24 67 F2 70 ED 1A E2 FF 6F 3A 12 0F B2 1D .#$g.p....o:.... >> > 00E0: AD B9 C9 2C 24 B3 89 B3 90 22 8F 5C 1E AE 86 99 ...,$....".\.... >> > 00F0: 1A B5 B4 4A 3E 1D 6F 73 FD CB 60 D3 E3 76 71 6C ...J>.os..`..vql >> > 0100: 90 B5 EA 4A D3 74 87 0E 02 9E C4 6D 0E 49 A2 47 ...J.t.....m.I.G >> > 0110: A4 2A FA CD D4 96 65 F3 FC E1 FB 9A 6F A1 0F 0E .*....e.....o... >> > 0120: AF 6F 9F 9F D5 7C 5A 29 FE BF 84 18 2E CC 7F 0C .o....Z)........ >> > 0130: 07 53 D2 F9 0A 44 DA 8E 3C B6 90 C0 71 69 5C CA .S...D..<...qi\. >> > 0140: 9F E1 FE 23 71 C1 B7 B1 1A 7D 84 BD 33 AA ED A6 ...#q.......3... >> > 0150: 9A CE 08 A9 9B 6E 29 54 B5 6B 06 9A 4D 4C 5F 3A .....n)T.k..ML_: >> > 0160: CF A6 FF ... >> > >> > Client Principal = kafka/sandbox.hortonworks....@itr.lan >> > Server Principal = krbtgt/itr....@itr.lan >> > Session Key = EncryptionKey: keyType=18 keyBytes (hex dump)= >> > 0000: 8E 4E 45 F8 0D B4 33 0C ED C5 7C A2 2D E2 C2 19 .NE...3.....-... >> > 0010: 87 CC 27 68 72 B1 5B F8 C4 7D E8 BF EC F0 E9 F4 ..'hr.[......... >> > >> > >> > Forwardable Ticket true >> > Forwarded Ticket false >> > Proxiable Ticket false >> > Proxy Ticket false >> > Postdated Ticket false >> > Renewable Ticket true >> > Initial Ticket true >> > Auth Time = Tue Jun 07 12:36:46 UTC 2016 >> > Start Time = Tue Jun 07 12:36:46 UTC 2016 >> > End Time = Wed Jun 08 12:36:46 UTC 2016 >> > Renew Till = Tue Jun 07 12:36:46 UTC 2016 >> > Client Addresses Null . >> (org.apache.kafka.common.security.kerberos.Login) >> > [2016-06-07 12:42:26,981] INFO TGT valid starting at: Tue Jun 07 >> 12:36:46 >> > UTC 2016 (org.apache.kafka.common.security.kerberos.Login) >> > [2016-06-07 12:42:26,981] INFO TGT expires: Wed Jun 08 12:36:46 UTC 2016 >> > (org.apache.kafka.common.security.kerberos.Login) >> > [2016-06-07 12:42:26,981] INFO TGT refresh sleeping until: Tue Jun 07 >> > 14:40:41 UTC 2016 (org.apache.kafka.common.security.kerberos.Login) >> > [2016-06-07 12:42:27,036] DEBUG Updated cluster metadata version 1 to >> > Cluster(nodes = [Node(-1, sandbox.hortonworks.com, 6667)], partitions = >> > []) (org.apache.kafka.clients.Metadata) >> > [2016-06-07 12:42:27,044] DEBUG Added sensor with name >> connections-closed: >> > (org.apache.kafka.common.metrics.Metrics) >> > [2016-06-07 12:42:27,046] DEBUG Added sensor with name >> > connections-created: (org.apache.kafka.common.metrics.Metrics) >> > [2016-06-07 12:42:27,047] DEBUG Added sensor with name >> > bytes-sent-received: (org.apache.kafka.common.metrics.Metrics) >> > [2016-06-07 12:42:27,047] DEBUG Added sensor with name bytes-sent: >> > (org.apache.kafka.common.metrics.Metrics) >> > [2016-06-07 12:42:27,048] DEBUG Added sensor with name bytes-received: >> > (org.apache.kafka.common.metrics.Metrics) >> > [2016-06-07 12:42:27,048] DEBUG Added sensor with name select-time: >> > (org.apache.kafka.common.metrics.Metrics) >> > [2016-06-07 12:42:27,048] DEBUG Added sensor with name io-time: >> > (org.apache.kafka.common.metrics.Metrics) >> > [2016-06-07 12:42:27,082] DEBUG Initiating connection to node -1 at >> > sandbox.hortonworks.com:6667. (org.apache.kafka.clients.NetworkClient) >> > [2016-06-07 12:42:27,091] DEBUG Added sensor with name >> node--1.bytes-sent >> > (org.apache.kafka.common.metrics.Metrics) >> > [2016-06-07 12:42:27,091] DEBUG Added sensor with name >> > node--1.bytes-received (org.apache.kafka.common.metrics.Metrics) >> > [2016-06-07 12:42:27,091] DEBUG Added sensor with name node--1.latency >> > (org.apache.kafka.common.metrics.Metrics) >> > [2016-06-07 12:42:27,091] DEBUG Completed connection to node -1 >> > (org.apache.kafka.clients.NetworkClient) >> > [2016-06-07 12:42:27,182] DEBUG Sending metadata request >> > ClientRequest(expectResponse=true, callback=null, >> > >> request=RequestSend(header={api_key=3,api_version=0,correlation_id=3,client_id=admin-1}, >> > body={topics=[]}), isInitiatedByNetworkClient, >> createdTimeMs=1465303347182, >> > sendTimeMs=0) to node -1 (org.apache.kafka.clients.NetworkClient) >> > [2016-06-07 12:42:27,187] DEBUG Connection with >> > sandbox.hortonworks.com/10.0.2.15 disconnected >> > (org.apache.kafka.common.network.Selector) >> > java.io.IOException: Connection reset by peer >> > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) >> > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) >> > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) >> > at sun.nio.ch.IOUtil.read(IOUtil.java:197) >> > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384) >> > at >> > >> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108) >> > at >> > >> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) >> > at >> > >> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) >> > at >> > >> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160) >> > at >> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141) >> > at org.apache.kafka.common.network.Selector.poll(Selector.java:286) >> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) >> > at >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) >> > at >> > >> kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminClient.scala:52) >> > at >> > >> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:67) >> > at >> > >> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:64) >> > at scala.collection.immutable.List.foreach(List.scala:318) >> > at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:64) >> > at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:93) >> > at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:101) >> > at kafka.admin.AdminClient.listAllGroupsFlattened(AdminClient.scala:122) >> > at >> > >> kafka.admin.AdminClient.listAllConsumerGroupsFlattened(AdminClient.scala:126) >> > at >> > >> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.list(ConsumerGroupCommand.scala:322) >> > at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:73) >> > at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) >> > [2016-06-07 12:42:27,190] DEBUG Node -1 disconnected. >> > (org.apache.kafka.clients.NetworkClient) >> > [2016-06-07 12:42:27,190] TRACE Cancelled request >> > ClientRequest(expectResponse=true, >> > >> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@3528b777 >> , >> > >> request=RequestSend(header={api_key=3,api_version=0,correlation_id=2,client_id=admin-1}, >> > body={topics=[]}), createdTimeMs=1465303347182, >> sendTimeMs=1465303347184) >> > due to node -1 being disconnected >> (org.apache.kafka.clients.NetworkClient) >> > [2016-06-07 12:42:27,190] TRACE Cancelled request >> > ClientRequest(expectResponse=true, callback=null, >> > >> request=RequestSend(header={api_key=3,api_version=0,correlation_id=3,client_id=admin-1}, >> > body={topics=[]}), isInitiatedByNetworkClient, >> createdTimeMs=1465303347182, >> > sendTimeMs=1465303347182) due to node -1 being disconnected >> > (org.apache.kafka.clients.NetworkClient) >> > [2016-06-07 12:42:27,190] DEBUG Give up sending metadata request since >> no >> > node is available (org.apache.kafka.clients.NetworkClient) >> > >> >> >> >> -- >> Thanks, >> Ewen >> > >