org.apache.kafka.common.KafkaException: Failed to construct kafka producer

2017-07-05 Thread 罗 辉
hi guys:

  I got an exception which i searched searchhadoop.com and the archive as well 
and got no matches, here it is:

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to 
construct kafka producer
 at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:342)
 at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:191)
 at com.zetyum.www.P$.main(P.scala:28)
 at com.zetyum.www.P.main(P.scala)
Caused by: org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.serialization.StringDeserializer is not an instance of 
org.apache.kafka.common.serialization.Serializer
 at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
 at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:239)
 ... 3 more


add I tried 0.11 and 0.10 for scala 2.10, both got upper exception. Here is my 
code:

import org.apache.kafka._
import org.apache.kafka.clients._
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.serialization._
import java.util.Properties

object P {
  def main(args: Array[String]) {
val brokers = "localhost:9092"
val topic = "test"
// Zookeeper connection properties
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
while (true) {
  val message = new ProducerRecord[String, String](topic, "1", "1")
  println(message)
  producer.send(message)
}
Thread.sleep(1000)
  }
}

thanks for any advices


San



Re: org.apache.kafka.common.KafkaException: Failed to construct kafka producer

2017-07-05 Thread tao xiao
you need to use org.apache.kafka.common.serialization.StringSerializer as
your ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG

On Wed, 5 Jul 2017 at 19:18 罗 辉  wrote:

> hi guys:
>
>   I got an exception which i searched searchhadoop.com and the archive as
> well and got no matches, here it is:
>
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> Exception in thread "main" org.apache.kafka.common.KafkaException: Failed
> to construct kafka producer
>  at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:342)
>  at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:191)
>  at com.zetyum.www.P$.main(P.scala:28)
>  at com.zetyum.www.P.main(P.scala)
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.StringDeserializer is not an instance
> of org.apache.kafka.common.serialization.Serializer
>  at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
>  at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:239)
>  ... 3 more
>
>
> add I tried 0.11 and 0.10 for scala 2.10, both got upper exception. Here
> is my code:
>
> import org.apache.kafka._
> import org.apache.kafka.clients._
> import org.apache.kafka.clients.producer._
> import org.apache.kafka.clients.consumer._
> import org.apache.kafka.common.serialization._
> import java.util.Properties
>
> object P {
>   def main(args: Array[String]) {
> val brokers = "localhost:9092"
> val topic = "test"
> // Zookeeper connection properties
> val props = new Properties()
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringDeserializer")
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> val producer = new KafkaProducer[String, String](props)
> while (true) {
>   val message = new ProducerRecord[String, String](topic, "1", "1")
>   println(message)
>   producer.send(message)
> }
> Thread.sleep(1000)
>   }
> }
>
> thanks for any advices
>
>
> San
>
>


Kafka consumer: fair consumption from multiple partitions

2017-07-05 Thread Stas Chizhov
Hi,

I have to process a topic with few thousand messages and a dozen partitions
from the very beginning. This topic is manually populated before
consumption. In this setup a consumer consuming from several partitions at
the same time tend to consume assigned partitions sequentially: first all
messages from 1 partition, then all messages from another and so on. I
wonder if there is away to make consumption more fair? I tried to play with
consumer properties without any luck. My kafka version is 0.10.2.

Thank you,
Stanislav.


why more open files

2017-07-05 Thread Satyavathi Anasuri
Hi,
   I have created a topic with 500 partitions in 3 node cluster 
with replication factor 3. kafka version is 0.11. I executed lsof command and 
it lists more 1 lakh open files. why these many open files and how to reduce it 
?.
reg's
Satya.


Re: [DISCUSS] KIP-175: Additional '--describe' views for ConsumerGroupCommand

2017-07-05 Thread Edoardo Comar
Hi Vahid,
no we are not relying on parsing the current output.

I just thought that keeping the full output isn't necessarily that bad as 
it shows some sort of history of how a group was used.

ciao
Edo
--

Edoardo Comar

IBM Message Hub

IBM UK Ltd, Hursley Park, SO21 2JN

"Vahid S Hashemian"  wrote on 04/07/2017 
17:11:43:

> From: "Vahid S Hashemian" 
> To: d...@kafka.apache.org
> Cc: "Kafka User" 
> Date: 04/07/2017 17:12
> Subject: Re: [DISCUSS] KIP-175: Additional '--describe' views for 
> ConsumerGroupCommand
> 
> Hi Edo,
> 
> Thanks for reviewing the KIP.
> 
> Modifying the default behavior of `--describe` was suggested in the 
> related JIRA.
> We could poll the community to see whether they go for that option, or, 
as 
> you suggested, introducing a new `--only-xxx` ( can't also think of a 
> proper name right now :) ) option instead.
> 
> Are you making use of the current `--describe` output and relying on the 

> full data set?
> 
> Thanks.
> --Vahid
> 
> 
> 
> 
> From:   Edoardo Comar 
> To: d...@kafka.apache.org
> Cc: "Kafka User" 
> Date:   07/04/2017 03:17 AM
> Subject:Re: [DISCUSS] KIP-175: Additional '--describe' views for 

> ConsumerGroupCommand
> 
> 
> 
> Thanks Vahid, I like the KIP.
> 
> One question - could we keep the current "--describe" behavior unchanged 

> and introduce "--only-xxx" options to filter down the full output as you 

> proposed ?
> 
> ciao,
> Edo
> --
> 
> Edoardo Comar
> 
> IBM Message Hub
> 
> IBM UK Ltd, Hursley Park, SO21 2JN
> 
> 
> 
> From:   "Vahid S Hashemian" 
> To: dev , "Kafka User" 

> Date:   04/07/2017 00:06
> Subject:[DISCUSS] KIP-175: Additional '--describe' views for 
> ConsumerGroupCommand
> 
> 
> 
> Hi,
> 
> I created KIP-175 to make some improvements to the ConsumerGroupCommand 
> tool.
> The KIP can be found here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-175%3A
> +Additional+%27--describe%27+views+for+ConsumerGroupCommand
> 
> 
> 
> Your review and feedback is welcome!
> 
> Thanks.
> --Vahid
> 
> 
> 
> 
> 
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number 

> 741598. 
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 
3AU
> 
> 
> 
> 

Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU


Re: [DISCUSS] KIP-175: Additional '--describe' views for ConsumerGroupCommand

2017-07-05 Thread Edoardo Comar
Thanks Vahid, I like the KIP.

One question - could we keep the current "--describe" behavior unchanged 
and introduce "--only-xxx" options to filter down the full output as you 
proposed ?

ciao,
Edo
--

Edoardo Comar

IBM Message Hub

IBM UK Ltd, Hursley Park, SO21 2JN



From:   "Vahid S Hashemian" 
To: dev , "Kafka User" 
Date:   04/07/2017 00:06
Subject:[DISCUSS] KIP-175: Additional '--describe' views for 
ConsumerGroupCommand



Hi,

I created KIP-175 to make some improvements to the ConsumerGroupCommand 
tool.
The KIP can be found here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-175%3A+Additional+%27--describe%27+views+for+ConsumerGroupCommand


Your review and feedback is welcome!

Thanks.
--Vahid





Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU


Re: why more open files

2017-07-05 Thread M. Manna
This is quite vague.

What commands have you executed?

What do you refer to by open files? Is it the log partition or consumer
offsets?

On 5 Jul 2017 3:21 pm, "Satyavathi Anasuri" 
wrote:

> Hi,
>I have created a topic with 500 partitions in 3 node
> cluster with replication factor 3. kafka version is 0.11. I executed lsof
> command and it lists more 1 lakh open files. why these many open files and
> how to reduce it ?.
> reg's
> Satya.
>


Re: why more open files

2017-07-05 Thread Kaufman Ng
Keep in mind Kafka brokers can use many file descriptors/handles. You may
need to increase the OS file descriptor limits.

http://kafka.apache.org/documentation/#os

"File descriptor limits: Kafka uses file descriptors for log segments and
open connections. If a broker hosts many partitions, consider that the
broker needs at least (number_of_partitions)*(partition_size/segment_size)
to track all log segments in addition to the number of connections the
broker makes. We recommend at least 10 allowed file descriptors for the
broker processes as a starting point."

On Wed, Jul 5, 2017 at 10:24 AM, M. Manna  wrote:

> This is quite vague.
>
> What commands have you executed?
>
> What do you refer to by open files? Is it the log partition or consumer
> offsets?
>
> On 5 Jul 2017 3:21 pm, "Satyavathi Anasuri" 
> wrote:
>
> > Hi,
> >I have created a topic with 500 partitions in 3 node
> > cluster with replication factor 3. kafka version is 0.11. I executed lsof
> > command and it lists more 1 lakh open files. why these many open files
> and
> > how to reduce it ?.
> > reg's
> > Satya.
> >
>



-- 
Kaufman Ng
+1 646 961 8063
Solutions Architect | Confluent | www.confluent.io


Re: Kafka Authorization and ACLs Broken

2017-07-05 Thread Raghav
Hi Rajini

Now that 0.11.0 is out, can we use the Admin client ? Are there some
example code for these ?

Thanks.

On Wed, May 24, 2017 at 9:06 PM, Rajini Sivaram 
wrote:

> Hi Raghav,
>
> Yes, you can create ACLs programmatically. Take a look at the use of
> AclCommand.main in https://github.com/apache/kafka/blob/trunk/core/src/
> test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
>
> If you can wait for the next release 0.11.0 that will be out next month,
> you can use the new Java AdminClient, which allows you to do this in a much
> neater way. Take a look at the interface https://github.com/
> apache/kafka/blob/trunk/clients/src/main/java/org/
> apache/kafka/clients/admin/AdminClient.java
> 
>
> If your release is not imminent, then you could build Kafka from the
> 0.11.0 branch and use the new AdminClient. When the release is out, you can
> switch over to the binary release.
>
> Regards,
>
> Rajini
>
>
>
> On Wed, May 24, 2017 at 4:13 PM, Raghav  wrote:
>
>> Hi Rajini
>>
>> Quick question on Configuring ACLs: We used bin/kafka-acls.sh to
>> configure ACL rules, which internally uses Kafka Admin APIs to configure
>> the ACLs.
>>
>> Can I add, remove and list ACLs via zk client libraries ? I want to be
>> able to add, remove, list ACLs via my code rather than using Kafka-acl.sh.
>> Is there a guideline for recommended set of libraries to use to do such
>> operations ?
>>
>> As always thanks so much.
>>
>>
>>
>> On Wed, May 24, 2017 at 7:04 AM, Rajini Sivaram 
>> wrote:
>>
>>> Raghav/Darshan,
>>>
>>> Can you try these steps on a clean installation of Kafka? It works for
>>> me, so hopefully it will work for you. And then you can adapt to your
>>> scenario.
>>>
>>> *Create keystores and truststores:*
>>>
>>> keytool -genkey -alias kafka -keystore server.keystore.jks -dname
>>> "CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
>>> -keypass server-key-password
>>>
>>> keytool -exportcert -file server-cert-file -keystore server.keystore.jks
>>> -alias kafka -storepass server-keystore-password
>>>
>>> keytool -importcert -file server-cert-file -keystore
>>> server.truststore.jks -alias kafka -storepass server-truststore-password
>>> -noprompt
>>>
>>> keytool -importcert -file server-cert-file -keystore
>>> client.truststore.jks -alias kafkaclient -storepass
>>> client-truststore-password -noprompt
>>>
>>>
>>> keytool -genkey -alias kafkaclient -keystore client.keystore.jks -dname
>>> "CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
>>> -keypass client-key-password
>>>
>>> keytool -exportcert -file client-cert-file -keystore client.keystore.jks
>>> -alias kafkaclient -storepass client-keystore-password
>>>
>>> keytool -importcert -file client-cert-file -keystore
>>> server.truststore.jks -alias kafkaclient -storepass
>>> server-truststore-password -noprompt
>>>
>>> *Configure broker: Add these lines at the end of your server.properties*
>>>
>>> listeners=SSL://:9093
>>>
>>> advertised.listeners=SSL://127.0.0.1:9093
>>>
>>> ssl.keystore.location=/tmp/acl/server.keystore.jks
>>>
>>> ssl.keystore.password=server-keystore-password
>>>
>>> ssl.key.password=server-key-password
>>>
>>> ssl.truststore.location=/tmp/acl/server.truststore.jks
>>>
>>> ssl.truststore.password=server-truststore-password
>>>
>>> security.inter.broker.protocol=SSL
>>>
>>> security.protocol=SSL
>>>
>>> ssl.client.auth=required
>>>
>>> allow.everyone.if.no.acl.found=false
>>>
>>> authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
>>>
>>> super.users=User:CN=KafkaBroker,O=Pivotal,C=UK
>>>
>>> *Configure producer: producer.properties*
>>>
>>> security.protocol=SSL
>>>
>>> ssl.truststore.location=/tmp/acl/client.truststore.jks
>>>
>>> ssl.truststore.password=client-truststore-password
>>>
>>> ssl.keystore.location=/tmp/acl/client.keystore.jks
>>>
>>> ssl.keystore.password=client-keystore-password
>>>
>>> ssl.key.password=client-key-password
>>>
>>>
>>> *Configure consumer: consumer.properties*
>>>
>>> security.protocol=SSL
>>>
>>> ssl.truststore.location=/tmp/acl/client.truststore.jks
>>>
>>> ssl.truststore.password=client-truststore-password
>>>
>>> ssl.keystore.location=/tmp/acl/client.keystore.jks
>>>
>>> ssl.keystore.password=client-keystore-password
>>>
>>> ssl.key.password=client-key-password
>>>
>>> group.id=testgroup
>>>
>>> *Create topic:*
>>>
>>> bin/kafka-topics.sh  --zookeeper localhost --create --topic testtopic
>>> --replication-factor 1 --partitions 1
>>>
>>>
>>> *Configure ACLs:*
>>>
>>> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181
>>> --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK" --producer
>>> --topic testtopic
>>>
>>> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181
>>> --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK" --consumer
>>> --topic testtopic --group test group
>>>

How to batch fetch/commit offset in Kafka 0.10 API

2017-07-05 Thread Martin Peng
Hi,

Does anyone know how to batch fetch/commit the Kafka topic offsets using
the new Kafka 0.10 API?
When we were using Kafka 0.81, we used BlockingChannel to send
OffsetCommitRequest and OffsetFetchRequest to do it in batch from Zk.

However in 0.10, everything is built for single consumer based. If I used
the commited() or commitAsync() it is only for single consumer. I can not
do batch commit anymore which I believe will slow down the system as we
have hundred of partitions for each topic.

Thanks
Martin


Kafka broker stays up but leaves cluster

2017-07-05 Thread John Yost
Hi Everyone,

We just upgraded to 0.10.0, and we've repeatedly seen a situation where a
broker is up and appears to be fetching replica state from the lead
replicas but the broker is not listed as part of the cluster. Any ideas as
to why this is happening? Anything I should grep for in the logs?

Thanks

--John


Re: Consumer blocks forever on unreachable ports

2017-07-05 Thread Raghu Angadi
quick update: This can be unblocked with consumer.wakeup(). So my current
work around is to run this in a separate thread and cancel it after a
timeout.

On Fri, Jun 30, 2017 at 11:14 AM, Raghu Angadi  wrote:

> Consumer blocks forever during initialization if the brokers are not
> reachable. 'request.timeout.ms' does not seem to be effective. How are
> users expected to detect this and handle?
>
> e.g:
> $ ./kafka-console-consumer.sh --bootstrap-server google.com:9092 --topic
> queries
>
> This blocks forever since google.com:9092 is not reachable. Consumer
> stack trace :
>
> [...]
> - locked <0xfdcba750> (a sun.nio.ch.Util$3)
> - locked <0xfdcba740> (a java.util.Collections$
> UnmodifiableSet)
> - locked <0xfdcba618> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(
> Selector.java:470)
> at org.apache.kafka.common.network.Selector.poll(
> Selector.java:286)
> at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:260)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> - locked <0xfdceacd0> (a org.apache.kafka.clients.
> consumer.internals.ConsumerNetworkClient)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:136)
> at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:197)
> - locked <0xfdcea350> (a org.apache.kafka.clients.
> consumer.internals.ConsumerCoordinator)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(ConsumerCoordinator.java:248)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:67)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
>
>


Re: Kafka Authorization and ACLs Broken

2017-07-05 Thread Rajini Sivaram
Hi Raghav,

Yes, you should be able to use AdminClient from 0.11.0. Take a look at the
Javadocs (
https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/admin/package-summary.html).
The integration tests may be useful too (
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
,
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
).

Regards,

Rajini

On Wed, Jul 5, 2017 at 4:10 PM, Raghav  wrote:

> Hi Rajini
>
> Now that 0.11.0 is out, can we use the Admin client ? Are there some
> example code for these ?
>
> Thanks.
>
> On Wed, May 24, 2017 at 9:06 PM, Rajini Sivaram 
> wrote:
>
>> Hi Raghav,
>>
>> Yes, you can create ACLs programmatically. Take a look at the use of
>> AclCommand.main in https://github.com/apache/kafk
>> a/blob/trunk/core/src/test/scala/integration/kafka/api/
>> EndToEndAuthorizationTest.scala
>>
>> If you can wait for the next release 0.11.0 that will be out next month,
>> you can use the new Java AdminClient, which allows you to do this in a much
>> neater way. Take a look at the interface https://github.com/a
>> pache/kafka/blob/trunk/clients/src/main/java/org/apache/
>> kafka/clients/admin/AdminClient.java
>> 
>>
>> If your release is not imminent, then you could build Kafka from the
>> 0.11.0 branch and use the new AdminClient. When the release is out, you can
>> switch over to the binary release.
>>
>> Regards,
>>
>> Rajini
>>
>>
>>
>> On Wed, May 24, 2017 at 4:13 PM, Raghav  wrote:
>>
>>> Hi Rajini
>>>
>>> Quick question on Configuring ACLs: We used bin/kafka-acls.sh to
>>> configure ACL rules, which internally uses Kafka Admin APIs to configure
>>> the ACLs.
>>>
>>> Can I add, remove and list ACLs via zk client libraries ? I want to be
>>> able to add, remove, list ACLs via my code rather than using Kafka-acl.sh.
>>> Is there a guideline for recommended set of libraries to use to do such
>>> operations ?
>>>
>>> As always thanks so much.
>>>
>>>
>>>
>>> On Wed, May 24, 2017 at 7:04 AM, Rajini Sivaram >> > wrote:
>>>
 Raghav/Darshan,

 Can you try these steps on a clean installation of Kafka? It works for
 me, so hopefully it will work for you. And then you can adapt to your
 scenario.

 *Create keystores and truststores:*

 keytool -genkey -alias kafka -keystore server.keystore.jks -dname
 "CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
 -keypass server-key-password

 keytool -exportcert -file server-cert-file -keystore
 server.keystore.jks -alias kafka -storepass server-keystore-password

 keytool -importcert -file server-cert-file -keystore
 server.truststore.jks -alias kafka -storepass server-truststore-password
 -noprompt

 keytool -importcert -file server-cert-file -keystore
 client.truststore.jks -alias kafkaclient -storepass
 client-truststore-password -noprompt


 keytool -genkey -alias kafkaclient -keystore client.keystore.jks -dname
 "CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
 -keypass client-key-password

 keytool -exportcert -file client-cert-file -keystore
 client.keystore.jks -alias kafkaclient -storepass client-keystore-password

 keytool -importcert -file client-cert-file -keystore
 server.truststore.jks -alias kafkaclient -storepass
 server-truststore-password -noprompt

 *Configure broker: Add these lines at the end of your server.properties*

 listeners=SSL://:9093

 advertised.listeners=SSL://127.0.0.1:9093

 ssl.keystore.location=/tmp/acl/server.keystore.jks

 ssl.keystore.password=server-keystore-password

 ssl.key.password=server-key-password

 ssl.truststore.location=/tmp/acl/server.truststore.jks

 ssl.truststore.password=server-truststore-password

 security.inter.broker.protocol=SSL

 security.protocol=SSL

 ssl.client.auth=required

 allow.everyone.if.no.acl.found=false

 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

 super.users=User:CN=KafkaBroker,O=Pivotal,C=UK

 *Configure producer: producer.properties*

 security.protocol=SSL

 ssl.truststore.location=/tmp/acl/client.truststore.jks

 ssl.truststore.password=client-truststore-password

 ssl.keystore.location=/tmp/acl/client.keystore.jks

 ssl.keystore.password=client-keystore-password

 ssl.key.password=client-key-password


 *Configure consumer: consumer.properties*

 security.protocol=SSL

 ssl.truststore.location=/tmp/acl/client.truststore.jks

 ssl.truststore.password=client-truststore-password

 ssl.keystore.location=/tmp/acl/client.ke

Kafka shutdown gracefully

2017-07-05 Thread Ghosh, Achintya (Contractor)
Hi team,

What is the command to shutdown kafka server gracefully instead of using 'kill 
-9 PID'?

If we use bin/kafka-server-stop.sh it shows "No kafka server to stop" but the 
service actually running and I see the PID by using "ps -ef|grep kafka"


Thanks
Achintya


Mirroring multiple clusters into one

2017-07-05 Thread Vahid S Hashemian
The literature suggests running the MM on the target cluster when possible 
(with the exception of when encryption is required for transferred data).
I am wondering if this is still the recommended approach when mirroring 
from multiple clusters to a single cluster (i.e. multiple MM instances).
Is there anything in particular (metric, specification, etc.) to consider 
before making a decision?

Thanks.
--Vahid




Re: exception processing streams ..

2017-07-05 Thread Guozhang Wang
I think Damina's finding is correct regarding the consumer bug, and there
is a PR being worked on already:
https://github.com/apache/kafka/pull/3489/files


Guozhang

On Tue, Jul 4, 2017 at 10:04 AM, Debasish Ghosh 
wrote:

> Thanks!
>
> On Tue, Jul 4, 2017 at 10:28 PM, Damian Guy  wrote:
>
> > Yes, System.exit(..)
> > streams.close(..) just attempts to stop any running stream threads.
> >
> > On Tue, 4 Jul 2017 at 17:49 Debasish Ghosh 
> > wrote:
> >
> >> Hi Damien -
> >>
> >> Just 1 question .. by "terminate the process" do you mean
> System.exit(..)
> >> ?
> >> Because streams.close() will not terminate the process - right ?
> >>
> >> regards.
> >>
> >> On Tue, Jul 4, 2017 at 9:36 PM, Debasish Ghosh <
> ghosh.debas...@gmail.com>
> >> wrote:
> >>
> >> > Hi Damian -
> >> >
> >> > I also thought so .. yes, I will add `KafkaStreams#setUncaughtE
> >> > xceptionHandler(...)` and Mesos should restart the process .. Thanks
> for
> >> > your prompt response ..
> >> >
> >> > regards.
> >> >
> >> > On Tue, Jul 4, 2017 at 9:30 PM, Damian Guy 
> >> wrote:
> >> >
> >> >> Hi Debasish,
> >> >>
> >> >> It looks like it is possibly a bug in the Kafka Consumer code.
> >> >> In your streams app you probably want to add an
> >> UncaughtExceptionHandler,
> >> >> i.e, via `KafkaStreams#setUncaughtExceptionHandler(...)` and
> terminate
> >> >> the
> >> >> process when you receive an uncaught exception. I guess Mesos should
> >> >> automatically restart it for you, then?
> >> >>
> >> >> Thanks,
> >> >> Damian
> >> >>
> >> >> On Tue, 4 Jul 2017 at 16:40 Debasish Ghosh  >
> >> >> wrote:
> >> >>
> >> >> > Hi -
> >> >> >
> >> >> > I have been running a streaming application on some data set.
> Things
> >> >> > usually run ok. Today I was trying to run the same application on
> >> Kafka
> >> >> > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster.
> >> After
> >> >> > running for quite some time, I got the following exception ..
> >> >> >
> >> >> > Exception in thread "StreamThread-1" java.lang.
> >> IllegalStateException:
> >> >> > > Attempt to retrieve exception from future which hasn't failed
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
> >> >> exception(RequestFuture.java:99)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
> >> >> isRetriable(RequestFuture.java:89)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> >> >> KafkaConsumer.java:1124)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamTask.
> >> >> commitOffsets(StreamTask.java:296)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamTask$1.
> >> >> run(StreamTask.java:79)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
> >> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamTask.
> >> >> commit(StreamTask.java:280)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> >> commitOne(StreamThread.java:807)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> >> commitAll(StreamThread.java:794)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> >> maybeCommit(StreamThread.java:769)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> >> runLoop(StreamThread.java:647)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> >> run(StreamThread.java:361)
> >> >> >
> >> >> >
> >> >> > Looks like some internal processing failed and control went to an
> >> >> > unexpected path. I have 2 questions ..
> >> >> >
> >> >> >1. any idea why this could happen ? I don't think it's related
> to
> >> >> Mesos
> >> >> >DC/OS though - may be some concurrency issue ?
> >> >> >2. how do I recover from such errors ? The stream processor has
> >> >> stopped
> >> >> >and the only way out is to restart the application.
> >> >> >
> >> >> > regards.
> >> >> >
> >> >> > --
> >> >> > Debasish Ghosh
> >> >> > http://manning.com/ghosh2
> >> >> > http://manning.com/ghosh
> >> >> >
> >> >> > Twttr: @debasishg
> >> >> > Blog: http://debasishg.blogspot.com
> >> >> > Code: http://github.com/debasishg
> >> >> >
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > Debasish Ghosh
> >> > http://manning.com/ghosh2
> >> > http://manning.com/ghosh
> >> >
> >> > Twttr: @debasishg
> >> > Blog: http://debasishg.blogspot.com
> >> > Code: http://github.com/debasishg
> >> >
> >>
> >>
> >>
> >> --
> >> Debasish Ghosh
> >> http://manning.com/ghosh2
> >> http

Re: Kafka metrics

2017-07-05 Thread Guozhang Wang
Hello Tom,

Currently only the client-side packages are using o.a.k.common.metrics for
metrics reporting, for broker-side metrics (including
LeaderElectionRateAndTimeMs), they are still implemented on yammer metrics (
com.yammer.metrics).


Guozhang


On Mon, Jul 3, 2017 at 4:20 AM, Tom Dearman  wrote:

> I have implemented org.apache.kafka.common.metrics.MetricsReporter and
> set it up using metric.reporters in the server properties. I don’t see all
> the metrics that I was expecting, for example I don’t see
> ‘LeaderElectionRateAndTimeMs’.  There seems to be another reporter you can
> implement and then register on kafka.metrics.reporters.  What is the
> difference between the metrics of the first and the second, and is it
> expected that I don’t see all metrics with the first.
>
> Tom




-- 
-- Guozhang


答复: org.apache.kafka.common.KafkaException: Failed to construct kafka producer

2017-07-05 Thread 罗 辉
thanks you ,got this problem solved with your advice


发件人: tao xiao 
发送时间: 2017年7月5日 20:55:45
收件人: users@kafka.apache.org
主题: Re: org.apache.kafka.common.KafkaException: Failed to construct kafka 
producer

you need to use org.apache.kafka.common.serialization.StringSerializer as
your ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG

On Wed, 5 Jul 2017 at 19:18 罗 辉  wrote:

> hi guys:
>
>   I got an exception which i searched searchhadoop.com and the archive as
> well and got no matches, here it is:
>
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> Exception in thread "main" org.apache.kafka.common.KafkaException: Failed
> to construct kafka producer
>  at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:342)
>  at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:191)
>  at com.zetyum.www.P$.main(P.scala:28)
>  at com.zetyum.www.P.main(P.scala)
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.StringDeserializer is not an instance
> of org.apache.kafka.common.serialization.Serializer
>  at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
>  at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:239)
>  ... 3 more
>
>
> add I tried 0.11 and 0.10 for scala 2.10, both got upper exception. Here
> is my code:
>
> import org.apache.kafka._
> import org.apache.kafka.clients._
> import org.apache.kafka.clients.producer._
> import org.apache.kafka.clients.consumer._
> import org.apache.kafka.common.serialization._
> import java.util.Properties
>
> object P {
>   def main(args: Array[String]) {
> val brokers = "localhost:9092"
> val topic = "test"
> // Zookeeper connection properties
> val props = new Properties()
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringDeserializer")
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> val producer = new KafkaProducer[String, String](props)
> while (true) {
>   val message = new ProducerRecord[String, String](topic, "1", "1")
>   println(message)
>   producer.send(message)
> }
> Thread.sleep(1000)
>   }
> }
>
> thanks for any advices
>
>
> San
>
>


Re: KStreams to KTable join

2017-07-05 Thread Guozhang Wang
When KStream / KTable is created from a source topic, both of them has a
record as a key-value pair, and the key is read from Kafka as the message
key.

What you showed in JSON seems only be the value of the message, and hence
I'm asking what's the key of the message, which will be the key of the
streams record.


Guozhang

On Thu, Jun 29, 2017 at 2:34 PM, Shekar Tippur  wrote:

> Guozhang,
>
> "1) if the coming record's key is null, then when it flows into the join
> processor inside the topology this record will be dropped as it cannot be
> joined with any records from the other stream."
>
> Can you please elaborate on the notion of key? By keys, do you mean kafka
> partition keys?
> For a json kstream to ktable example, can you please show me a sample
> input?
>
> For me, the ktable has:
>
> {"user_name": "Joe", "location": "US", "gender": "male"}
> {"user_name": "Julie", "location": "US", "gender": "female"}
>
> {"user_name": "Kawasaki", "location": "Japan", "gender": "male"}
>
> The kstream gets a event (KStreams)
>
> {"user": "Joe", "custom": {"choice":"vegan"}}
>
> Is this data right or do I need to have a key and then a json - as in:
>
>
> "joe", {"user_name": "Joe", "location": "US", "gender": "male"}
>
>
>
>
> On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang  wrote:
>
> > I think your issue is in two folds:
> >
> > 1) if the coming record's key is null, then when it flows into the join
> > processor inside the topology this record will be dropped as it cannot be
> > joined with any records from the other stream.
> >
> > 2) the NPE you are getting when giving it the non-null keyed record seems
> > because, you are using "SnowServerDeserialzer" (is it set as the default
> > key deserializer) which expects a SnowServerPOJOClass while the key "joe"
> > is typed String. You need to override the key deserialize when
> constructing
> > the "cache" KTable as well:
> >
> > 
> > KTable  cache = builder.table(Serdes.String(),
> > rawSerde, "cache", "local-cache");
> >
> >
> > Guozhang
> >
> >
> > On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur 
> wrote:
> >
> > > Guozhang
> > >
> > > I am using 0.10.2.1 version
> > >
> > > - Shekar
> > >
> > > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang 
> > > wrote:
> > >
> > > > Hi Shekar,
> > > >
> > > > Could you demonstrate your input data. More specifically, what are
> the
> > > key
> > > > types of your input streams, and are they not-null values? It seems
> the
> > > > root cause is similar to the other thread you asked on the mailing
> > list.
> > > >
> > > > Also, could you provide your used Kafka Streams version?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur 
> > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I am having trouble implementing streams to table join.
> > > > >
> > > > > I have 2 POJO's each representing streams and table data
> structures.
> > > raw
> > > > > topic contains streams and cache topic contains table structure.
> The
> > > join
> > > > > is not happening since the print statement is not being called.
> > > > >
> > > > > Appreciate any pointers.
> > > > >
> > > > > - Shekar
> > > > >
> > > > > raw.leftJoin(cache, new ValueJoiner > > > > CachePOJOClass,RawPOJOClass>() {
> > > > >
> > > > > @Override
> > > > > public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > > > >
> > > > > String src=r.getSource();
> > > > > String cSrc=c.getSnowHost();
> > > > > Custom custom=new Custom();
> > > > >
> > > > > if (src.matches(snowSrc)){
> > > > > System.out.println("In apply code");
> > > > > custom.setAdditionalProperty("custom",cSrc.getAll());
> > > > > r.setCustom(custom);
> > > > > }
> > > > > return r;
> > > > > }
> > > > > }).to("parser");
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Fwd: kafka instalation

2017-07-05 Thread Elahe Bagheri
-- Forwarded message --
From: Elahe Bagheri 
Date: Sat, Jul 1, 2017 at 4:16 PM
Subject: kafka instalation
To: users@kafka.apache.org


Dear,


Recently I used to install kafka on ubuntu server 16.0.4. I installed jdk8
and zookeepr successfully. there was no problem with installing kafka,
except that i couldn't see the "" message. I attached the terminal shot.
I would be soo appreciated if you could help me to pass it.

Cheers and have a nice day.
Elahe,