Re: Partition election on consumer

2013-10-11 Thread Markus Roder
Thanks Neha, really appreciate your assistance


2013/10/9 Neha Narkhede 

> Kafka's consumer rebalancing strategy is explained in detail here -
> http://kafka.apache.org/documentation.html#distributionimpl
> Hope that helps!
>
> -Neha
>
>
> On Tue, Oct 8, 2013 at 11:42 PM, Markus Roder  >wrote:
>
> > Hi Neha,
> >
> > thanks for this information.
> > Can you give me a hint for implementing a own rebalancing strategy?
> >
> > Thanks in advance
> > Markus
> >
> >
> > 2013/10/8 Neha Narkhede 
> >
> > > Currently there is no way to invoke a callback on the rebalance
> > operation.
> > > But this is certainly something to consider for Kafka 0.9 since we are
> > > planning a client rewrite for that release. You can find the proposal
> in
> > > progress here -
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI
> > >
> > > For now your best bet is to use the SimpleConsumer and implement your
> own
> > > rebalancing strategy. Another hacky approach is to register zookeeper
> > > watches on the /consumers//owners path that indicates the
> > partition
> > > ownership change.
> > >
> > > Thanks,
> > > Neha
> > > On Oct 8, 2013 2:12 AM, "Markus Roder" 
> wrote:
> > >
> > > > Hi,
> > > >
> > > > we currently face a "problem" on our consumer cluster, which may
> have a
> > > > simple solution. Never the less I could not find this solution yet.
> > > >
> > > > Description of problem:
> > > > 1 kafka topic with 24 partitions (kafka version 0.8 Beta1
> > > > 2 or more consumers in same consumer group. Each consumer processes
> its
> > > > partitions by aggregating topic data into a relational database. Each
> > > > consumer hashes the aggregation data locally for commiting data into
> > the
> > > > relational database. After commit to database the consumerConnector
> > > commits
> > > > the offsets to kafka.
> > > >
> > > > Problem is: If I connect a new consumer, the consumerconnector
> > > recalculates
> > > > the partitions to read from on each consumer instance. That causes
> our
> > > > system currently to process topic-data twice because of the local
> > > > aggregation within the consumer.
> > > >
> > > > Is there any possibility to catch the event of new partition
> selection
> > in
> > > > conumserConnector to commit the offsets and data before reconnecting
> to
> > > new
> > > > partitions?
> > > >
> > > > Thanks in advance
> > > > Markus
> > > >
> > > > --
> > > > Markus Roder
> > > > Distelweg 4
> > > > 97318 Kitzingen
> > > > Mail: roder.marku...@gmail.com
> > > > Profil: http://gplus.to/markusroder
> > > >
> > >
> >
> >
> >
> > --
> > Markus Roder
> > Distelweg 4
> > 97318 Kitzingen
> > Mail: roder.marku...@gmail.com
> > Profil: http://gplus.to/markusroder
> >
>



-- 
Markus Roder
Distelweg 4
97318 Kitzingen
Mail: roder.marku...@gmail.com
Profil: http://gplus.to/markusroder


Re: Best way to start / stop / restart a Consumer?

2013-10-11 Thread Tanguy tlrx
Thanks Jun,

Jira issue has been filled: https://issues.apache.org/jira/browse/KAFKA-1083

By the way, what is the recommended way to start, stop and restart a
ConsumerConnector in the same running JMV?

Thanks,


2013/10/10 Jun Rao 

> Each time we create a new consumer connector, we assign a random consumer
> id by default. You can try setting "consumer.id" to use a fixed consumer
> id. In any case, we probably should deregister those beans when shutting
> down the connector. Could you file a jira?
>
> Thanks,
>
> Jun
>
>
> On Thu, Oct 10, 2013 at 7:55 AM, Tanguy tlrx  wrote:
>
> > It's in 0.8.
> >
> > The JMX names are not exactly the same, but I see 2 beans with similar
> > names, something like:
> >
> >
> >
> "my_consumer_group-my_consumer-mytopic-my_consumer_hostname.thing.com-1381416134138-3573c8bf-7-FetchQueueSize"
> >
> > -- Tanguy
> >
> >
> >
> >
> >
> > 2013/10/10 Jun Rao 
> >
> > > Is that in 0.7 or 0.8? JMX won't allow a bean with the same name to be
> > > registered twice. Do you see 2 beans with similar names? What are the
> > exact
> > > bean names?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Oct 10, 2013 at 2:42 AM, Tanguy tlrx 
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > In our application, we are currently starting a Kafka Consumer with
> the
> > > > following lines of code:
> > > >
> > > > connector = Consumer.createJavaConsumerConnector(consumerConfig);
> > > > streams = connector .createMessageStreams(map);
> > > >
> > > > Then, each KafkaStream is processed in a dedicated thread per topic
> and
> > > > partition, as documented here
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> > > >
> > > > We need to stop (halt) and restart the consumer. Today, we just call:
> > > >
> > > > connector.shutdown()
> > > >
> > > > and wait for threads to terminate.
> > > >
> > > > To restart the consumer, we create a new connector:
> > > >
> > > > connector = Consumer.createJavaConsumerConnector(consumerConfig);
> > > >
> > > > When restarting is complete, I can see that a JMX MBean (we  use
> > Metrics
> > > > JMXReporter) like "ZookeeperConsumerConnector" is registered twice.
> > This
> > > > bean is not registered when the previous connector instance is shut
> > down.
> > > >
> > > > What is the best way to stop/halt and restart a Consumer using the
> Java
> > > > API?
> > > >
> > > > Is it normal that the MBean is not unregistered at shutdown time?
> > > >
> > > > Thanks,
> > > >
> > > > -- Tanguy
> > > >
> > >
> >
> >
> >
> > --
> > -- Tanguy
> > twitter @tlrx
> > https://github.com/tlrx
> >
>



-- 
-- Tanguy
twitter @tlrx
https://github.com/tlrx


Re: Best way to start / stop / restart a Consumer?

2013-10-11 Thread Neha Narkhede
Best way to shutdown is to invoke the shutdown() API. To restart, you need
to createMessageStreams()

Thanks,
Neha
On Oct 11, 2013 6:10 AM, "Tanguy tlrx"  wrote:

> Thanks Jun,
>
> Jira issue has been filled:
> https://issues.apache.org/jira/browse/KAFKA-1083
>
> By the way, what is the recommended way to start, stop and restart a
> ConsumerConnector in the same running JMV?
>
> Thanks,
>
>
> 2013/10/10 Jun Rao 
>
> > Each time we create a new consumer connector, we assign a random consumer
> > id by default. You can try setting "consumer.id" to use a fixed consumer
> > id. In any case, we probably should deregister those beans when shutting
> > down the connector. Could you file a jira?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Oct 10, 2013 at 7:55 AM, Tanguy tlrx  wrote:
> >
> > > It's in 0.8.
> > >
> > > The JMX names are not exactly the same, but I see 2 beans with similar
> > > names, something like:
> > >
> > >
> > >
> >
> "my_consumer_group-my_consumer-mytopic-my_consumer_hostname.thing.com-1381416134138-3573c8bf-7-FetchQueueSize"
> > >
> > > -- Tanguy
> > >
> > >
> > >
> > >
> > >
> > > 2013/10/10 Jun Rao 
> > >
> > > > Is that in 0.7 or 0.8? JMX won't allow a bean with the same name to
> be
> > > > registered twice. Do you see 2 beans with similar names? What are the
> > > exact
> > > > bean names?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, Oct 10, 2013 at 2:42 AM, Tanguy tlrx 
> > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > In our application, we are currently starting a Kafka Consumer with
> > the
> > > > > following lines of code:
> > > > >
> > > > > connector = Consumer.createJavaConsumerConnector(consumerConfig);
> > > > > streams = connector .createMessageStreams(map);
> > > > >
> > > > > Then, each KafkaStream is processed in a dedicated thread per topic
> > and
> > > > > partition, as documented here
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> > > > >
> > > > > We need to stop (halt) and restart the consumer. Today, we just
> call:
> > > > >
> > > > > connector.shutdown()
> > > > >
> > > > > and wait for threads to terminate.
> > > > >
> > > > > To restart the consumer, we create a new connector:
> > > > >
> > > > > connector = Consumer.createJavaConsumerConnector(consumerConfig);
> > > > >
> > > > > When restarting is complete, I can see that a JMX MBean (we  use
> > > Metrics
> > > > > JMXReporter) like "ZookeeperConsumerConnector" is registered twice.
> > > This
> > > > > bean is not registered when the previous connector instance is shut
> > > down.
> > > > >
> > > > > What is the best way to stop/halt and restart a Consumer using the
> > Java
> > > > > API?
> > > > >
> > > > > Is it normal that the MBean is not unregistered at shutdown time?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > -- Tanguy
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Tanguy
> > > twitter @tlrx
> > > https://github.com/tlrx
> > >
> >
>
>
>
> --
> -- Tanguy
> twitter @tlrx
> https://github.com/tlrx
>


Re: Best way to start / stop / restart a Consumer?

2013-10-11 Thread Tanguy tlrx
Hi Neha,

Thanks for the tip but calling again createMessageStreams() on a
ConsumerConnector object that has been shut down throws a
NullPointerException.

Did you manage to get it work?



2013/10/11 Neha Narkhede 

> Best way to shutdown is to invoke the shutdown() API. To restart, you need
> to createMessageStreams()
>
> Thanks,
> Neha
> On Oct 11, 2013 6:10 AM, "Tanguy tlrx"  wrote:
>
> > Thanks Jun,
> >
> > Jira issue has been filled:
> > https://issues.apache.org/jira/browse/KAFKA-1083
> >
> > By the way, what is the recommended way to start, stop and restart a
> > ConsumerConnector in the same running JMV?
> >
> > Thanks,
> >
> >
> > 2013/10/10 Jun Rao 
> >
> > > Each time we create a new consumer connector, we assign a random
> consumer
> > > id by default. You can try setting "consumer.id" to use a fixed
> consumer
> > > id. In any case, we probably should deregister those beans when
> shutting
> > > down the connector. Could you file a jira?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Oct 10, 2013 at 7:55 AM, Tanguy tlrx 
> wrote:
> > >
> > > > It's in 0.8.
> > > >
> > > > The JMX names are not exactly the same, but I see 2 beans with
> similar
> > > > names, something like:
> > > >
> > > >
> > > >
> > >
> >
> "my_consumer_group-my_consumer-mytopic-my_consumer_hostname.thing.com-1381416134138-3573c8bf-7-FetchQueueSize"
> > > >
> > > > -- Tanguy
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 2013/10/10 Jun Rao 
> > > >
> > > > > Is that in 0.7 or 0.8? JMX won't allow a bean with the same name to
> > be
> > > > > registered twice. Do you see 2 beans with similar names? What are
> the
> > > > exact
> > > > > bean names?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Thu, Oct 10, 2013 at 2:42 AM, Tanguy tlrx 
> > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > In our application, we are currently starting a Kafka Consumer
> with
> > > the
> > > > > > following lines of code:
> > > > > >
> > > > > > connector = Consumer.createJavaConsumerConnector(consumerConfig);
> > > > > > streams = connector .createMessageStreams(map);
> > > > > >
> > > > > > Then, each KafkaStream is processed in a dedicated thread per
> topic
> > > and
> > > > > > partition, as documented here
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> > > > > >
> > > > > > We need to stop (halt) and restart the consumer. Today, we just
> > call:
> > > > > >
> > > > > > connector.shutdown()
> > > > > >
> > > > > > and wait for threads to terminate.
> > > > > >
> > > > > > To restart the consumer, we create a new connector:
> > > > > >
> > > > > > connector = Consumer.createJavaConsumerConnector(consumerConfig);
> > > > > >
> > > > > > When restarting is complete, I can see that a JMX MBean (we  use
> > > > Metrics
> > > > > > JMXReporter) like "ZookeeperConsumerConnector" is registered
> twice.
> > > > This
> > > > > > bean is not registered when the previous connector instance is
> shut
> > > > down.
> > > > > >
> > > > > > What is the best way to stop/halt and restart a Consumer using
> the
> > > Java
> > > > > > API?
> > > > > >
> > > > > > Is it normal that the MBean is not unregistered at shutdown time?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > -- Tanguy
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Tanguy
> > > > twitter @tlrx
> > > > https://github.com/tlrx
> > > >
> > >
> >
> >
> >
> > --
> > -- Tanguy
> > twitter @tlrx
> > https://github.com/tlrx
> >
>



-- 
-- Tanguy
twitter @tlrx
https://github.com/tlrx


Re: Kafka cannot produce message

2013-10-11 Thread Jun Rao
Apache email doesn't allow attachment. Could you try the latest code in the
0.8 branch and see if the same issue exists?

Thanks,

Jun


On Thu, Oct 10, 2013 at 10:20 PM, Jiang Jacky  wrote:

>
> for those 2 logs directly in my ~/kafka_2.8.0-0.8.0-beta1, Maybe I did not
> set the log output to indicated directory.
> I will attach my 3 logs to you. server.log, controller.log and
> state-change.log
>
> for testing, I changed the port to 9093, and each time, I produce message
> it gives me error
>
> [2013-10-11 05:12:16,948] INFO Fetching metadata from broker
> id:0,host:localhost,por
>
>   t:9093 with
> correlation id 0 for 1 topic(s) Set(my-replicated-topic) (kafka.client.C
>
>
> lientUtils$)
> [2013-10-11 05:12:16,966] INFO Connected to localhost:9093 for producing
> (kafka.prod
>
>   ucer.SyncProducer)
> [2013-10-11 05:12:16,997] ERROR [KafkaApi-0] Error while fetching metadata
> for parti
>
> tion [my-replicated-topic,0]
> (kafka.server.KafkaApis)
> kafka.common.LeaderNotAvailableException: Leader not available for
> partition [my-rep
>
> licated-topic,0]
> at
> kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:468)
> at
> kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:456)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>
>
> la:206)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>
>
> la:206)
> at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scal
>
>
> a:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.immutable.List.map(List.scala:45)
> at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:456)
> at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:452)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>
>
> la:206)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>
>
> la:206)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.immutable.Set$Set1.map(Set.scala:68)
> at
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:452)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:724)
> [2013-10-11 05:12:17,066] INFO Disconnecting from localhost:9093
> (kafka.producer.Syn
>
>   cProducer)
> [2013-10-11 05:12:17,069] INFO Closing socket connection to /127.0.0.1.
> (kafka.netwo
>
> rk.Processor)
> [2013-10-11 05:12:17,099] WARN Error while fetching metadatapartition
> 0 lead
>
>   er: nonereplicas:
>   isr:isUnderReplicated: false for topic partition
>
>
>[my-replicated-topic,0]: [class
> kafka.common.LeaderNotAvailableException] (kafka.pr
>
>
> oducer.BrokerPartitionInfo)
> [2013-10-11 05:12:17,117] WARN Failed to collate messages by
> topic,partition due to:
>
>No leader for
> any partition in topic my-replicated-topic (kafka.producer.async.Defa
>
>
> ultEventHandler)
> [2013-10-11 05:12:17,119] INFO Back off for 100 ms before retrying send.
> Remaining r
>
>   etries = 3
> (kafka.producer.async.DefaultEventHandler)
> [2013-10-11 05:12:17,222] INFO Fetching metadata from broker
> id:0,host:localhost,por
>
>   t:9093 with
> correlation id 2 for 1 topic(s) Set(my-replicated-topic) (kafka.client.C
>
>
> lientUtils$)
> [2013-10-11 05:12:17,223] INFO Connected to localhost:9093 for producing
> (kafka.prod
>
>   ucer.SyncProducer)
> [2013-10-11 05:12:17,225] ERROR [KafkaApi-0] Error while fetching metadata
> for parti
>
> tion [my-replicated-topic,0]
> (kafka.server.KafkaApis)
> kafka.common.LeaderNotAvailableException: Leader not available for
> partition [my-rep
>
> licated-topic,0]

Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread Jun Rao
You could use a broker list or a VIP to deal with broker failures.

Thanks,

Jun


On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com  wrote:

> Hi guys,
>
> I'm trying to maintain a bunch of simple kafka consumer to consume messages
> from brokers. I know there is a way to send TopicMetadataRequest to broker
> and get the response from the broker. But you have to specify the broker
> list to query the information. But broker might not be available because of
> some failure. My question is is there any api I can call and query broker
> metadata for topic/partition directly from zookeeper? I know I can query
> that information using zookeeper API. But that's not friendly datastructure
>  like the TopicMetadata/PartitionMetadata.  Thank you!
>
> Best,
> Siyuan
>


Re: Question about auto-rebalancing

2013-10-11 Thread Jun Rao
If you are using simple consumer, you are responsible for dealing with
leader replica changes. When the leader changes, an error code will be
returned in the fetch response and you need to refresh metadata and retry
the fetch request to the new leader. For details, see
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

Thanks,

Jun


On Thu, Oct 10, 2013 at 11:08 PM, hsy...@gmail.com  wrote:

> Hi guys,
>
> Here is a case I observed, I have a single-node 3 broker instance cluster.
> I created 1 topic with 2 partitions and 2 replica for each partition. The
> initial distribution is like this topic1/partition0 ->(broker0, broker2)
>  topic1/partition1 ->(broker1,broker2). So broker0 is leader broker for
> partition0 and broker1 is the leader broker for partition1.  I then kill
> broker0, broker3 becomes leader broker for partition1, then I kill broker2,
> broker1 becomes leader broker of both partition0 and partition1 which is
> fine.  But when I restart broker0, broker2, after they synced with broker1,
> they are just replica broker for partition0 and partition1. So my
> consumers(simple consumer) really don't know which broker it should read
> from. I found a command to that will force re-balance after failover, but
> isn't there any automatic way to rebalance the lleader broker?
>
> Best regards,
> Siyuan
>


Re: Messages from producer are immediately going to /tmp/logs in kafka

2013-10-11 Thread Jun Rao
Those messages could still be in file system pagecache and may not be
flushed to disks.

Thanks,

Jun


On Thu, Oct 10, 2013 at 11:20 PM, Monika Garg  wrote:

> Hi,
>
> In kafka-0.8 there are three important properties given for
>
> log.flush.interval.messages=1
>
> log.flush.interval.ms=90
>
> log.flush.scheduler.interval.ms=90
>
> I have set the above properties as I have mentioned above.Then I started
> Kafka Console Producer given with kafka bundle-0.8 and gave some
> messages.The message are going to log partitions of given topic
> immediately.
>
> I am confused why the messages are flushing to /tmp/logs immediately,They
> should wait as per log.flush.interval.messages=
> 1 or  log.flush.interval.ms=90.
>
> Please check.
>
> --
> *Moniii*
>
>
>
> --
> *Moniii*
>


Re: Hi, Can anyone tell me why I cannot produce any message to remote kafka server from my local machine

2013-10-11 Thread Neha Narkhede
The issue doesn't seem to point to anything. Do you mind sending it in a
pastebin?



On Thu, Oct 10, 2013 at 9:54 PM, Jiang Jacky  wrote:

> Hi, Thanks Joe
> I have disabled the firewall, but I did not tried that again, I changed
> another way by using Netty sending the socket package to kafka server, then
> it will produce in the local. But recently I suddenly ran into below issue,
> can you have a look?
> https://mail.google.com/mail/ca/u/0/#label/TechDiscuss/141a5c074641dd70
> Thanks.
>
>
> 2013/10/10 Joseph Lawson 
>
> > another thing to note is that your brokers may be running on port 6667
> > instead of 9092. it appears that the default recently changed from 9092
> to
> > 6667 so make sure if you have firewall rules for 9092 that you
> specifically
> > set the port setting in your configuration to what you expect.  -Joe
> >
> > Sent from my Droid Charge on Verizon 4G LTE Joseph Lawson wrote:
> > it may be that the broker is reporting its internal ip address to
> > zookeeper which then may interfere with your ability to submit the the
> > broker with a producer that isn't on AWS's network.  you can make the
> > broker report the public dns name instead of the internal host name
> which i
> > believe it reports by default.  just update the host.name setting on the
> > broker to the public dns address.  hope this helps ..
> >
> > -Joe
> >
> > Sent from my Droid Charge on Verizon 4G LTE Jiang Jacky wrote:
> > did anyone experience same issue? can you share with me?
> > Thanks a lot.
> >
> >
> > 2013/10/7 Jun Rao 
> >
> > > Kafka port is documented in
> > > http://kafka.apache.org/documentation.html#brokerconfigs
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Sat, Oct 5, 2013 at 12:05 PM, Jiang Jacky 
> > wrote:
> > >
> > > > Hi, I tried to setup the host.name in servier.properties, it doesn't
> > > work.
> > > > I believe it is the network security issue. However, I create a new
> > > > instance in the same security group without kafka, zookeeper, it does
> > > work,
> > > > it can still produce to kafka server. but when I change to another
> ec2
> > > > account, then create the same instance, and it cannot produce to
> kafka
> > > > server.
> > > > I pay attention that there is no outbound port setting in the
> security
> > > > group configuration of kafka server ec2, I think if we shall have to
> > set
> > > > the outbound port for the firewall?
> > > > Do you guys know which outbound port should be opened for kafka
> server?
> > > > Thanks
> > > >
> > > >
> > > > 2013/10/5 Guozhang Wang 
> > > >
> > > > > Hello Jacky,
> > > > >
> > > > > Have you read this FAQ:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Oct 4, 2013 at 10:41 PM, Jiang Jacky 
> > > > wrote:
> > > > >
> > > > > > It is very weird, I have a kafka cluster in EC2, There is no any
> > > > problem
> > > > > to
> > > > > > produce message from one of node by same producer. But when I
> move
> > > the
> > > > > > producer to my local machine at home, then it gives me the below
> > > error:
> > > > > > Failed to send messages after 3 tries.
> > > > > >
> > > > > > Can anyone tell me how do I fix this issue? I have opened all
> ports
> > > of
> > > > my
> > > > > > machine at my home, and the security group is also opened for
> kafka
> > > > > server
> > > > > > and zookeeper in EC2. Everything is fine, but I just cannot send
> > any
> > > > > > message to kafka server.
> > > > > > Please help me.
> > > > > > Thanks Everyone!
> > > > > >
> > > > > > Jacky
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>


Re: kafka.common.UnknownTopicOrPartitionException

2013-10-11 Thread Neha Narkhede
The state-change.log captures critical state change information about all
partitions in a Kafka cluster and we have
toolsthat
you can use on the state-change.log to self troubleshoot any
state/leader election issues with particular partitions.




On Thu, Oct 10, 2013 at 10:37 PM, Snehalata Nagaje <
snehalata.nag...@harbingergroup.com> wrote:

> Yes, but I have commented the the option in log4J. properties
>
> Thanks,
> Snehalata
>
> --**
> From: "Monika Garg" 
> Sent: Friday, October 11, 2013 11:01 AM
>
> To: 
> Subject: Re: kafka.common.**UnknownTopicOrPartitionExcepti**on
>
>  Hi Snehalata
>> state-change log are automatically created if you are using kafka-0.8
>> version.
>>
>>
>> On Wed, Oct 9, 2013 at 10:42 AM, Snehalata Nagaje <
>> snehalata.nagaje@**harbingergroup.com>
>> wrote:
>>
>>  Actually state-change log setting is not on. So could not find the
>>> state-change log.
>>>
>>> --
>>> From: "Jun Rao" 
>>> Sent: Tuesday, October 08, 2013 8:24 PM
>>> To: 
>>> Subject: Re: kafka.common.UnknownTopicOrPartitionException
>>>
>>>
>>>  Any error in the controller and state-change log?
>>>

 Thanks,

 Jun


 On Tue, Oct 8, 2013 at 5:41 AM, Snehalata Nagaje <
 snehalata.nagaje@**harbingergr**oup.com <
 snehalata.nagaje@**harbingergroup.com
 >>
 wrote:

  Hi ,

>
> I am getting below exception when I consume any message from given
> topic,
> when I checked the log file,, topic is already exist in kafak server.
> still
> It is giving the exceptions.
>
>
> kafka.common.UnknownTopicOrPartitionException: Topic
> topic_c7d4daa8-cdfd-40e6-b3de-a31b3ae30de9 partition 0 doesn't
> exist
> on 0
> at
> kafka.server.ReplicaManager.getLeaderReplicaIfLocal(**
> ReplicaManager.scala:170)
> at kafka.server.KafkaApis$$anonfun$14.apply(KafkaApis.**
> scala:330)
> at kafka.server.KafkaApis$$anonfun$14.apply(KafkaApis.**
> scala:325)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:206)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:206)
> at scala.collection.immutable.
> Map$Map1.foreach(Map.scala:**
> 105)
> at
> scala.collection.TraversableLike$class.map(**
> TraversableLike.scala:206)
> at scala.collection.immutable.Map$Map1.map(Map.scala:93)
> at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.**
> **
> scala:325)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
> at
> kafka.server.KafkaRequestHandler.run(
> KafkaRequestHandler.scala:41)
> at java.lang.Thread.run(Thread.java:679)
>
> This problem has started after changing the zookeeper instance ,
> actually
> we have pointed kafka to new zookeeper instance.
>
> Thanks,
> Snehalata
> Disclaimer:
> This e-mail may contain Privileged/Confidential information and is
> intended only for the individual(s) named. Any review, retransmission,
> dissemination or other use of, or taking of any action in reliance upon
> this information by persons or entities other than the intended
> recipient
> is prohibited. Please notify the sender, if you have received this
> e-mail
> by mistake and delete it from your system. Information in this message
> that
> does not relate to the official business of the company shall be
> understood
> as neither given nor endorsed by it. E-mail transmission cannot be
> guaranteed to be secure or error-free. The sender does not accept
> liability
> for any errors or omissions in the contents of this message which arise
> as
> a result of e-mail transmission. If verification is required please
> request
> a hard-copy version. Visit us at http://www.harbingergroup.com/
>
>
>
>   Disclaimer:

>>> This e-mail may contain Privileged/Confidential information and is
>>> intended only for the individual(s) named. Any review, retransmission,
>>> dissemination or other use of, or taking of any action in reliance upon
>>> this information by persons or entities other than the intended recipient
>>> is prohibited. Please notify the sender, if you have received this e-mail
>>> by mistake and delete it from your system. Information in this message
>>> that
>>> does not relate to the official business of the company shall be
>>> understood
>>> as neither given nor endorsed by it. E-mail transmission cannot be
>>> guaranteed to be 

Re: Kafka cannot produce message

2013-10-11 Thread Neha Narkhede
Does the change to /etc/hosts resolve this issue as well?


On Fri, Oct 11, 2013 at 8:16 AM, Jun Rao  wrote:

> Apache email doesn't allow attachment. Could you try the latest code in the
> 0.8 branch and see if the same issue exists?
>
> Thanks,
>
> Jun
>
>
> On Thu, Oct 10, 2013 at 10:20 PM, Jiang Jacky  wrote:
>
> >
> > for those 2 logs directly in my ~/kafka_2.8.0-0.8.0-beta1, Maybe I did
> not
> > set the log output to indicated directory.
> > I will attach my 3 logs to you. server.log, controller.log and
> > state-change.log
> >
> > for testing, I changed the port to 9093, and each time, I produce message
> > it gives me error
> >
> > [2013-10-11 05:12:16,948] INFO Fetching metadata from broker
> > id:0,host:localhost,por
> >
> >   t:9093 with
> > correlation id 0 for 1 topic(s) Set(my-replicated-topic) (kafka.client.C
> >
> >
> > lientUtils$)
> > [2013-10-11 05:12:16,966] INFO Connected to localhost:9093 for producing
> > (kafka.prod
> >
> >   ucer.SyncProducer)
> > [2013-10-11 05:12:16,997] ERROR [KafkaApi-0] Error while fetching
> metadata
> > for parti
> >
> > tion [my-replicated-topic,0]
> > (kafka.server.KafkaApis)
> > kafka.common.LeaderNotAvailableException: Leader not available for
> > partition [my-rep
> >
> > licated-topic,0]
> > at
> > kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:468)
> > at
> > kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:456)
> > at
> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
> >
> >
> > la:206)
> > at
> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
> >
> >
> > la:206)
> > at
> > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scal
> >
> >
> > a:61)
> > at scala.collection.immutable.List.foreach(List.scala:45)
> > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > at scala.collection.immutable.List.map(List.scala:45)
> > at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:456)
> > at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:452)
> > at
> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
> >
> >
> > la:206)
> > at
> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
> >
> >
> > la:206)
> > at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
> > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > at scala.collection.immutable.Set$Set1.map(Set.scala:68)
> > at
> > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:452)
> > at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> > at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > at java.lang.Thread.run(Thread.java:724)
> > [2013-10-11 05:12:17,066] INFO Disconnecting from localhost:9093
> > (kafka.producer.Syn
> >
> >   cProducer)
> > [2013-10-11 05:12:17,069] INFO Closing socket connection to /127.0.0.1.
> > (kafka.netwo
> >
> > rk.Processor)
> > [2013-10-11 05:12:17,099] WARN Error while fetching metadatapartition
> > 0 lead
> >
> >   er: nonereplicas:
> >   isr:isUnderReplicated: false for topic partition
> >
> >
> >[my-replicated-topic,0]: [class
> > kafka.common.LeaderNotAvailableException] (kafka.pr
> >
> >
> > oducer.BrokerPartitionInfo)
> > [2013-10-11 05:12:17,117] WARN Failed to collate messages by
> > topic,partition due to:
> >
> >No leader for
> > any partition in topic my-replicated-topic (kafka.producer.async.Defa
> >
> >
> > ultEventHandler)
> > [2013-10-11 05:12:17,119] INFO Back off for 100 ms before retrying send.
> > Remaining r
> >
> >   etries = 3
> > (kafka.producer.async.DefaultEventHandler)
> > [2013-10-11 05:12:17,222] INFO Fetching metadata from broker
> > id:0,host:localhost,por
> >
> >   t:9093 with
> > correlation id 2 for 1 topic(s) Set(my-replicated-topic) (kafka.client.C
> >
> >
> > lientUtils$)
> > [2013-10-11 05:12:17,223] INFO Connected to localhost:9093 for producing
> > (kafka.prod
> >
> >   ucer.

Re: Best way to start / stop / restart a Consumer?

2013-10-11 Thread Neha Narkhede
Can you send around your code snippet and related NPE stack trace?


On Fri, Oct 11, 2013 at 8:01 AM, Tanguy tlrx  wrote:

> Hi Neha,
>
> Thanks for the tip but calling again createMessageStreams() on a
> ConsumerConnector object that has been shut down throws a
> NullPointerException.
>
> Did you manage to get it work?
>
>
>
> 2013/10/11 Neha Narkhede 
>
> > Best way to shutdown is to invoke the shutdown() API. To restart, you
> need
> > to createMessageStreams()
> >
> > Thanks,
> > Neha
> > On Oct 11, 2013 6:10 AM, "Tanguy tlrx"  wrote:
> >
> > > Thanks Jun,
> > >
> > > Jira issue has been filled:
> > > https://issues.apache.org/jira/browse/KAFKA-1083
> > >
> > > By the way, what is the recommended way to start, stop and restart a
> > > ConsumerConnector in the same running JMV?
> > >
> > > Thanks,
> > >
> > >
> > > 2013/10/10 Jun Rao 
> > >
> > > > Each time we create a new consumer connector, we assign a random
> > consumer
> > > > id by default. You can try setting "consumer.id" to use a fixed
> > consumer
> > > > id. In any case, we probably should deregister those beans when
> > shutting
> > > > down the connector. Could you file a jira?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, Oct 10, 2013 at 7:55 AM, Tanguy tlrx 
> > wrote:
> > > >
> > > > > It's in 0.8.
> > > > >
> > > > > The JMX names are not exactly the same, but I see 2 beans with
> > similar
> > > > > names, something like:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> "my_consumer_group-my_consumer-mytopic-my_consumer_hostname.thing.com-1381416134138-3573c8bf-7-FetchQueueSize"
> > > > >
> > > > > -- Tanguy
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > 2013/10/10 Jun Rao 
> > > > >
> > > > > > Is that in 0.7 or 0.8? JMX won't allow a bean with the same name
> to
> > > be
> > > > > > registered twice. Do you see 2 beans with similar names? What are
> > the
> > > > > exact
> > > > > > bean names?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Thu, Oct 10, 2013 at 2:42 AM, Tanguy tlrx  >
> > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > In our application, we are currently starting a Kafka Consumer
> > with
> > > > the
> > > > > > > following lines of code:
> > > > > > >
> > > > > > > connector =
> Consumer.createJavaConsumerConnector(consumerConfig);
> > > > > > > streams = connector .createMessageStreams(map);
> > > > > > >
> > > > > > > Then, each KafkaStream is processed in a dedicated thread per
> > topic
> > > > and
> > > > > > > partition, as documented here
> > > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> > > > > > >
> > > > > > > We need to stop (halt) and restart the consumer. Today, we just
> > > call:
> > > > > > >
> > > > > > > connector.shutdown()
> > > > > > >
> > > > > > > and wait for threads to terminate.
> > > > > > >
> > > > > > > To restart the consumer, we create a new connector:
> > > > > > >
> > > > > > > connector =
> Consumer.createJavaConsumerConnector(consumerConfig);
> > > > > > >
> > > > > > > When restarting is complete, I can see that a JMX MBean (we
>  use
> > > > > Metrics
> > > > > > > JMXReporter) like "ZookeeperConsumerConnector" is registered
> > twice.
> > > > > This
> > > > > > > bean is not registered when the previous connector instance is
> > shut
> > > > > down.
> > > > > > >
> > > > > > > What is the best way to stop/halt and restart a Consumer using
> > the
> > > > Java
> > > > > > > API?
> > > > > > >
> > > > > > > Is it normal that the MBean is not unregistered at shutdown
> time?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > -- Tanguy
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Tanguy
> > > > > twitter @tlrx
> > > > > https://github.com/tlrx
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Tanguy
> > > twitter @tlrx
> > > https://github.com/tlrx
> > >
> >
>
>
>
> --
> -- Tanguy
> twitter @tlrx
> https://github.com/tlrx
>


Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread Neha Narkhede
If, for some reason, you don't have access to a virtual IP or load
balancer, you need to round robin once through all the brokers before
failing a TopicMetadataRequest. So unless all the brokers in your cluster
are down, this should not be a problem.

Thanks,
Neha


On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com  wrote:

> Hi guys,
>
> I'm trying to maintain a bunch of simple kafka consumer to consume messages
> from brokers. I know there is a way to send TopicMetadataRequest to broker
> and get the response from the broker. But you have to specify the broker
> list to query the information. But broker might not be available because of
> some failure. My question is is there any api I can call and query broker
> metadata for topic/partition directly from zookeeper? I know I can query
> that information using zookeeper API. But that's not friendly datastructure
>  like the TopicMetadata/PartitionMetadata.  Thank you!
>
> Best,
> Siyuan
>


Re: Anyone running kafka with a single broker in production? what about only 8GB ram?

2013-10-11 Thread Guozhang Wang
Hello,

In most cases of Kafka, network bottleneck will be hit before the disk
bottleneck. So maybe you want to check your network capacity to see if it
has been saturated.

Guozhang


On Thu, Oct 10, 2013 at 3:57 PM, Bruno D. Rodrigues <
bruno.rodrig...@litux.org> wrote:

> A 10/10/2013, às 23:14, S Ahmed  escreveu:
>
> > Is anyone out there running a single broker kafka setup?
> >
> > How about with only 8 GB RAM?
> >
> > I'm looking at one of the better dedicated server prodivers, and a 8GB
> > server is pretty much what I want to spend at the moment, would it make
> > sense going this route?
> > This same server would also potentially be running zookeeper also.
> >
> > In terms of messages per second, at most I would be seeing about 2000
> > messages per second, of 20KB to 200KB in size.
> >
> > I know the people at linkedin are running with I believe 24GB of ram.
>
> My personal newbie experience, which is surely completely wrong and
> miss-configured, got me up to 70MB/sec, either with controlled 1K messages
> (hence 70Kmsg/sec) as well as with more random data (test data from 100
> bytes to a couple MB). First I thought the 70MB were the hard disk limit,
> but when I got the same result both with a proper linux server with a 10K
> disk, as well as with a Mac mini with a 5400rpm disk, I got confused.
>
> The mini has 2G, the linux server has 8 or 16, can'r recall at the moment.
>
> The test was performed both with single and multi producers and consumers.
> One producer = 70MB, two producers = 35MB each and so forth. Running
> standalone instances on each server, same value. Running both together in 2
> partition 2 replica crossed mode, same result.
>
> As far as I understood, more memory just means more kernel buffer space to
> speed up the lack of disk speed, as kafka seems to not really depend on
> memory for the queueing.
>
>
>


-- 
-- Guozhang


Re: Anyone running kafka with a single broker in production? what about only 8GB ram?

2013-10-11 Thread Bruno D. Rodrigues

> On Thu, Oct 10, 2013 at 3:57 PM, Bruno D. Rodrigues <
> bruno.rodrig...@litux.org> wrote:
> 
>> My personal newbie experience, which is surely completely wrong and
>> miss-configured, got me up to 70MB/sec, either with controlled 1K messages
>> (hence 70Kmsg/sec) as well as with more random data (test data from 100
>> bytes to a couple MB). First I thought the 70MB were the hard disk limit,
>> but when I got the same result both with a proper linux server with a 10K
>> disk, as well as with a Mac mini with a 5400rpm disk, I got confused.
>> 
>> The mini has 2G, the linux server has 8 or 16, can'r recall at the moment.
>> 
>> The test was performed both with single and multi producers and consumers.
>> One producer = 70MB, two producers = 35MB each and so forth. Running
>> standalone instances on each server, same value. Running both together in 2
>> partition 2 replica crossed mode, same result.
>> 
>> As far as I understood, more memory just means more kernel buffer space to
>> speed up the lack of disk speed, as kafka seems to not really depend on
>> memory for the queueing.

A 11/10/2013, às 17:28, Guozhang Wang  escreveu:

> Hello,
> 
> In most cases of Kafka, network bottleneck will be hit before the disk
> bottleneck. So maybe you want to check your network capacity to see if it
> has been saturated.

They are all connected to Gbit ethernet cards and proper network routers. I can 
easily get way above 950Mbps up and down between each machine and even between 
multiple machines. Gbit is 128MB/s. 70MB/s is 560Kbps. So far so good, 56% 
network capacity is a goodish value. But then I enable snappy, get the same 
70MB on the input and output side, and 20MB/sec on the network, so it surely 
isn't network limits. It's also not on the input or output side - the input 
reads a pre-processed MMaped file that reads at 150MB/sec without cache (SSD) 
up to 3GB/sec when loaded into memory. The output simply counts the messages 
and size of them.

One weird thing is that the kafka process seems to not cross the 100% cpu on 
the top or equivalent command. Top shows 100% for each CPU, so a multi-threaded 
process should go up to 400% (both the linux and mac mini are 2 CPU with 
hiperthreading, so "almost" 4 cpus).




signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Anyone running kafka with a single broker in production? what about only 8GB ram?

2013-10-11 Thread Magnus Edenhill
Make sure the fetch batch size and the local consumer queue sizes are large
enough, setting them too low will limit your throughput to the
broker<->client latency.

This would be controlled using the following properties:
- fetch.message.max.bytes
- queued.max.message.chunks

On the producer side you would want to play with:
 - queue.buffering.max.ms and .messages
 - batch.num.messages

Memory on the broker should only affect disk cache performance, the more
the merrier of course, but it depends on your use case, with a bit of luck
the disk caches are already hot for the data you are reading (e.g.,
recently produced).

Consuming millions of messages per second on quad core i7 with 8 gigs of
RAM is possible without a sweat, given the disk caches are hot.


Regards,
Magnus


2013/10/11 Bruno D. Rodrigues 

>
> > On Thu, Oct 10, 2013 at 3:57 PM, Bruno D. Rodrigues <
> > bruno.rodrig...@litux.org> wrote:
> >
> >> My personal newbie experience, which is surely completely wrong and
> >> miss-configured, got me up to 70MB/sec, either with controlled 1K
> messages
> >> (hence 70Kmsg/sec) as well as with more random data (test data from 100
> >> bytes to a couple MB). First I thought the 70MB were the hard disk
> limit,
> >> but when I got the same result both with a proper linux server with a
> 10K
> >> disk, as well as with a Mac mini with a 5400rpm disk, I got confused.
> >>
> >> The mini has 2G, the linux server has 8 or 16, can'r recall at the
> moment.
> >>
> >> The test was performed both with single and multi producers and
> consumers.
> >> One producer = 70MB, two producers = 35MB each and so forth. Running
> >> standalone instances on each server, same value. Running both together
> in 2
> >> partition 2 replica crossed mode, same result.
> >>
> >> As far as I understood, more memory just means more kernel buffer space
> to
> >> speed up the lack of disk speed, as kafka seems to not really depend on
> >> memory for the queueing.
>
> A 11/10/2013, às 17:28, Guozhang Wang  escreveu:
>
> > Hello,
> >
> > In most cases of Kafka, network bottleneck will be hit before the disk
> > bottleneck. So maybe you want to check your network capacity to see if it
> > has been saturated.
>
> They are all connected to Gbit ethernet cards and proper network routers.
> I can easily get way above 950Mbps up and down between each machine and
> even between multiple machines. Gbit is 128MB/s. 70MB/s is 560Kbps. So far
> so good, 56% network capacity is a goodish value. But then I enable snappy,
> get the same 70MB on the input and output side, and 20MB/sec on the
> network, so it surely isn't network limits. It's also not on the input or
> output side - the input reads a pre-processed MMaped file that reads at
> 150MB/sec without cache (SSD) up to 3GB/sec when loaded into memory. The
> output simply counts the messages and size of them.
>
> One weird thing is that the kafka process seems to not cross the 100% cpu
> on the top or equivalent command. Top shows 100% for each CPU, so a
> multi-threaded process should go up to 400% (both the linux and mac mini
> are 2 CPU with hiperthreading, so "almost" 4 cpus).
>
>
>


Re: Question about auto-rebalancing

2013-10-11 Thread hsy...@gmail.com
Hi Jun,

Thanks for your reply, but in a real cluster, one broker could serve
different topics and different partitions, the simple consumer only has
knowledge of brokers that are available but it has no knowledge to decide
which broker is best to pick up to consume messages.  If you don't choose
carefully, multiple simple consumer might end up with reading from same
node which is definitely not good for performance.
Interesting thing is I find out there is
command kafka-preferred-replica-election.sh which will try to equally
distribute the leadership among different brokers, this is good that I can
always let my simple consumer reads from leader broker(even it fails, the
replica will pick up as leader which is fine).  But why don't kafka cluster
run this command automatically when there is a broker change(up/down) in
the cluster so that the leadership can always be equally distributed among
different brokers ASAP?  I think it's very good for simple consumer to
decide which broker is good to read from.

Another question is I'm also curious how high-level consumer is balanced. I
assume each high-level consumer know other consumers(int the same group)
which broker they read message from and it can try to avoid those brokers
and to pick up a free one?  Is there a document for the balancing rule
among high-level consumer. Does it always guarantee that after several
leadership change/temporary broker fail, it can always equally distribute
the read among the brokers. Basically I think it's nice to have a API to
let dev know which consumer reads from which broker otherwise I don't know
anything behind the high-level consumer

Thanks!

Best,
Siyuan


Best way to start / stop / restart a Consumer?

2013-10-11 Thread Joel Koshy
Re: starting/stopping - right now you would have to call shutdown and
create a new consumer connector since you can call createMessageStreams
only once per connector.

On Friday, October 11, 2013, Tanguy tlrx wrote:

> Thanks Jun,
>
> Jira issue has been filled:
> https://issues.apache.org/jira/browse/KAFKA-1083
>
> By the way, what is the recommended way to start, stop and restart a
> ConsumerConnector in the same running JMV?
>
> Thanks,
>
>
> 2013/10/10 Jun Rao 
>
> > Each time we create a new consumer connector, we assign a random consumer
> > id by default. You can try setting "consumer.id" to use a fixed consumer
> > id. In any case, we probably should deregister those beans when shutting
> > down the connector. Could you file a jira?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Oct 10, 2013 at 7:55 AM, Tanguy tlrx  wrote:
> >
> > > It's in 0.8.
> > >
> > > The JMX names are not exactly the same, but I see 2 beans with similar
> > > names, something like:
> > >
> > >
> > >
> >
> "my_consumer_group-my_consumer-mytopic-my_consumer_hostname.thing.com-1381416134138-3573c8bf-7-FetchQueueSize"
> > >
> > > -- Tanguy
> > >
> > >
> > >
> > >
> > >
> > > 2013/10/10 Jun Rao 
> > >
> > > > Is that in 0.7 or 0.8? JMX won't allow a bean with the same name to
> be
> > > > registered twice. Do you see 2 beans with similar names? What are the
> > > exact
> > > > bean names?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, Oct 10, 2013 at 2:42 AM, Tanguy tlrx 
> > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > In our application, we are currently starting a Kafka Consumer with
> > the
> > > > > following lines of code:
> > > > >
> > > > > connector = Consumer.createJavaConsumerConnector(consumerConfig);
> > > > > streams = connector .createMessageStreams(map);
> > > > >
> > > > > Then, each KafkaStream is processed in a dedicated thread per topic
> > and
> > > > > partition, as documented here
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> > > > >
> > > > > We need to stop (halt) and restart the consumer. Today, we just
> call:
> > > > >
> > > > > connector.shutdown()
> > > > >
> > > > > and wait for threads to terminate.
> > > > >
> > > > > To restart the consumer, we create a new connector:
> > > > >
> > > > > connector = Consumer.createJavaConsumerConnector(consumerConfig);
> > > > >
> > > > > When restarting is complete, I can see that a JMX MBean (we  use
> > > Metrics
> > > > > JMXReporter) like "ZookeeperConsumerConnector" is registered twice.
> > > This
> > > > > bean is not registered when the previous connector instance is shut
> > > down.
> > > > >
> > > > > What is the best way to stop/halt and restart a Consumer using the
> > Java
> > > > > API?
> > > > >
> > > > > Is it normal that the MBean is not unregistered at shutdown time?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > -- Tanguy
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Tanguy
> > > twitter @tlrx
> > > https://github.com/tlrx
> > >
> >
>
>
>
> --
> -- Tanguy
> twitter @tlrx
> https://github.com/tlrx
>


-- 
Sent from Gmail Mobile


Re: Anyone running kafka with a single broker in production? what about only 8GB ram?

2013-10-11 Thread Bruno D. Rodrigues
Producer:
props.put("batch.num.messages", "1000"); // 200
props.put("queue.buffering.max.messages", "2"); // 1   
props.put("request.required.acks", "0");
props.put("producer.type", "async"); // sync

// return ++this.count % a_numPartitions; // just round-robin
props.put("partitioner.class", "main.SimplePartitioner"); // 
kafka.producer.DefaultPartitioner

// disabled = 70MB source, 70MB network, enabled = 70MB source, 
~40-50MB network
props.put("compression.codec", "Snappy"); // none

Consumer is with default settings, as I test separately without any consumer at 
all, and then test the extra load of having 1..n consumers. I assume the top 
speed would be without consumers at all. I'm measuring both the produced 
messages as well as the consumer side.

On the kafka server I've changed the following, expecting less disk writes at 
the cost of loosing messages:

#log.flush.interval.messages=1
log.flush.interval.messages=1000
#log.flush.interval.ms=1000
log.flush.interval.ms=1
#log.segment.bytes=536870912
# is signed int 32, only up to 2^31-1!
log.segment.bytes=20 
#log.retention.hours=168
log.retention.hours=1


Basically I need high throughput of discardable messages, so having them 
persisted temporarily on the disk, in an highly optimised manner like Kafka 
shows, would be great not for the reliability (not loosing messages), but 
because it would allow me to get some previous messages even if the client 
(kafka client or real consumer client) disconnects, as well as providing a way 
to go back in time some seconds if needed.



A 11/10/2013, às 18:56, Magnus Edenhill  escreveu:

> Make sure the fetch batch size and the local consumer queue sizes are large
> enough, setting them too low will limit your throughput to the
> broker<->client latency.
> 
> This would be controlled using the following properties:
> - fetch.message.max.bytes
> - queued.max.message.chunks
> 
> On the producer side you would want to play with:
> - queue.buffering.max.ms and .messages
> - batch.num.messages
> 
> Memory on the broker should only affect disk cache performance, the more
> the merrier of course, but it depends on your use case, with a bit of luck
> the disk caches are already hot for the data you are reading (e.g.,
> recently produced).
> 
> Consuming millions of messages per second on quad core i7 with 8 gigs of
> RAM is possible without a sweat, given the disk caches are hot.
> 
> 
> Regards,
> Magnus
> 
> 
> 2013/10/11 Bruno D. Rodrigues 
> 
>> 
>>> On Thu, Oct 10, 2013 at 3:57 PM, Bruno D. Rodrigues <
>>> bruno.rodrig...@litux.org> wrote:
>>> 
 My personal newbie experience, which is surely completely wrong and
 miss-configured, got me up to 70MB/sec, either with controlled 1K
>> messages
 (hence 70Kmsg/sec) as well as with more random data (test data from 100
 bytes to a couple MB). First I thought the 70MB were the hard disk
>> limit,
 but when I got the same result both with a proper linux server with a
>> 10K
 disk, as well as with a Mac mini with a 5400rpm disk, I got confused.
 
 The mini has 2G, the linux server has 8 or 16, can'r recall at the
>> moment.
 
 The test was performed both with single and multi producers and
>> consumers.
 One producer = 70MB, two producers = 35MB each and so forth. Running
 standalone instances on each server, same value. Running both together
>> in 2
 partition 2 replica crossed mode, same result.
 
 As far as I understood, more memory just means more kernel buffer space
>> to
 speed up the lack of disk speed, as kafka seems to not really depend on
 memory for the queueing.
>> 
>> A 11/10/2013, às 17:28, Guozhang Wang  escreveu:
>> 
>>> Hello,
>>> 
>>> In most cases of Kafka, network bottleneck will be hit before the disk
>>> bottleneck. So maybe you want to check your network capacity to see if it
>>> has been saturated.
>> 
>> They are all connected to Gbit ethernet cards and proper network routers.
>> I can easily get way above 950Mbps up and down between each machine and
>> even between multiple machines. Gbit is 128MB/s. 70MB/s is 560Kbps. So far
>> so good, 56% network capacity is a goodish value. But then I enable snappy,
>> get the same 70MB on the input and output side, and 20MB/sec on the
>> network, so it surely isn't network limits. It's also not on the input or
>> output side - the input reads a pre-processed MMaped file that reads at
>> 150MB/sec without cache (SSD) up to 3GB/sec when loaded into memory. The
>> output simply counts the messages and size of them.
>> 
>> One weird thing is that the kafka process seems to not cross the 100% cpu
>> on the top or equivalent command. Top shows 100% for each CPU, so a
>> multi-threaded process should go up to 400% (both the linux and mac mini
>> are 2 CPU with hiperthreading, so "almost" 4 cpus).
>> 
>> 
>> 



signature.asc
Description: Message signed with O

Re: Kafka startup, no broker partitions consumed by consumer thread

2013-10-11 Thread Viktor Kolodrevskiy
Jun,

Thank you! That helped.


Thanks,
Viktor


Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread hsy...@gmail.com
Thanks guys!
But I feel weird. Assume I have 20 brokers for 10 different topics with 2
partitions and  2 replicas for each. For each consumer consumes different
topic/replica I have to specify those 20 brokers and go over all of them to
know which broker is alive. And even worse how about I dynamically add new
broker into the cluster and remove the old one. I think it's nice to have a
way to get metadata from zookeeper(centralized coordinator?) directly.

Best,
Siyuan


On Fri, Oct 11, 2013 at 9:12 AM, Neha Narkhede wrote:

> If, for some reason, you don't have access to a virtual IP or load
> balancer, you need to round robin once through all the brokers before
> failing a TopicMetadataRequest. So unless all the brokers in your cluster
> are down, this should not be a problem.
>
> Thanks,
> Neha
>
>
> On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com 
> wrote:
>
> > Hi guys,
> >
> > I'm trying to maintain a bunch of simple kafka consumer to consume
> messages
> > from brokers. I know there is a way to send TopicMetadataRequest to
> broker
> > and get the response from the broker. But you have to specify the broker
> > list to query the information. But broker might not be available because
> of
> > some failure. My question is is there any api I can call and query broker
> > metadata for topic/partition directly from zookeeper? I know I can query
> > that information using zookeeper API. But that's not friendly
> datastructure
> >  like the TopicMetadata/PartitionMetadata.  Thank you!
> >
> > Best,
> > Siyuan
> >
>


Re: Anyone running kafka with a single broker in production? what about only 8GB ram?

2013-10-11 Thread Kane Kane
I'm also curious to know what is the limiting factor of kafka write
throughput?

I've never seen reports higher than 100mb/sec, obviously disks can provide
much more. In my own test with single broker, single partition, single
replica:
bin/kafka-producer-perf-test.sh --topics perf --threads 10 --broker-list
10.80.42.154:9092 --messages 500 --message-size 3000
It tops around 90MB/sec. Cpu, disk, memory, network, everything is
chilling, but still I can't get higher numbers.


On Fri, Oct 11, 2013 at 11:17 AM, Bruno D. Rodrigues <
bruno.rodrig...@litux.org> wrote:

> Producer:
> props.put("batch.num.messages", "1000"); // 200
> props.put("queue.buffering.max.messages", "2"); // 1
> props.put("request.required.acks", "0");
> props.put("producer.type", "async"); // sync
>
> // return ++this.count % a_numPartitions; // just round-robin
> props.put("partitioner.class", "main.SimplePartitioner"); //
> kafka.producer.DefaultPartitioner
>
> // disabled = 70MB source, 70MB network, enabled = 70MB source,
> ~40-50MB network
> props.put("compression.codec", "Snappy"); // none
>
> Consumer is with default settings, as I test separately without any
> consumer at all, and then test the extra load of having 1..n consumers. I
> assume the top speed would be without consumers at all. I'm measuring both
> the produced messages as well as the consumer side.
>
> On the kafka server I've changed the following, expecting less disk writes
> at the cost of loosing messages:
>
> #log.flush.interval.messages=1
> log.flush.interval.messages=1000
> #log.flush.interval.ms=1000
> log.flush.interval.ms=1
> #log.segment.bytes=536870912
> # is signed int 32, only up to 2^31-1!
> log.segment.bytes=20
> #log.retention.hours=168
> log.retention.hours=1
>
>
> Basically I need high throughput of discardable messages, so having them
> persisted temporarily on the disk, in an highly optimised manner like Kafka
> shows, would be great not for the reliability (not loosing messages), but
> because it would allow me to get some previous messages even if the client
> (kafka client or real consumer client) disconnects, as well as providing a
> way to go back in time some seconds if needed.
>
>
>
> A 11/10/2013, às 18:56, Magnus Edenhill  escreveu:
>
> Make sure the fetch batch size and the local consumer queue sizes are large
> enough, setting them too low will limit your throughput to the
> broker<->client latency.
>
> This would be controlled using the following properties:
> - fetch.message.max.bytes
> - queued.max.message.chunks
>
> On the producer side you would want to play with:
> - queue.buffering.max.ms and .messages
> - batch.num.messages
>
> Memory on the broker should only affect disk cache performance, the more
> the merrier of course, but it depends on your use case, with a bit of luck
> the disk caches are already hot for the data you are reading (e.g.,
> recently produced).
>
> Consuming millions of messages per second on quad core i7 with 8 gigs of
> RAM is possible without a sweat, given the disk caches are hot.
>
>
> Regards,
> Magnus
>
>
> 2013/10/11 Bruno D. Rodrigues 
>
>
> On Thu, Oct 10, 2013 at 3:57 PM, Bruno D. Rodrigues <
> bruno.rodrig...@litux.org> wrote:
>
> My personal newbie experience, which is surely completely wrong and
> miss-configured, got me up to 70MB/sec, either with controlled 1K
>
> messages
>
> (hence 70Kmsg/sec) as well as with more random data (test data from 100
> bytes to a couple MB). First I thought the 70MB were the hard disk
>
> limit,
>
> but when I got the same result both with a proper linux server with a
>
> 10K
>
> disk, as well as with a Mac mini with a 5400rpm disk, I got confused.
>
> The mini has 2G, the linux server has 8 or 16, can'r recall at the
>
> moment.
>
>
> The test was performed both with single and multi producers and
>
> consumers.
>
> One producer = 70MB, two producers = 35MB each and so forth. Running
> standalone instances on each server, same value. Running both together
>
> in 2
>
> partition 2 replica crossed mode, same result.
>
> As far as I understood, more memory just means more kernel buffer space
>
> to
>
> speed up the lack of disk speed, as kafka seems to not really depend on
> memory for the queueing.
>
>
> A 11/10/2013, às 17:28, Guozhang Wang  escreveu:
>
> Hello,
>
> In most cases of Kafka, network bottleneck will be hit before the disk
> bottleneck. So maybe you want to check your network capacity to see if it
> has been saturated.
>
>
> They are all connected to Gbit ethernet cards and proper network routers.
> I can easily get way above 950Mbps up and down between each machine and
> even between multiple machines. Gbit is 128MB/s. 70MB/s is 560Kbps. So far
> so good, 56% network capacity is a goodish value. But then I enable snappy,
> get the same 70MB on the input and output side, and 20MB/sec on the
> network, so it surely isn't network limits. It's also not on th

Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread Neha Narkhede
>> For each consumer consumes different
topic/replica I have to specify those 20 brokers and go over all of them to
know which broker is alive. And even worse how about I dynamically add new
broker into the cluster and remove the old one

TopicMetadataRequest is a batch API and you can get metadata information
for either a list of all topics or all topics in the cluster, if you
specify an empty list of topics. Adding a broker is not a problem since the
metadata request also returns the list of brokers in a cluster. The reason
this is better than reading from zookeeper is because the same operation
would require multiple zookeeper roundtrips, instead of a single
TopicMetadataRequest roundtrip to some kafka broker.

Thanks,
Neha


On Fri, Oct 11, 2013 at 11:30 AM, hsy...@gmail.com  wrote:

> Thanks guys!
> But I feel weird. Assume I have 20 brokers for 10 different topics with 2
> partitions and  2 replicas for each. For each consumer consumes different
> topic/replica I have to specify those 20 brokers and go over all of them to
> know which broker is alive. And even worse how about I dynamically add new
> broker into the cluster and remove the old one. I think it's nice to have a
> way to get metadata from zookeeper(centralized coordinator?) directly.
>
> Best,
> Siyuan
>
>
> On Fri, Oct 11, 2013 at 9:12 AM, Neha Narkhede  >wrote:
>
> > If, for some reason, you don't have access to a virtual IP or load
> > balancer, you need to round robin once through all the brokers before
> > failing a TopicMetadataRequest. So unless all the brokers in your cluster
> > are down, this should not be a problem.
> >
> > Thanks,
> > Neha
> >
> >
> > On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com 
> > wrote:
> >
> > > Hi guys,
> > >
> > > I'm trying to maintain a bunch of simple kafka consumer to consume
> > messages
> > > from brokers. I know there is a way to send TopicMetadataRequest to
> > broker
> > > and get the response from the broker. But you have to specify the
> broker
> > > list to query the information. But broker might not be available
> because
> > of
> > > some failure. My question is is there any api I can call and query
> broker
> > > metadata for topic/partition directly from zookeeper? I know I can
> query
> > > that information using zookeeper API. But that's not friendly
> > datastructure
> > >  like the TopicMetadata/PartitionMetadata.  Thank you!
> > >
> > > Best,
> > > Siyuan
> > >
> >
>


Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread hsy...@gmail.com
What if the broker it tries to get the TopicMetadata from is down?
I assume a broker is more like to be down than a zookeeper.

Thanks,
Siyuan


On Fri, Oct 11, 2013 at 2:39 PM, Neha Narkhede wrote:

> >> For each consumer consumes different
> topic/replica I have to specify those 20 brokers and go over all of them to
> know which broker is alive. And even worse how about I dynamically add new
> broker into the cluster and remove the old one
>
> TopicMetadataRequest is a batch API and you can get metadata information
> for either a list of all topics or all topics in the cluster, if you
> specify an empty list of topics. Adding a broker is not a problem since the
> metadata request also returns the list of brokers in a cluster. The reason
> this is better than reading from zookeeper is because the same operation
> would require multiple zookeeper roundtrips, instead of a single
> TopicMetadataRequest roundtrip to some kafka broker.
>
> Thanks,
> Neha
>
>
> On Fri, Oct 11, 2013 at 11:30 AM, hsy...@gmail.com 
> wrote:
>
> > Thanks guys!
> > But I feel weird. Assume I have 20 brokers for 10 different topics with 2
> > partitions and  2 replicas for each. For each consumer consumes different
> > topic/replica I have to specify those 20 brokers and go over all of them
> to
> > know which broker is alive. And even worse how about I dynamically add
> new
> > broker into the cluster and remove the old one. I think it's nice to
> have a
> > way to get metadata from zookeeper(centralized coordinator?) directly.
> >
> > Best,
> > Siyuan
> >
> >
> > On Fri, Oct 11, 2013 at 9:12 AM, Neha Narkhede  > >wrote:
> >
> > > If, for some reason, you don't have access to a virtual IP or load
> > > balancer, you need to round robin once through all the brokers before
> > > failing a TopicMetadataRequest. So unless all the brokers in your
> cluster
> > > are down, this should not be a problem.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com 
> > > wrote:
> > >
> > > > Hi guys,
> > > >
> > > > I'm trying to maintain a bunch of simple kafka consumer to consume
> > > messages
> > > > from brokers. I know there is a way to send TopicMetadataRequest to
> > > broker
> > > > and get the response from the broker. But you have to specify the
> > broker
> > > > list to query the information. But broker might not be available
> > because
> > > of
> > > > some failure. My question is is there any api I can call and query
> > broker
> > > > metadata for topic/partition directly from zookeeper? I know I can
> > query
> > > > that information using zookeeper API. But that's not friendly
> > > datastructure
> > > >  like the TopicMetadata/PartitionMetadata.  Thank you!
> > > >
> > > > Best,
> > > > Siyuan
> > > >
> > >
> >
>


Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread Bruno D. Rodrigues
Why not ask zookeeper for the list of brokers and then ask a random
broker for the metadata (and repeat if the broker is down), even if
it's two calls.

Heck it already does unnecessary connections. It connects to a broker,
gets the metadata, disconnects, and then connects again for the data.
If it's already assumed a producer or consumer will take some seconds
until ready, what is another call gonna prejudice the flow.

Then producers and consumers would then be consistently configured. Or
allow the producers to also go to a broker instead of zookeeper.

This way the consumer needs to know and hardcode at least one node.
The node can fail. It can be changed.

I thought zookeeper served to abstract this kind of complexity







--
Bruno Rodrigues
Sent from my iPhone

No dia 11/10/2013, às 22:40, Neha Narkhede  escreveu:

>>> For each consumer consumes different
> topic/replica I have to specify those 20 brokers and go over all of them to
> know which broker is alive. And even worse how about I dynamically add new
> broker into the cluster and remove the old one
>
> TopicMetadataRequest is a batch API and you can get metadata information
> for either a list of all topics or all topics in the cluster, if you
> specify an empty list of topics. Adding a broker is not a problem since the
> metadata request also returns the list of brokers in a cluster. The reason
> this is better than reading from zookeeper is because the same operation
> would require multiple zookeeper roundtrips, instead of a single
> TopicMetadataRequest roundtrip to some kafka broker.
>
> Thanks,
> Neha
>
>
>> On Fri, Oct 11, 2013 at 11:30 AM, hsy...@gmail.com  wrote:
>>
>> Thanks guys!
>> But I feel weird. Assume I have 20 brokers for 10 different topics with 2
>> partitions and  2 replicas for each. For each consumer consumes different
>> topic/replica I have to specify those 20 brokers and go over all of them to
>> know which broker is alive. And even worse how about I dynamically add new
>> broker into the cluster and remove the old one. I think it's nice to have a
>> way to get metadata from zookeeper(centralized coordinator?) directly.
>>
>> Best,
>> Siyuan
>>
>>
>> On Fri, Oct 11, 2013 at 9:12 AM, Neha Narkhede >> wrote:
>>
>>> If, for some reason, you don't have access to a virtual IP or load
>>> balancer, you need to round robin once through all the brokers before
>>> failing a TopicMetadataRequest. So unless all the brokers in your cluster
>>> are down, this should not be a problem.
>>>
>>> Thanks,
>>> Neha
>>>
>>>
>>> On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com 
>>> wrote:
>>>
 Hi guys,

 I'm trying to maintain a bunch of simple kafka consumer to consume
>>> messages
 from brokers. I know there is a way to send TopicMetadataRequest to
>>> broker
 and get the response from the broker. But you have to specify the
>> broker
 list to query the information. But broker might not be available
>> because
>>> of
 some failure. My question is is there any api I can call and query
>> broker
 metadata for topic/partition directly from zookeeper? I know I can
>> query
 that information using zookeeper API. But that's not friendly
>>> datastructure
 like the TopicMetadata/PartitionMetadata.  Thank you!

 Best,
 Siyuan
>>


Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread hsy...@gmail.com
That's why I'm asking, I would like to see a kafka zookeeper client api to
get TopicMetadata instead of my own hacky way to query the zookeeper

Thanks!
Best,
Siyuan


On Fri, Oct 11, 2013 at 4:00 PM, Bruno D. Rodrigues <
bruno.rodrig...@litux.org> wrote:

> Why not ask zookeeper for the list of brokers and then ask a random
> broker for the metadata (and repeat if the broker is down), even if
> it's two calls.
>
> Heck it already does unnecessary connections. It connects to a broker,
> gets the metadata, disconnects, and then connects again for the data.
> If it's already assumed a producer or consumer will take some seconds
> until ready, what is another call gonna prejudice the flow.
>
> Then producers and consumers would then be consistently configured. Or
> allow the producers to also go to a broker instead of zookeeper.
>
> This way the consumer needs to know and hardcode at least one node.
> The node can fail. It can be changed.
>
> I thought zookeeper served to abstract this kind of complexity
>
>
>
>
>
>
>
> --
> Bruno Rodrigues
> Sent from my iPhone
>
> No dia 11/10/2013, às 22:40, Neha Narkhede 
> escreveu:
>
> >>> For each consumer consumes different
> > topic/replica I have to specify those 20 brokers and go over all of them
> to
> > know which broker is alive. And even worse how about I dynamically add
> new
> > broker into the cluster and remove the old one
> >
> > TopicMetadataRequest is a batch API and you can get metadata information
> > for either a list of all topics or all topics in the cluster, if you
> > specify an empty list of topics. Adding a broker is not a problem since
> the
> > metadata request also returns the list of brokers in a cluster. The
> reason
> > this is better than reading from zookeeper is because the same operation
> > would require multiple zookeeper roundtrips, instead of a single
> > TopicMetadataRequest roundtrip to some kafka broker.
> >
> > Thanks,
> > Neha
> >
> >
> >> On Fri, Oct 11, 2013 at 11:30 AM, hsy...@gmail.com 
> wrote:
> >>
> >> Thanks guys!
> >> But I feel weird. Assume I have 20 brokers for 10 different topics with
> 2
> >> partitions and  2 replicas for each. For each consumer consumes
> different
> >> topic/replica I have to specify those 20 brokers and go over all of
> them to
> >> know which broker is alive. And even worse how about I dynamically add
> new
> >> broker into the cluster and remove the old one. I think it's nice to
> have a
> >> way to get metadata from zookeeper(centralized coordinator?) directly.
> >>
> >> Best,
> >> Siyuan
> >>
> >>
> >> On Fri, Oct 11, 2013 at 9:12 AM, Neha Narkhede  >>> wrote:
> >>
> >>> If, for some reason, you don't have access to a virtual IP or load
> >>> balancer, you need to round robin once through all the brokers before
> >>> failing a TopicMetadataRequest. So unless all the brokers in your
> cluster
> >>> are down, this should not be a problem.
> >>>
> >>> Thanks,
> >>> Neha
> >>>
> >>>
> >>> On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com 
> >>> wrote:
> >>>
>  Hi guys,
> 
>  I'm trying to maintain a bunch of simple kafka consumer to consume
> >>> messages
>  from brokers. I know there is a way to send TopicMetadataRequest to
> >>> broker
>  and get the response from the broker. But you have to specify the
> >> broker
>  list to query the information. But broker might not be available
> >> because
> >>> of
>  some failure. My question is is there any api I can call and query
> >> broker
>  metadata for topic/partition directly from zookeeper? I know I can
> >> query
>  that information using zookeeper API. But that's not friendly
> >>> datastructure
>  like the TopicMetadata/PartitionMetadata.  Thank you!
> 
>  Best,
>  Siyuan
> >>
>


Re: Question about auto-rebalancing

2013-10-11 Thread Guozhang Wang
Hello Siyuan,

For the automatic leader re-election, yes we are considering to make it
work. Could you file a JIRA for this issue?

For the high-level consumer's rebalancing logic, you can find it at

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredicttheresultsoftheconsumerrebabalance%3F

Guozhang


On Fri, Oct 11, 2013 at 11:06 AM, hsy...@gmail.com  wrote:

> Hi Jun,
>
> Thanks for your reply, but in a real cluster, one broker could serve
> different topics and different partitions, the simple consumer only has
> knowledge of brokers that are available but it has no knowledge to decide
> which broker is best to pick up to consume messages.  If you don't choose
> carefully, multiple simple consumer might end up with reading from same
> node which is definitely not good for performance.
> Interesting thing is I find out there is
> command kafka-preferred-replica-election.sh which will try to equally
> distribute the leadership among different brokers, this is good that I can
> always let my simple consumer reads from leader broker(even it fails, the
> replica will pick up as leader which is fine).  But why don't kafka cluster
> run this command automatically when there is a broker change(up/down) in
> the cluster so that the leadership can always be equally distributed among
> different brokers ASAP?  I think it's very good for simple consumer to
> decide which broker is good to read from.
>
> Another question is I'm also curious how high-level consumer is balanced. I
> assume each high-level consumer know other consumers(int the same group)
> which broker they read message from and it can try to avoid those brokers
> and to pick up a free one?  Is there a document for the balancing rule
> among high-level consumer. Does it always guarantee that after several
> leadership change/temporary broker fail, it can always equally distribute
> the read among the brokers. Basically I think it's nice to have a API to
> let dev know which consumer reads from which broker otherwise I don't know
> anything behind the high-level consumer
>
> Thanks!
>
> Best,
> Siyuan
>



-- 
-- Guozhang


Re: Question about auto-rebalancing

2013-10-11 Thread Sriram Subramanian
We already have a JIRA for auto rebalance. I would be working on this soon.

KAFKA-930 



On 10/11/13 5:39 PM, "Guozhang Wang"  wrote:

>Hello Siyuan,
>
>For the automatic leader re-election, yes we are considering to make it
>work. Could you file a JIRA for this issue?
>
>For the high-level consumer's rebalancing logic, you can find it at
>
>https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredictthere
>sultsoftheconsumerrebabalance%3F
>
>Guozhang
>
>
>On Fri, Oct 11, 2013 at 11:06 AM, hsy...@gmail.com 
>wrote:
>
>> Hi Jun,
>>
>> Thanks for your reply, but in a real cluster, one broker could serve
>> different topics and different partitions, the simple consumer only has
>> knowledge of brokers that are available but it has no knowledge to
>>decide
>> which broker is best to pick up to consume messages.  If you don't
>>choose
>> carefully, multiple simple consumer might end up with reading from same
>> node which is definitely not good for performance.
>> Interesting thing is I find out there is
>> command kafka-preferred-replica-election.sh which will try to equally
>> distribute the leadership among different brokers, this is good that I
>>can
>> always let my simple consumer reads from leader broker(even it fails,
>>the
>> replica will pick up as leader which is fine).  But why don't kafka
>>cluster
>> run this command automatically when there is a broker change(up/down) in
>> the cluster so that the leadership can always be equally distributed
>>among
>> different brokers ASAP?  I think it's very good for simple consumer to
>> decide which broker is good to read from.
>>
>> Another question is I'm also curious how high-level consumer is
>>balanced. I
>> assume each high-level consumer know other consumers(int the same group)
>> which broker they read message from and it can try to avoid those
>>brokers
>> and to pick up a free one?  Is there a document for the balancing rule
>> among high-level consumer. Does it always guarantee that after several
>> leadership change/temporary broker fail, it can always equally
>>distribute
>> the read among the brokers. Basically I think it's nice to have a API to
>> let dev know which consumer reads from which broker otherwise I don't
>>know
>> anything behind the high-level consumer
>>
>> Thanks!
>>
>> Best,
>> Siyuan
>>
>
>
>
>-- 
>-- Guozhang



Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread Guozhang Wang
Hello Siyuan,

Pre 0.8 Kafka does have a zookeeper based producer api, and this has been
removed in 0.8. You can find some of the reasons in this JIRA:

https://issues.apache.org/jira/browse/KAFKA-369

Guozhang


On Fri, Oct 11, 2013 at 5:16 PM, hsy...@gmail.com  wrote:

> That's why I'm asking, I would like to see a kafka zookeeper client api to
> get TopicMetadata instead of my own hacky way to query the zookeeper
>
> Thanks!
> Best,
> Siyuan
>
>
> On Fri, Oct 11, 2013 at 4:00 PM, Bruno D. Rodrigues <
> bruno.rodrig...@litux.org> wrote:
>
> > Why not ask zookeeper for the list of brokers and then ask a random
> > broker for the metadata (and repeat if the broker is down), even if
> > it's two calls.
> >
> > Heck it already does unnecessary connections. It connects to a broker,
> > gets the metadata, disconnects, and then connects again for the data.
> > If it's already assumed a producer or consumer will take some seconds
> > until ready, what is another call gonna prejudice the flow.
> >
> > Then producers and consumers would then be consistently configured. Or
> > allow the producers to also go to a broker instead of zookeeper.
> >
> > This way the consumer needs to know and hardcode at least one node.
> > The node can fail. It can be changed.
> >
> > I thought zookeeper served to abstract this kind of complexity
> >
> >
> >
> >
> >
> >
> >
> > --
> > Bruno Rodrigues
> > Sent from my iPhone
> >
> > No dia 11/10/2013, às 22:40, Neha Narkhede 
> > escreveu:
> >
> > >>> For each consumer consumes different
> > > topic/replica I have to specify those 20 brokers and go over all of
> them
> > to
> > > know which broker is alive. And even worse how about I dynamically add
> > new
> > > broker into the cluster and remove the old one
> > >
> > > TopicMetadataRequest is a batch API and you can get metadata
> information
> > > for either a list of all topics or all topics in the cluster, if you
> > > specify an empty list of topics. Adding a broker is not a problem since
> > the
> > > metadata request also returns the list of brokers in a cluster. The
> > reason
> > > this is better than reading from zookeeper is because the same
> operation
> > > would require multiple zookeeper roundtrips, instead of a single
> > > TopicMetadataRequest roundtrip to some kafka broker.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > >> On Fri, Oct 11, 2013 at 11:30 AM, hsy...@gmail.com 
> > wrote:
> > >>
> > >> Thanks guys!
> > >> But I feel weird. Assume I have 20 brokers for 10 different topics
> with
> > 2
> > >> partitions and  2 replicas for each. For each consumer consumes
> > different
> > >> topic/replica I have to specify those 20 brokers and go over all of
> > them to
> > >> know which broker is alive. And even worse how about I dynamically add
> > new
> > >> broker into the cluster and remove the old one. I think it's nice to
> > have a
> > >> way to get metadata from zookeeper(centralized coordinator?) directly.
> > >>
> > >> Best,
> > >> Siyuan
> > >>
> > >>
> > >> On Fri, Oct 11, 2013 at 9:12 AM, Neha Narkhede <
> neha.narkh...@gmail.com
> > >>> wrote:
> > >>
> > >>> If, for some reason, you don't have access to a virtual IP or load
> > >>> balancer, you need to round robin once through all the brokers before
> > >>> failing a TopicMetadataRequest. So unless all the brokers in your
> > cluster
> > >>> are down, this should not be a problem.
> > >>>
> > >>> Thanks,
> > >>> Neha
> > >>>
> > >>>
> > >>> On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com  >
> > >>> wrote:
> > >>>
> >  Hi guys,
> > 
> >  I'm trying to maintain a bunch of simple kafka consumer to consume
> > >>> messages
> >  from brokers. I know there is a way to send TopicMetadataRequest to
> > >>> broker
> >  and get the response from the broker. But you have to specify the
> > >> broker
> >  list to query the information. But broker might not be available
> > >> because
> > >>> of
> >  some failure. My question is is there any api I can call and query
> > >> broker
> >  metadata for topic/partition directly from zookeeper? I know I can
> > >> query
> >  that information using zookeeper API. But that's not friendly
> > >>> datastructure
> >  like the TopicMetadata/PartitionMetadata.  Thank you!
> > 
> >  Best,
> >  Siyuan
> > >>
> >
>



-- 
-- Guozhang


Re: Question about auto-rebalancing

2013-10-11 Thread hsy...@gmail.com
Oh, Sriram, Thank you very much!



On Fri, Oct 11, 2013 at 5:44 PM, Sriram Subramanian <
srsubraman...@linkedin.com> wrote:

> We already have a JIRA for auto rebalance. I would be working on this soon.
>
> KAFKA-930 
>
>
>
> On 10/11/13 5:39 PM, "Guozhang Wang"  wrote:
>
> >Hello Siyuan,
> >
> >For the automatic leader re-election, yes we are considering to make it
> >work. Could you file a JIRA for this issue?
> >
> >For the high-level consumer's rebalancing logic, you can find it at
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredictthere
> >sultsoftheconsumerrebabalance%3F
> >
> >Guozhang
> >
> >
> >On Fri, Oct 11, 2013 at 11:06 AM, hsy...@gmail.com 
> >wrote:
> >
> >> Hi Jun,
> >>
> >> Thanks for your reply, but in a real cluster, one broker could serve
> >> different topics and different partitions, the simple consumer only has
> >> knowledge of brokers that are available but it has no knowledge to
> >>decide
> >> which broker is best to pick up to consume messages.  If you don't
> >>choose
> >> carefully, multiple simple consumer might end up with reading from same
> >> node which is definitely not good for performance.
> >> Interesting thing is I find out there is
> >> command kafka-preferred-replica-election.sh which will try to equally
> >> distribute the leadership among different brokers, this is good that I
> >>can
> >> always let my simple consumer reads from leader broker(even it fails,
> >>the
> >> replica will pick up as leader which is fine).  But why don't kafka
> >>cluster
> >> run this command automatically when there is a broker change(up/down) in
> >> the cluster so that the leadership can always be equally distributed
> >>among
> >> different brokers ASAP?  I think it's very good for simple consumer to
> >> decide which broker is good to read from.
> >>
> >> Another question is I'm also curious how high-level consumer is
> >>balanced. I
> >> assume each high-level consumer know other consumers(int the same group)
> >> which broker they read message from and it can try to avoid those
> >>brokers
> >> and to pick up a free one?  Is there a document for the balancing rule
> >> among high-level consumer. Does it always guarantee that after several
> >> leadership change/temporary broker fail, it can always equally
> >>distribute
> >> the read among the brokers. Basically I think it's nice to have a API to
> >> let dev know which consumer reads from which broker otherwise I don't
> >>know
> >> anything behind the high-level consumer
> >>
> >> Thanks!
> >>
> >> Best,
> >> Siyuan
> >>
> >
> >
> >
> >--
> >-- Guozhang
>
>