Re: Migrating data from old brokers to new borkers question

2014-09-16 Thread Gwen Shapira
Since these tools are so useful, I wonder what it requires (from both
Airbnb and Kafka) to merge this into Kafka project. I think there are
couple of Jira regarding improved tool usability that this resolved.

On Mon, Sep 15, 2014 at 11:45 AM, Alexis Midon
 wrote:
> distribution will be even based on the number of partitions.
> It is the same logic as AdminUtils.
> see
> https://github.com/airbnb/kafkat/blob/master/lib/kafkat/command/reassign.rb#L39
>
> On Sun, Sep 14, 2014 at 6:05 PM, Neha Narkhede 
> wrote:
>
>> This is great. Thanks for sharing! Does kafkat automatically figure out the
>> right reassignment strategy based on even data distribution?
>>
>> On Wed, Sep 3, 2014 at 12:12 AM, Alexis Midon <
>> alexis.mi...@airbedandbreakfast.com> wrote:
>>
>> > Hi Marcin,
>> >
>> > A few weeks ago, I did an upgrade to 0.8.1.1 and then augmented the
>> cluster
>> > from 3 to 9 brokers. All went smoothly.
>> > In a dev environment, we found out that the biggest pain point is to have
>> > to deal with the json file and the error-prone command line interface.
>> > So to make our life easier, my team mate Nelson [1] came up with kafkat:
>> > https://github.com/airbnb/kafkat
>> >
>> > We now install kafkat on every broker. Note that kafkat does NOT connect
>> to
>> > a broker, but to zookeeper. So you can actually use it from any machine.
>> >
>> > For reassignment, please see:
>> > `kafkat reassign [topic] [--brokers ] [--replicas ] `
>> > It will transparently generate and kick off a balanced assignment.
>> >
>> > feedback and contributions welcome! Enjoy!
>> >
>> > Alexis
>> >
>> > [1] https://github.com/nelgau
>> >
>> >
>> >
>> > On Tue, Aug 26, 2014 at 10:27 AM, Marcin Michalski <
>> mmichal...@tagged.com>
>> > wrote:
>> >
>> > > I am running on 0.8.1.1 and I thought that the partition reassignment
>> > tools
>> > > can do this job. Just was not sure if this is the best way to do this.
>> > > I will try this out in stage env first and will perform the same in
>> prod.
>> > >
>> > > Thanks,
>> > > marcin
>> > >
>> > >
>> > > On Mon, Aug 25, 2014 at 7:23 PM, Joe Stein 
>> wrote:
>> > >
>> > > > Marcin, that is a typical task now.  What version of Kafka are you
>> > > running?
>> > > >
>> > > > Take a look at
>> > > >
>> > https://kafka.apache.org/documentation.html#basic_ops_cluster_expansion
>> > > > and
>> > > >
>> > > >
>> > >
>> >
>> https://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
>> > > >
>> > > > Basically you can do a --generate to get existing JSON topology and
>> > with
>> > > > that take the results of "Current partition replica assignment" (the
>> > > first
>> > > > JSON that outputs) and make whatever changes (like sed old node for
>> new
>> > > > node and add more replica's which increase the replication factor,
>> > > whatever
>> > > > you want) and then --execute.
>> > > >
>> > > > With lots of data this takes time so you will want to run --verify to
>> > see
>> > > > what is in progress... good thing do a node at a time (even topic at
>> a
>> > > > time) however you want to manage and wait for it as such.
>> > > >
>> > > > The "preferred" replica is simply the first one in the list of
>> > replicas.
>> > > >  The kafka-preferred-replica-election.sh just makes that replica the
>> > > leader
>> > > > as this is not automatic yet.
>> > > >
>> > > > If you are running a version prior to 0.8.1.1 it might make sense to
>> > > > upgrade the old nodes first then run reassign to the new servers.
>> > > >
>> > > >
>> > > > /***
>> > > >  Joe Stein
>> > > >  Founder, Principal Consultant
>> > > >  Big Data Open Source Security LLC
>> > > >  http://www.stealth.ly
>> > > >  Twitter: @allthingshadoop 
>> > > > /
>> > > >
>> > > >
>> > > > On Mon, Aug 25, 2014 at 8:59 PM, Marcin Michalski <
>> > mmichal...@tagged.com
>> > > >
>> > > > wrote:
>> > > >
>> > > > > Hi, I would like to migrate my Kafka setup from old servers to new
>> > > > servers.
>> > > > > Let say I have 8 really old servers that have the kafka
>> > > topics/partitions
>> > > > > replicated 4 ways and want to migrate the data to 4 brand new
>> servers
>> > > and
>> > > > > want the replication factor be 3. I wonder if anyone has ever
>> > performed
>> > > > > this type of migration?
>> > > > >
>> > > > > Will auto rebalancing take care of this automatically if I do the
>> > > > > following?
>> > > > >
>> > > > > Let say I bring down old broker id 1 down and startup new server
>> > broker
>> > > > id
>> > > > > 100 up, is there a way to migrate all of the data of the topic that
>> > had
>> > > > the
>> > > > > topic (where borker id 1 was the leader) over to the new broker
>> 100?
>> > > > >
>> > > > > Or do I need to use *bin/kafka-preferred-replica-election.sh *to
>> > > reassign
>> > > > > the topics/partitions from old broker 1 to broker 100? And then
>> just
>> > > keep
>> > > > > doing

Re: Non-blocking High-Level Consumer

2014-09-16 Thread Gwen Shapira
For Fluffka, I created a wrapping function:

  IterStatus timedHasNext() {
try {
  long startTime = System.currentTimeMillis();
  it.hasNext();
  long endTime = System.currentTimeMillis();
  return new IterStatus(true,endTime-startTime);
} catch (ConsumerTimeoutException e) {
  return new IterStatus(false,consumerTimeout);
}
  }

IterStatus is basically a pair of whether there is data and how long we waited.

Calling this in a loop like that:
  while (timeWaited < timeUpperLimit) {
iterStatus = timedHasNext();
if (iterStatus.hasData()) {
   // handling data
}
timeWaited += iterStatus.getWaitTime();
  }

Allows us to read all messages that are available within a certain
amount of time.

You can do other cool stuff (exit after certain number of messages,
when you waited X amount of time with 0 new data, etc).

Gwen

On Mon, Sep 15, 2014 at 8:54 AM, Neha Narkhede  wrote:
> There isn't a very clean way to do this in the 0.8.x high level consumer.
> You can configure consumer.timeout.ms to a certain value so that the
> consumer's blocking iterator returns if no message arrives for
> consumer.timeout.ms.
>
> Thanks
> Neha
>
> On Mon, Sep 15, 2014 at 6:33 AM, Ivan Balashov  wrote:
>
>> Hi,
>>
>> Is it possible to read all available messages with HLC in a
>> non-blocking way? E.g. read all messages and not wait for more
>> messages to appear in the topic.
>>
>> As far as I understand, currently one has to keep high-level consumer
>> in a separate thread until it is shut down explicitly, but how can one
>> check if all available messages are in fact consumed?
>>
>> Thanks,
>>


Re: How to use kafka as flume source.

2014-09-19 Thread Gwen Shapira
Just to update (better late than never!):
The Kafka source & sink for Flume were updated to latest Kafka version
and improved a bit (offsets are now committed after data is written to
Flume channel).
If you build Flume from trunk, you'll get these.

Gwen

On Sun, Aug 3, 2014 at 10:31 AM, Andrew Ehrlich  wrote:
> The author may or may not have upgraded the project to be compatible with
> 0.8.1. You could message the author directly to ask.
>
>
> On Sun, Aug 3, 2014 at 6:53 AM, rafeeq s  wrote:
>
>> Thanks.
>> But above link example uses  flume-ng  to
>> communicate with *kafka 0.7,2 > >*.
>>
>> I am using *kafka 0.8.1*. Please advice an example of flume-ng with kafka
>> 0.8.1.
>>
>> Thanks in advance
>>
>> Regards,
>>
>> Rafeeq S
>> *(“What you do is what matters, not what you think or say or plan.” )*
>>
>>
>>
>> On Sun, Aug 3, 2014 at 4:40 PM, Sharninder  wrote:
>>
>> > https://github.com/baniuyao/flume-ng-kafka-source
>> >
>> >
>> >
>> > On Sun, Aug 3, 2014 at 6:15 PM, rafeeq s  wrote:
>> >
>> > > Hi,
>> > >
>> > > We are planning to use kafka as *flume source*. Please advice me, how
>> to
>> > > use kafka as source in flume.
>> > >
>> > > please share if there is any best example of *flume- kafka source- hdfs
>> > > sink*.
>> > >
>> > > Regards,
>> > >
>> > > Rafeeq S
>> > > *("What you do is what matters, not what you think or say or plan." )*
>> > >
>> >
>>


Re: Read a specific number of messages using kafka

2014-09-25 Thread Gwen Shapira
Using high level consumer and assuming you already created an iterator:

while (msgCount < maxMessages && it.hasNext()) {
 bytes = it.next().message();
 eventList.add(bytes);
}

(See a complete example here:
https://github.com/apache/flume/blob/trunk/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java)

Gwen

On Thu, Sep 25, 2014 at 9:15 AM, pankaj ojha  wrote:
> Hi,
>
> My requirement is to read a specific number of messages from kafka topic
> which contains data in json format and after reading number of messges, i
> need to write that in a file and then stop. How can I count number of
> messages read by my consumer code(either simpleconsumer or high level) ?
>
> Please help.
>
> --
> Thanks,
> Pankaj Ojha


Re: programmatically get number of items in topic/partition

2014-10-01 Thread Gwen Shapira
Take a look at ConsumerOffsetChecker. It does just that: print the
offset and lag for each consumer and partition.

You can either use that class directly, or use it as a guideline for
your implementation

On Wed, Oct 1, 2014 at 2:10 AM, Shlomi Hazan  wrote:
> Hi,
> How can I programmatically get the number of items in a topic, pending for
> consumption?
> If no programmatic way is avail, what other method is available?
> Shlomi


Re: Reassigning Partition Failing

2014-10-06 Thread Gwen Shapira
Do we have a jira to support removal of dead brokers without having to
start a new broker with the same id?

I think its something we'll want to allow.

On Thu, Oct 2, 2014 at 7:45 AM, Jun Rao  wrote:
> The reassign partition process only completes after the new replicas are
> fully caught up and the old replicas are deleted. So, if the old replica is
> down, the process can never complete, which is what you observed. In your
> case, if you just want to replace a broker host with a new one, instead of
> using the reassign partition tool, simply start a new broker with the same
> broker id as the old one, the new broker will replicate all the data
> automatically.
>
> Thanks,
>
> Jun
>
> On Wed, Oct 1, 2014 at 3:43 PM, Lung, Paul  wrote:
>
>> Hi All,
>>
>> I had a 0.8.1.1 Kafka Broker go down, and I was trying to use the reassign
>> partition script to move topics off that broker. When I describe the
>> topics, I see the following:
>>
>> Topic: mini__022active_120__33__mini Partition: 0 Leader: 2131118
>> Replicas: 2131118,2166601,2163421 Isr: 2131118,2166601
>>
>> This shows that the broker “2163421” is down. So I create the following
>> file /tmp/move_topic.json:
>> {
>> "version": 1,
>> "partitions": [
>> {
>> "topic": "mini__022active_120__33__mini",
>> "partition": 0,
>> "replicas": [
>> 2131118, 2166601,  2156998
>> ]
>> }
>> ]
>> }
>>
>> And then do this:
>>
>> ./kafka-reassign-partitions.sh --execute --reassignment-json-file
>> /tmp/move_topic.json
>> Successfully started reassignment of partitions
>> {"version":1,"partitions":[{"topic":"mini__022active_120__33__mini","partition":0,"replicas":[2131118,2166601,2156998]}]}
>>
>> However, when I try to verify this, I get the following error:
>> ./kafka-reassign-partitions.sh --verify --reassignment-json-file
>> /tmp/move_topic.json
>> Status of partition reassignment:
>> ERROR: Assigned replicas (2131118,2166601,2156998,2163421) don't match the
>> list of replicas for reassignment (2131118,2166601,2156998) for partition
>> [mini__022active_120__33__mini,0]
>> Reassignment of partition [mini__022active_120__33__mini,0] failed
>>
>> If I describe the topics, I now see there are 4 replicas. This has been
>> like this for many hours now, so it seems to have permanently moved to 4
>> replicas for some reason.
>> Topic:mini__022active_120__33__mini PartitionCount:1
>> ReplicationFactor:4 Configs:
>> Topic: mini__022active_120__33__mini Partition: 0 Leader: 2131118
>> Replicas: 2131118,2166601,2156998,2163421 Isr: 2131118,2166601
>>
>> If I re-execute and re-verify, I get the same error. So it seems to be
>> wedged.
>>
>> Can someone help?
>>
>> Paul Lung
>>
>>
>>


Re: Kafka AWS deployment + UI console

2014-10-07 Thread Gwen Shapira
I'm using Hue's ZooKeeper app: http://gethue.com/new-zookeeper-browser-app/

This UI looks very cute, but I didn't try it yet:
https://github.com/claudemamo/kafka-web-console

Gwen

On Tue, Oct 7, 2014 at 5:08 PM, Shafaq  wrote:
> We are going to deploy Kafka in Production and also monitor it via console.
> (e.g. State of partitions in Broker- leader and slaves, state of consumers )
>
> Is there out-of-the-box solution?
>
> What is the best and efficient way of deployment and monitoring
>
> Has someone tried this- looks promising
> http://www.michael-noll.com/blog/2014/03/17/wirbelsturm-one-click-deploy-storm-kafka-clusters-with-vagrant-puppet/
>
> --
> Kind Regards,
> Shafaq


Re: Producer connection timing out

2014-10-08 Thread Gwen Shapira
can you check that you can connect on port 9092 from producer to
broker? (check with telnet or something similar)
ping may succeed when a port is blocked.

On Wed, Oct 8, 2014 at 9:40 AM, ravi singh  wrote:
> Even though I am able to ping to the broker machine from my producer
> machine , the producer is throwing below expcetion while connecting to
> broker.
> I wanted to increase time out for producer but couldnt find any parameter
> for that in kafka 8.
> Any idea whats wrong here?
>
> [2014-10-08 09:29:47,762] ERROR Producer connection to 10.22.44.555:9092
> unsuccessful (kafka.producer.SyncProducer)
> java.net.ConnectException: Connection timed out
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:167)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:46)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2014-10-08 09:29:47,766] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(test_new)] from broker [id:0,host:10.22.33.444,port:9092]
> failed (kafka.client.ClientUtils$)
> java.net.ConnectException: Connection timed out
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:167)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:46)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> --
> *Regards,*
> *Ravi*


Re: Load Balancing Consumers or Multiple consumers reading off same topic

2014-10-08 Thread Gwen Shapira
If you use the high level consumer implementation, and register all
consumers as part of the same group - they will load-balance
automatically.

When you add a consumer to the group, if there are enough partitions
in the topic, some of the partitions will be assigned to the new
consumer.
When a consumer crashes, once its node in ZK times out, other
consumers will get its partitions.

Gwen

On Wed, Oct 8, 2014 at 10:39 AM, Sharninder  wrote:
> Hi,
>
> I'm not even sure if this is a valid use-case, but I really wanted to run
> it by you guys. How do I load balance my consumers? For example, if my
> consumer machine is under load, I'd like to spin up another VM with another
> consumer process to keep reading messages off any topic. On similar lines,
> how do you guys handle consumer failures? Suppose one consumer process gets
> an exception and crashes, is it possible for me to somehow make sure that
> there is another process that is still reading the queue for me?
>
> --
> Sharninder


Re: Load Balancing Consumers or Multiple consumers reading off same topic

2014-10-08 Thread Gwen Shapira
yep. exactly.

On Wed, Oct 8, 2014 at 11:07 AM, Sharninder  wrote:
> Thanks Gwen.
>
> When you're saying that I can add consumers to the same group, does that
> also hold true if those consumers are running on different machines? Or in
> different JVMs?
>
> --
> Sharninder
>
>
> On Wed, Oct 8, 2014 at 11:35 PM, Gwen Shapira  wrote:
>
>> If you use the high level consumer implementation, and register all
>> consumers as part of the same group - they will load-balance
>> automatically.
>>
>> When you add a consumer to the group, if there are enough partitions
>> in the topic, some of the partitions will be assigned to the new
>> consumer.
>> When a consumer crashes, once its node in ZK times out, other
>> consumers will get its partitions.
>>
>> Gwen
>>
>> On Wed, Oct 8, 2014 at 10:39 AM, Sharninder  wrote:
>> > Hi,
>> >
>> > I'm not even sure if this is a valid use-case, but I really wanted to run
>> > it by you guys. How do I load balance my consumers? For example, if my
>> > consumer machine is under load, I'd like to spin up another VM with
>> another
>> > consumer process to keep reading messages off any topic. On similar
>> lines,
>> > how do you guys handle consumer failures? Suppose one consumer process
>> gets
>> > an exception and crashes, is it possible for me to somehow make sure that
>> > there is another process that is still reading the queue for me?
>> >
>> > --
>> > Sharninder
>>


Re: Load Balancing Consumers or Multiple consumers reading off same topic

2014-10-08 Thread Gwen Shapira
Here's an example (from ConsumerOffsetChecker tool) of 1 topic (t1)
and 1 consumer group (flume), each of the 3 topic partitions is being
read by a different machine running the flume consumer:
Group   Topic  Pid Offset
logSize Lag Owner
flume   t1 0   50172068
100210042   50037974
flume_kafkacdh-1.ent.cloudera.com-1412722833783-3d6d80db-0
flume   t1 1   49914701
499147010
flume_kafkacdh-2.ent.cloudera.com-1412722838536-a6a4915d-0
flume   t1 2   54218841
8273338028514539
flume_kafkacdh-3.ent.cloudera.com-1412722832793-b23eaa63-0

If flume_kafkacdh-1 crashed, another broker will pick up the partition:
Group   Topic  Pid Offset
logSize Lag Owner
flume   t1 0   59669715
100210042   40540327
flume_kafkacdh-2.ent.cloudera.com-1412792880818-b4aa6feb-0
flume   t1 1   49914701
499147010
flume_kafkacdh-2.ent.cloudera.com-1412792880818-b4aa6feb-0
flume   t1 2   65796205
8273338016937175
flume_kafkacdh-3.ent.cloudera.com-1412792871089-cabd4934-0

Then I can start flume_kafkacdh-4 and see things rebalance again:
flume   t1 0   60669715
100210042   39540327
flume_kafkacdh-2.ent.cloudera.com-1412792880818-b4aa6feb-0
flume   t1 1   49914701
499147010
flume_kafkacdh-3.ent.cloudera.com-1412792871089-cabd4934-0
flume   t1 2   66829740
8273338015903640
flume_kafkacdh-4.ent.cloudera.com-1412793053882-9bfddff9-0

Isn't Kafka the best thing ever? :)

Gwen

On Wed, Oct 8, 2014 at 11:23 AM, Gwen Shapira  wrote:
> yep. exactly.
>
> On Wed, Oct 8, 2014 at 11:07 AM, Sharninder  wrote:
>> Thanks Gwen.
>>
>> When you're saying that I can add consumers to the same group, does that
>> also hold true if those consumers are running on different machines? Or in
>> different JVMs?
>>
>> --
>> Sharninder
>>
>>
>> On Wed, Oct 8, 2014 at 11:35 PM, Gwen Shapira  wrote:
>>
>>> If you use the high level consumer implementation, and register all
>>> consumers as part of the same group - they will load-balance
>>> automatically.
>>>
>>> When you add a consumer to the group, if there are enough partitions
>>> in the topic, some of the partitions will be assigned to the new
>>> consumer.
>>> When a consumer crashes, once its node in ZK times out, other
>>> consumers will get its partitions.
>>>
>>> Gwen
>>>
>>> On Wed, Oct 8, 2014 at 10:39 AM, Sharninder  wrote:
>>> > Hi,
>>> >
>>> > I'm not even sure if this is a valid use-case, but I really wanted to run
>>> > it by you guys. How do I load balance my consumers? For example, if my
>>> > consumer machine is under load, I'd like to spin up another VM with
>>> another
>>> > consumer process to keep reading messages off any topic. On similar
>>> lines,
>>> > how do you guys handle consumer failures? Suppose one consumer process
>>> gets
>>> > an exception and crashes, is it possible for me to somehow make sure that
>>> > there is another process that is still reading the queue for me?
>>> >
>>> > --
>>> > Sharninder
>>>


Re: Auto Purging Consumer Group Configuration [Especially Kafka Console Group]

2014-10-09 Thread Gwen Shapira
The problem with Kafka is that we never know when a consumer is
"truly" inactive.

But - if you decide to define inactive as consumer who's last offset
is lower than anything available on the log (or perhaps lagging by
over X messages?), its fairly easy to write a script to detect and
clean them directly on ZK.

BTW. Why do you need to clean them? What issue do you see with just
letting them hang around?

Gwen

On Thu, Oct 9, 2014 at 9:18 PM, Bhavesh Mistry
 wrote:
> Hi Kafka,
>
> We have lots of lingering console consumer group people have created for
> testing or debugging purpose for one time use via
> bin/kafka-console-consumer.sh.  Is there auto purging that clean script
> that Kafka provide ?  Is three any API to find out inactive Consumer group
> and delete consumer group configuration.
>
> Thanks,
>
> Bhavesh


Re: kafka java api (written in 100% clojure)

2014-10-13 Thread Gwen Shapira
Out of curiosity: did you choose Redis because ZooKeeper is not well
supported in Clojure? Or were there other reasons?

On Mon, Oct 13, 2014 at 2:04 PM, Gerrit Jansen van Vuuren
 wrote:
> Hi Steven,
>
> Redis:
>
>   I've had a discussion on redis today, and one architecture that does come
> up is using a master slave, then if the master fails the have the
> application start writing to the slave. Writing to a slave is possible in
> redis, albeit you cannot fail back to the master because writes to a slave
> will not be automatically replicated to the master.
>
>   Any suggestions are welcome.
>
> Java:
>
>   I like Java, have been with it a long time, did groovy, went back to
> Java, tried scala, went back to Java, then tried clojure and got sold on it
> (mainly because it fits my way of thinking). OtherJVMLang -> Java interop
> is always better than Java -> OtherJVMLang interop. Clojure interop to Java
> is really great, but Java to Clojure you need to use things like:
>   RT.var("kafka-clj.consumer.node", "read-msg!").invoke(connector,
> timeoutMillis))
>
>
>  I've refactored the Java API to be more "Java like" (plus made Consumer
> Iterable), and made a new release "2.3.9", see the updated examples, also
> have a look at
> https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/doc/vagrant.md
> .
>
> My idea for this library is that from Java/Groovy etc you do not need to
> know about Clojure behind the scenes (barring the stacktraces), you just
> get Java, and obviously if your using Clojure you just get Clojure ;).
>
> Cheers,
>  Gerrit
>
>
> On Mon, Oct 13, 2014 at 6:52 PM, Steven Schlansker <
> sschlans...@opentable.com> wrote:
>
>> Couple of mostly-uninformed comments inline,
>>
>>
>> On Oct 13, 2014, at 2:00 AM, Gerrit Jansen van Vuuren 
>> wrote:
>>
>> > Hi Daniel,
>> >
>> > At the moment redis is a spof in the architecture, but you can setup
>> > replication and I'm seriously looking into using redis cluster to
>> eliminate
>> > this.
>> >   Some docs that point to this are:
>> >   http://redis.io/topics/cluster-tutorial
>> >   http://redis.io/topics/sentinel
>>
>> There's some evidence that redis clusters are *not* good for managing state
>> in the way that you are using it:
>>
>> http://aphyr.com/posts/283-call-me-maybe-redis
>>
>> > If you can’t tolerate data loss, Redis Sentinel (and by extension Redis
>> Cluster) is not safe for use as:
>> >
>> >   • A lock service
>> >   • A queue
>> >   • A database
>>
>>
>> >>
>> >>> On 13/10/2014, at 10:22 am, Gerrit Jansen van Vuuren <
>> >> gerrit...@gmail.com> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> Just thought I'll put this out for the kafka community to see (if
>> anyone
>> >>> finds it useful great!!).
>> >>>
>> >>> Kafka-fast is 100% pure clojure implementation for kafka, but not just
>> >>> meant for clojure because it has a Java API wrapper that can be used
>> from
>> >>> Java, Groovy, JRuby or Scala.
>>
>> One thing that frustrates me with the Kafka library is that despite it
>> claiming
>> that the Scala code is interoperable with Java, it really isn't.  You end
>> up
>> having to work around the Scala compiler 'magic' in increasingly bizarre
>> ways,
>> e.g. default arguments:
>>
>> kafka = new KafkaServer(createConfig(), KafkaServer.init$default$2());
>>
>> which is both magical and fragile.  I don't know whether Clojure is the
>> same way,
>> just want to point out that if you don't take particular care of us old
>> fart Java
>> nuts, you'll lose us quickly :)
>>
>> Another example, from your docs:
>>
>> Object connector = Producer.createConnector(new BrokerConf("192.168.4.40",
>> 9092));
>> Producer.sendMsg(connector, "my-topic", "Hi".getBytes("UTF-8"));
>>
>> This is downright bizarre to me, I would instead expect:
>>
>> Producer connector = Producer.createConnector(...)
>> connector.sendMsg("my-topic", bytes)
>>
>> which is IMO shorter, cleaner, and easier for testing (especially mocking).
>>
>>
>> Hope some of my ravings are helpful,
>> Steven
>>
>>


Re: Achieving Consistency and Durability

2014-10-14 Thread Gwen Shapira
ack = 2 *will* throw an exception when there's only one node in ISR.

The problem with ack=2 is that if you have 3 replicas and you got acks
from 2 of them, the one replica which did not get the message can
still be in ISR and get elected as leader, leading for a loss of the
message. If you specify ack=3, you can't tolerate the failure of a
single replica. Not amazing either.

To makes things even worse, when specifying the number of acks you
want, you don't always know how many replicas the topic should have,
so its difficult to pick the correct number.

acks = -1 solves that problem (since all messages need to get acked by
all replicas), but introduces the new problem of not getting an
exception if ISR shrank to 1 replica.

Thats why the min.isr configuration was added.

I hope this clarifies things :)
I'm planning to add this to the docs in a day or two, so let me know
if there are any additional explanations or scenarios you think we
need to include.

Gwen

On Tue, Oct 14, 2014 at 12:27 PM, Scott Reynolds  wrote:
> A question about 0.8.1.1 and acks. I was under the impression that setting
> acks to 2 will not throw an exception when there is only one node in ISR.
> Am I incorrect ? Thus the need for min_isr.
>
> On Tue, Oct 14, 2014 at 11:50 AM, Kyle Banker  wrote:
>
>> It's quite difficult to infer from the docs the exact techniques required
>> to ensure consistency and durability in Kafka. I propose that we add a doc
>> section detailing these techniques. I would be happy to help with this.
>>
>> The basic question is this: assuming that I can afford to temporarily halt
>> production to Kafka, how do I ensure that no message written to Kafka is
>> ever lost under typical failure scenarios (i.e., the loss of a single
>> broker)?
>>
>> Here's my understanding of this for Kafka v0.8.1.1:
>>
>> 1. Create a topic with a replication factor of 3.
>> 2. Use a sync producer and set acks to 2. (Setting acks to -1 may
>> successfully write even in a case where the data is written only to a
>> single node).
>>
>> Even with these two precautions, there's always the possibility of an
>> "unclean leader election." Can data loss still occur in this scenario? Is
>> it possible to achieve this level of durability on v0.8.1.1?
>>
>> In Kafka v0.8.2, in addition to the above:
>>
>> 3. Ensure that the triple-replicated topic also disallows unclean leader
>> election (https://issues.apache.org/jira/browse/KAFKA-1028).
>>
>> 4. Set the min.isr value of the producer to 2 and acks to -1 (
>> https://issues.apache.org/jira/browse/KAFKA-1555). The producer will then
>> throw an exception if data can't be written to 2 out of 3 nodes.
>>
>> In addition to producer configuration and usage, there are also monitoring
>> and operations considerations for achieving durability and consistency. As
>> those are rather nuanced, it'd probably be easiest to just start iterating
>> on a document to flesh those out.
>>
>> If anyone has any advice on how to better specify this, or how to get
>> started on improving the docs, I'm happy to help out.
>>


Re: Consistency and Availability on Node Failures

2014-10-16 Thread Gwen Shapira
Just note that this is not  a universal solution. Many use-cases care
about which partition you end up writing to since partitions are used
to... well, partition logical entities such as customers and users.



On Wed, Oct 15, 2014 at 9:03 PM, Jun Rao  wrote:
> Kyle,
>
> What you wanted is not supported out of box. You can achieve this using the
> new java producer. The new java producer allows you to pick an arbitrary
> partition when sending a message. If you receive NotEnoughReplicasException
> when sending a message, you can resend it to another partition.
>
> Thanks,
>
> Jun
>
> On Tue, Oct 14, 2014 at 1:51 PM, Kyle Banker  wrote:
>
>> Consider a 12-node Kafka cluster with a 200-parition topic having a
>> replication factor of 3. Let's assume, in addition, that we're running
>> Kafka v0.8.2, we've disabled unclean leader election, acks is -1, and
>> min.isr is 2.
>>
>> Now suppose we lose 2 nodes. In this case, there's a good chance that 2/3
>> replicas of one or more partitions will be unavailable. This means that
>> messages assigned to those partitions will not be writable. If we're
>> writing a large number of messages, I would expect that all producers would
>> eventually halt. It is somewhat surprising that, if we rely on a basic
>> durability setting, the cluster would likely be unavailable even after
>> losing only 2 / 12 nodes.
>>
>> It might be useful in this scenario for the producer to be able to detect
>> which partitions are no longer available and reroute messages that would
>> have hashed to the unavailable partitions (as defined by our acks and
>> min.isr settings). This way, the cluster as a whole would remain available
>> for writes at the cost of a slightly higher load on the remaining machines.
>>
>> Is this limitation accurately described? Is the proposed producer
>> functionality worth pursuing?
>>


Re: Cross-Data-Center Mirroring, and Guaranteed Minimum Time Period on Data

2014-10-16 Thread Gwen Shapira
I assume the messages themselves contain the timestamp?

If you use Flume, you can configure a Kafka source to pull data from
Kafka, use an interceptor to pull the date out of your message and
place it in the event header and then the HDFS sink can write to a
partition based on the timestamp.

Gwen

On Wed, Oct 15, 2014 at 8:47 PM, Jun Rao  wrote:
> One way you can do that is to continually load data from Kafka to Hadoop.
> During load, you put data into different HDFS directories based on the
> timestamp. The Hadoop admin can decide when to open up those directories
> for read based on whether data from all data centers have arrived.
>
> Thanks,
>
> Jun
>
> On Tue, Oct 14, 2014 at 11:54 PM, Alex Melville  wrote:
>
>> Hi Apache Community,
>>
>>
>> My company has the following use case. We have multiple geographically
>> disparate data centers each with their own Kafka cluster, and we want to
>> aggregate all of these center's data to one central Kafka cluster located
>> in a data center distinct from the rest using MirrorMaker. Once in the
>> central cluster, most of this data will be fed into Hadoop for analytics
>> purposes. However, with how we have Hadoop working right now, it must wait
>> until it has received data from all of the other data centers for a
>> specific time period before it has the green light to load that data into
>> HDFS and process it. For example, say we have 3 remote (as in not central)
>> data centers, and DC1 has pushed to the central data center all of its data
>> up to 4:00 PM, DC2 has pushed everything up to 3:30 PM, and DC2 is lagging
>> behind and only pushed data up to the 2:00PM time period. Then Hadoop
>> processes all data tagged with modification times before 2:00PM, and it
>> must wait until DC3 catches up by pushing 2:15, 2:30, etc. data to the
>> central cluster before it can process the 3:00 PM data.
>>
>> So our question is: What is the best way to handle this time-period-ordered
>> requirement on our data using a distributed messaging log like Kafka? We
>> originally started using Kafka to move away from a batch-oriented backend
>> data pipeline transport system in favor of a more streaming-focused system,
>> but we still need to keep track of the latest common time period of data
>> streaming in from the remote clusters.
>>
>>
>> Cheers,
>>
>> Alex M.
>>


Re: Topic doesn't exist exception

2014-10-17 Thread Gwen Shapira
If you have "auto.create.topics.enable" set to "true" (default),
producing to a topic creates it.

Its a bit tricky because the "send" that creates the topic can fail
with "leader not found" or similar issue. retrying few times will
eventually succeed as the topic gets created and the leader gets
elected.

Is it possible that you are not getting errors because you are using
async producer?

Also "no messages are delivered" can have many causes. Check if the
topic exists using:
bin/kafka-topics.sh --list --zookeeper localhost:2181

Perhaps the topic was created and the issue is elsewhere (the consumer
is a usual suspect! perhaps look in the FAQ for tips with that issue)

Gwen

On Fri, Oct 17, 2014 at 12:56 PM, Mohit Anchlia  wrote:
> Is Kafka supposed to throw exception if topic doesn't exist? It appears
> that there is no exception thrown even though no messages are delivered and
> there are errors logged in Kafka logs.


Re: Topic doesn't exist exception

2014-10-17 Thread Gwen Shapira
0.8.1.1 producer is Sync by default, and you can set producer.type to
async if needed.

On Fri, Oct 17, 2014 at 2:57 PM, Mohit Anchlia  wrote:
> Thanks! How can I tell if I am using async producer? I thought all the
> sends are async in nature
> On Fri, Oct 17, 2014 at 11:44 AM, Gwen Shapira 
> wrote:
>
>> If you have "auto.create.topics.enable" set to "true" (default),
>> producing to a topic creates it.
>>
>> Its a bit tricky because the "send" that creates the topic can fail
>> with "leader not found" or similar issue. retrying few times will
>> eventually succeed as the topic gets created and the leader gets
>> elected.
>>
>> Is it possible that you are not getting errors because you are using
>> async producer?
>>
>> Also "no messages are delivered" can have many causes. Check if the
>> topic exists using:
>> bin/kafka-topics.sh --list --zookeeper localhost:2181
>>
>> Perhaps the topic was created and the issue is elsewhere (the consumer
>> is a usual suspect! perhaps look in the FAQ for tips with that issue)
>>
>> Gwen
>>
>> On Fri, Oct 17, 2014 at 12:56 PM, Mohit Anchlia 
>> wrote:
>> > Is Kafka supposed to throw exception if topic doesn't exist? It appears
>> > that there is no exception thrown even though no messages are delivered
>> and
>> > there are errors logged in Kafka logs.
>>


Re: read N items from topic

2014-10-17 Thread Gwen Shapira
btw. I got a blog post where I show how I work around the blocking
hasNext() thing.
May be helpful:
http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/

On Thu, Oct 16, 2014 at 12:52 PM, Neha Narkhede  wrote:
> Josh,
>
> The consumer's API doesn't allow you to specify N messages, but you can
> invoke iter.next() as Gwen suggested and count the messages. Note that the
> iterator can block if you have less than N messages so you will have to
> careful design around it. The new consumer's API provides a non blocking
> poll() API so this sort of use case is better handled. In any case, getting
> messages based on a count is something that has to happen on the consumer
> side since the server sends the bytes using the sendfile API that doesn't
> allow it to inspect the bytes.
>
> Thanks,
> Neha
>
> On Thu, Oct 16, 2014 at 8:37 AM,  wrote:
>
>> Using the high level consumer, each consumer in the group can call
>> iter.next () in a loop until they get the number of messages you need.
>>
>> —
>> Sent from Mailbox
>>
>> On Thu, Oct 16, 2014 at 10:18 AM, Josh J  wrote:
>>
>> > hi,
>> > How do I read N items from a topic? I also would like to do this for a
>> > consumer group, so that each consumer can specify an N number of tuples
>> to
>> > read, and each consumer reads distinct tuples.
>> > Thanks,
>> > Josh
>>


Re: Topic doesn't exist exception

2014-10-17 Thread Gwen Shapira
Sorry if I'm confusing you :)

Kafka 0.8.1.1 has two producers sync and async. You are using the sync
producer without waiting for acks. I hope this helps?

Regardless, did you check if the partition got created? are you able
to produce messages? are you able to consume them?

Gwen

On Fri, Oct 17, 2014 at 4:13 PM, Mohit Anchlia  wrote:
> Still don't understand the difference. If it's not waiting for the ack then
> doesn't it make async?
> On Fri, Oct 17, 2014 at 12:55 PM,  wrote:
>
>> Its using the sync producer without waiting for any broker to acknowledge
>> the write.  This explains the lack of errors you are seeing.
>>
>> —
>> Sent from Mailbox
>>
>> On Fri, Oct 17, 2014 at 3:15 PM, Mohit Anchlia 
>> wrote:
>>
>> > Little confused :) From one of the examples I am using property
>> > request.required.acks=0,
>> > I thought this sets the producer to be async?
>> > On Fri, Oct 17, 2014 at 11:59 AM, Gwen Shapira 
>> > wrote:
>> >> 0.8.1.1 producer is Sync by default, and you can set producer.type to
>> >> async if needed.
>> >>
>> >> On Fri, Oct 17, 2014 at 2:57 PM, Mohit Anchlia 
>> >> wrote:
>> >> > Thanks! How can I tell if I am using async producer? I thought all the
>> >> > sends are async in nature
>> >> > On Fri, Oct 17, 2014 at 11:44 AM, Gwen Shapira > >
>> >> > wrote:
>> >> >
>> >> >> If you have "auto.create.topics.enable" set to "true" (default),
>> >> >> producing to a topic creates it.
>> >> >>
>> >> >> Its a bit tricky because the "send" that creates the topic can fail
>> >> >> with "leader not found" or similar issue. retrying few times will
>> >> >> eventually succeed as the topic gets created and the leader gets
>> >> >> elected.
>> >> >>
>> >> >> Is it possible that you are not getting errors because you are using
>> >> >> async producer?
>> >> >>
>> >> >> Also "no messages are delivered" can have many causes. Check if the
>> >> >> topic exists using:
>> >> >> bin/kafka-topics.sh --list --zookeeper localhost:2181
>> >> >>
>> >> >> Perhaps the topic was created and the issue is elsewhere (the
>> consumer
>> >> >> is a usual suspect! perhaps look in the FAQ for tips with that issue)
>> >> >>
>> >> >> Gwen
>> >> >>
>> >> >> On Fri, Oct 17, 2014 at 12:56 PM, Mohit Anchlia <
>> mohitanch...@gmail.com
>> >> >
>> >> >> wrote:
>> >> >> > Is Kafka supposed to throw exception if topic doesn't exist? It
>> >> appears
>> >> >> > that there is no exception thrown even though no messages are
>> >> delivered
>> >> >> and
>> >> >> > there are errors logged in Kafka logs.
>> >> >>
>> >>
>>


Re: Topic doesn't exist exception

2014-10-17 Thread Gwen Shapira
If I understand correctly (and I'll be happy if someone who knows more
will jump in and correct me):

The Sync/Async part is not between the producer and the broker. Its
between you and the producer. The Sync producer takes your message and
immediately contacts the broker, sends the message, either wait for
acks or not and returns. The Async producer takes your message and
immediately returns. The producer will send the message to the broker
at some time later, batching multiple requests for efficiency and
throughput.

So yeah, I think you got it mostly right. Just note that the producer
doesn't wait on .send, the producer executes the send - either
returning immediately (if async) or when it managed to contact the
broker (if sync).

Gwen

On Fri, Oct 17, 2014 at 4:38 PM, Mohit Anchlia  wrote:
> My understanding of sync is that producer waits on .send until Kafka
> receives the message. And async means it just dispatches the message
> without any gurantees that message is delivered. Did I get that part right?
> On Fri, Oct 17, 2014 at 1:28 PM, Gwen Shapira  wrote:
>
>> Sorry if I'm confusing you :)
>>
>> Kafka 0.8.1.1 has two producers sync and async. You are using the sync
>> producer without waiting for acks. I hope this helps?
>>
>> Regardless, did you check if the partition got created? are you able
>> to produce messages? are you able to consume them?
>>
>> Gwen
>>
>> On Fri, Oct 17, 2014 at 4:13 PM, Mohit Anchlia 
>> wrote:
>> > Still don't understand the difference. If it's not waiting for the ack
>> then
>> > doesn't it make async?
>> > On Fri, Oct 17, 2014 at 12:55 PM,  wrote:
>> >
>> >> Its using the sync producer without waiting for any broker to
>> acknowledge
>> >> the write.  This explains the lack of errors you are seeing.
>> >>
>> >> —
>> >> Sent from Mailbox
>> >>
>> >> On Fri, Oct 17, 2014 at 3:15 PM, Mohit Anchlia 
>> >> wrote:
>> >>
>> >> > Little confused :) From one of the examples I am using property
>> >> > request.required.acks=0,
>> >> > I thought this sets the producer to be async?
>> >> > On Fri, Oct 17, 2014 at 11:59 AM, Gwen Shapira > >
>> >> > wrote:
>> >> >> 0.8.1.1 producer is Sync by default, and you can set producer.type to
>> >> >> async if needed.
>> >> >>
>> >> >> On Fri, Oct 17, 2014 at 2:57 PM, Mohit Anchlia <
>> mohitanch...@gmail.com>
>> >> >> wrote:
>> >> >> > Thanks! How can I tell if I am using async producer? I thought all
>> the
>> >> >> > sends are async in nature
>> >> >> > On Fri, Oct 17, 2014 at 11:44 AM, Gwen Shapira <
>> gshap...@cloudera.com
>> >> >
>> >> >> > wrote:
>> >> >> >
>> >> >> >> If you have "auto.create.topics.enable" set to "true" (default),
>> >> >> >> producing to a topic creates it.
>> >> >> >>
>> >> >> >> Its a bit tricky because the "send" that creates the topic can
>> fail
>> >> >> >> with "leader not found" or similar issue. retrying few times will
>> >> >> >> eventually succeed as the topic gets created and the leader gets
>> >> >> >> elected.
>> >> >> >>
>> >> >> >> Is it possible that you are not getting errors because you are
>> using
>> >> >> >> async producer?
>> >> >> >>
>> >> >> >> Also "no messages are delivered" can have many causes. Check if
>> the
>> >> >> >> topic exists using:
>> >> >> >> bin/kafka-topics.sh --list --zookeeper localhost:2181
>> >> >> >>
>> >> >> >> Perhaps the topic was created and the issue is elsewhere (the
>> >> consumer
>> >> >> >> is a usual suspect! perhaps look in the FAQ for tips with that
>> issue)
>> >> >> >>
>> >> >> >> Gwen
>> >> >> >>
>> >> >> >> On Fri, Oct 17, 2014 at 12:56 PM, Mohit Anchlia <
>> >> mohitanch...@gmail.com
>> >> >> >
>> >> >> >> wrote:
>> >> >> >> > Is Kafka supposed to throw exception if topic doesn't exist? It
>> >> >> appears
>> >> >> >> > that there is no exception thrown even though no messages are
>> >> >> delivered
>> >> >> >> and
>> >> >> >> > there are errors logged in Kafka logs.
>> >> >> >>
>> >> >>
>> >>
>>


Re: Topic doesn't exist exception

2014-10-17 Thread Gwen Shapira
There's some good advice here: https://kafka.apache.org/081/ops.html

And you may enjoy this post too:
http://blog.liveramp.com/2013/04/08/kafka-0-8-producer-performance-2/



On Fri, Oct 17, 2014 at 5:52 PM, Mohit Anchlia  wrote:
> Thanks for the info. I see there are tons of parameters but is there a
> place that lists some important performance specific parameters?
> On Fri, Oct 17, 2014 at 2:43 PM, Gwen Shapira  wrote:
>
>> If I understand correctly (and I'll be happy if someone who knows more
>> will jump in and correct me):
>>
>> The Sync/Async part is not between the producer and the broker. Its
>> between you and the producer. The Sync producer takes your message and
>> immediately contacts the broker, sends the message, either wait for
>> acks or not and returns. The Async producer takes your message and
>> immediately returns. The producer will send the message to the broker
>> at some time later, batching multiple requests for efficiency and
>> throughput.
>>
>> So yeah, I think you got it mostly right. Just note that the producer
>> doesn't wait on .send, the producer executes the send - either
>> returning immediately (if async) or when it managed to contact the
>> broker (if sync).
>>
>> Gwen
>>
>> On Fri, Oct 17, 2014 at 4:38 PM, Mohit Anchlia 
>> wrote:
>> > My understanding of sync is that producer waits on .send until Kafka
>> > receives the message. And async means it just dispatches the message
>> > without any gurantees that message is delivered. Did I get that part
>> right?
>> > On Fri, Oct 17, 2014 at 1:28 PM, Gwen Shapira 
>> wrote:
>> >
>> >> Sorry if I'm confusing you :)
>> >>
>> >> Kafka 0.8.1.1 has two producers sync and async. You are using the sync
>> >> producer without waiting for acks. I hope this helps?
>> >>
>> >> Regardless, did you check if the partition got created? are you able
>> >> to produce messages? are you able to consume them?
>> >>
>> >> Gwen
>> >>
>> >> On Fri, Oct 17, 2014 at 4:13 PM, Mohit Anchlia 
>> >> wrote:
>> >> > Still don't understand the difference. If it's not waiting for the ack
>> >> then
>> >> > doesn't it make async?
>> >> > On Fri, Oct 17, 2014 at 12:55 PM,  wrote:
>> >> >
>> >> >> Its using the sync producer without waiting for any broker to
>> >> acknowledge
>> >> >> the write.  This explains the lack of errors you are seeing.
>> >> >>
>> >> >> —
>> >> >> Sent from Mailbox
>> >> >>
>> >> >> On Fri, Oct 17, 2014 at 3:15 PM, Mohit Anchlia <
>> mohitanch...@gmail.com>
>> >> >> wrote:
>> >> >>
>> >> >> > Little confused :) From one of the examples I am using property
>> >> >> > request.required.acks=0,
>> >> >> > I thought this sets the producer to be async?
>> >> >> > On Fri, Oct 17, 2014 at 11:59 AM, Gwen Shapira <
>> gshap...@cloudera.com
>> >> >
>> >> >> > wrote:
>> >> >> >> 0.8.1.1 producer is Sync by default, and you can set
>> producer.type to
>> >> >> >> async if needed.
>> >> >> >>
>> >> >> >> On Fri, Oct 17, 2014 at 2:57 PM, Mohit Anchlia <
>> >> mohitanch...@gmail.com>
>> >> >> >> wrote:
>> >> >> >> > Thanks! How can I tell if I am using async producer? I thought
>> all
>> >> the
>> >> >> >> > sends are async in nature
>> >> >> >> > On Fri, Oct 17, 2014 at 11:44 AM, Gwen Shapira <
>> >> gshap...@cloudera.com
>> >> >> >
>> >> >> >> > wrote:
>> >> >> >> >
>> >> >> >> >> If you have "auto.create.topics.enable" set to "true"
>> (default),
>> >> >> >> >> producing to a topic creates it.
>> >> >> >> >>
>> >> >> >> >> Its a bit tricky because the "send" that creates the topic can
>> >> fail
>> >> >> >> >> with "leader not found" or similar issue. retrying few times
>> will
>> >> >> >> >> eventually succeed as the topic gets created and the leader
>> gets
>> >> >> >> >> elected.
>> >> >> >> >>
>> >> >> >> >> Is it possible that you are not getting errors because you are
>> >> using
>> >> >> >> >> async producer?
>> >> >> >> >>
>> >> >> >> >> Also "no messages are delivered" can have many causes. Check if
>> >> the
>> >> >> >> >> topic exists using:
>> >> >> >> >> bin/kafka-topics.sh --list --zookeeper localhost:2181
>> >> >> >> >>
>> >> >> >> >> Perhaps the topic was created and the issue is elsewhere (the
>> >> >> consumer
>> >> >> >> >> is a usual suspect! perhaps look in the FAQ for tips with that
>> >> issue)
>> >> >> >> >>
>> >> >> >> >> Gwen
>> >> >> >> >>
>> >> >> >> >> On Fri, Oct 17, 2014 at 12:56 PM, Mohit Anchlia <
>> >> >> mohitanch...@gmail.com
>> >> >> >> >
>> >> >> >> >> wrote:
>> >> >> >> >> > Is Kafka supposed to throw exception if topic doesn't exist?
>> It
>> >> >> >> appears
>> >> >> >> >> > that there is no exception thrown even though no messages are
>> >> >> >> delivered
>> >> >> >> >> and
>> >> >> >> >> > there are errors logged in Kafka logs.
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>>


Re: Achieving Consistency and Durability

2014-10-20 Thread Gwen Shapira
Hi Kyle,

I added new documentation, which will hopefully help. Please take a look here:
https://issues.apache.org/jira/browse/KAFKA-1555

I've heard rumors that you are very very good at documenting, so I'm
looking forward to your comments.

Note that I'm completely ignoring the acks>1 case since we are about
to remove it.

Gwen

On Wed, Oct 15, 2014 at 1:21 PM, Kyle Banker  wrote:
> Thanks very much for these clarifications, Gwen.
>
> I'd recommend modifying the following phrase describing "acks=-1":
>
> "This option provides the best durability, we guarantee that no messages
> will be lost as long as at least one in sync replica remains."
>
> The "as long as at least one in sync replica remains" is such a huge
> caveat. It should be noted that "acks=-1" provides no actual durability
> guarantees unless min.isr is also used to specify a majority of replicas.
>
> In addition, I was curious if you might comment on my other recent posting
> "Consistency and Availability on Node Failures" and possibly add this
> scenario to the docs. With acks=-1 and min.isr=2 and a 3-replica topic in a
> 12-node Kafka cluster, there's a relatively high probability that losing 2
> nodes from this cluster will result in an inability to write to the cluster.
>
> On Tue, Oct 14, 2014 at 4:50 PM, Gwen Shapira  wrote:
>
>> ack = 2 *will* throw an exception when there's only one node in ISR.
>>
>> The problem with ack=2 is that if you have 3 replicas and you got acks
>> from 2 of them, the one replica which did not get the message can
>> still be in ISR and get elected as leader, leading for a loss of the
>> message. If you specify ack=3, you can't tolerate the failure of a
>> single replica. Not amazing either.
>>
>> To makes things even worse, when specifying the number of acks you
>> want, you don't always know how many replicas the topic should have,
>> so its difficult to pick the correct number.
>>
>> acks = -1 solves that problem (since all messages need to get acked by
>> all replicas), but introduces the new problem of not getting an
>> exception if ISR shrank to 1 replica.
>>
>> Thats why the min.isr configuration was added.
>>
>> I hope this clarifies things :)
>> I'm planning to add this to the docs in a day or two, so let me know
>> if there are any additional explanations or scenarios you think we
>> need to include.
>>
>> Gwen
>>
>> On Tue, Oct 14, 2014 at 12:27 PM, Scott Reynolds 
>> wrote:
>> > A question about 0.8.1.1 and acks. I was under the impression that
>> setting
>> > acks to 2 will not throw an exception when there is only one node in ISR.
>> > Am I incorrect ? Thus the need for min_isr.
>> >
>> > On Tue, Oct 14, 2014 at 11:50 AM, Kyle Banker 
>> wrote:
>> >
>> >> It's quite difficult to infer from the docs the exact techniques
>> required
>> >> to ensure consistency and durability in Kafka. I propose that we add a
>> doc
>> >> section detailing these techniques. I would be happy to help with this.
>> >>
>> >> The basic question is this: assuming that I can afford to temporarily
>> halt
>> >> production to Kafka, how do I ensure that no message written to Kafka is
>> >> ever lost under typical failure scenarios (i.e., the loss of a single
>> >> broker)?
>> >>
>> >> Here's my understanding of this for Kafka v0.8.1.1:
>> >>
>> >> 1. Create a topic with a replication factor of 3.
>> >> 2. Use a sync producer and set acks to 2. (Setting acks to -1 may
>> >> successfully write even in a case where the data is written only to a
>> >> single node).
>> >>
>> >> Even with these two precautions, there's always the possibility of an
>> >> "unclean leader election." Can data loss still occur in this scenario?
>> Is
>> >> it possible to achieve this level of durability on v0.8.1.1?
>> >>
>> >> In Kafka v0.8.2, in addition to the above:
>> >>
>> >> 3. Ensure that the triple-replicated topic also disallows unclean leader
>> >> election (https://issues.apache.org/jira/browse/KAFKA-1028).
>> >>
>> >> 4. Set the min.isr value of the producer to 2 and acks to -1 (
>> >> https://issues.apache.org/jira/browse/KAFKA-1555). The producer will
>> then
>> >> throw an exception if data can't be written to 2 out of 3 nodes.
>> >>
>> >> In addition to producer configuration and usage, there are also
>> monitoring
>> >> and operations considerations for achieving durability and consistency.
>> As
>> >> those are rather nuanced, it'd probably be easiest to just start
>> iterating
>> >> on a document to flesh those out.
>> >>
>> >> If anyone has any advice on how to better specify this, or how to get
>> >> started on improving the docs, I'm happy to help out.
>> >>
>>


Re: frequent periods of ~1500 replicas not in sync

2014-10-21 Thread Gwen Shapira
Consumers always read from the leader replica, which is always in sync
by definition. So you are good there.
The concern would be if the leader crashes during this period.



On Tue, Oct 21, 2014 at 2:56 PM, Neil Harkins  wrote:
> Hi. I've got a 5 node cluster running Kafka 0.8.1,
> with 4697 partitions (2 replicas each) across 564 topics.
> I'm sending it about 1% of our total messaging load now,
> and several times a day there is a period where 1~1500
> partitions have one replica not in sync. Is this normal?
> If a consumer is reading from a replica that gets deemed
> "not in sync", does it get redirected to the good replica?
> Is there a #partitions over which maintenance tasks
> become infeasible?
>
> Relevant config bits:
> auto.leader.rebalance.enable=true
> leader.imbalance.per.broker.percentage=20
> leader.imbalance.check.interval.seconds=30
> replica.lag.time.max.ms=1
> replica.lag.max.messages=4000
> num.replica.fetchers=4
> replica.fetch.max.bytes=10485760
>
> Not necessarily correlated to those periods,
> I see a lot of these errors in the logs:
>
> [2014-10-20 21:23:26,999] 21963614 [ReplicaFetcherThread-3-1] ERROR
> kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-3-1], Error
> in fetch Name: FetchRequest; Version: 0; CorrelationId: 77423;
> ClientId: ReplicaFetcherThread-3-1; ReplicaId: 2; MaxWait: 500 ms;
> MinBytes: 1 bytes; RequestInfo: ...
>
> And a few of these:
>
> [2014-10-20 21:23:39,555] 3467527 [kafka-scheduler-2] ERROR
> kafka.utils.ZkUtils$  - Conditional update of path
> /brokers/topics/foo.bar/partitions/3/state with data
> {"controller_epoch":11,"leader":3,"version":1,"leader_epoch":109,"isr":[3]}
> and expected version 197 failed due to
> org.apache.zookeeper.KeeperException$BadVersionException:
> KeeperErrorCode = BadVersion for
> /brokers/topics/foo.bar/partitions/3/state
>
> And this one I assume is a client closing the connection non-gracefully,
> thus should probably be a warning, not an error?:
>
> [2014-10-20 21:54:15,599] 23812214 [kafka-processor-9092-3] ERROR
> kafka.network.Processor  - Closing socket for /10.31.0.224 because of
> error
>
> -neil


Re: Partition and Replica assignment for a Topic

2014-10-21 Thread Gwen Shapira
Anything missing in the output of:
kafka-topics.sh --describe --zookeeper localhost:2181
?

On Tue, Oct 21, 2014 at 4:29 PM, Jonathan Creasy
 wrote:
> I¹d like to be able to see a little more detail for a topic.
>
> What is the best way to get this information?
>
> Topic   Partition   Replica Broker
> topic1  1   1   3
> topic1  1   2   4
> topic1  1   3   1
> topic1  2   1   1
> topic1  2   2   3
> topic1  2   3   2
>
> I¹d like to be able to create topic allocations dashboards, similar to the
> index allocations dashboards in the Elasticsearch plugin Marvell.
>
> Basically, translating index -> topic, shard -> partition, replica ->
> replica, node -> broker.
>
> -Jonathan
>
>


Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Gwen Shapira
RAID-10?
Interesting choice for a system where the data is already replicated
between nodes. Is it to avoid the cost of large replication over the
network? how large are these disks?

On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino  wrote:
> In fact there are many more than 4000 open files. Many of our brokers run
> with 28,000+ open files (regular file handles, not network connections). In
> our case, we're beefing up the disk performance as much as we can by
> running in a RAID-10 configuration with 14 disks.
>
> -Todd
>
> On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She  wrote:
>
>> Todd,
>>
>> Actually I'm wondering how kafka handle so much partition, with one
>> partition there is at least one file on disk, and with 4000 partition,
>> there will be at least 4000 files.
>>
>> When all these partitions have write request, how did Kafka make the write
>> operation on the disk to be sequential (which is emphasized in the design
>> document of Kafka) and make sure the disk access is effective?
>>
>> Thank you for your reply.
>>
>> xiaobinshe
>>
>>
>>
>> 2014-10-22 5:10 GMT+08:00 Todd Palino :
>>
>> > As far as the number of partitions a single broker can handle, we've set
>> > our cap at 4000 partitions (including replicas). Above that we've seen
>> some
>> > performance and stability issues.
>> >
>> > -Todd
>> >
>> > On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She 
>> > wrote:
>> >
>> > > hello, everyone
>> > >
>> > > I'm new to kafka, I'm wondering what's the max num of partition can one
>> > > siggle machine handle in Kafka?
>> > >
>> > > Is there an sugeest num?
>> > >
>> > > Thanks.
>> > >
>> > > xiaobinshe
>> > >
>> >
>>


Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Gwen Shapira
Makes sense. Thanks :)

On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
 wrote:
> There are various costs when a broker fails, including broker leader election 
> for each partition, etc., as well as exposing possible issues for in-flight 
> messages, and client rebalancing etc.
>
> So even though replication provides partition redundancy, RAID 10 on each 
> broker is usually a good tradeoff to prevent the typical most common cause of 
> broker server failure (e.g. disk failure) as well, and overall smoother 
> operation.
>
> Best Regards,
>
> -Jonathan
>
>
> On Oct 22, 2014, at 11:01 AM, Gwen Shapira  wrote:
>
>> RAID-10?
>> Interesting choice for a system where the data is already replicated
>> between nodes. Is it to avoid the cost of large replication over the
>> network? how large are these disks?
>>
>> On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino  wrote:
>>> In fact there are many more than 4000 open files. Many of our brokers run
>>> with 28,000+ open files (regular file handles, not network connections). In
>>> our case, we're beefing up the disk performance as much as we can by
>>> running in a RAID-10 configuration with 14 disks.
>>>
>>> -Todd
>>>
>>> On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She  wrote:
>>>
>>>> Todd,
>>>>
>>>> Actually I'm wondering how kafka handle so much partition, with one
>>>> partition there is at least one file on disk, and with 4000 partition,
>>>> there will be at least 4000 files.
>>>>
>>>> When all these partitions have write request, how did Kafka make the write
>>>> operation on the disk to be sequential (which is emphasized in the design
>>>> document of Kafka) and make sure the disk access is effective?
>>>>
>>>> Thank you for your reply.
>>>>
>>>> xiaobinshe
>>>>
>>>>
>>>>
>>>> 2014-10-22 5:10 GMT+08:00 Todd Palino :
>>>>
>>>>> As far as the number of partitions a single broker can handle, we've set
>>>>> our cap at 4000 partitions (including replicas). Above that we've seen
>>>> some
>>>>> performance and stability issues.
>>>>>
>>>>> -Todd
>>>>>
>>>>> On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She 
>>>>> wrote:
>>>>>
>>>>>> hello, everyone
>>>>>>
>>>>>> I'm new to kafka, I'm wondering what's the max num of partition can one
>>>>>> siggle machine handle in Kafka?
>>>>>>
>>>>>> Is there an sugeest num?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> xiaobinshe
>>>>>>
>>>>>
>>>>
>


Re: Using Kafka for ETL from DW to Hadoop

2014-10-23 Thread Gwen Shapira
While I agree with Mark that testing the end-to-end pipeline is
critical, note that in terms of performance - whatever you write to
hook-up Teradata to Kafka is unlikely to be as fast as Teradata
connector for Sqoop (especially the newer one). Quite a lot of
optimization by Teradata engineers went into the connector.

Actually, unless you need very low latency (seconds to few minutes),
or consumers other than Hadoop, I'd go with Sqoop incremental jobs and
leave Kafka out of the equation completely. This will save you quite a
bit of work on connecting Teradata to Kafka, if it fits your user
case.

Gwen

On Thu, Oct 23, 2014 at 9:48 AM, Mark Roberts  wrote:
> If you use Kafka for the first bulk load, you will test your new
> Teradata->Kafka->Hive pipeline, as well as have the ability to blow away
> the data in Hive and reflow it from Kafka without an expensive full
> re-export from Teradata.  As for whether Kafka can handle hundreds of GB of
> data: Yes, absolutely.
>
> -Mark
>
>
> On Thu, Oct 23, 2014 at 3:08 AM, Po Cheung 
> wrote:
>
>> Hello,
>>
>> We are planning to set up a data pipeline and send periodic, incremental
>> updates from Teradata to Hadoop via Kafka.  For a large DW table with
>> hundreds of GB of data, is it okay (in terms of performance) to use Kafka
>> for the initial bulk data load?  Or will Sqoop with Teradata connector be
>> more appropriate?
>>
>>
>> Thanks,
>> Po


Re: How many partition can one single machine handle in Kafka?

2014-10-24 Thread Gwen Shapira
Todd,

Did you load-test using SSDs?
Got numbers to share?

On Fri, Oct 24, 2014 at 10:40 AM, Todd Palino  wrote:
> Hmm, I haven't read the design doc lately, but I'm surprised that there's
> even a discussion of sequential disk access. I suppose for small subsets of
> the writes you can write larger blocks of sequential data, but that's about
> the extent of it. Maybe one of the developers can speak more to that aspect.
>
> As far as the number of files goes, it really doesn't matter that much
> whether you have a few or a lot. Once you have more than one, the disk
> access is random, so the performance is more like a cliff than a gentle
> slope. As I said, we've found issues once we go above 4000 partitions, and
> that's probably a combination of what the software can handle and the
> number of open files.
>
> -Todd
>
>
> On Thu, Oct 23, 2014 at 11:19 PM, Xiaobin She  wrote:
>
>> Todd,
>>
>> Thank you very much for your reply. My understanding of RAID 10 is wrong.
>>
>> I understand that one can not get absolute sequential disk access even on
>> one single disk, the reason I'm interested with this question is that the
>> design document of Kafka emphasize that Kafka make advantage of the
>> sequential disk acceess to improve the disk performance, and I can' t
>> understand how to achive this with thounds of open files.
>>
>> I thought that compare to one or fewer files, thounds of open files will
>> make the disk access much more random, and make the disk performance much
>> more weak.
>>
>> You mentioned that to increase overall IO cpapcity, one will have to use
>> multiple spindles with sufficiently fast disk speed, but will it be more
>> effective for the disk with fewer files? Or does the num of files is not an
>> important factor for the entire performance of Kafka?
>>
>> Thanks again.
>>
>> xiaobinshe
>>
>>
>>
>> 2014-10-23 22:01 GMT+08:00 Todd Palino :
>>
>> > Your understanding of RAID 10 is slightly off. Because it is a
>> combination
>> > of striping and mirroring, trying to say that there are 4000 open files
>> per
>> > pair of disks is not accurate. The disk, as far as the system is
>> concerned,
>> > is the entire RAID. Files are striped across all mirrors, so any open
>> file
>> > will cross all 7 mirror sets.
>> >
>> > Even if you were to operate on a single disk, you're never going to be
>> able
>> > to ensure sequential disk access with Kafka. Even if you have a single
>> > partition on a disk, there will be multiple log files for that partition
>> > and you will have to seek to read older data. What you have to do is use
>> > multiple spindles, with sufficiently fast disk speeds, to increase your
>> > overall IO capacity. You can also tune to get a little more. For example,
>> > we use a 120 second commit on that mount point to reduce the frequency of
>> > flushing to disk.
>> >
>> > -Todd
>> >
>> >
>> > On Wed, Oct 22, 2014 at 10:09 PM, Xiaobin She 
>> > wrote:
>> >
>> > > Todd,
>> > >
>> > > Thank you for the information.
>> > >
>> > > With 28,000+ files and 14 disks, that makes there are averagely about
>> > 4000
>> > > open files on two disk ( which is treated as one single disk) , am I
>> > right?
>> > >
>> > > How do you manage to make the all the write operation to thest 4000
>> open
>> > > files be sequential to the disk?
>> > >
>> > > As far as I know, write operation to different files on the same disk
>> > will
>> > > cause random write, which is not good for performance.
>> > >
>> > > xiaobinshe
>> > >
>> > >
>> > >
>> > >
>> > > 2014-10-23 1:00 GMT+08:00 Todd Palino :
>> > >
>> > > > In fact there are many more than 4000 open files. Many of our brokers
>> > run
>> > > > with 28,000+ open files (regular file handles, not network
>> > connections).
>> > > In
>> > > > our case, we're beefing up the disk performance as much as we can by
>> > > > running in a RAID-10 configuration with 14 disks.
>> > > >
>> > > > -Todd
>> > > >
>> > > > On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She 
>> > > wrote:
>> > > >
>> > > > > Todd,
>> > > > >
>> > > > > Actually I'm wondering how kafka handle so much partition, with one
>> > > > > partition there is at least one file on disk, and with 4000
>> > partition,
>> > > > > there will be at least 4000 files.
>> > > > >
>> > > > > When all these partitions have write request, how did Kafka make
>> the
>> > > > write
>> > > > > operation on the disk to be sequential (which is emphasized in the
>> > > design
>> > > > > document of Kafka) and make sure the disk access is effective?
>> > > > >
>> > > > > Thank you for your reply.
>> > > > >
>> > > > > xiaobinshe
>> > > > >
>> > > > >
>> > > > >
>> > > > > 2014-10-22 5:10 GMT+08:00 Todd Palino :
>> > > > >
>> > > > > > As far as the number of partitions a single broker can handle,
>> > we've
>> > > > set
>> > > > > > our cap at 4000 partitions (including replicas). Above that we've
>> > > seen
>> > > > > some
>> > > > > > performance and stability issues.
>> > > > > >
>> > > > > > -Todd
>> > > > > >
>> 

Re: broker down,the cluster can't work normal

2014-10-28 Thread Gwen Shapira
note that --zookeeper is the location of the zookeeper server, not Kafka broker.

Are you running zookeeper on both 192.168.100.91 and 192.168.100.92?

Zookeeper is based on simple majority, therefore you can't run it with
2 nodes (well you can, but it will freeze if you lose one node), you
need either 1 or 3. In your case it makes sense to have two Kafka
brokers and one Zookeeper node (which you already have on
192.168.100.91).

Gwen

On Tue, Oct 28, 2014 at 2:24 AM, 天天向上  wrote:
> |   |   |
> |
> |   |
>
>
> hello,
> I have two kafka_server,IP address is 192.168.100.91   192.168.100.92
> broker 0:192.168.100.91broker 1 :192.168.100.92
>   At broker 0:
> bin/kafka-topics.sh --create --zookeeper 192.168.100.91:2181 --topic qaz 
> --partition 3 --replication-factor 2
> bin/kafka-topics.sh --describe --zookeeper 192.168.100.91:2181 --topic qaz
>   Topic:qazPartitionCount:3ReplicationFactor:2Configs:
> Topic: qazPartition: 0Leader: 1Replicas: 1,0Isr: 
> 1,0
> Topic: qazPartition: 1Leader: 0Replicas: 0,1Isr: 
> 0,1
> Topic: qazPartition: 2Leader: 1Replicas: 1,0Isr: 
> 1,0
>
>
> I can execure this :
>  bin/kafka-console-consumer.sh --zookeeper 192.168.100.91:2181 
> --topic qaz --from-beginning
> But i can't execure the next ,No matter whether the broker 0  is normal:
>  bin/kafka-console-consumer.sh --zookeeper 192.168.100.92:2181 
> --topic qaz --from-beginning
>  prompt:kafka no brokers found。
>  at this time ,the broker 1 is normal。
>
>
> Thanks!!!
>
>
>
>
>
>
>
>
>
>
>
> |   |
> |
> |   |   |


Re: High Level Consumer and Close with Auto Commit On

2014-10-28 Thread Gwen Shapira
High level consumer commits before shutting down.

If you'll look at ZookeeperConsumerConnector.scala (currently the only
implementation of ConsumerConnector) you'll see shutdown() includes
the following:

  if (config.autoCommitEnable)
commitOffsets()

Gwen

On Tue, Oct 28, 2014 at 11:44 AM, Bhavesh Mistry
 wrote:
> Hi Kafka Team,
>
> What is expected behavior when you close *ConsumerConnector* and auto
> commit is on ?  Basically, when auto commit interval is set to 5 seconds
> and shutdown is called (before 5 seconds elapses) does ConsumerConnector
> commit the offset of message consumed by (next()) method or consumer will
> get duplicate messages when it comes online after restart ?
>
> ConsumerConnector.shutdown();
>
> Thanks,
>
> Bhavesh


Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-10-31 Thread Gwen Shapira
The producer configuration should list the kafka brokers, not the zookeeper
quorum.
See here: http://kafka.apache.org/documentation.html#producerconfigs

(and send my regards to Alex Gorbachev ;)

Gwen



On Fri, Oct 31, 2014 at 8:05 PM, Tomas Nunez  wrote:

> Hi
>
> I'm trying to upgrade a 0.7 kafka cluster, but I'm getting an error:
>
> I created the file migrationToolConsumer.properties with just theinfo to
> connect to the old cluster:
> _
> zookeeper.connect=zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
> group.id=migration.prod
> __
>
>
> Then the producer file
> ___
>
> zookeeper.connect=zookeeper01_new:2181,zookeeper02_new:2181,zookeeper03_new:2181
> group.id=migration.prod
> ___
>
> And then I called the migrationtool, and get this error
>
> __
> $ kafka/bin/kafka-run-class.sh kafka.tools.KafkaMigrationTool
> --kafka.07.jar migration-tool/kafka-0.7.2.jar --zkclient.01.jar
> migration-tool/zkclient-0.1.jar --num.producers 4
> --consumer.config=kafka/config/migrationToolConsumer.properties
> -producer.config=kafka/config/migrationToolProducer.properties
> --whitelist=.*
> Kafka migration tool failed due to:
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:200)
> Caused by: java.lang.NoClassDefFoundError: scala/Tuple2$mcLL$sp
> at kafka.utils.ZKConfig.(ZkUtils.scala:302)
> at kafka.consumer.ConsumerConfig.(ConsumerConfig.scala:44)
> ... 5 more
> Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp
> at java.lang.ClassLoader.findClass(ClassLoader.java:531)
> at
>
> kafka.tools.KafkaMigrationTool$ParentLastURLClassLoader$FindClassClassLoader.findClass(KafkaMigrationTool.java:440)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at
>
> kafka.tools.KafkaMigrationTool$ParentLastURLClassLoader$ChildURLClassLoader.findClass(KafkaMigrationTool.java:463)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 7 more
>
> [2014-11-01 02:51:35,124] ERROR Kafka migration tool failed:
> (kafka.tools.KafkaMigrationTool)
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:200)
> Caused by: java.lang.NoClassDefFoundError: scala/Tuple2$mcLL$sp
> at kafka.utils.ZKConfig.(ZkUtils.scala:302)
> at kafka.consumer.ConsumerConfig.(ConsumerConfig.scala:44)
> ... 5 more
> Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp
> at java.lang.ClassLoader.findClass(ClassLoader.java:531)
> at
>
> kafka.tools.KafkaMigrationTool$ParentLastURLClassLoader$FindClassClassLoader.findClass(KafkaMigrationTool.java:440)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at
>
> kafka.tools.KafkaMigrationTool$ParentLastURLClassLoader$ChildURLClassLoader.findClass(KafkaMigrationTool.java:463)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 7 more
> __
>
>
> The files kafka-0.7.2.jar and  zkclient-0.1.jar are the exact same ones
> that are being used in the old 0.7 cluster.
>
> I also tried downloading and building a new 0.7.2 jar file with scala, but
> it had the same error, too.
>
> What can I be doing wrong? Is there any way to get more output to see where
> is the error?
>
> Thanks in advance,
>
> --
> Tomàs Núñez
> Enterprise Infrastructure Consultant
> The Pythian Group - Love your data!
>
> Office (international):  +1 613 565 8696 x1501
>
> --
>
>
> --
>
>
>
>


Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-10-31 Thread Gwen Shapira
This is part of Scala, so it should be in the scala-library-...jar


On Fri, Oct 31, 2014 at 8:26 PM, Tomas Nunez  wrote:

> Well... I used strace and I found it was looking for some classes in a
> wrong path. I fixed most of them, but there's one that isn't anywhere,
> neither the new nor the old kafka servers:
>
> $ strace -o logfile -s 1000 -f kafka/bin/kafka-run-class.sh
> kafka.tools.KafkaMigrationTool --kafka.07.jar
> migration-tool/kafka-0.7.2.jar --zkclient.01.jar
> migration-tool/zkclient-0.1.jar
> --consumer.config=kafka/config/migrationToolConsumer.properties
> -producer.config=kafka/config/migrationToolProducer.properties
> --whitelist=.*
> Exception in thread "main" java.lang.NoClassDefFoundError:
> scala/Tuple2$mcLL$sp
> at kafka.utils.Utils.stackTrace(Utils.scala)
> at kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:268)
> Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 2 more
>
> $ grep ENOENT logfile|tail -1
> 16484
>
> stat("migration-tool/kafka07/core/target/scala_2.8.0/classes/scala/Tuple2$mcLL$sp.class",
> 0x7f59b27f4950) = -1 ENOENT (No such file or directory)
>
> I saw someone had a similar problem in this list, but it's not clear (at
> least to me) how it was fixed in the end
>
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201302.mbox/%3CCANEBM33rMTZ2UGp2dpaFj8xV=n4=rr9gp3six0sko++nhqv...@mail.gmail.com%3E
>
> The JIRA ticket is still untouched
> https://issues.apache.org/jira/browse/KAFKA-760
>
> May different versions of scala be the cause of this error? I just used
> "./sbt update" to make sure...
>
> If not, where should I find that class file?
>
> Thanks again!
>
> On Sat, Nov 1, 2014 at 4:05 AM, Tomas Nunez  wrote:
>
> > Hi
> >
> > I'm trying to upgrade a 0.7 kafka cluster, but I'm getting an error:
> >
> > I created the file migrationToolConsumer.properties with just theinfo to
> > connect to the old cluster:
> > _
> > zookeeper.connect=zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
> > group.id=migration.prod
> > __
> >
> >
> > Then the producer file
> > ___
> >
> >
> zookeeper.connect=zookeeper01_new:2181,zookeeper02_new:2181,zookeeper03_new:2181
> > group.id=migration.prod
> > ___
> >
> > And then I called the migrationtool, and get this error
> >
> > __
> > $ kafka/bin/kafka-run-class.sh kafka.tools.KafkaMigrationTool
> > --kafka.07.jar migration-tool/kafka-0.7.2.jar --zkclient.01.jar
> > migration-tool/zkclient-0.1.jar --num.producers 4
> > --consumer.config=kafka/config/migrationToolConsumer.properties
> > -producer.config=kafka/config/migrationToolProducer.properties
> > --whitelist=.*
> > Kafka migration tool failed due to:
> > java.lang.reflect.InvocationTargetException
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > Method)
> > at
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> > at
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > at
> java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> > at
> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:200)
> > Caused by: java.lang.NoClassDefFoundError: scala/Tuple2$mcLL$sp
> > at kafka.utils.ZKConfig.(ZkUtils.scala:302)
> > at kafka.consumer.ConsumerConfig.(ConsumerConfig.scala:44)
> > ... 5 more
> > Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp
> > at java.lang.ClassLoader.findClass(ClassLoader.java:531)
> > at
> >
> kafka.tools.KafkaMigrationTool$ParentLastURLClassLoader$FindClassClassLoader.findClass(KafkaMigrationTool.java:440)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > at
> >
> kafka.tools.KafkaMigrationTool$ParentLastURLClassLoader$ChildURLClassLoader.findClass(KafkaMigrationTool.java:463)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > ... 7 more
> >
> > [2014-11-01 02:51:35,124] ERROR Kafka migration tool failed:
> > (kafka.tools.KafkaMigrationTool)
> > java.lang.reflect.InvocationTargetException
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > Method)
> > at
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAc

Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-10-31 Thread Gwen Shapira
The file should be your Scala jar.
You should have it in your Kafka lib directory.
Do a "jar -t" to make sure it actually contains the Tuple class.

Gwen

On Fri, Oct 31, 2014 at 8:49 PM, Tomas Nunez  wrote:

> Thanks for pointing the error. I fixed it, but I'm still getting the same
> error. The content of migrationToolProducer.properties is now:
> _
>
> metadata.broker.list=kafka_new01:9092,kafka_new02:9092,kafka_new03:9092,kafka_new04:9092,kafka_new05:9092,
> group.id=migration.prod
> _
>
> And the error still is:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> scala/Tuple2$mcLL$sp
> at kafka.utils.Utils.stackTrace(Utils.scala)
> at kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:268)
> Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 2 more
>
> I have no "Tuple2*" file anywhere in the server. I'm following
> https://cwiki.apache.org/confluence/display/KAFKA/Migrating+from+0.7+to+0.8
> and I can't see there anything about downloading classes, and I don't find
> much people with the same problem, which leads me to think that I'm doing
> something wrong...
>
> Any idea on what to try?
>
> Thanks!
>
> PD: Of course, I'll tell Alex you say hi :)
>
> On Sat, Nov 1, 2014 at 4:32 AM, Gwen Shapira 
> wrote:
>
> > This is part of Scala, so it should be in the scala-library-...jar
> >
> >
> > On Fri, Oct 31, 2014 at 8:26 PM, Tomas Nunez  wrote:
> >
> > > Well... I used strace and I found it was looking for some classes in a
> > > wrong path. I fixed most of them, but there's one that isn't anywhere,
> > > neither the new nor the old kafka servers:
> > >
> > > $ strace -o logfile -s 1000 -f kafka/bin/kafka-run-class.sh
> > > kafka.tools.KafkaMigrationTool --kafka.07.jar
> > > migration-tool/kafka-0.7.2.jar --zkclient.01.jar
> > > migration-tool/zkclient-0.1.jar
> > > --consumer.config=kafka/config/migrationToolConsumer.properties
> > > -producer.config=kafka/config/migrationToolProducer.properties
> > > --whitelist=.*
> > > Exception in thread "main" java.lang.NoClassDefFoundError:
> > > scala/Tuple2$mcLL$sp
> > > at kafka.utils.Utils.stackTrace(Utils.scala)
> > > at
> > kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:268)
> > > Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp
> > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > ... 2 more
> > >
> > > $ grep ENOENT logfile|tail -1
> > > 16484
> > >
> > >
> >
> stat("migration-tool/kafka07/core/target/scala_2.8.0/classes/scala/Tuple2$mcLL$sp.class",
> > > 0x7f59b27f4950) = -1 ENOENT (No such file or directory)
> > >
> > > I saw someone had a similar problem in this list, but it's not clear
> (at
> > > least to me) how it was fixed in the end
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201302.mbox/%3CCANEBM33rMTZ2UGp2dpaFj8xV=n4=rr9gp3six0sko++nhqv...@mail.gmail.com%3E
> > >
> > > The JIRA ticket is still untouched
> > > https://issues.apache.org/jira/browse/KAFKA-760
> > >
> > > May different versions of scala be the cause of this error? I just used
> > > "./sbt update" to make sure...
> > >
> > > If not, where should I find that class file?
> > >
> > > Thanks again!
> > >
> > > On Sat, Nov 1, 2014 at 4:05 AM, Tomas Nunez  wrote:
> > >
> > 

Re: Spark Kafka Performance

2014-11-03 Thread Gwen Shapira
Not sure about the throughput, but:

"I mean that the words counted in spark should grow up" - The spark
word-count example doesn't accumulate.
It gets an RDD every n seconds and counts the words in that RDD. So we
don't expect the count to go up.



On Mon, Nov 3, 2014 at 6:57 AM, Eduardo Costa Alfaia  wrote:

> Hi Guys,
> Anyone could explain me how to work Kafka with Spark, I am using the
> JavaKafkaWordCount.java like a test and the line command is:
>
> ./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount
> spark://192.168.0.13:7077 computer49:2181 test-consumer-group unibs.it 3
>
> and like a producer I am using this command:
>
> rdkafka_cachesender -t unibs.nec -p 1 -b 192.168.0.46:9092 -f output.txt
> -l 100 -n 10
>
>
> rdkafka_cachesender is a program that was developed by me which send to
> kafka the output.txt’s content where -l is the length of each send(upper
> bound) and -n is the lines to send in a row. Bellow is the throughput
> calculated by the program:
>
> File is 2235755 bytes
> throughput (b/s) = 699751388
> throughput (b/s) = 723542382
> throughput (b/s) = 662989745
> throughput (b/s) = 505028200
> throughput (b/s) = 471263416
> throughput (b/s) = 446837266
> throughput (b/s) = 409856716
> throughput (b/s) = 373994467
> throughput (b/s) = 366343097
> throughput (b/s) = 373240017
> throughput (b/s) = 386139016
> throughput (b/s) = 373802209
> throughput (b/s) = 369308515
> throughput (b/s) = 366935820
> throughput (b/s) = 365175388
> throughput (b/s) = 362175419
> throughput (b/s) = 358356633
> throughput (b/s) = 357219124
> throughput (b/s) = 352174125
> throughput (b/s) = 348313093
> throughput (b/s) = 355099099
> throughput (b/s) = 348069777
> throughput (b/s) = 348478302
> throughput (b/s) = 340404276
> throughput (b/s) = 339876031
> throughput (b/s) = 339175102
> throughput (b/s) = 327555252
> throughput (b/s) = 324272374
> throughput (b/s) = 322479222
> throughput (b/s) = 319544906
> throughput (b/s) = 317201853
> throughput (b/s) = 317351399
> throughput (b/s) = 315027978
> throughput (b/s) = 313831014
> throughput (b/s) = 310050384
> throughput (b/s) = 307654601
> throughput (b/s) = 305707061
> throughput (b/s) = 307961102
> throughput (b/s) = 296898200
> throughput (b/s) = 296409904
> throughput (b/s) = 294609332
> throughput (b/s) = 293397843
> throughput (b/s) = 293194876
> throughput (b/s) = 291724886
> throughput (b/s) = 290031314
> throughput (b/s) = 289747022
> throughput (b/s) = 289299632
>
> The throughput goes down after some seconds and it does not maintain the
> performance like the initial values:
>
> throughput (b/s) = 699751388
> throughput (b/s) = 723542382
> throughput (b/s) = 662989745
>
> Another question is about spark, after I have started the spark line
> command after 15 sec spark continue to repeat the words counted, but my
> program continue to send words to kafka, so I mean that the words counted
> in spark should grow up. I have attached the log from spark.
>
> My Case is:
>
> ComputerA(Kafka_cachsesender) -> ComputerB(Kakfa-Brokers-Zookeeper) ->
> ComputerC (Spark)
>
> If I don’t explain very well send a reply to me.
>
> Thanks Guys
> --
> Informativa sulla Privacy: http://www.unibs.it/node/8155
>


Re: Dynamically adding Kafka brokers

2014-11-03 Thread Gwen Shapira
+1
Thats what we use to generate broker id in automatic deployments.
This method makes troubleshooting easier (you know where each broker is
running), and doesn't require keeping extra files around.

On Mon, Nov 3, 2014 at 2:17 PM, Joe Stein  wrote:

> Most folks strip the IP and use that as the broker.id. KAFKA-1070 does not
> yet accommodate for that very widely used method. I think it would be bad
> if KAFKA-1070 only worked for new installations because that is how people
> use Kafka today (per
>
> https://issues.apache.org/jira/browse/KAFKA-1070?focusedCommentId=14085808&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14085808
> )
>
> On Mon, Nov 3, 2014 at 2:12 PM, Joel Koshy  wrote:
>
> > KAFKA-1070 will help with this and is pending a review.
> >
> > On Mon, Nov 03, 2014 at 05:03:20PM -0500, Otis Gospodnetic wrote:
> > > Hi,
> > >
> > > How do people handle situations, and specifically the broker.id
> > property,
> > > where the Kafka (broker) cluster is not fully defined right away?
> > >
> > > Here's the use case we have at Sematext:
> > > * Our software ships as a VM
> > > * All components run in this single VM, including 1 Kafka broker
> > > * Of course, this is just for a nice OOTB experience, but to scale one
> > > needs to have more instances of this VM, including more Kafka brokers
> > > * *One can clone our VM and launch N instances of it, but because we
> > have a
> > > single Kafka broker config with a single broker.id 
> in
> > > it, one can't just launch more of these VMs and expect to see more
> Kafka
> > > brokers join the cluster.  One would have to change the broker.id
> > >  on each new VM instance.*
> > >
> > > How do others handle this in a software that is packages and ships to
> > user
> > > and is not in your direct control to allow you to edit configs?
> > >
> > > Would it be best to have a script that connect to ZooKeeper to get the
> > list
> > > of all existing brokers and their IDs and then generate a new distinct
> > ID +
> > > config for the new Kafka broker?
> > >
> > > Or are there slicker ways to do this that people use?
> > >
> > > Thanks,
> > > Otis
> > > --
> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
>


Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-11-05 Thread Gwen Shapira
or directory)
> 13342 stat("/var/migration/kafka/utils/Log4jController.class",
> 0x7fc92913d7c0) = -1 ENOENT (No such file or directory)
> 13342 stat("/var/migration/kafka/utils/Log4jControllerMBean.class",
> 0x7fc92913b4e0) = -1 ENOENT (No such file or directory)
> 13342
> stat("/var/migration/org/apache/log4j/spi/ThrowableInformation.class",
> 0x7fc92913e860) = -1 ENOENT (No such file or directory)
>
> and after I cd to kafka-0.8.1.1-src/core/build/classes/main/, a lot of
> those ENOENT disappear, but I still get the same "Null Pointer error".
> Grepping "ENOENT" I see:
>
> 13257
>
> stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/org/apache/zookeeper/Environment.class",
> 0x7fee656ae750) = -1 ENOENT (No such file or directory)
> 13257
>
> stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/org/apache/zookeeper/Version.class",
> 0x7fee656ae680) = -1 ENOENT (No such file or directory)
> 13257
>
> stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/org/apache/zookeeper/version/Info.class",
> 0x7fee656ac3a0) = -1 ENOENT (No such file or directory)
> 13257
>
> stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/org/apache/zookeeper/Environment$Entry.class",
> 0x7fee656ae8e0) = -1 ENOENT (No such file or directory)
> 13257
>
> stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/org/apache/zookeeper/ZooKeeper$ZKWatchManager.class",
> 0x7fee656aede0) = -1 ENOENT (No such file or directory)
> 13257
>
> stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/org/apache/zookeeper/ClientCnxn.class",
> 0x7fee656aede0) = -1 ENOENT (No such file or directory)
> 13257
>
> stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/org/apache/zookeeper/proto/RequestHeader.class",
> 0x7fee656ae5f0) = -1 ENOENT (No such file or directory)
> 13257
>
> stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/org/apache/zookeeper/ClientCnxn$1.class",
> 0x7fee656ae9a0) = -1 ENOENT (No such file or directory)
> 13257
>
> stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/META-INF/services/java.nio.channels.spi.SelectorProvider",
> 0x7fee656aed20) = -1 ENOENT (No such file or directory)
> 13257
>
> stat("/var/migration/migration-tool/kafka-0.8.1.1-src/core/build/classes/main/org/apache/log4j/spi/ThrowableInformation.class",
> 0x7fee656af860) = -1 ENOENT (No such file or directory)
>
> I found, for instance ClientCnxn$1.class in zookeeper-3.3.4.jar
>
> $ jar tf ./core/build/dependant-libs-2.8.0/zookeeper-3.3.4.jar|grep Client
> org/apache/zookeeper/ClientCnxn$1.class
> org/apache/zookeeper/ClientCnxn$2.class
> org/apache/zookeeper/ClientCnxn$AuthData.class
> org/apache/zookeeper/ClientCnxn$EndOfStreamException.class
> org/apache/zookeeper/ClientCnxn$EventThread.class
> org/apache/zookeeper/ClientCnxn$Packet.class
> org/apache/zookeeper/ClientCnxn$SendThread.class
> org/apache/zookeeper/ClientCnxn$SessionExpiredException.class
> org/apache/zookeeper/ClientCnxn$SessionTimeoutException.class
> org/apache/zookeeper/ClientCnxn$WatcherSetEventPair.class
> org/apache/zookeeper/ClientCnxn.class
> org/apache/zookeeper/ClientWatchManager.class
> org/apache/zookeeper/ServerAdminClient.class
>
> And that jarfile is on the classpath, and I see on strace that it's finding
> it... Why is it not finding the class, then?
>
> I'm a bit lost... Is there any way I can run it with more verbose? Strace
> is too verbose, "bash -x" is not that much. Something in between?
>
>
>
>
> On Sat, Nov 1, 2014 at 4:53 AM, Gwen Shapira 
> wrote:
>
> > The file should be your Scala jar.
> > You should have it in your Kafka lib directory.
> > Do a "jar -t" to make sure it actually contains the Tuple class.
> >
> > Gwen
> >
> > On Fri, Oct 31, 2014 at 8:49 PM, Tomas Nunez  wrote:
> >
> > > Thanks for pointing the error. I fixed it, but I'm still getting the
> same
> > > error. The content of migrationToolProducer.properties is now:
> > > _
> > >
> > >
> >
> metadata.broker.list=kafka_new01:9092,kafka_new02:9092,kafka_new03:9092,kafka_new04:9092,kafka_new05:9092,
> > > group.id=migration.prod
> > > _
> > >
> > > And the error still is:
> > >
> > > Exception in thread "main" java.lang.NoClassDefFoundError:
> > > scala/Tuple2$mcLL$sp
> > >  

Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-11-05 Thread Gwen Shapira
Regarding more information:
Maybe ltrace?

If I were you, I'd go to MigrationTool code and start adding LOG lines.
because there aren't enough of those to troubleshoot.

On Wed, Nov 5, 2014 at 6:13 PM, Gwen Shapira  wrote:

> org.apache.zookeeper.ClientCnxn is throwing the exception, so I'm 100%
> sure it eventually found the class.
>
> On Wed, Nov 5, 2014 at 5:59 PM, Tomas Nunez  wrote:
>
>> Ok, still fighting with the migrationTool here...
>>
>> That tuple wasn't in the scala-library.jar. It turns out I was using scala
>> 2.10 for kafka0.8 and scala 2.8 for kafka0.7, and the jar files were not
>> compatible. So, for the record, it seems that you need both the 0.7 jar
>> files and your 0.8 kafka compiled with the same java version.
>>
>> After fixing that (downloading kafka 0.8 compiled with scala 2.8), I'm now
>> facing a different error, this time more crypic:
>>
>> Kafka migration tool failed due to:
>> java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
>> Caused by: java.lang.NullPointerException
>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
>> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
>> at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
>> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>> at
>>
>> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
>> at
>>
>> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
>> at
>>
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
>> at
>>
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
>> at
>>
>> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
>> at
>>
>> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
>> ... 5 more
>>
>> [2014-11-06 01:32:44,362] ERROR Kafka migration tool failed:
>> (kafka.tools.KafkaMigrationTool)
>> java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
>> Caused by: java.lang.NullPointerException
>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
>> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
>> at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
>> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>> at
>>
>> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
>> at
>>
>> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
>> at
>>
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
>> at
>>
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
>> at
>>
>> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
>> at
>>
>> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
>> ... 5 more
>>
>> Using strace with it, I could see these errors again:
>>
>> 13342 stat("/var/migration/

Re: Error using migrationtool for upgrading 0.7 to 0.8

2014-11-05 Thread Gwen Shapira
Also, can you post your configs? Especially the "zookeeper.connect" one?

On Wed, Nov 5, 2014 at 6:15 PM, Gwen Shapira  wrote:

> Regarding more information:
> Maybe ltrace?
>
> If I were you, I'd go to MigrationTool code and start adding LOG lines.
> because there aren't enough of those to troubleshoot.
>
> On Wed, Nov 5, 2014 at 6:13 PM, Gwen Shapira 
> wrote:
>
>> org.apache.zookeeper.ClientCnxn is throwing the exception, so I'm 100%
>> sure it eventually found the class.
>>
>> On Wed, Nov 5, 2014 at 5:59 PM, Tomas Nunez  wrote:
>>
>>> Ok, still fighting with the migrationTool here...
>>>
>>> That tuple wasn't in the scala-library.jar. It turns out I was using
>>> scala
>>> 2.10 for kafka0.8 and scala 2.8 for kafka0.7, and the jar files were not
>>> compatible. So, for the record, it seems that you need both the 0.7 jar
>>> files and your 0.8 kafka compiled with the same java version.
>>>
>>> After fixing that (downloading kafka 0.8 compiled with scala 2.8), I'm
>>> now
>>> facing a different error, this time more crypic:
>>>
>>> Kafka migration tool failed due to:
>>> java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
>>> Caused by: java.lang.NullPointerException
>>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
>>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
>>> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
>>> at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
>>> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
>>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>>> at
>>>
>>> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
>>> at
>>>
>>> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
>>> at
>>>
>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65)
>>> at
>>>
>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)
>>> at
>>>
>>> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:88)
>>> at
>>>
>>> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
>>> ... 5 more
>>>
>>> [2014-11-06 01:32:44,362] ERROR Kafka migration tool failed:
>>> (kafka.tools.KafkaMigrationTool)
>>> java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> kafka.tools.KafkaMigrationTool.main(KafkaMigrationTool.java:204)
>>> Caused by: java.lang.NullPointerException
>>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:361)
>>> at org.apache.zookeeper.ClientCnxn.(ClientCnxn.java:332)
>>> at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:383)
>>> at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
>>> at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
>>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98)
>>> at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84)
>>> at
>>>
>>> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:152)
>>> at
>>>
>>> kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:122)
>>> at
>>>
>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.(Zook

Re: Spark and Kafka

2014-11-06 Thread Gwen Shapira
What's the window size? If the window is around 10 seconds and you are
sending data at very stable rate, this is expected.



On Thu, Nov 6, 2014 at 9:32 AM, Eduardo Costa Alfaia  wrote:

> Hi Guys,
>
> I am doing some tests with Spark Streaming and Kafka, but I have seen
> something strange, I have modified the JavaKafkaWordCount to use
> ReducebyKeyandWindow and to print in the screen the accumulated numbers of
> the words, in the beginning spark works very well in each interaction the
> numbers of the words increase but after 12 a 13 sec the results repeats
> continually.
>
> My program producer remain sending the words toward the kafka.
>
> Does anyone have any idea about this?
>
>
> ---
> Time: 1415272266000 ms
> ---
> (accompanied
> them,6)
> (merrier,5)
> (it
> possessed,5)
> (the
> treacherous,5)
> (Quite,12)
> (offer,273)
> (rabble,58)
> (exchanging,16)
> (Genoa,18)
> (merchant,41)
> ...
> ---
> Time: 1415272267000 ms
> ---
> (accompanied
> them,12)
> (merrier,12)
> (it
> possessed,12)
> (the
> treacherous,11)
> (Quite,24)
> (offer,602)
> (rabble,132)
> (exchanging,35)
> (Genoa,36)
> (merchant,84)
> ...
> ---
> Time: 1415272268000 ms
> ---
> (accompanied
> them,17)
> (merrier,18)
> (it
> possessed,17)
> (the
> treacherous,17)
> (Quite,35)
> (offer,889)
> (rabble,192)
> (the
> bed,1)
> (exchanging,51)
> (Genoa,54)
> ...
> ---
> Time: 1415272269000 ms
> ---
> (accompanied
> them,17)
> (merrier,18)
> (it
> possessed,17)
> (the
> treacherous,17)
> (Quite,35)
> (offer,889)
> (rabble,192)
> (the
> bed,1)
> (exchanging,51)
> (Genoa,54)
> ...
>
> ---
> Time: 141527227 ms
> ---
> (accompanied
> them,17)
> (merrier,18)
> (it
> possessed,17)
> (the
> treacherous,17)
> (Quite,35)
> (offer,889)
> (rabble,192)
> (the
> bed,1)
> (exchanging,51)
> (Genoa,54)
> ...
>
>
> --
> Informativa sulla Privacy: http://www.unibs.it/node/8155
>


Re: No longer supporting Java 6, if? when?

2014-11-06 Thread Gwen Shapira
+1 for dropping Java 6

On Thu, Nov 6, 2014 at 9:31 AM, Steven Schlansker  wrote:

> Java 6 has been End of Life since Feb 2013.
> Java 7 (and 8, but unfortunately that's too new still) has very compelling
> features which can make development a lot easier.
>
> The sooner more projects drop Java 6 the better, in my opinion :)
>
> On Nov 5, 2014, at 7:45 PM, Worthy LaFollette  wrote:
>
> > Mostly converted now to 1.7, this would be welcomed to get any new
> > features.
> >
> > On Wed Nov 05 2014 at 7:32:55 PM Joe Stein  wrote:
> >
> >> This has been coming up in a lot of projects and for other reasons too I
> >> wanted to kick off the discussion about if/when we end support for Java
> 6.
> >> Besides any API we may want to use in >= 7 we also compile our binaries
> for
> >> 6 for release currently.
> >>
> >> /***
> >> Joe Stein
> >> Founder, Principal Consultant
> >> Big Data Open Source Security LLC
> >> http://www.stealth.ly
> >> Twitter: @allthingshadoop 
> >> /
> >>
>
>


Re: No longer supporting Java 6, if? when?

2014-11-06 Thread Gwen Shapira
Java6 is supported on CDH4 but not CDH5.

On Thu, Nov 6, 2014 at 9:54 AM, Koert Kuipers  wrote:

> when is java 6 dropped by the hadoop distros?
>
> i am still aware of many clusters that are java 6 only at the moment.
>
>
>
> On Thu, Nov 6, 2014 at 12:44 PM, Gwen Shapira 
> wrote:
>
> > +1 for dropping Java 6
> >
> > On Thu, Nov 6, 2014 at 9:31 AM, Steven Schlansker <
> > sschlans...@opentable.com
> > > wrote:
> >
> > > Java 6 has been End of Life since Feb 2013.
> > > Java 7 (and 8, but unfortunately that's too new still) has very
> > compelling
> > > features which can make development a lot easier.
> > >
> > > The sooner more projects drop Java 6 the better, in my opinion :)
> > >
> > > On Nov 5, 2014, at 7:45 PM, Worthy LaFollette 
> wrote:
> > >
> > > > Mostly converted now to 1.7, this would be welcomed to get any new
> > > > features.
> > > >
> > > > On Wed Nov 05 2014 at 7:32:55 PM Joe Stein 
> > wrote:
> > > >
> > > >> This has been coming up in a lot of projects and for other reasons
> > too I
> > > >> wanted to kick off the discussion about if/when we end support for
> > Java
> > > 6.
> > > >> Besides any API we may want to use in >= 7 we also compile our
> > binaries
> > > for
> > > >> 6 for release currently.
> > > >>
> > > >> /***
> > > >> Joe Stein
> > > >> Founder, Principal Consultant
> > > >> Big Data Open Source Security LLC
> > > >> http://www.stealth.ly
> > > >> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > >> /
> > > >>
> > >
> > >
> >
>


Re: powered by kafka

2014-11-08 Thread Gwen Shapira
Done!

Thank you for using Kafka and letting us know :)

On Sat, Nov 8, 2014 at 2:15 AM, vipul jhawar  wrote:

> Exponential @exponentialinc is using kafka in production to power the
> events ingestion pipeline for real time analytics and log feed consumption.
>
> Please post on powered by kafka wiki -
> https://cwiki.apache.org/confluence/display/KAFKA/Powered+By
>
> Thanks
> Vipul
> http://in.linkedin.com/in/vjhawar/
>


Re: powered by kafka

2014-11-09 Thread Gwen Shapira
Updated. Thanks!

On Sat, Nov 8, 2014 at 12:16 PM, Jimmy John  wrote:

> Livefyre (http://web.livefyre.com/) uses kafka for the real time
> notifications, analytics pipeline and as the primary mechanism for general
> pub/sub.
>
> thx...
> jim
>
> On Sat, Nov 8, 2014 at 7:41 AM, Gwen Shapira 
> wrote:
>
> > Done!
> >
> > Thank you for using Kafka and letting us know :)
> >
> > On Sat, Nov 8, 2014 at 2:15 AM, vipul jhawar 
> > wrote:
> >
> > > Exponential @exponentialinc is using kafka in production to power the
> > > events ingestion pipeline for real time analytics and log feed
> > consumption.
> > >
> > > Please post on powered by kafka wiki -
> > > https://cwiki.apache.org/confluence/display/KAFKA/Powered+By
> > >
> > > Thanks
> > > Vipul
> > > http://in.linkedin.com/in/vjhawar/
> > >
> >
>


Re: Issues Running Kafka Producer Java example

2014-11-09 Thread Gwen Shapira
The producer code here looks fine. It may be an issue with the consumer, or
how the consumer is used.

If you are running the producer before starting a consumer, make sure you
get all messages by setting auto.offset.reset=smallest (in the console
consumer you can use --from-beginning)

Also, you can use the ConsumerOffsetChecker tool to see:
1. How much data you have in the topic
2. Whether the consumer is consuming anything at all


Gwen

On Sat, Nov 8, 2014 at 1:25 PM, Hardik Pandya 
wrote:

> Hello Champs,
>
> I am trying to run  first java producer example.
>
> Upon running this example, producer successfully sends the message, at
> least it looks like it does, there is no java dump
>
> But trying to verify the messages on consumer side - it does not return any
> data sent by producer
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
>
> here's my Producer code - thanks in advance!
>
> package example.kafka;
>
> import java.util.Date;
> import java.util.Properties;
> import java.util.Random;
>
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
>
> public class TestProducer {
>
> /**
>  * @param args
>  */
> public static void main(String[] args) {
> // TODO Auto-generated method stub
> long events = Long.parseLong(args[0]);
> Properties props = new Properties();
> props.put("metadata.broker.list", "10.0.2.15:9092,10.0.2.15:9093,
> 10.0.2.15:9094,10.0.2.15:9095");
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("partitioner.class", "example.kafka.SimplePartitioner");
> props.put("request.required.acks", "1");
>  ProducerConfig config = new ProducerConfig(props);
> Producer producer = new Producer(config);
>  Random rnd = new Random();
>  for(long event=0;event < events;event++) {
> Long runtime = new Date().getTime();
> String msgKey= "192.168.2." + rnd.nextInt(255);
> String msg =  runtime  + ",www.exmaple.com," + msgKey;
> KeyedMessage data = new KeyedMessage String>("page_visits", msgKey, msg);
> producer.send(data);
> System.out.println("message sent");
> }
> producer.close();
>  }
>
> }
>


Re: powered by kafka

2014-11-09 Thread Gwen Shapira
I'm not Jay, but fixed it anyways ;)

Gwen

On Sun, Nov 9, 2014 at 10:34 AM, vipul jhawar 
wrote:

> Hi Jay
>
> Thanks for posting the update.
>
> However, i checked the page history and the hyperlink is pointing to the
> wrong domain.
> Exponential refers to www.exponential.com. I sent the twitter handle,
> should have sent the domain.
> Please correct.
>
> Thanks
>
> On Sat, Nov 8, 2014 at 3:45 PM, vipul jhawar 
> wrote:
>
> > Exponential @exponentialinc is using kafka in production to power the
> > events ingestion pipeline for real time analytics and log feed
> consumption.
> >
> > Please post on powered by kafka wiki -
> > https://cwiki.apache.org/confluence/display/KAFKA/Powered+By
> >
> > Thanks
> > Vipul
> > http://in.linkedin.com/in/vjhawar/
> >
>


Re: Programmatic Kafka version detection/extraction?

2014-11-11 Thread Gwen Shapira
In Sqoop we do the following:

Maven runs a shell script, passing the version as a parameter.
The shell-script generates a small java class, which is then built with a
Maven plugin.
Our code references this generated class when we expose "getVersion()".

Its complex and ugly, so I'm kind of hoping that there's a better way to do
it :)

Gwen

On Tue, Nov 11, 2014 at 9:42 AM, Jun Rao  wrote:

> Currently, the version number is only stored in our build config file,
> gradle.properties. Not sure how we can automatically extract it and expose
> it in an mbean. How do other projects do this?
>
> Thanks,
>
> Jun
>
> On Tue, Nov 11, 2014 at 7:05 AM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
> > Hi Jun,
> >
> > Sounds good.  But is the version number stored anywhere from where it
> could
> > be gotten?
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Tue, Nov 11, 2014 at 12:45 AM, Jun Rao  wrote:
> >
> > > Otis,
> > >
> > > We don't have an api for that now. We can probably expose this as a JMX
> > as
> > > part of kafka-1481.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Nov 10, 2014 at 7:17 PM, Otis Gospodnetic <
> > > otis.gospodne...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > Is there a way to detect which version of Kafka one is running?
> > > > Is there an API for that, or a constant with this value, or maybe an
> > > MBean
> > > > or some other way to get to this info?
> > > >
> > > > Thanks,
> > > > Otis
> > > > --
> > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> > > > Solr & Elasticsearch Support * http://sematext.com/
> > > >
> > >
> >
>


Re: Programmatic Kafka version detection/extraction?

2014-11-11 Thread Gwen Shapira
So it looks like we can use Gradle to add properties to manifest file and
then use getResourceAsStream to read the file and parse it.

The Gradle part would be something like:
jar.manifest {
attributes('Implementation-Title': project.name,
'Implementation-Version': project.version,
'Built-By': System.getProperty('user.name'),
'Built-JDK': System.getProperty('java.version'),
'Built-Host': getHostname(),
'Source-Compatibility': project.sourceCompatibility,
'Target-Compatibility': project.targetCompatibility
)
}

The code part would be:
this.getClass().getClassLoader().getResourceAsStream("/META-INF/MANIFEST.MF")

Does that look like the right approach?

Gwen

On Tue, Nov 11, 2014 at 10:43 AM, Bhavesh Mistry  wrote:

> If is maven artifact then you will get following pre-build property file
> from maven build called pom.properties under
> /META-INF/maven/groupid/artifactId/pom.properties folder.
>
> Here is sample:
> #Generated by Maven
> #Mon Oct 10 10:44:31 EDT 2011
> version=10.0.1
> groupId=com.google.guava
> artifactId=guava
>
> Thanks,
>
> Bhavesh
>
> On Tue, Nov 11, 2014 at 10:34 AM, Gwen Shapira 
> wrote:
>
> > In Sqoop we do the following:
> >
> > Maven runs a shell script, passing the version as a parameter.
> > The shell-script generates a small java class, which is then built with a
> > Maven plugin.
> > Our code references this generated class when we expose "getVersion()".
> >
> > Its complex and ugly, so I'm kind of hoping that there's a better way to
> do
> > it :)
> >
> > Gwen
> >
> > On Tue, Nov 11, 2014 at 9:42 AM, Jun Rao  wrote:
> >
> > > Currently, the version number is only stored in our build config file,
> > > gradle.properties. Not sure how we can automatically extract it and
> > expose
> > > it in an mbean. How do other projects do this?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Nov 11, 2014 at 7:05 AM, Otis Gospodnetic <
> > > otis.gospodne...@gmail.com> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Sounds good.  But is the version number stored anywhere from where it
> > > could
> > > > be gotten?
> > > >
> > > > Thanks,
> > > > Otis
> > > > --
> > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> > > > Solr & Elasticsearch Support * http://sematext.com/
> > > >
> > > >
> > > > On Tue, Nov 11, 2014 at 12:45 AM, Jun Rao  wrote:
> > > >
> > > > > Otis,
> > > > >
> > > > > We don't have an api for that now. We can probably expose this as a
> > JMX
> > > > as
> > > > > part of kafka-1481.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Mon, Nov 10, 2014 at 7:17 PM, Otis Gospodnetic <
> > > > > otis.gospodne...@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Is there a way to detect which version of Kafka one is running?
> > > > > > Is there an API for that, or a constant with this value, or maybe
> > an
> > > > > MBean
> > > > > > or some other way to get to this info?
> > > > > >
> > > > > > Thanks,
> > > > > > Otis
> > > > > > --
> > > > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > > Management
> > > > > > Solr & Elasticsearch Support * http://sematext.com/
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: No longer supporting Java 6, if? when?

2014-11-11 Thread Gwen Shapira
Perhaps relevant:

Hadoop is moving toward dropping Java6 in next release.
https://issues.apache.org/jira/browse/HADOOP-10530


On Thu, Nov 6, 2014 at 11:03 AM, Jay Kreps  wrote:

> Yeah it is a little bit silly that people are still using Java 6.
>
> I guess this is a tradeoff--being more conservative in our java support
> means more people can use our software, whereas upgrading gives us
> developers a better experience since we aren't stuck with ancient stuff.
>
> Nonetheless I would argue for being a bit conservative here. Sadly a
> shocking number of people are still using Java 6. The Kafka clients get
> embedded in applications all over the place, and likely having even one
> application not yet upgraded would block adopting the new Kafka version
> that dropped java 6 support. So unless there is something in Java 7 we
> really really want I think it might be good to hold out a bit.
>
> As an example we dropped java 6 support in Samza and immediately had people
> blocked by that, and unlike the Kafka clients, Samza use is pretty
> centralized.
>
> -Jay
>
> On Wed, Nov 5, 2014 at 5:32 PM, Joe Stein  wrote:
>
> > This has been coming up in a lot of projects and for other reasons too I
> > wanted to kick off the discussion about if/when we end support for Java
> 6.
> > Besides any API we may want to use in >= 7 we also compile our binaries
> for
> > 6 for release currently.
> >
> > /***
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop 
> > /
> >
>


Re: Security in 0.8.2 beta

2014-11-11 Thread Gwen Shapira
Nope.

Here's the JIRA where we are still actively working on security, targeting
0.9:
https://issues.apache.org/jira/browse/KAFKA-1682

Gwen

On Tue, Nov 11, 2014 at 7:37 PM, Kashyap Mhaisekar 
wrote:

> Hi,
> Is there a way to secure the topics created in Kafka 0.8.2 beta? The need
> is to ensure no one is asked to read data from the topic without
> authorization.
>
> Regards
> Kashyap
>


Re: Programmatic Kafka version detection/extraction?

2014-11-12 Thread Gwen Shapira
Good question.

The server will need to expose this in the protocol, so Kafka clients will
know what they are talking to.

We may also want to expose this in the producer and consumer, so people who
use Kafka's built-in clients will know which version they have in the
environment.



On Wed, Nov 12, 2014 at 9:09 AM, Mark Roberts  wrote:

> Just to be clear: this is going to be exposed via some Api the clients can
> call at startup?
>
>
> > On Nov 12, 2014, at 08:59, Guozhang Wang  wrote:
> >
> > Sounds great, +1 on this.
> >
> >> On Tue, Nov 11, 2014 at 1:36 PM, Gwen Shapira 
> wrote:
> >>
> >> So it looks like we can use Gradle to add properties to manifest file
> and
> >> then use getResourceAsStream to read the file and parse it.
> >>
> >> The Gradle part would be something like:
> >> jar.manifest {
> >>attributes('Implementation-Title': project.name,
> >>'Implementation-Version': project.version,
> >>'Built-By': System.getProperty('user.name'),
> >>'Built-JDK': System.getProperty('java.version'),
> >>'Built-Host': getHostname(),
> >>'Source-Compatibility': project.sourceCompatibility,
> >>'Target-Compatibility': project.targetCompatibility
> >>)
> >>}
> >>
> >> The code part would be:
> >>
> >>
> this.getClass().getClassLoader().getResourceAsStream("/META-INF/MANIFEST.MF")
> >>
> >> Does that look like the right approach?
> >>
> >> Gwen
> >>
> >> On Tue, Nov 11, 2014 at 10:43 AM, Bhavesh Mistry <
> >> mistry.p.bhav...@gmail.com
> >>> wrote:
> >>
> >>> If is maven artifact then you will get following pre-build property
> file
> >>> from maven build called pom.properties under
> >>> /META-INF/maven/groupid/artifactId/pom.properties folder.
> >>>
> >>> Here is sample:
> >>> #Generated by Maven
> >>> #Mon Oct 10 10:44:31 EDT 2011
> >>> version=10.0.1
> >>> groupId=com.google.guava
> >>> artifactId=guava
> >>>
> >>> Thanks,
> >>>
> >>> Bhavesh
> >>>
> >>> On Tue, Nov 11, 2014 at 10:34 AM, Gwen Shapira 
> >>> wrote:
> >>>
> >>>> In Sqoop we do the following:
> >>>>
> >>>> Maven runs a shell script, passing the version as a parameter.
> >>>> The shell-script generates a small java class, which is then built
> >> with a
> >>>> Maven plugin.
> >>>> Our code references this generated class when we expose
> "getVersion()".
> >>>>
> >>>> Its complex and ugly, so I'm kind of hoping that there's a better way
> >> to
> >>> do
> >>>> it :)
> >>>>
> >>>> Gwen
> >>>>
> >>>>> On Tue, Nov 11, 2014 at 9:42 AM, Jun Rao  wrote:
> >>>>>
> >>>>> Currently, the version number is only stored in our build config
> >> file,
> >>>>> gradle.properties. Not sure how we can automatically extract it and
> >>>> expose
> >>>>> it in an mbean. How do other projects do this?
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Jun
> >>>>>
> >>>>> On Tue, Nov 11, 2014 at 7:05 AM, Otis Gospodnetic <
> >>>>> otis.gospodne...@gmail.com> wrote:
> >>>>>
> >>>>>> Hi Jun,
> >>>>>>
> >>>>>> Sounds good.  But is the version number stored anywhere from where
> >> it
> >>>>> could
> >>>>>> be gotten?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Otis
> >>>>>> --
> >>>>>> Monitoring * Alerting * Anomaly Detection * Centralized Log
> >>> Management
> >>>>>> Solr & Elasticsearch Support * http://sematext.com/
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Nov 11, 2014 at 12:45 AM, Jun Rao 
> >> wrote:
> >>>>>>
> >>>>>>> Otis,
> >>>>>>>
> >>>>>>> We don't have an api for that now. We can probably expose this
> >> as a
> >>>> JMX
> >>>>>> as
> >>>>>>> part of kafka-1481.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> Jun
> >>>>>>>
> >>>>>>> On Mon, Nov 10, 2014 at 7:17 PM, Otis Gospodnetic <
> >>>>>>> otis.gospodne...@gmail.com> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> Is there a way to detect which version of Kafka one is running?
> >>>>>>>> Is there an API for that, or a constant with this value, or
> >> maybe
> >>>> an
> >>>>>>> MBean
> >>>>>>>> or some other way to get to this info?
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Otis
> >>>>>>>> --
> >>>>>>>> Monitoring * Alerting * Anomaly Detection * Centralized Log
> >>>>> Management
> >>>>>>>> Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> >
> > --
> > -- Guozhang
>


Re: Programmatic Kafka version detection/extraction?

2014-11-12 Thread Gwen Shapira
Actually, Jun suggested exposing this via JMX.

On Wed, Nov 12, 2014 at 9:31 AM, Gwen Shapira  wrote:

> Good question.
>
> The server will need to expose this in the protocol, so Kafka clients will
> know what they are talking to.
>
> We may also want to expose this in the producer and consumer, so people
> who use Kafka's built-in clients will know which version they have in the
> environment.
>
>
>
> On Wed, Nov 12, 2014 at 9:09 AM, Mark Roberts  wrote:
>
>> Just to be clear: this is going to be exposed via some Api the clients
>> can call at startup?
>>
>>
>> > On Nov 12, 2014, at 08:59, Guozhang Wang  wrote:
>> >
>> > Sounds great, +1 on this.
>> >
>> >> On Tue, Nov 11, 2014 at 1:36 PM, Gwen Shapira 
>> wrote:
>> >>
>> >> So it looks like we can use Gradle to add properties to manifest file
>> and
>> >> then use getResourceAsStream to read the file and parse it.
>> >>
>> >> The Gradle part would be something like:
>> >> jar.manifest {
>> >>attributes('Implementation-Title': project.name,
>> >>'Implementation-Version': project.version,
>> >>'Built-By': System.getProperty('user.name'),
>> >>'Built-JDK': System.getProperty('java.version'),
>> >>'Built-Host': getHostname(),
>> >>'Source-Compatibility': project.sourceCompatibility,
>> >>'Target-Compatibility': project.targetCompatibility
>> >>)
>> >>}
>> >>
>> >> The code part would be:
>> >>
>> >>
>> this.getClass().getClassLoader().getResourceAsStream("/META-INF/MANIFEST.MF")
>> >>
>> >> Does that look like the right approach?
>> >>
>> >> Gwen
>> >>
>> >> On Tue, Nov 11, 2014 at 10:43 AM, Bhavesh Mistry <
>> >> mistry.p.bhav...@gmail.com
>> >>> wrote:
>> >>
>> >>> If is maven artifact then you will get following pre-build property
>> file
>> >>> from maven build called pom.properties under
>> >>> /META-INF/maven/groupid/artifactId/pom.properties folder.
>> >>>
>> >>> Here is sample:
>> >>> #Generated by Maven
>> >>> #Mon Oct 10 10:44:31 EDT 2011
>> >>> version=10.0.1
>> >>> groupId=com.google.guava
>> >>> artifactId=guava
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Bhavesh
>> >>>
>> >>> On Tue, Nov 11, 2014 at 10:34 AM, Gwen Shapira > >
>> >>> wrote:
>> >>>
>> >>>> In Sqoop we do the following:
>> >>>>
>> >>>> Maven runs a shell script, passing the version as a parameter.
>> >>>> The shell-script generates a small java class, which is then built
>> >> with a
>> >>>> Maven plugin.
>> >>>> Our code references this generated class when we expose
>> "getVersion()".
>> >>>>
>> >>>> Its complex and ugly, so I'm kind of hoping that there's a better way
>> >> to
>> >>> do
>> >>>> it :)
>> >>>>
>> >>>> Gwen
>> >>>>
>> >>>>> On Tue, Nov 11, 2014 at 9:42 AM, Jun Rao  wrote:
>> >>>>>
>> >>>>> Currently, the version number is only stored in our build config
>> >> file,
>> >>>>> gradle.properties. Not sure how we can automatically extract it and
>> >>>> expose
>> >>>>> it in an mbean. How do other projects do this?
>> >>>>>
>> >>>>> Thanks,
>> >>>>>
>> >>>>> Jun
>> >>>>>
>> >>>>> On Tue, Nov 11, 2014 at 7:05 AM, Otis Gospodnetic <
>> >>>>> otis.gospodne...@gmail.com> wrote:
>> >>>>>
>> >>>>>> Hi Jun,
>> >>>>>>
>> >>>>>> Sounds good.  But is the version number stored anywhere from where
>> >> it
>> >>>>> could
>> >>>>>> be gotten?
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Otis
>> >>>>>> --
>> >>>>>> Monitoring * Alerting * Anomaly Detection * Centralized Log
>> >>> Management
>> >>>>>> Solr & Elasticsearch Support * http://sematext.com/
>> >>>>>>
>> >>>>>>
>> >>>>>> On Tue, Nov 11, 2014 at 12:45 AM, Jun Rao 
>> >> wrote:
>> >>>>>>
>> >>>>>>> Otis,
>> >>>>>>>
>> >>>>>>> We don't have an api for that now. We can probably expose this
>> >> as a
>> >>>> JMX
>> >>>>>> as
>> >>>>>>> part of kafka-1481.
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>>
>> >>>>>>> Jun
>> >>>>>>>
>> >>>>>>> On Mon, Nov 10, 2014 at 7:17 PM, Otis Gospodnetic <
>> >>>>>>> otis.gospodne...@gmail.com> wrote:
>> >>>>>>>
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>> Is there a way to detect which version of Kafka one is running?
>> >>>>>>>> Is there an API for that, or a constant with this value, or
>> >> maybe
>> >>>> an
>> >>>>>>> MBean
>> >>>>>>>> or some other way to get to this info?
>> >>>>>>>>
>> >>>>>>>> Thanks,
>> >>>>>>>> Otis
>> >>>>>>>> --
>> >>>>>>>> Monitoring * Alerting * Anomaly Detection * Centralized Log
>> >>>>> Management
>> >>>>>>>> Solr & Elasticsearch Support * http://sematext.com/
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>


Re: Programmatic Kafka version detection/extraction?

2014-11-14 Thread Gwen Shapira
I'm not sure there's a single "protocol version". Each
request/response has its own version ID embedded in the request
itself.

Broker version, broker ID and (as Joel suggested) git hash are all
reasonable. And +1 for adding this to API to support non-JVM clients.

Gwen

On Fri, Nov 14, 2014 at 8:46 AM, Magnus Edenhill  wrote:
> Please let's not add dependencies on more third party protocols/software,
> the move away from Zookeeper dependence on the clients is very much welcomed
> and it'd be a pity to see a new foreign dependency added for such a trivial
> thing
> as propagating the version of a broker to its client.
>
> My suggestion is to add a new protocol request which returns:
>  - broker version
>  - protocol version
>  - broker id
>
> A generic future-proof solution would be the use of tags (or named TLVs):
> InfoResponse: [InfoTag]
> InfoTag:
>intX tag  ( KAFKA_TAG_BROKER_VERSION, KAFKA_TAG_PROTO_VERSION,
> KAFKA_TAG_SSL, ... )
>intX type( KAFKA_TYPE_STR, KAFKA_TYPE_INT32, KAFKA_TYPE_INT64, ...)
>intX len
>bytes payload
>
> Local site/vendor tags could be defined in the configuration file,
>
>
> This would also allow clients to enable/disable features based on protocol
> version,
> e.g., there is no point in trying offsetCommit on a 0.8.0 broker and the
> client library
> can inform the application about this early, rather than having its
> offsetCommit requests
> fail by connection teardown (which is not much of an error propagation).
>
>
> My two cents,
> Magnus
>
>
> 2014-11-12 20:11 GMT+01:00 Mark Roberts :
>
>> I haven't worked much with JMX before, but some quick googling (10-20
>> minutes) is very inconclusive as to how I would go about getting the server
>> version I'm connecting to from a Python client.  Can someone please
>> reassure me that it's relatively trivial for non Java clients to query JMX
>> for the server version of every server in the cluster? Is there a reason
>> not to include this in the API itself?
>>
>> -Mark
>>
>> On Wed, Nov 12, 2014 at 9:50 AM, Joel Koshy  wrote:
>>
>> > +1 on the JMX + gradle properties. Is there any (seamless) way of
>> > including the exact git hash? That would be extremely useful if users
>> > need help debugging and happen to be on an unreleased build (say, off
>> > trunk)
>> >
>> > On Wed, Nov 12, 2014 at 09:34:35AM -0800, Gwen Shapira wrote:
>> > > Actually, Jun suggested exposing this via JMX.
>> > >
>> > > On Wed, Nov 12, 2014 at 9:31 AM, Gwen Shapira 
>> > wrote:
>> > >
>> > > > Good question.
>> > > >
>> > > > The server will need to expose this in the protocol, so Kafka clients
>> > will
>> > > > know what they are talking to.
>> > > >
>> > > > We may also want to expose this in the producer and consumer, so
>> people
>> > > > who use Kafka's built-in clients will know which version they have in
>> > the
>> > > > environment.
>> > > >
>> > > >
>> > > >
>> > > > On Wed, Nov 12, 2014 at 9:09 AM, Mark Roberts 
>> > wrote:
>> > > >
>> > > >> Just to be clear: this is going to be exposed via some Api the
>> clients
>> > > >> can call at startup?
>> > > >>
>> > > >>
>> > > >> > On Nov 12, 2014, at 08:59, Guozhang Wang 
>> > wrote:
>> > > >> >
>> > > >> > Sounds great, +1 on this.
>> > > >> >
>> > > >> >> On Tue, Nov 11, 2014 at 1:36 PM, Gwen Shapira <
>> > gshap...@cloudera.com>
>> > > >> wrote:
>> > > >> >>
>> > > >> >> So it looks like we can use Gradle to add properties to manifest
>> > file
>> > > >> and
>> > > >> >> then use getResourceAsStream to read the file and parse it.
>> > > >> >>
>> > > >> >> The Gradle part would be something like:
>> > > >> >> jar.manifest {
>> > > >> >>attributes('Implementation-Title': project.name,
>> > > >> >>'Implementation-Version': project.version,
>> > > >> >>'Built-By': System.getProperty('user.name'),
>> > > >> >>'Built-JDK': S

Re: Create topic creates extra partitions

2014-11-21 Thread Gwen Shapira
I think the issue is that you are:
" running the above snippet for every broker ... I am assuming that
item.partitionsMetadata() only returns PartitionMetadata for the partitions
this broker is responsible for "

This is inaccurate. Each broker will check ZooKeeper for PartitionMetadata
and return information about all partitions.
You only need to check one broker. You can try other brokers if the first
broker returns an error.

Gwen

On Fri, Nov 21, 2014 at 10:14 PM, Rajiv Kurian  wrote:

> I have used the kafka-topics.sh script to create a topic with a number of
> partitions.
>
> bin/kafka-topics.sh  --zookeeper myZKPath-create --topic myTopic
>  --partitions 1024 --replication-factor 3
>
> I was expecting this topic to be created with 1024 total partitions across
> 3 of my kafka brokers. Instead it seems like it ended up creating 1024 * 3
> = 3072 partitions. Is this a bug?
>
> I got the number of partitions running the code from the SimpleConsumer
> example at
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> specifically the findLeader(...) function. I just count the number of
> partitionsMetadata objects received. That seems to give me the number
> 3072.  The relevant snippet looks like:
>
> List metaData = resp.topicsMetadata();
> for (TopicMetadata item : metaData) {
>
>   for (PartitionMetadata part : item.partitionsMetadata()) {
>
> numPartitions++;
>
> }
>
>  }
>
>
> Given I am running the above snippet for every broker, should I not expect
> to see the numPartitions as 1024? I am assuming that
> item.partitionsMetadata() only returns PartitionMetadata for the partitions
> this broker is responsible for? If not then how do I exactly figure out
> what partitions this broker is the leader for?
>
> I am using the SimpleConsumer and my logic is getting completely muddled
> because of this problem.
>
> Also is there a way through the kafka console scripts to find out the
> number of partitions for a given topic?
>
>
> Thanks,
> Rajiv
>


Re: Two Kafka Question

2014-11-24 Thread Gwen Shapira
Hi Casey,

1. There's some limit based on size of zookeeper nodes, not sure exactly
where it is though. We've seen 30 node clusters running in production.

2. For your scenario to work, the new broker will need to have the same
broker id as the old one - or you'll need to manually re-assign partitions.

Gwen

On Mon, Nov 24, 2014 at 11:15 AM, Sybrandy, Casey <
casey.sybra...@six3systems.com> wrote:

> Hello,
>
> First, is there a limit to how many Kafka brokers you can have?
>
> Second, if a Kafka broker node fails and I start a new broker on a new
> node, is it correct to assume that the cluster will copy data to that node
> to satisfy the replication factor specified for a given topic?  In other
> words, let's assume that I have a 3 node cluster and a topic with a
> replication factor of 3.  If one node fails and I start up a new node, will
> the new node have existing messages replicated to it?
>
> Thanks.
>
> Casey
>


Re: rule to set number of brokers for each server

2014-11-28 Thread Gwen Shapira
I don't see any advantage to more than one broker per server. In my
experience a single broker is capable of saturating the network link
and therefore I can't see how a second or third brokers will give any
benefits.

Gwen

On Fri, Nov 28, 2014 at 9:24 AM, Sa Li  wrote:
> Dear all
>
> I am provision production kafka cluster, which has 3 servers, I am
> wondering how many brokers I should set for each servers, I set 3 brokers
> in dev clusters, but I really don't what is the advantages to set more than
> 1 broker for each server, what about 1 broker for each server, totally 3
> brokers, instead of 9 brokers.
>
> thanks
>
>
> --
>
> Alec Li


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-04 Thread Gwen Shapira
Can you elaborate a bit on what an object API wrapper will look like?

Since the serialization API already exists today, its very easy to
know how I'll use the new producer with serialization - exactly the
same way I use the existing one.
If we are proposing a change that will require significant changes in
how we serialize / deserialize, I'd like to see the API so I can
estimate the impact.

Gwen

On Thu, Dec 4, 2014 at 10:19 AM, Guozhang Wang  wrote:
> I would prefer making the kafka producer as is and wrap the object API on
> top rather than wiring the serializer configs into producers. Some thoughts:
>
> 1. For code sharing, I think it may only be effective for though simple
> functions such as string serialization, etc. For Avro / Shrift / PB, the
> serialization logic would be quite hard to share across organizations:
> imagine some people wants to use Avro 1.7 while others are still staying
> with 1.4 which are not API compatible, while some people use a schema
> registry server for clients to communicate while others compile the schemas
> into source code, etc. So I think in the end having those simple object
> serialization code into kafka.api package and letting applications write
> their own complicated serialization wrapper would be as beneficial as this
> approach.
>
> 2. For code simplicity I do not see a huge difference between a wired
> serializer, which will call serializer.encode() inside the producer, with a
> wrapper, which will call the same outside the producer, or a typed record,
> which will call record.encode() inside the producer.
>
> 3. For less error-proneness, people always mess with the config settings
> especially when they use hierarchical / nested wiring of configs, and such
> mistakes will only be detected on runtime but not compilation time. In the
> past we have seem a lot of such cases with the old producer APIs that
> wire-in the serializer class. If we move this to a SerDe interface, for
> example KafkaProducer(KeySer, ValueSer) such errors will be
> detected at compilation.
>
> 4. For data type flexibility, the current approach bind one producer
> instance to a fixed record type. This may be OK in most cases as people
> usually only use a single data type but there are some cases where we would
> like to have a single producer to be able to send multiple typed messages,
> like control messages, or even with a single serialization like Avro we
> would sometimes want to have GenericaRecord and IndexedRecord for some
> specific types.
>
>
> Guozhang
>
> On Wed, Dec 3, 2014 at 2:54 PM, Jun Rao  wrote:
>
>> Jan, Jason,
>>
>> First, within an Kafka cluster, it's unlikely that each topic has a
>> different type serializer. Like Jason mentioned, Square standardizes on
>> protocol. Many other places such as LinkedIn standardize on Avro.
>>
>> Second, dealing with bytes only has limited use cases. Other than copying
>> bytes around, there isn't much else that one can do. Even for the case of
>> copying data from Kafka into HDFS, often you will need to (1) extract the
>> timestamp so that you can partition the data properly; (2) extract
>> individual fields if you want to put the data in a column-oriented storage
>> format. So, most interesting clients likely need to deal with objects
>> instead of bytes.
>>
>> Finally, the generic api doesn't prevent one from using just the bytes. The
>> additional overhead is just a method call, which the old clients are
>> already paying. Having both a raw bytes and a generic api is probably going
>> to confuse the users more.
>>
>> Thanks,
>>
>> Jun
>>
>>
>>
>> On Tue, Dec 2, 2014 at 6:50 PM, Jan Filipiak 
>> wrote:
>>
>> > Hello Everyone,
>> >
>> > I would very much appreciate if someone could provide me a real world
>> > examplewhere it is more convenient to implement the serializers instead
>> of
>> > just making sure to provide bytearrays.
>> >
>> > The code we came up with explicitly avoids the serializer api. I think it
>> > is common understanding that if you want to transport data you need to
>> have
>> > it as a bytearray.
>> >
>> > If at all I personally would like to have a serializer interface that
>> > takes the same types as the producer
>> >
>> > public interface Serializer extends Configurable {
>> > public byte[] serializeKey(K data);
>> > public byte[] serializeValue(V data);
>> > public void close();
>> > }
>> >
>> > this would avoid long serialize implementations with branches like
>> > "switch(topic)" or "if(isKey)". Further serializer per topic makes more
>> > sense in my opinion. It feels natural to have a one to one relationship
>> > from types to topics or at least only a few partition per type. But as we
>> > inherit the type from the producer we would have to create many
>> producers.
>> > This would create additional unnecessary connections to the brokers. With
>> > the serializers we create a one type to all topics relationship and the
>> > only type that satisfies that is the bytearray or Object. Am

Re: Producer can writes to a follower during preferred lead election?

2014-12-07 Thread Gwen Shapira
If you write to a non-leader partition, I'd expect you'd get
NotLeaderForPartitionException (thrown by
Partition.appendMessagesToLeader).
This will get sent to the producer as error code 6.

I don't see anything special in the producer side to handle this
specific (although I'd expect a forced metadata refresh and then a
re-send).

Gwen

On Sat, Dec 6, 2014 at 6:46 PM, Xiaoyu Wang  wrote:
> Hello,
>
> I am looking at producer code and found that producer updates its
> broker/partition info under the two conditions
>
>1. has reached the topicMetadataRefreshInterval
>2. failed sending message, before retry
>
> So, assume we have broker A and B, B is the current lead and A is the
> preferred lead and a producer is publishing to B. If someone execute
> preferred lead election command now, A will become the new lead and the
> producer won't know the lead is now A and will still writes to B until the
> metadata refresh interval has been reached. Is this correct? Or did I did
> miss anything.
>
>
> Thanks.


Re: Producer can writes to a follower during preferred lead election?

2014-12-08 Thread Gwen Shapira
I think that A will not be able to become a follower until B becomes a leader.

On Sun, Dec 7, 2014 at 11:07 AM, Xiaoyu Wang  wrote:
> On preferred replica election, controller sends LeaderAndIsr requests to
> brokers. Broker will handle the LeaderAndIsr request by either become a
> leader or become a follower.
>
> In the previous case, when A receive the call, it will try to become the
> leader and stop fetching from B; when B receive the call, it will try to
> become a follower and stop receiving new requests. Is it possible that A
> stops fetching before B stops receiving new requests? If this is possible,
> there still may be messages goes to B but not A, right?
>
> On Sun, Dec 7, 2014 at 7:20 AM, Thunder Stumpges 
> wrote:
>
>> In this case B will return "not leader for partition" error as soon as the
>> leader is re-elected and I imagine the producer will correct itself.
>>
>> -Thunder
>>
>>
>> -Original Message-
>> From: Xiaoyu Wang [xw...@rocketfuel.com]
>> Received: Saturday, 06 Dec 2014, 6:49PM
>> To: users@kafka.apache.org [users@kafka.apache.org]
>> Subject: Producer can writes to a follower during preferred lead election?
>>
>> Hello,
>>
>> I am looking at producer code and found that producer updates its
>> broker/partition info under the two conditions
>>
>>1. has reached the topicMetadataRefreshInterval
>>2. failed sending message, before retry
>>
>> So, assume we have broker A and B, B is the current lead and A is the
>> preferred lead and a producer is publishing to B. If someone execute
>> preferred lead election command now, A will become the new lead and the
>> producer won't know the lead is now A and will still writes to B until the
>> metadata refresh interval has been reached. Is this correct? Or did I did
>> miss anything.
>>
>>
>> Thanks.
>>


Re: leaderless topicparts after single node failure: how to repair?

2014-12-10 Thread Gwen Shapira
It looks like none of your replicas are in-sync. Did you enable unclean
leader election?
This will allow one of the un-synced replicas to become leader, leading to
data loss but maintaining availability of the topic.

Gwen


On Tue, Dec 9, 2014 at 8:43 AM, Neil Harkins  wrote:

> Hi. We've suffered a single node HW failure (broker_id 4)
> with at least 2 replicas of each topic partition, but some
> topic parts are now leaderless (all were across 4,5):
>
> Topic: topic.with.two.replicas Partition: 0Leader: -1
> Replicas: 4,5   Isr:
>
> on broker 5, we see warnings like this in the logs:
>
> /var/log/kafka/kafka.log.2:[2014-12-05 05:21:28,216] 19186668
> [kafka-request-handler-4] WARN  kafka.server.ReplicaManager  -
> [Replica Manager on Broker 5]: While recording the follower position,
> the partition [topic.with.two.replicas,0] hasn't been created, skip
> updating leader HW
>
> /var/log/kafka/kafka.log.2:[2014-12-05 05:21:28,219] 19186671
> [kafka-request-handler-4] WARN  kafka.server.KafkaApis  - [KafkaApi-5]
> Fetch request with correlation id 36397 from client
> ReplicaFetcherThread-1-5 on partition [topic.with.two.replicas,0]
> failed due to Topic topic.with.two.replicas either doesn't exist or is
> in the process of being deleted
>
> We also have some topics which had 3 replicas also now leaderless:
>
> Topic:topic.with.three.replicas PartitionCount:6 ReplicationFactor:3
> Configs:
> Topic: topic.with.three.replicas Partition: 0 Leader: none Replicas: 3,1,2
> Isr:
>
> whose 'state' in zookeeper apparently disappeared:
> '/brokers/topics/topic.with.three.replicas/partitions/3/state':
> NoNodeError((), {})
>
> Our versions are:
> kafka 0.8.1
> zookeeper 3.4.5
>
> From searching archives of this list, the recommended "fix"
> is to blow away the topic(s) and recreate. At this point in time,
> that's an option, but it's not really acceptable for a reliable
> data pipeline. Are there options to repair specific partitions?
>
> -neil
>


Re: OutOfMemoryException when starting replacement node.

2014-12-10 Thread Gwen Shapira
There is a parameter called replica.fetch.max.bytes that controls the
size of the messages buffer a broker will attempt to consume at once.
It defaults to 1MB, and has to be at least message.max.bytes (so at
least one message can be sent).

If you try to support really large messages and increase these values,
you may run into OOM issues.

Gwen

On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon  wrote:
> I just wanted to bump this issue to see if anyone has thoughts. Based on
> the error message it seems like the broker is attempting to consume nearly
> 2GB of data in a single fetch. Is this expected behavior?
>
> Please let us know if more details would be helpful or if it would be
> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>
> Thanks,
> Solon
>
> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov  wrote:
>
>> Hi,
>>
>> We were recently trying to replace a broker instance and were getting an
>> OutOfMemoryException when the new node was coming up. The issue happened
>> during the log replication phase. We were able to circumvent this issue by
>> copying over all of the logs to the new node before starting it.
>>
>> Details:
>>
>> - The heap size on the old and new node was 8GB.
>> - There was about 50GB of log data to transfer.
>> - There were 1548 partitions across 11 topics
>> - We recently increased our num.replica.fetchers to solve the problem
>> described here: https://issues.apache.org/jira/browse/KAFKA-1196. However,
>> this process worked when we first changed that value.
>>
>> [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 (kafka.network.
>> BoundedByteBufferReceive)
>> java.lang.OutOfMemoryError: Java heap space
>>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
>> BoundedByteBufferReceive.scala:80)
>>   at kafka.network.BoundedByteBufferReceive.readFrom(
>> BoundedByteBufferReceive.scala:63)
>>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>   at kafka.network.BoundedByteBufferReceive.readCompletely(
>> BoundedByteBufferReceive.scala:29)
>>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>>   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> $sendRequest(SimpleConsumer.scala:71)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> SimpleConsumer.scala:108)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> SimpleConsumer.scala:108)
>>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> SimpleConsumer.scala:108)
>>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>   at kafka.server.AbstractFetcherThread.processFetchRequest(
>> AbstractFetcherThread.scala:96)
>>   at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>> 88)
>>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>
>> Thank you
>>


Re: OutOfMemoryException when starting replacement node.

2014-12-10 Thread Gwen Shapira
If you have replica.fetch.max.bytes set to 10MB, I would not expect
2GB allocation in BoundedByteBufferReceive when doing a fetch.

Sorry, out of ideas on why this happens...

On Wed, Dec 10, 2014 at 8:41 AM, Solon Gordon  wrote:
> Thanks for your help. We do have replica.fetch.max.bytes set to 10MB to
> allow larger messages, so perhaps that's related. But should that really be
> big enough to cause OOMs on an 8GB heap? Are there other broker settings we
> can tune to avoid this issue?
>
> On Wed, Dec 10, 2014 at 11:05 AM, Gwen Shapira 
> wrote:
>
>> There is a parameter called replica.fetch.max.bytes that controls the
>> size of the messages buffer a broker will attempt to consume at once.
>> It defaults to 1MB, and has to be at least message.max.bytes (so at
>> least one message can be sent).
>>
>> If you try to support really large messages and increase these values,
>> you may run into OOM issues.
>>
>> Gwen
>>
>> On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon  wrote:
>> > I just wanted to bump this issue to see if anyone has thoughts. Based on
>> > the error message it seems like the broker is attempting to consume
>> nearly
>> > 2GB of data in a single fetch. Is this expected behavior?
>> >
>> > Please let us know if more details would be helpful or if it would be
>> > better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>> >
>> > Thanks,
>> > Solon
>> >
>> > On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov 
>> wrote:
>> >
>> >> Hi,
>> >>
>> >> We were recently trying to replace a broker instance and were getting an
>> >> OutOfMemoryException when the new node was coming up. The issue happened
>> >> during the log replication phase. We were able to circumvent this issue
>> by
>> >> copying over all of the logs to the new node before starting it.
>> >>
>> >> Details:
>> >>
>> >> - The heap size on the old and new node was 8GB.
>> >> - There was about 50GB of log data to transfer.
>> >> - There were 1548 partitions across 11 topics
>> >> - We recently increased our num.replica.fetchers to solve the problem
>> >> described here: https://issues.apache.org/jira/browse/KAFKA-1196.
>> However,
>> >> this process worked when we first changed that value.
>> >>
>> >> [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283
>> (kafka.network.
>> >> BoundedByteBufferReceive)
>> >> java.lang.OutOfMemoryError: Java heap space
>> >>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>> >>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> >>   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
>> >> BoundedByteBufferReceive.scala:80)
>> >>   at kafka.network.BoundedByteBufferReceive.readFrom(
>> >> BoundedByteBufferReceive.scala:63)
>> >>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >>   at kafka.network.BoundedByteBufferReceive.readCompletely(
>> >> BoundedByteBufferReceive.scala:29)
>> >>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> >>   at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>> >>   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> >> $sendRequest(SimpleConsumer.scala:71)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> >> SimpleConsumer.scala:108)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> >> SimpleConsumer.scala:108)
>> >>   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> >> SimpleConsumer.scala:108)
>> >>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >>   at kafka.server.AbstractFetcherThread.processFetchRequest(
>> >> AbstractFetcherThread.scala:96)
>> >>   at
>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>> >> 88)
>> >>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >>
>> >> Thank you
>> >>
>>


Re: OutOfMemoryException when starting replacement node.

2014-12-10 Thread Gwen Shapira
Ah, found where we actually size the request as partitions * fetch size.

Thanks for the correction, Jay and sorry for the mix-up, Solon.

On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps  wrote:
> Hey Solon,
>
> The 10MB size is per-partition. The rationale for this is that the fetch
> size per-partition is effectively a max message size. However with so many
> partitions on one machine this will lead to a very large fetch size. We
> don't do a great job of scheduling these to stay under a memory bound
> today. Ideally the broker and consumer should do something intelligent to
> stay under a fixed memory budget, this is something we'd like to address as
> part of the new consumer.
>
> For now you need to either bump up your heap or decrease your fetch size.
>
> -jay
>
> On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon  wrote:
>
>> I just wanted to bump this issue to see if anyone has thoughts. Based on
>> the error message it seems like the broker is attempting to consume nearly
>> 2GB of data in a single fetch. Is this expected behavior?
>>
>> Please let us know if more details would be helpful or if it would be
>> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>>
>> Thanks,
>> Solon
>>
>> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov 
>> wrote:
>>
>> > Hi,
>> >
>> > We were recently trying to replace a broker instance and were getting an
>> > OutOfMemoryException when the new node was coming up. The issue happened
>> > during the log replication phase. We were able to circumvent this issue
>> by
>> > copying over all of the logs to the new node before starting it.
>> >
>> > Details:
>> >
>> > - The heap size on the old and new node was 8GB.
>> > - There was about 50GB of log data to transfer.
>> > - There were 1548 partitions across 11 topics
>> > - We recently increased our num.replica.fetchers to solve the problem
>> > described here: https://issues.apache.org/jira/browse/KAFKA-1196.
>> However,
>> > this process worked when we first changed that value.
>> >
>> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 (kafka.network.
>> > BoundedByteBufferReceive)
>> > java.lang.OutOfMemoryError: Java heap space
>> >   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>> >   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> >   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
>> > BoundedByteBufferReceive.scala:80)
>> >   at kafka.network.BoundedByteBufferReceive.readFrom(
>> > BoundedByteBufferReceive.scala:63)
>> >   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >   at kafka.network.BoundedByteBufferReceive.readCompletely(
>> > BoundedByteBufferReceive.scala:29)
>> >   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> >   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>> >   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> > $sendRequest(SimpleConsumer.scala:71)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> > SimpleConsumer.scala:108)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> > SimpleConsumer.scala:108)
>> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> > SimpleConsumer.scala:108)
>> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >   at kafka.server.AbstractFetcherThread.processFetchRequest(
>> > AbstractFetcherThread.scala:96)
>> >   at
>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>> > 88)
>> >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >
>> > Thank you
>> >
>>


Re: OutOfMemoryException when starting replacement node.

2014-12-11 Thread Gwen Shapira
Agree that the docs can be better. Perhaps you want to open a JIRA (at
issues.apache.org) with this suggestion?

On Wed, Dec 10, 2014 at 4:03 PM, Solon Gordon  wrote:
> I see, thank you for the explanation. You might consider being more
> explicit about this in your documentation. We didn't realize we needed to
> take the (partitions * fetch size) calculation into account when choosing
> partition counts for our topics, so this is a bit of a rude surprise.
>
> On Wed, Dec 10, 2014 at 3:50 PM, Gwen Shapira  wrote:
>
>> Ah, found where we actually size the request as partitions * fetch size.
>>
>> Thanks for the correction, Jay and sorry for the mix-up, Solon.
>>
>> On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps  wrote:
>> > Hey Solon,
>> >
>> > The 10MB size is per-partition. The rationale for this is that the fetch
>> > size per-partition is effectively a max message size. However with so
>> many
>> > partitions on one machine this will lead to a very large fetch size. We
>> > don't do a great job of scheduling these to stay under a memory bound
>> > today. Ideally the broker and consumer should do something intelligent to
>> > stay under a fixed memory budget, this is something we'd like to address
>> as
>> > part of the new consumer.
>> >
>> > For now you need to either bump up your heap or decrease your fetch size.
>> >
>> > -jay
>> >
>> > On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon  wrote:
>> >
>> >> I just wanted to bump this issue to see if anyone has thoughts. Based on
>> >> the error message it seems like the broker is attempting to consume
>> nearly
>> >> 2GB of data in a single fetch. Is this expected behavior?
>> >>
>> >> Please let us know if more details would be helpful or if it would be
>> >> better for us to file a JIRA issue. We're using Kafka 0.8.1.1.
>> >>
>> >> Thanks,
>> >> Solon
>> >>
>> >> On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov 
>> >> wrote:
>> >>
>> >> > Hi,
>> >> >
>> >> > We were recently trying to replace a broker instance and were getting
>> an
>> >> > OutOfMemoryException when the new node was coming up. The issue
>> happened
>> >> > during the log replication phase. We were able to circumvent this
>> issue
>> >> by
>> >> > copying over all of the logs to the new node before starting it.
>> >> >
>> >> > Details:
>> >> >
>> >> > - The heap size on the old and new node was 8GB.
>> >> > - There was about 50GB of log data to transfer.
>> >> > - There were 1548 partitions across 11 topics
>> >> > - We recently increased our num.replica.fetchers to solve the problem
>> >> > described here: https://issues.apache.org/jira/browse/KAFKA-1196.
>> >> However,
>> >> > this process worked when we first changed that value.
>> >> >
>> >> > [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283
>> (kafka.network.
>> >> > BoundedByteBufferReceive)
>> >> > java.lang.OutOfMemoryError: Java heap space
>> >> >   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>> >> >   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> >> >   at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(
>> >> > BoundedByteBufferReceive.scala:80)
>> >> >   at kafka.network.BoundedByteBufferReceive.readFrom(
>> >> > BoundedByteBufferReceive.scala:63)
>> >> >   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >> >   at kafka.network.BoundedByteBufferReceive.readCompletely(
>> >> > BoundedByteBufferReceive.scala:29)
>> >> >   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> >> >   at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>> >> >   at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> >> > $sendRequest(SimpleConsumer.scala:71)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> >> > apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> >> > SimpleConsumer.scala:108)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> >> > SimpleConsumer.scala:108)
>> >> >   at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> >> > SimpleConsumer.scala:108)
>> >> >   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >> >   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >> >   at kafka.server.AbstractFetcherThread.processFetchRequest(
>> >> > AbstractFetcherThread.scala:96)
>> >> >   at
>> >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>> >> > 88)
>> >> >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >> >
>> >> > Thank you
>> >> >
>> >>
>>


Re: Kafka design pattern question - multiple user ids

2014-12-15 Thread Gwen Shapira
When you send messages to Kafka you send a  pair. The key
can include the user id.

Here's how:

KeyedMessage data = new KeyedMessage
  (user_id, user_id, event);

producer.send(data);

Hope this helps,
Gwen

On Mon, Dec 15, 2014 at 10:29 AM, Harold Nguyen  wrote:
> Hello Kafka Experts!
>
> Sorry if this has been answered before - I was hoping for a quick response
> to a naive questions for a newbie like myself!
>
> If I have multiple users, how do I split the streams so that they
> correspond with different user ids ?
>
> Suppose I have tens of thousands of user ids that I want to keep track of.
> Is there a way to write to Kafka and associate a "key" with it ? (The key
> being the user id?) Or is there a better way to do this ?
>
> Thanks so much for your time!
>
> Harold


Re: Kafka design pattern question - multiple user ids

2014-12-15 Thread Gwen Shapira
AFAIK, you can have as many keys as you want - but if you are looking
to have a separate partition for each key, you are more limited. I
can't give an exact limit since it depends on multiple factors, but
probably not over 10,000 (and even 1000 for a single topic can be
"pushing it" in some cases).

I recommend using HashPartition for placing multiple user_ids in one
partition while making sure that all messages for this user will go to
the same partition.

On Mon, Dec 15, 2014 at 10:48 AM, Harold Nguyen  wrote:
> Hi Gwen,
>
> Thanks for the great and fast reply! How many different keys can Kafka
> support ?
>
> Harold
>
> On Mon, Dec 15, 2014 at 10:46 AM, Gwen Shapira 
> wrote:
>>
>> When you send messages to Kafka you send a  pair. The key
>> can include the user id.
>>
>> Here's how:
>>
>> KeyedMessage data = new KeyedMessage
>>   (user_id, user_id, event);
>>
>> producer.send(data);
>>
>> Hope this helps,
>> Gwen
>>
>> On Mon, Dec 15, 2014 at 10:29 AM, Harold Nguyen 
>> wrote:
>> > Hello Kafka Experts!
>> >
>> > Sorry if this has been answered before - I was hoping for a quick
>> response
>> > to a naive questions for a newbie like myself!
>> >
>> > If I have multiple users, how do I split the streams so that they
>> > correspond with different user ids ?
>> >
>> > Suppose I have tens of thousands of user ids that I want to keep track
>> of.
>> > Is there a way to write to Kafka and associate a "key" with it ? (The key
>> > being the user id?) Or is there a better way to do this ?
>> >
>> > Thanks so much for your time!
>> >
>> > Harold
>>


Re: Number of Consumers Connected

2014-12-15 Thread Gwen Shapira
Currently you can find the number of consumer groups through ZooKeeper:

connect to ZK and run
ls /consumers

and count the number of results

On Mon, Dec 15, 2014 at 11:34 AM, nitin sharma
 wrote:
> Hi Team,
>
> Is it possible to know how many Consumer Group connected to kafka broker Ids
> and as well as how many Instances within a Group are fetching messages from
> Kafka Brokers
>
> Regards,
> Nitin Kumar Sharma.


Re: Number of Consumers Connected

2014-12-15 Thread Gwen Shapira
Hi Nitin,

Go to where you installed zookeeper and run:

bin/zkCli.sh -server 127.0.0.1:2181

On Mon, Dec 15, 2014 at 6:09 PM, nitin sharma
 wrote:
> Thanks Neha and Gwen for your responses..
>
> @Gwen -- Kindly explain how to perform the steps you have mentioned. how
> should i connect to a zookeeper..?
>
> Regards,
> Nitin Kumar Sharma.
>
>
> On Mon, Dec 15, 2014 at 6:36 PM, Neha Narkhede  wrote:
>>
>> In addition to Gwen's suggestion, we actually don't have jmx metrics that
>> give you a list of actively consuming processes.
>>
>> On Mon, Dec 15, 2014 at 12:59 PM, Gwen Shapira 
>> wrote:
>> >
>> > Currently you can find the number of consumer groups through ZooKeeper:
>> >
>> > connect to ZK and run
>> > ls /consumers
>> >
>> > and count the number of results
>> >
>> > On Mon, Dec 15, 2014 at 11:34 AM, nitin sharma
>> >  wrote:
>> > > Hi Team,
>> > >
>> > > Is it possible to know how many Consumer Group connected to kafka
>> broker
>> > Ids
>> > > and as well as how many Instances within a Group are fetching messages
>> > from
>> > > Kafka Brokers
>> > >
>> > > Regards,
>> > > Nitin Kumar Sharma.
>> >
>>
>>
>> --
>> Thanks,
>> Neha
>>


Re: consumer groups

2014-12-16 Thread Gwen Shapira
" If all
the consumers stop listening how long will Kafka continue to store messages
for that group?"

Kafka retains data for set amount of time, regardless of whether
anyone is listening or not. This amount of time is configurable.
Because Kafka performance is generally constant with the amount of
data it stores, storing terabytes of data for months is not uncommon.

Gwen

On Tue, Dec 16, 2014 at 1:09 PM, Greg Lloyd  wrote:
> Hi,
>
> I am planning to use kafka for a work queue type use case with multiple
> consumers. I also plan to use it with multiple consumer groups. What is not
> clear to me from the documentation is how to define+manage consumer groups
> or if that is possible at all. It appears to me that a consumer group is
> created just by connecting with a new group-id. What is not clear to me is
> how to manage this: if I no longer want a consumer group how do I remove
> it? What happens when there are no consumers of a group listening? If all
> the consumers stop listening how long will Kafka continue to store messages
> for that group?
>
> Thanks for any clarification,
>
> Greg Lloyd


Re: consumer groups

2014-12-17 Thread Gwen Shapira
When you add a new consumer group, you can choose where it will start reading.

With high-level consumer you can set auto.offset.reset to "largest"
(new messages only) or "smallest" (all messages) , with simple
consumer you can pick specific offsets.


On Wed, Dec 17, 2014 at 2:33 PM, Greg Lloyd  wrote:
> Thanks for the reply,
>
> So if I wanted to add a new group of consumers 6 months into the lifespan
> of my implementation and I didn't want that new group to process all the
> last six months is there a method to manage this?
>
>
>
> On Tue, Dec 16, 2014 at 9:48 PM, Gwen Shapira  wrote:
>>
>> " If all
>> the consumers stop listening how long will Kafka continue to store messages
>> for that group?"
>>
>> Kafka retains data for set amount of time, regardless of whether
>> anyone is listening or not. This amount of time is configurable.
>> Because Kafka performance is generally constant with the amount of
>> data it stores, storing terabytes of data for months is not uncommon.
>>
>> Gwen
>>
>> On Tue, Dec 16, 2014 at 1:09 PM, Greg Lloyd  wrote:
>> > Hi,
>> >
>> > I am planning to use kafka for a work queue type use case with multiple
>> > consumers. I also plan to use it with multiple consumer groups. What is
>> not
>> > clear to me from the documentation is how to define+manage consumer
>> groups
>> > or if that is possible at all. It appears to me that a consumer group is
>> > created just by connecting with a new group-id. What is not clear to me
>> is
>> > how to manage this: if I no longer want a consumer group how do I remove
>> > it? What happens when there are no consumers of a group listening? If all
>> > the consumers stop listening how long will Kafka continue to store
>> messages
>> > for that group?
>> >
>> > Thanks for any clarification,
>> >
>> > Greg Lloyd
>>


Re: can't produce message in kafka production

2014-12-18 Thread Gwen Shapira
Looks like you can't connect to: 10.100.98.100:9092

I'd validate that this is the issue using telnet and then check the
firewall / ipfilters settings.

On Thu, Dec 18, 2014 at 2:21 PM, Sa Li  wrote:
> Dear all
>
> We just build a kafka production cluster, I can create topics in kafka
> production from another host. But when I am send very simple message as
> producer, it generate such errors:
>
> root@precise64:/etc/kafka# bin/kafka-console-producer.sh --broker-list
> 10.100.98.100:9092 --topic my-replicated-topic-production
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> my test message 1
> [2014-12-18 21:44:25,830] WARN Failed to send producer request with
> correlation id 2 to broker 101 with data for partitions
> [my-replicated-topic-production,1]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
> at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:256)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:99)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2014-12-18 21:44:25,948] WARN Failed to send producer request with
> correlation id 5 to broker 101 with data for partitions
> [my-replicated-topic-production,1]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply

Re: can't produce message in kafka production

2014-12-18 Thread Gwen Shapira
Perhaps you have the logs from broker? It may show other errors that
can help us troubleshoot.

On Thu, Dec 18, 2014 at 4:11 PM, Sa Li  wrote:
> Thanks, Gwen, I telnet it,
> root@precise64:/etc/kafka# telnet 10.100.98.100 9092
> Trying 10.100.98.100...
> Connected to 10.100.98.100.
> Escape character is '^]'.
>
> seems it connected, and I check with system operation people, netstate
> should 9092 is listening. I am assuming this is the connection issue, since
> I can run the same command to my dev-cluster with no problem at all, which
> is 10.100.70.128:9092.
>
> Just in case, is it possibly caused by other types of issues?
>
> thanks
>
> Alec
>
> On Thu, Dec 18, 2014 at 2:33 PM, Gwen Shapira  wrote:
>>
>> Looks like you can't connect to: 10.100.98.100:9092
>>
>> I'd validate that this is the issue using telnet and then check the
>> firewall / ipfilters settings.
>>
>> On Thu, Dec 18, 2014 at 2:21 PM, Sa Li  wrote:
>> > Dear all
>> >
>> > We just build a kafka production cluster, I can create topics in kafka
>> > production from another host. But when I am send very simple message as
>> > producer, it generate such errors:
>> >
>> > root@precise64:/etc/kafka# bin/kafka-console-producer.sh --broker-list
>> > 10.100.98.100:9092 --topic my-replicated-topic-production
>> > SLF4J: Class path contains multiple SLF4J bindings.
>> > SLF4J: Found binding in
>> >
>> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> > SLF4J: Found binding in
>> >
>> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> > explanation.
>> > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> > my test message 1
>> > [2014-12-18 21:44:25,830] WARN Failed to send producer request with
>> > correlation id 2 to broker 101 with data for partitions
>> > [my-replicated-topic-production,1]
>> > (kafka.producer.async.DefaultEventHandler)
>> > java.nio.channels.ClosedChannelException
>> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>> > at
>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>> > at
>> >
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>> > at
>> >
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>> > at
>> >
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>> > at
>> >
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> > at
>> >
>> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>> > at
>> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>> > at
>> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>> > at
>> >
>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:256)
>> > at
>> >
>> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
>> > at
>> >
>> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:99)
>> > at
>> >
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> > at
>> >
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>> > at
>> >
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>> > at
>> >
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>> > at
>> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>> > 

Re: Uneven disk usage in Kafka 0.8.1.1

2014-12-25 Thread Gwen Shapira
Hi,

LogManager.nextLogDir() has the logic for choosing which directory to use.
The documentation of the method says:
  /**
   * Choose the next directory in which to create a log. Currently this is
done
   * by calculating the number of partitions in each directory and then
choosing the
   * data directory with the fewest partitions.
   */

A possible explanation for what you are seeing is that some of the
directories were added recently and simply did not fill up yet. Kafka
doesn't attempt to rebalance logs across disks or directories (although
this could be a nice feature request, at least for a utility).

Gwen




On Thu, Dec 25, 2014 at 1:30 AM, Yury Ruchin  wrote:

> I also noticed that some disks do not contain any partitions which is even
> more strange. Why does broker not use them, while other disks are
> overloaded?
>
> 2014-12-25 12:07 GMT+03:00 Yury Ruchin :
>
> > Hi,
> >
> > With Kafka 0.8.1.1, I'm continuously running into the issue with no disk
> > space remaining. I'm observing that on the problematic brokers partitions
> > are distributed across several identical disks very unevenly. For
> example,
> > provided the fact that partitions have similar size, I see 6 partitions
> > being written to one disk, while the other disks get only 1-3 partitions
> > each.
> >
> > What could cause that? Is that possible to somehow affect the partition
> > distribution?
> >
> > Thanks,
> > Yury
> >
>


Re: Kafka 0.8.2 release - before Santa Claus?

2014-12-25 Thread Gwen Shapira
IMO:
KAFKA-1790 - can be pushed out (or even marked as "won't fix")
KAFKA-1782 - can be pushed out (not really a blocker)

The rest look like actual blockers to me.

Gwen

On Tue, Dec 23, 2014 at 1:32 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Hi,
>
> I see 16 open issues for 0.8.2 at
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.2%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20updated%20DESC%2C%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC
> - some with patches, some blockers, some blockers without patches.
>
> Are all issues listed as blockers truly blockers for 0.8.2?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Dec 1, 2014 at 8:13 PM, Joe Stein  wrote:
>
> > If we can have a build ready by Dec 26th I think that is feasible. I
> could
> > prepare and post that if we think we have the votes and a stable version.
> >
> > /***
> > Joe Stein
> > Founder, Principal Consultant
> > Big Data Open Source Security LLC
> > http://www.stealth.ly
> > Twitter: @allthingshadoop
> > /
> > On Dec 1, 2014 7:56 PM, "Neha Narkhede"  wrote:
> >
> > > +1 for doing a 0.8.2 final before the December break.
> > >
> > > On Mon, Dec 1, 2014 at 8:40 AM, Jun Rao  wrote:
> > >
> > > > We are currently discussing a last-minute API change to the new java
> > > > producer. We have also accumulated a few more 0.8.2 blockers.
> > > >
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/KAFKA-1642?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.8.2
> > > >
> > > > So, we likely will have another 0.8.2 release in Dec. However, I am
> not
> > > > sure if that's beta2 or final.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Wed, Nov 26, 2014 at 12:22 PM, Otis Gospodnetic <
> > > > otis.gospodne...@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > People using SPM to monitor Kafka have been anxiously asking us
> about
> > > the
> > > > > 0.8.2 release and we've been telling them December.  Is that still
> > the
> > > > > plan?
> > > > >
> > > > > Thanks,
> > > > > Otis
> > > > > --
> > > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > Management
> > > > > Solr & Elasticsearch Support * http://sematext.com/
> > > > >
> > > >
> > >
> >
>


Re: Kafka 0.8.2 release - before Santa Claus?

2014-12-26 Thread Gwen Shapira
Actually, KAFKA-1785 <https://issues.apache.org/jira/browse/KAFKA-1785> can
also wait - since it is likely to be part of a larger patch.

On Thu, Dec 25, 2014 at 10:39 AM, Gwen Shapira 
wrote:

> IMO:
> KAFKA-1790 - can be pushed out (or even marked as "won't fix")
> KAFKA-1782 - can be pushed out (not really a blocker)
>
> The rest look like actual blockers to me.
>
> Gwen
>
> On Tue, Dec 23, 2014 at 1:32 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
>> Hi,
>>
>> I see 16 open issues for 0.8.2 at
>>
>> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.2%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20updated%20DESC%2C%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC
>> - some with patches, some blockers, some blockers without patches.
>>
>> Are all issues listed as blockers truly blockers for 0.8.2?
>>
>> Thanks,
>> Otis
>> --
>> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> Solr & Elasticsearch Support * http://sematext.com/
>>
>>
>> On Mon, Dec 1, 2014 at 8:13 PM, Joe Stein  wrote:
>>
>> > If we can have a build ready by Dec 26th I think that is feasible. I
>> could
>> > prepare and post that if we think we have the votes and a stable
>> version.
>> >
>> > /***
>> > Joe Stein
>> > Founder, Principal Consultant
>> > Big Data Open Source Security LLC
>> > http://www.stealth.ly
>> > Twitter: @allthingshadoop
>> > /
>> > On Dec 1, 2014 7:56 PM, "Neha Narkhede" 
>> wrote:
>> >
>> > > +1 for doing a 0.8.2 final before the December break.
>> > >
>> > > On Mon, Dec 1, 2014 at 8:40 AM, Jun Rao  wrote:
>> > >
>> > > > We are currently discussing a last-minute API change to the new java
>> > > > producer. We have also accumulated a few more 0.8.2 blockers.
>> > > >
>> > > >
>> > > >
>> > >
>> >
>> https://issues.apache.org/jira/browse/KAFKA-1642?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.8.2
>> > > >
>> > > > So, we likely will have another 0.8.2 release in Dec. However, I am
>> not
>> > > > sure if that's beta2 or final.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jun
>> > > >
>> > > > On Wed, Nov 26, 2014 at 12:22 PM, Otis Gospodnetic <
>> > > > otis.gospodne...@gmail.com> wrote:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > > People using SPM to monitor Kafka have been anxiously asking us
>> about
>> > > the
>> > > > > 0.8.2 release and we've been telling them December.  Is that still
>> > the
>> > > > > plan?
>> > > > >
>> > > > > Thanks,
>> > > > > Otis
>> > > > > --
>> > > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
>> > Management
>> > > > > Solr & Elasticsearch Support * http://sematext.com/
>> > > > >
>> > > >
>> > >
>> >
>>
>
>


Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
OffsetCommitRequest has two constructors now:

For version 0:
 OffsetCommitRequest(String groupId, Map offsetData)

And version 1:
OffsetCommitRequest(String groupId, int generationId, String
consumerId, Map offsetData)

None of them seem to require timestamps... so I'm not sure where you
see that this is required. Can you share an example?

Gwen

On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers  wrote:
> Hi Joel,
>
> I'm looking more closely at the OffsetCommitRequest wire protocol change
> you mentioned below, and I cannot figure out how to explicitly construct a
> request with the earlier version.  Should the api version be different for
> requests that do not include it and/or servers that do not support the
> timestamp field?  It looks like 0.8.1.1 did not include the timestamp field
> and used api version 0.  But 0.8.2-beta seems to now require timestamps
> even when I explicitly encode OffsetCommitRequest api version 0 (server
> logs a BufferUnderflowException).
>
> Is this the expected server behavior?  Can you provide any tips on how
> third-party clients should manage the wire-protocol change for this api
> method (I'm working on kafka-python)?
>
> Thanks,
>
> -Dana
>
> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy  wrote:
>
>> Yes it should be backwards compatible. So for e.g., you should be able
>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
>> not upgrade your clients until after the brokers have been upgraded.
>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
>> explicitly construct an OffsetCommitRequest with the earlier version.
>>
>> Thanks,
>>
>> Joel
>>
>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
>> > Hi Joel,
>> >
>> > Thanks for all the clarifications!  Just another question on this: will
>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
>> > Generally speaking, would there be any concerns with using the 0.8.2
>> > consumer with a 0.8.1 broker, for instance?
>> >
>> > Marius
>> >
>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy  wrote:
>> >
>> > > Inline..
>> > >
>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
>> > > > Hello everyone,
>> > > >
>> > > > I have a few questions about the current status and future of the
>> Kafka
>> > > > consumers.
>> > > >
>> > > > We have been working to adding Kafka support in Spring XD [1],
>> currently
>> > > > using the high level consumer via Spring Integration Kafka [2]. We
>> are
>> > > > working on adding features such as:
>> > > > - the ability to control offsets/replay topics;
>> > > > - the ability to control partition allocation across multiple
>> consumers;
>> > > >
>> > > > We are currently at version 0.8.1.1, so using the simple consumer is
>> a
>> > > > pretty straightforward choice right now. However, in the light of the
>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
>> > > >
>> > > > 1) With respect to the consumer redesign for 0.9, what is the future
>> of
>> > > the
>> > > > Simple Consumer and High Level Consumer? To my best understanding,
>> the
>> > > > existing high level consumer API will be deprecated in favour of the
>> new
>> > > > consumer API. What is the future of the Simple Consumer, in this
>> case? it
>> > > > will continue to exist as a low-level API implementing the Kafka
>> protocol
>> > > > [3] and providing the building blocks for the new consumer, or will
>> it be
>> > > > deprecated as well?
>> > >
>> > > The new consumer will subsume both use-cases (simple and high-level).
>> > > You can still use the old SimpleConsumer if you wish - i.e., the wire
>> > > protocol for fetch and other requests will still be supported.
>> > >
>> > > >
>> > > > 2) Regarding the new consumer: the v0.8.2 codebase contains an early
>> > > > implementation of it, but since this a feature scheduled only for
>> 0.9,
>> > > what
>> > > > is its status as well? Is it included only as a future reference and
>> for
>> > > > stabilizing the API?
>> > >
>> > > It is a WIP so you cannot really use it.
>> > >
>> > > > 3) Obviously, offset management is a concern if using the simple
>> > > consumer,
>> > > > so - wondering about the Offset Management API as well. The Kafka
>> > > protocol
>> > > > document specifically indicates that it will be fully functional in
>> 0.8.2
>> > > > [4] - however, a functional implementation is already available in
>> > > 0.8.1.1
>> > > > (accessible via the SimpleConsumer API but not documented in [5]).
>> Again,
>> > > > trying to understand the extent of what 0.8.1.1 already supports
>> > > > (ostensibly, the offset manager support seems to have been added
>> only in
>> > > > 0.8.2 - please correct me if I am wrong), and whether if it is
>> > > recommended
>> > > > for use in production in any form (with the caveats that acco

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
Ah, I see :)

The readFrom function basically tries to read two extra fields if you
are on version 1:

if (versionId == 1) {
  groupGenerationId = buffer.getInt
  consumerId = readShortString(buffer)
}

The rest looks identical in version 0 and 1, and still no timestamp in sight...

Gwen

On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers  wrote:
> Hi Gwen, I am using/writing kafka-python to construct api requests and have
> not dug too deeply into the server source code.  But I believe it is
> kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
> used to decode the wire protocol.
>
> -Dana
> OffsetCommitRequest has two constructors now:
>
> For version 0:
>  OffsetCommitRequest(String groupId, Map PartitionData> offsetData)
>
> And version 1:
> OffsetCommitRequest(String groupId, int generationId, String
> consumerId, Map offsetData)
>
> None of them seem to require timestamps... so I'm not sure where you
> see that this is required. Can you share an example?
>
> Gwen
>
> On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers  wrote:
>> Hi Joel,
>>
>> I'm looking more closely at the OffsetCommitRequest wire protocol change
>> you mentioned below, and I cannot figure out how to explicitly construct a
>> request with the earlier version.  Should the api version be different for
>> requests that do not include it and/or servers that do not support the
>> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
> field
>> and used api version 0.  But 0.8.2-beta seems to now require timestamps
>> even when I explicitly encode OffsetCommitRequest api version 0 (server
>> logs a BufferUnderflowException).
>>
>> Is this the expected server behavior?  Can you provide any tips on how
>> third-party clients should manage the wire-protocol change for this api
>> method (I'm working on kafka-python)?
>>
>> Thanks,
>>
>> -Dana
>>
>> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy  wrote:
>>
>>> Yes it should be backwards compatible. So for e.g., you should be able
>>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
>>> not upgrade your clients until after the brokers have been upgraded.
>>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
>>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
>>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
>>> explicitly construct an OffsetCommitRequest with the earlier version.
>>>
>>> Thanks,
>>>
>>> Joel
>>>
>>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
>>> > Hi Joel,
>>> >
>>> > Thanks for all the clarifications!  Just another question on this: will
>>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
>>> > Generally speaking, would there be any concerns with using the 0.8.2
>>> > consumer with a 0.8.1 broker, for instance?
>>> >
>>> > Marius
>>> >
>>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy 
> wrote:
>>> >
>>> > > Inline..
>>> > >
>>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
>>> > > > Hello everyone,
>>> > > >
>>> > > > I have a few questions about the current status and future of the
>>> Kafka
>>> > > > consumers.
>>> > > >
>>> > > > We have been working to adding Kafka support in Spring XD [1],
>>> currently
>>> > > > using the high level consumer via Spring Integration Kafka [2]. We
>>> are
>>> > > > working on adding features such as:
>>> > > > - the ability to control offsets/replay topics;
>>> > > > - the ability to control partition allocation across multiple
>>> consumers;
>>> > > >
>>> > > > We are currently at version 0.8.1.1, so using the simple consumer
> is
>>> a
>>> > > > pretty straightforward choice right now. However, in the light of
> the
>>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have a few
> questions:
>>> > > >
>>> > > > 1) With respect to the consumer redesign for 0.9, what is the
> future
>>> of
>>> > > the
>>> > > > Simple Consumer and High Level Consumer? To my best understanding,
>>> the
>>> > > > existing high level consumer API will be deprecated in favour of
> the
>>> new
>>> > > > consumer API. What is the future of the Simple Consumer, in this
>>> case? it
>>> > > > will continue to exist as a low-level API implementing the Kafka
>>> protocol
>>> > > > [3] and providing the building blocks for the new consumer, or will
>>> it be
>>> > > > deprecated as well?
>>> > >
>>> > > The new consumer will subsume both use-cases (simple and high-level).
>>> > > You can still use the old SimpleConsumer if you wish - i.e., the wire
>>> > > protocol for fetch and other requests will still be supported.
>>> > >
>>> > > >
>>> > > > 2) Regarding the new consumer: the v0.8.2 codebase contains an
> early
>>> > > > implementation of it, but since this a feature scheduled only for
>>> 0.9,
>>> > > what
>>> > > > is its status as well? Is it included only as a future reference
> and
>>> for
>>> > > > stabilizing the API?
>>> > >
>>> > > It is a WIP so you cannot really u

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
part of the Map changed, which will modify the wire protocol.

This is actually not handled in the Java client either. It will send
the timestamp no matter which version is used.

This looks like a bug and I'd even mark it as blocker for 0.8.2 since
it may prevent rolling upgrades.

Are you opening the JIRA?

Gwen

On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers  wrote:
> specifically comparing 0.8.1 --
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
> ```
> (1 to partitionCount).map(_ => {
>   val partitionId = buffer.getInt
>   val offset = buffer.getLong
>   val metadata = readShortString(buffer)
>   (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
> metadata))
> })
> ```
>
> totrunk --
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
> ```
> (1 to partitionCount).map(_ => {
>   val partitionId = buffer.getInt
>   val offset = buffer.getLong
>   val timestamp = {
> val given = buffer.getLong
> if (given == -1L) now else given
>   }
>   val metadata = readShortString(buffer)
>   (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
> metadata, timestamp))
> })
> ```
>
> should the `timestamp` buffer read be wrapped in an api version check?
>
>
> Dana Powers
> Rdio, Inc.
> dana.pow...@rd.io
> rdio.com/people/dpkp/
>
> On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira  wrote:
>
>> Ah, I see :)
>>
>> The readFrom function basically tries to read two extra fields if you
>> are on version 1:
>>
>> if (versionId == 1) {
>>   groupGenerationId = buffer.getInt
>>   consumerId = readShortString(buffer)
>> }
>>
>> The rest looks identical in version 0 and 1, and still no timestamp in
>> sight...
>>
>> Gwen
>>
>> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers  wrote:
>> > Hi Gwen, I am using/writing kafka-python to construct api requests and
>> have
>> > not dug too deeply into the server source code.  But I believe it is
>> > kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
>> > used to decode the wire protocol.
>> >
>> > -Dana
>> > OffsetCommitRequest has two constructors now:
>> >
>> > For version 0:
>> >  OffsetCommitRequest(String groupId, Map> > PartitionData> offsetData)
>> >
>> > And version 1:
>> > OffsetCommitRequest(String groupId, int generationId, String
>> > consumerId, Map offsetData)
>> >
>> > None of them seem to require timestamps... so I'm not sure where you
>> > see that this is required. Can you share an example?
>> >
>> > Gwen
>> >
>> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers  wrote:
>> >> Hi Joel,
>> >>
>> >> I'm looking more closely at the OffsetCommitRequest wire protocol change
>> >> you mentioned below, and I cannot figure out how to explicitly
>> construct a
>> >> request with the earlier version.  Should the api version be different
>> for
>> >> requests that do not include it and/or servers that do not support the
>> >> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
>> > field
>> >> and used api version 0.  But 0.8.2-beta seems to now require timestamps
>> >> even when I explicitly encode OffsetCommitRequest api version 0 (server
>> >> logs a BufferUnderflowException).
>> >>
>> >> Is this the expected server behavior?  Can you provide any tips on how
>> >> third-party clients should manage the wire-protocol change for this api
>> >> method (I'm working on kafka-python)?
>> >>
>> >> Thanks,
>> >>
>> >> -Dana
>> >>
>> >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy 
>> wrote:
>> >>
>> >>> Yes it should be backwards compatible. So for e.g., you should be able
>> >>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
>> >>> not upgrade your clients until after the brokers have been upgraded.
>> >>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
>> >>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
>> >>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
>> >>> explicitly construct an Off

Re: Current vote - 0.8.2.0-RC1 or 0.8.2.0?

2015-01-14 Thread Gwen Shapira
The Apache process is that you vote for an RC, and if the vote passes
(i.e. three +1 from PMC and no -1) the same artifacts will be released
(without RC).
If issues are discovered, there may be another RC.

Note that the RC is published on Jun's directory, not an official
Kafka repository.

You can see more details here:
http://www.apache.org/dev/release-publishing
http://www.apache.org/dev/release.html

On Wed, Jan 14, 2015 at 9:12 AM, Stevo Slavić  wrote:
> Hello Apache Kafka community,
>
> Is currently active vote for 0.8.2.0-RC1 or 0.8.2.0?
>
> If the vote is for 0.8.2.0-RC1 why isn't that reflected in artifact
> metadata? Version should be 0.8.2.0-RC1, 0.8.2-RC1 or something similar
> (0.8.2 beta release had "-beta" and no ".0" suffix - see
> http://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2-beta/ )
> If it stays like this, and final gets released later with same 0.8.2.0
> version, but different content - repositories, both local and remote will
> get polluted with junk.
>
> If the vote is for 0.8.2.0 final GA release, why call the vote candidate 1?
>
> Also, version related - none of the previous 0.8.x releases had ".0"
> release i.e. 0.8.x.0. Is this change in version numbering intentional?
>
> Kind regards,
> Stevo Slavic.


Re: Delete topic

2015-01-14 Thread Gwen Shapira
At the moment, the best way would be:

* Wait about two weeks
* Upgrade to 0.8.2
* Use kafka-topic.sh --delete

:)

2015-01-14 9:26 GMT-08:00 Armando Martinez Briones :
> Hi.
>
> What is the best way to delete a topic into production environment?
>
> --
> [image: Tralix][image: 1]José Armando Martínez Briones
> *Arquitecto de software*
> Tralix México
> Tel: +52 442 161 1002 ext. 2920
> www.tralix.com.mx
> 
> *TRALIX MÉXICO S. DE R.L.
> DE C.V* como responsable del tratamiento de sus datos personales, hace de
> su conocimiento que la información obtenida por este medio es tratada de
> forma estrictamente confidencial por lo que recabaremos y trataremos de sus
> datos según los lineamientos de nuestro Aviso de Privacidad
> . Para
> conocer la versión completa podrá hacerlo a través de la página Aviso de
> privacidad. Al leer el presente Aviso de Privacidad y no manifestar su
> oposición al tratamiento de sus Datos Personales, se entiende que usted
> conoció la versión completa y acepta los términos del mismo, siempre de
> conformidad a la Ley Federal de Protección de Datos Personales en Posesión
> de los Particulares.


Re: Delete topic

2015-01-14 Thread Gwen Shapira
Yep, you need to set delete.topic.enable=true.

Forgot that step :)

2015-01-14 10:16 GMT-08:00 Jayesh Thakrar :
> Does one also need to set the config parameter "delete.topic.enable" to true 
> ?I am using 8.2 beta and I had to set it to true to enable topic deletion.
>   From: Armando Martinez Briones 
>  To: users@kafka.apache.org
>  Sent: Wednesday, January 14, 2015 11:33 AM
>  Subject: Re: Delete topic
>
> thanks Gwen Shapira ;)
>
> El 14 de enero de 2015, 11:31, Gwen Shapira 
> escribió:
>
>> At the moment, the best way would be:
>>
>> * Wait about two weeks
>> * Upgrade to 0.8.2
>> * Use kafka-topic.sh --delete
>>
>> :)
>>
>> 2015-01-14 9:26 GMT-08:00 Armando Martinez Briones :
>> > Hi.
>> >
>> > What is the best way to delete a topic into production environment?
>> >
>> > --
>> > [image: Tralix][image: 1]José Armando Martínez Briones
>> > *Arquitecto de software*
>> > Tralix México
>> > Tel: +52 442 161 1002 ext. 2920
>> > www.tralix.com.mx
>> > <https://twitter.com/#!/tralix>
>> > <http://www.facebook.com/pages/Tralix/47723192646>*TRALIX MÉXICO S. DE
>> R.L.
>> > DE C.V* como responsable del tratamiento de sus datos personales, hace de
>> > su conocimiento que la información obtenida por este medio es tratada de
>> > forma estrictamente confidencial por lo que recabaremos y trataremos de
>> sus
>> > datos según los lineamientos de nuestro Aviso de Privacidad
>> > <http://tralix.com.mx/politicasdeprivacidad/avisodeprivacidad.html>.
>> Para
>> > conocer la versión completa podrá hacerlo a través de la página Aviso de
>> > privacidad. Al leer el presente Aviso de Privacidad y no manifestar su
>> > oposición al tratamiento de sus Datos Personales, se entiende que usted
>> > conoció la versión completa y acepta los términos del mismo, siempre de
>> > conformidad a la Ley Federal de Protección de Datos Personales en
>> Posesión
>> > de los Particulares.
>
>
>>
>
>
>
> --
> [image: Tralix][image: 1]José Armando Martínez Briones
> *Arquitecto de software*
> Tralix México
> Tel: +52 442 161 1002  ext. 2920
> www.tralix.com.mx
> <https://twitter.com/#!/tralix>
> <http://www.facebook.com/pages/Tralix/47723192646>*TRALIX MÉXICO S. DE R.L.
> DE C.V* como responsable del tratamiento de sus datos personales, hace de
> su conocimiento que la información obtenida por este medio es tratada de
> forma estrictamente confidencial por lo que recabaremos y trataremos de sus
> datos según los lineamientos de nuestro Aviso de Privacidad
> <http://tralix.com.mx/politicasdeprivacidad/avisodeprivacidad.html>. Para
> conocer la versión completa podrá hacerlo a través de la página Aviso de
> privacidad. Al leer el presente Aviso de Privacidad y no manifestar su
> oposición al tratamiento de sus Datos Personales, se entiende que usted
> conoció la versión completa y acepta los términos del mismo, siempre de
> conformidad a la Ley Federal de Protección de Datos Personales en Posesión
> de los Particulares.
>
>


Re: "java.io.IOException: Too many open files" error

2015-01-15 Thread Gwen Shapira
You may find this article useful for troubleshooting and modifying TIME_WAIT:
http://www.linuxbrigade.com/reduce-time_wait-socket-connections/

The line you have for increasing file limit is fine, but you may also
need to increase the limit system wide:
insert "fs.file-max = 10" in /etc/sysctl.conf

Gwen

On Thu, Jan 15, 2015 at 12:30 PM, Sa Li  wrote:
> Hi, all
>
> We test our production kafka, and getting such error
>
> [2015-01-15 19:03:45,057] ERROR Error in acceptor (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at sun.nio.ch.ServerSocketChannelImpl.accept(
> ServerSocketChannelImpl.java:241)
> at kafka.network.Acceptor.accept(SocketServer.scala:200)
> at kafka.network.Acceptor.run(SocketServer.scala:154)
> at java.lang.Thread.run(Thread.java:745)
>
> I noticed some other developers had similar issues, one suggestion was "
>
> Without knowing the intricacies of Kafka, i think the default open file
> descriptors is 1024 on unix. This can be changed by setting a higher ulimit
> value ( typically 8192 but sometimes even 10 ).
> Before modifying the ulimit I would recommend you check the number of
> sockets stuck in TIME_WAIT mode. In this case, it looks like the broker has
> too many open sockets. This could be because you have a rogue client
> connecting and disconnecting repeatedly.
> You might have to reduce the TIME_WAIT state to 30 seconds or lower.
>
> "
>
> We increase the open file handles by doing this:
>
> insert "kafka - nofile 10" in /etc/security/limits.conf
>
> Is that right to change the open file descriptors?  In addition, it says to
> reduce the TIME_WAIT, where about to change this state? Or any other
> solution for this issue?
>
> thanks
>
>
>
> --
>
> Alec Li


Re: [VOTE] 0.8.2.0 Candidate 1

2015-01-15 Thread Gwen Shapira
Would make sense to enable it after we have authorization feature and
admins can control who can delete what.

On Thu, Jan 15, 2015 at 6:32 PM, Jun Rao  wrote:
> Yes, I agree it's probably better not to enable "delete.topic.enable" by
> default.
>
> Thanks,
>
> Jun
>
> On Thu, Jan 15, 2015 at 6:29 PM, Joe Stein  wrote:
>
>> I think that is a change of behavior that organizations may get burned on.
>> Right now there is no delete data feature. If an operations teams upgrades
>> to 0.8.2 and someone decides to delete a topic then there will be data
>> loss. The organization may not have wanted that to happen. I would argue to
>> not have a way to "by default" delete data. There is something actionable
>> about consciously turning on a feature that allows anyone with access to
>> kafka-topics (or zookeeper for that matter) to delete Kafka data. If folks
>> want that feature then flip the switch prior to upgrade or after and
>> rolling restart and have at it. By not setting it as default they will know
>> they have to turn it on and figure out what they need to-do from a security
>> perspective (until Kafka gives them that) to protect their data (through
>> network or other type of measures).
>>
>> On Thu, Jan 15, 2015 at 8:24 PM, Manikumar Reddy 
>> wrote:
>>
>> > Also can we remove "delete.topic.enable" config property and enable topic
>> > deletion by default?
>> > On Jan 15, 2015 10:07 PM, "Jun Rao"  wrote:
>> >
>> > > Thanks for reporting this. I will remove that option in RC2.
>> > >
>> > > Jun
>> > >
>> > > On Thu, Jan 15, 2015 at 5:21 AM, Jaikiran Pai <
>> jai.forums2...@gmail.com>
>> > > wrote:
>> > >
>> > > > I just downloaded the Kafka binary and am trying this on my 32 bit
>> JVM
>> > > > (Java 7)? Trying to start Zookeeper or Kafka server keeps failing
>> with
>> > > > "Unrecognized VM option 'UseCompressedOops'":
>> > > >
>> > > > ./zookeeper-server-start.sh ../config/zookeeper.properties
>> > > > Unrecognized VM option 'UseCompressedOops'
>> > > > Error: Could not create the Java Virtual Machine.
>> > > > Error: A fatal exception has occurred. Program will exit.
>> > > >
>> > > > Same with the Kafka server startup scripts. My Java version is:
>> > > >
>> > > > java version "1.7.0_71"
>> > > > Java(TM) SE Runtime Environment (build 1.7.0_71-b14)
>> > > > Java HotSpot(TM) Server VM (build 24.71-b01, mixed mode)
>> > > >
>> > > > Should there be a check in the script, before adding this option?
>> > > >
>> > > > -Jaikiran
>> > > >
>> > > > On Wednesday 14 January 2015 10:08 PM, Jun Rao wrote:
>> > > >
>> > > >> + users mailing list. It would be great if people can test this out
>> > and
>> > > >> report any blocker issues.
>> > > >>
>> > > >> Thanks,
>> > > >>
>> > > >> Jun
>> > > >>
>> > > >> On Tue, Jan 13, 2015 at 7:16 PM, Jun Rao  wrote:
>> > > >>
>> > > >>  This is the first candidate for release of Apache Kafka 0.8.2.0.
>> > There
>> > > >>> has been some changes since the 0.8.2 beta release, especially in
>> the
>> > > new
>> > > >>> java producer api and jmx mbean names. It would be great if people
>> > can
>> > > >>> test
>> > > >>> this out thoroughly. We are giving people 10 days for testing and
>> > > voting.
>> > > >>>
>> > > >>> Release Notes for the 0.8.2.0 release
>> > > >>> *https://people.apache.org/~junrao/kafka-0.8.2.0-
>> > > >>> candidate1/RELEASE_NOTES.html
>> > > >>> > > > >>> candidate1/RELEASE_NOTES.html>*
>> > > >>>
>> > > >>> *** Please download, test and vote by Friday, Jan 23h, 7pm PT
>> > > >>>
>> > > >>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> > > >>> *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/KEYS
>> > > >>> *
>> > in
>> > > >>> addition to the md5, sha1
>> > > >>> and sha2 (SHA256) checksum.
>> > > >>>
>> > > >>> * Release artifacts to be voted upon (source and binary):
>> > > >>> *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/
>> > > >>> *
>> > > >>>
>> > > >>> * Maven artifacts to be voted upon prior to release:
>> > > >>> *https://people.apache.org/~junrao/kafka-0.8.2.0-
>> > > >>> candidate1/maven_staging/
>> > > >>> > > > >>> candidate1/maven_staging/>*
>> > > >>>
>> > > >>> * scala-doc
>> > > >>> *https://people.apache.org/~junrao/kafka-0.8.2.0-
>> > > >>> candidate1/scaladoc/#package
>> > > >>> > > > >>> candidate1/scaladoc/#package>*
>> > > >>>
>> > > >>> * java-doc
>> > > >>> *
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/javadoc/
>> > > >>> <
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/javadoc/
>> > >*
>> > > >>>
>> > > >>> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0
>> tag
>> > > >>> *https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
>> > > >>> b0c7d579f8ae

Re: kafka brokers going down within 24 hrs

2015-01-16 Thread Gwen Shapira
Those errors are expected - if broker 10.0.0.11 went down, it will
reset the connection and the other broker will close the socket.
However, it looks like 10.0.0.11 crashes every two minutes?

Do you have the logs from 10.0.0.11?

On Thu, Jan 15, 2015 at 9:51 PM, Tousif  wrote:
> i'm using kafka 2.9.2-0.8.1.1 and zookeeper 3.4.6.
> i noticed that only one broker is going down.
>  My message size is less thn 3 kb and  KAFKA_HEAP_OPTS="-Xmx512M"
> and  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops
> -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
> -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC
> -Djava.awt.headless=true" .
>
>  Do you mean kafka broker never goes down and  does broker start
> automatically after failing ?
> I see only these errors on both the brokers.
>
> 10.0.0.11 is the broker which is going down.
>
> ERROR Closing socket for /10.0.0.11 because of error
> (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcher.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> at kafka.utils.Utils$.read(Utils.scala:375)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:347)
> at kafka.network.Processor.run(SocketServer.scala:245)
> at java.lang.Thread.run(Thread.java:662)
> [2015-01-16 11:01:48,173] INFO Closing socket connection to /10.0.0.11.
> (kafka.network.Processor)
> [2015-01-16 11:03:08,164] ERROR Closing socket for /10.0.0.11 because of
> error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcher.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> at kafka.utils.Utils$.read(Utils.scala:375)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:347)
> at kafka.network.Processor.run(SocketServer.scala:245)
> at java.lang.Thread.run(Thread.java:662)
> [2015-01-16 11:03:08,280] INFO Closing socket connection to /10.0.0.11.
> (kafka.network.Processor)
> [2015-01-16 11:03:48,369] ERROR Closing socket for /10.0.0.11 because of
> error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcher.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> at kafka.utils.Utils$.read(Utils.scala:375)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:347)
> at kafka.network.Processor.run(SocketServer.scala:245)
> at java.lang.Thread.run(Thread.java:662)
>
>
>
> On Thu, Jan 15, 2015 at 7:49 PM, Harsha  wrote:
>
>> Tousif,
>>Which version of kafka and zookeeper are you using and whats your
>>message size and jvm size that you allocated for kafka brokers.
>> There is only 1 zookeeper node , if its a production cluster I recommend
>> you to have quorum of zookeeper nodes. Both kafka & storm are heavy
>> users of zookeeper. Also supervisord is recommended for storm I am not
>> sure you need to have it for kafka, for storm its the fail-fast nature
>> of workers that requires supervisord to restart.
>> When kafka goes down first time , i.e before supervisord restarts it do
>> you see same OOM error. Check the logs to see why its going down for the
>> first time.
>> -Harsha
>>
>>
>>
>> On Wed, Jan 14, 2015, at 10:50 PM, Tousif wrote:
>> > Hello Chia-Chun Shih,
>> >
>> > There are multiple issues,
>> > First thing is i don't see out of memory error every time and OOM happens
>> > after supervisord keep retrying to start  kafka.
>> > It goes down when it tries to add partition fetcher
>> >
>> > it starts with
>> >
>> > *conflict in /controller data:
>> > {"version":1,"brokerid":0,"timestamp":"1421296052741"} stored data:
>> > {"version":1,"brokerid":1,"timestamp":"1421291998088"}
>> > (kafka.utils.ZkUtils$)*
>> >
>> >
>> > ERROR Conditional update of path
>> > /brokers/topics/realtimestreaming/partitions/1/state with data
>> >
>> {"controller_epoch":34,"leader":0,"version":1,"leader_epoch":54,"isr":[0]}
>> > and expected version 90 failed due to
>> > org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode
>> > =
>> > BadVersion for /brokers/topics/realtimestreaming/partitions/1/state
>> > (kafka.utils.ZkUtils$)
>> >
>> > and th

Re: Kafka Out of Memory error

2015-01-19 Thread Gwen Shapira
Two things:
1. The OOM happened on the consumer, right? So the memory that matters
is the RAM on the consumer machine, not on the Kafka cluster nodes.

2. If the consumers belong to the same consumer group, each will
consume a subset of the partitions and will only need to allocate
memory for those partitions.

So, assuming all your consumers belong to the same group:
2 consumers  -> each has 500 partitions -> each uses 500MB.

The total remains 1GB no matter how many consumers you have, as long
as they are all in the same group.

If the consumer belong to different groups (i.e. they read copies of
the same messages from the same partitions), then yes, you are limited
to 8 per server (probably less because there are other stuff on the
server).

Gwen

On Mon, Jan 19, 2015 at 3:06 PM, Pranay Agarwal
 wrote:
> Thanks a lot Natty.
>
> I am using this Ruby gem on the client side with all the default config
> https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb
> and the value fetch.message.max.bytes is set to 1MB.
>
> Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM)
> and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it
> mean 1 kafka node can at best support 8 consumer only? Also, when I do
> top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed
> on each 3 nodes of the cluster) I don't see lots of memory being used on
> the machine. Also, even with this calculation, I shouldn't be facing any
> issue with only 1 consumer, as I have 8GB of JVM space given to Kafka
> nodes, right?
>
> Thanks
> -Pranay
>
> On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins 
> wrote:
>
>> The fetch.message.max.size is actually a client-side configuration. With
>> regard to increasing the number of threads, I think the calculation may be
>> a little more subtle than what you're proposing, and frankly, it's unlikely
>> that your servers can handle allocating 200MB x 1000 threads = 200GB of
>> memory at a single time.
>>
>> I believe that if you have every partition on a single broker, and all of
>> your consumer threads are requesting data simultaneously, then yes, the
>> broker would attempt to allocate 200GB of heap, and probably you'll hit an
>> OOME. However, since each consumer is only reading from one partition,
>> those 1000 threads should be making requests that are spread out over the
>> entire Kafka cluster. Depending on the memory on your servers, you may need
>> to increase the number of brokers in your cluster to support the 1000
>> threads. For example, I would expect that you can support this with 10
>> brokers if each broker has something north of 20GB of heap allocated.
>>
>> Some of this is a little bit of guess work on my part, and I'm not super
>> confident of my numbers...Can anybody else on the list validate my math?
>>
>> Thanks,
>> Natty
>>
>> Jonathan "Natty" Natkins
>> StreamSets | Customer Engagement Engineer
>> mobile: 609.577.1600 | linkedin 
>>
>>
>> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal 
>> wrote:
>>
>> > Thanks Natty.
>> >
>> > Is there any config which I need to change on the client side as well?
>> > Also, currently I am trying with only 1 consumer thread. Does the
>> equation
>> > changes to
>> > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read with
>> > 1000 threads from from topic2(1000 partitions)?
>> >
>> > -Pranay
>> >
>> > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins 
>> > wrote:
>> >
>> > > Hi Pranay,
>> > >
>> > > I think the JIRA you're referencing is a bit orthogonal to the OOME
>> that
>> > > you're experiencing. Based on the stacktrace, it looks like your OOME
>> is
>> > > coming from a consumer request, which is attempting to allocate 200MB.
>> > > There was a thread (relatively recently) that discussed what I think is
>> > > your issue:
>> > >
>> > >
>> > >
>> >
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5...@mail.gmail.com%3E
>> > >
>> > > I suspect that the takeaway is that the way Kafka determines the
>> required
>> > > memory for a consumer request is (#partitions in the topic) x
>> > > (replica.fetch.max.bytes), and seemingly you don't have enough memory
>> > > allocated to handle that request. The solution is likely to increase
>> the
>> > > heap size on your brokers or to decrease your max fetch size.
>> > >
>> > > Thanks,
>> > > Natty
>> > >
>> > > Jonathan "Natty" Natkins
>> > > StreamSets | Customer Engagement Engineer
>> > > mobile: 609.577.1600 | linkedin 
>> > >
>> > >
>> > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal <
>> > agarwalpran...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi All,
>> > > >
>> > > > I have a kafka cluster setup which has 2 topics
>> > > >
>> > > > topic1 with 10 partitions
>> > > > topic2 with 1000 partitions.
>> > > >
>> > > > While, I am able to consume messages from topic1 ju

Re: Backups

2015-01-19 Thread Gwen Shapira
Hi,

As a former DBA, I hear you on backups :)

Technically, you could copy all log.dir files somewhere safe
occasionally. I'm pretty sure we don't guarantee the consistency or
safety of this copy. You could find yourself with a corrupt "backup"
by copying files that are either in the middle of getting written or
are inconsistent in time with other files. Kafka doesn't have a good
way to stop writing to files for long enough to allow copying them
safely.

Unlike traditional backups, there's no transaction log that can be
rolled to move a disk copy forward in time (or that can be used when
data files are locked for backups). In Kafka, the files *are* the
transaction log and you roll back in time by deciding which offsets to
read.

DR is possible using MirrorMaker though, since the only thing better
than replication is... more replication!
So you could create a non-corrupt file copy by stopping a MirrorMaker
replica occasionally and copying all files somewhere safe.

If it helps you sleep better at night :)
Typically having kafka nodes on multiple racks and a DR in another
data center is considered pretty safe.

Gwen

On Wed, Jan 14, 2015 at 9:22 AM, Gene Robichaux
 wrote:
> Does anyone have any thoughts on Kafka broker backups?
>
> All of our topics have a replication factor of 3. However I just want to know 
> if anyone does anything about traditional backups. My background is Ops DBA, 
> so I have a special place in my heart for backups.
>
>
> Gene Robichaux
> Manager, Database Operations
> Match.com
> 8300 Douglas Avenue I Suite 800 I Dallas, TX  75225
>


Re: Backups

2015-01-20 Thread Gwen Shapira
Interesting question.
I think you'll need to sync it for the exact same time across the
entire cluster, otherwise you'll recover from an inconsistent state.
Not sure if this is feasible, or how Kafka handles starting from
inconsistent state.

If I were the sysadmin, I'd go with the good old MySQL method:
MirrorMaker to a replica, once a day stop MirrorMaker, stop the
replica and take a cold backup. Move copy to tape / NAS / offsite
storage.

I'll be curious to hear what the LinkedIn team is doing.

Gwen



On Tue, Jan 20, 2015 at 8:48 AM, Otis Gospodnetic
 wrote:
> Could one use ZFS or BTRFS snapshot functionality for this?
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Tue, Jan 20, 2015 at 1:39 AM, Gwen Shapira  wrote:
>
>> Hi,
>>
>> As a former DBA, I hear you on backups :)
>>
>> Technically, you could copy all log.dir files somewhere safe
>> occasionally. I'm pretty sure we don't guarantee the consistency or
>> safety of this copy. You could find yourself with a corrupt "backup"
>> by copying files that are either in the middle of getting written or
>> are inconsistent in time with other files. Kafka doesn't have a good
>> way to stop writing to files for long enough to allow copying them
>> safely.
>>
>> Unlike traditional backups, there's no transaction log that can be
>> rolled to move a disk copy forward in time (or that can be used when
>> data files are locked for backups). In Kafka, the files *are* the
>> transaction log and you roll back in time by deciding which offsets to
>> read.
>>
>> DR is possible using MirrorMaker though, since the only thing better
>> than replication is... more replication!
>> So you could create a non-corrupt file copy by stopping a MirrorMaker
>> replica occasionally and copying all files somewhere safe.
>>
>> If it helps you sleep better at night :)
>> Typically having kafka nodes on multiple racks and a DR in another
>> data center is considered pretty safe.
>>
>> Gwen
>>
>> On Wed, Jan 14, 2015 at 9:22 AM, Gene Robichaux
>>  wrote:
>> > Does anyone have any thoughts on Kafka broker backups?
>> >
>> > All of our topics have a replication factor of 3. However I just want to
>> know if anyone does anything about traditional backups. My background is
>> Ops DBA, so I have a special place in my heart for backups.
>> >
>> >
>> > Gene Robichaux
>> > Manager, Database Operations
>> > Match.com
>> > 8300 Douglas Avenue I Suite 800 I Dallas, TX  75225
>> >
>>


Re: Backups

2015-01-20 Thread Gwen Shapira
Nice idea!
Console consumer won't necessarily work, since it doesn't parallelize,
but using something like Camus to backup to HDFS can be pretty cool.



On Tue, Jan 20, 2015 at 10:15 AM, Jayesh Thakrar
 wrote:
> Another option is to copy data from each topic (of interest/concern) to a 
> "flat file on a periodic basis".E.g. say you had a queue that only contained 
> "textual data".Periodically I would run the bundled console-consumer to read 
> data from the queue and dump to a file/directory and then backup it up 
> without any worry and then move the file/directory out once the backup is 
> complete. This can serve as a point-in-time incremental snapshot.
> And if you are itching to exercise your DBA roots (full backup followed by 
> incremental backups), then you could use the "read from the beginning" option 
> of the console-consumer every once in a while. Although note that the "full" 
> is constrained by the retention period of the data (controlled at the 
> queue/cluster level).
>   From: Gwen Shapira 
>  To: "users@kafka.apache.org" 
>  Sent: Tuesday, January 20, 2015 12:07 PM
>  Subject: Re: Backups
>
> Interesting question.
> I think you'll need to sync it for the exact same time across the
> entire cluster, otherwise you'll recover from an inconsistent state.
> Not sure if this is feasible, or how Kafka handles starting from
> inconsistent state.
>
> If I were the sysadmin, I'd go with the good old MySQL method:
> MirrorMaker to a replica, once a day stop MirrorMaker, stop the
> replica and take a cold backup. Move copy to tape / NAS / offsite
> storage.
>
> I'll be curious to hear what the LinkedIn team is doing.
>
> Gwen
>
>
>
>
>
> On Tue, Jan 20, 2015 at 8:48 AM, Otis Gospodnetic
>  wrote:
>> Could one use ZFS or BTRFS snapshot functionality for this?
>>
>> Otis
>> --
>> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> Solr & Elasticsearch Support * http://sematext.com/
>>
>>
>> On Tue, Jan 20, 2015 at 1:39 AM, Gwen Shapira  wrote:
>>
>>> Hi,
>>>
>>> As a former DBA, I hear you on backups :)
>>>
>>> Technically, you could copy all log.dir files somewhere safe
>>> occasionally. I'm pretty sure we don't guarantee the consistency or
>>> safety of this copy. You could find yourself with a corrupt "backup"
>>> by copying files that are either in the middle of getting written or
>>> are inconsistent in time with other files. Kafka doesn't have a good
>>> way to stop writing to files for long enough to allow copying them
>>> safely.
>>>
>>> Unlike traditional backups, there's no transaction log that can be
>>> rolled to move a disk copy forward in time (or that can be used when
>>> data files are locked for backups). In Kafka, the files *are* the
>>> transaction log and you roll back in time by deciding which offsets to
>>> read.
>>>
>>> DR is possible using MirrorMaker though, since the only thing better
>>> than replication is... more replication!
>>> So you could create a non-corrupt file copy by stopping a MirrorMaker
>>> replica occasionally and copying all files somewhere safe.
>>>
>>> If it helps you sleep better at night :)
>>> Typically having kafka nodes on multiple racks and a DR in another
>>> data center is considered pretty safe.
>>>
>>> Gwen
>>>
>>> On Wed, Jan 14, 2015 at 9:22 AM, Gene Robichaux
>>>  wrote:
>>> > Does anyone have any thoughts on Kafka broker backups?
>>> >
>>> > All of our topics have a replication factor of 3. However I just want to
>>> know if anyone does anything about traditional backups. My background is
>>> Ops DBA, so I have a special place in my heart for backups.
>>> >
>>> >
>>> > Gene Robichaux
>>> > Manager, Database Operations
>>> > Match.com
>>> > 8300 Douglas Avenue I Suite 800 I Dallas, TX  75225
>>> >
>>>
>
>
>


Re: Help: Kafka LeaderNotAvailableException

2015-01-22 Thread Gwen Shapira
It sounds like you have two zookeepers, one for HDP and one for Kafka.
Did you move Kafka from one zookeeper to another?

Perhaps Kafka finds the topics (logs) on disk, but they do not exist
in ZK because you are using a different zookeeper now.

Gwen

On Thu, Jan 22, 2015 at 6:38 PM, Jun Rao  wrote:
> Any error in the controller and the broker log?
>
> Thanks,
>
> Jun
>
> On Thu, Jan 22, 2015 at 1:33 AM,  wrote:
>
>> Hi,
>> Let me overview on the issue that I am facing on producing message in
>> Kafka:
>> I have horthonworks HDP-2.1 installed, along that we have Kafka on other
>> node.
>>
>> * On kafka node:
>> Start Zookeepeer
>> Start Kafka Broker service
>> Send message/producer
>> Consume message - Works (Note: here we start Zookeeper locally on kafka01
>> node)
>>
>> * Issue side:
>> >Now in HDP-2.1 we have Zookeeper inbuild & we have Zookeeper service
>> running on master node
>> >I go to Kafka server & Started Kafka Broker
>> (In config\server.properties file I have added zookeeper.connect with
>> maasternode:2181)
>> Then I start producer & Send message... after that we got error like
>> kafka.common.LeaderNotAvailableException
>>
>> [2015-01-17 05:54:09,465] WARN Error while fetching metadata
>> [{TopicMetadata for topic fred ->
>> No partition metadata for topic fred due to
>> kafka.common.LeaderNotAvailableException}] for topic [fred]: class
>> kafka.common.LeaderNotAvailableException
>> (kafka.producer.BrokerPartitionInfo)
>> [2015-01-17 05:54:09,659] WARN Error while fetching metadata
>> [{TopicMetadata for topic fred ->
>> No partition metadata for topic fred due to
>> kafka.common.LeaderNotAvailableException}] for topic [fred]: class
>> kafka.common.LeaderNotAvailableException
>> (kafka.producer.BrokerPartitionInfo)
>> [2015-01-17 05:54:09,659] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: fred
>> (kafka.producer.async.DefaultEventHandler)
>> [2015-01-17 05:54:09,802] WARN Error while fetching metadata
>> [{TopicMetadata for topic fred ->
>> No partition metadata for topic fred due to
>> kafka.common.LeaderNotAvailableException}] for topic [fred]: class
>> kafka.common.LeaderNotAvailableException
>> (kafka.producer.BrokerPartitionInfo)
>> [2015-01-17 05:54:09,820] ERROR Failed to send requests for topics fred
>> with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
>> [2015-01-17 05:54:09,822] ERROR Error in handling batch of 1 events
>> (kafka.producer.async.ProducerSendThread)
>> kafka.common.FailedToSendMessageException: Failed to send messages after 3
>> tries.
>> at
>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>> at
>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>> at
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>>
>> Can someone suggest what is going wrong.
>> Thanks.
>>
>>
>>
>>
>>
>> -Vishal
>>
>>
>> Regards,
>> Vishal
>> Software Dev Staff Engineer
>> Dell | Bangalore
>> Ext : 79268
>>
>>


Re: Can't create a topic; can't delete it either

2015-01-27 Thread Gwen Shapira
Also, do you have delete.topic.enable=true on all brokers?

The automatic topic creation can fail if the default number of
replicas is greater than number of available brokers. Check the
default.replication.factor parameter.

Gwen

On Tue, Jan 27, 2015 at 12:29 AM, Joel Koshy  wrote:
> Which version of the broker are you using?
>
> On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote:
>> While running kafka in production I found an issue where a topic wasn't
>> getting created even with auto topic enabled. I then went ahead and created
>> the topic manually (from the command line). I then delete the topic, again
>> manually. Now my broker won't allow me to either create *the* topic or
>> delete *the* topic. (other topic creation and deletion is working fine).
>>
>> The topic is in "marked for deletion" stage for more than 3 hours.
>>
>> $ bin/kafka-topics.sh --zookeeper zookeeper1:2181/replication/kafka  --list
>> --topic GRIFFIN-TldAdFormat.csv-1422321736886
>> GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion
>>
>> If this is a known issue, is there a workaround?
>>
>> Sumit
>


Re: Resilient Producer

2015-01-28 Thread Gwen Shapira
It sounds like you are describing Flume, with SpoolingDirectory source
(or exec source running tail) and Kafka channel.

On Wed, Jan 28, 2015 at 10:39 AM, Fernando O.  wrote:
> Hi all,
> I'm evaluating using Kafka.
>
> I liked this thing of Facebook scribe that you log to your own machine and
> then there's a separate process that forwards messages to the central
> logger.
>
> With Kafka it seems that I have to embed the publisher in my app, and deal
> with any communication problem managing that on the producer side.
>
> I googled quite a bit trying to find a project that would basically use
> daemon that parses a log file and send the lines to the Kafka cluster
> (something like a tail file.log but instead of redirecting the output to
> the console: send it to kafka)
>
> Does anyone knows about something like that?
>
>
> Thanks!
> Fernando.


Re: create topic does not really executed successfully

2015-02-02 Thread Gwen Shapira
IIRC, the directory is only created after you send data to the topic.

Do you get errors when your producer sends data?

Another common issue is that you specify replication-factor 3 when you
have fewer than 3 brokers.

Gwen

On Mon, Feb 2, 2015 at 2:34 AM, Xinyi Su  wrote:
> Hi,
>
> I am using Kafka_2.9.2-0.8.2-beta.  When I use kafka-topic.sh to create
> topic, I observed sometimes the topic is not really created successfully as
> the output shows in console.
>
> Below is my command line:
>
> # bin/kafka-topics.sh  --zookeeper :2181 --create --topic zerg.hydra
> --partitions 3 --replication-factor 3
>
> The command prompts "created topic xxx", but local storage directory used
> for this topic under "log.dirs" does not created at all. Normally, there
> should be some folders like zerg.hydra-0, zerg.hydra-1... just named
> according to partion id and assignment policy.
>
> I come across this issue about four times, the disk is not full and
> directory access permission is legal. Do you know about the cause of this
> issue?
>
> Thanks.
>
> Xinyi


Re: New Producer - ONLY sync mode?

2015-02-02 Thread Gwen Shapira
If you want to emulate the old sync producer behavior, you need to set
the batch size to 1  (in producer config) and wait on the future you
get from Send (i.e. future.get)

I can't think of good reasons to do so, though.

Gwen


On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
 wrote:
> Hi,
>
> Is the plan for New Producer to have ONLY async mode?  I'm asking because
> of this info from the Wiki:
>
>
>- The producer will always attempt to batch data and will always
>immediately return a SendResponse which acts as a Future to allow the
>client to await the completion of the request.
>
>
> The word "always" makes me think there will be no sync mode.
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/


Re: New Producer - ONLY sync mode?

2015-02-02 Thread Gwen Shapira
Can Y have a callback that will handle the notification to X?
In this case, perhaps Y can be async and X can buffer the data until
the callback triggers and says "all good" (or resend if the callback
indicates an error)

On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
 wrote:
> Hi,
>
> Thanks for the info.  Here's the use case.  We have something up stream
> sending data, say a log shipper called X.  It sends it to some remote
> component Y.  Y is the Kafka Producer and it puts data into Kafka.  But Y
> needs to send a reply to X and tell it whether it successfully put all its
> data into Kafka.  If it did not, Y wants to tell X to buffer data locally
> and resend it later.
>
> If producer is ONLY async, Y can't easily do that.  Or maybe Y would just
> need to wait for the Future to come back and only then send the response
> back to X?  If so, I'm guessing the delay would be more or less the same as
> if the Producer was using SYNC mode?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps  wrote:
>
>> Yeah as Gwen says there is no sync/async mode anymore. There is a new
>> configuration which does a lot of what async did in terms of allowing
>> batching:
>>
>> batch.size - This is the target amount of data per partition the server
>> will attempt to batch together.
>> linger.ms - This is the time the producer will wait for more data to be
>> sent to better batch up writes. The default is 0 (send immediately). So if
>> you set this to 50 ms the client will send immediately if it has already
>> filled up its batch, otherwise it will wait to accumulate the number of
>> bytes given by batch.size.
>>
>> To send asynchronously you do
>>producer.send(record)
>> whereas to block on a response you do
>>producer.send(record).get();
>> which will wait for acknowledgement from the server.
>>
>> One advantage of this model is that the client will do it's best to batch
>> under the covers even if linger.ms=0. It will do this by batching any data
>> that arrives while another send is in progress into a single
>> request--giving a kind of "group commit" effect.
>>
>> The hope is that this will be both simpler to understand (a single api that
>> always works the same) and more powerful (you always get a response with
>> error and offset information whether or not you choose to use it).
>>
>> -Jay
>>
>>
>> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira 
>> wrote:
>>
>> > If you want to emulate the old sync producer behavior, you need to set
>> > the batch size to 1  (in producer config) and wait on the future you
>> > get from Send (i.e. future.get)
>> >
>> > I can't think of good reasons to do so, though.
>> >
>> > Gwen
>> >
>> >
>> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
>> >  wrote:
>> > > Hi,
>> > >
>> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
>> because
>> > > of this info from the Wiki:
>> > >
>> > >
>> > >- The producer will always attempt to batch data and will always
>> > >immediately return a SendResponse which acts as a Future to allow
>> the
>> > >client to await the completion of the request.
>> > >
>> > >
>> > > The word "always" makes me think there will be no sync mode.
>> > >
>> > > Thanks,
>> > > Otis
>> > > --
>> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > > Solr & Elasticsearch Support * http://sematext.com/
>> >
>>


Re: New Producer - ONLY sync mode?

2015-02-02 Thread Gwen Shapira
If I understood the code and Jay correctly - if you wait for the
future it will be a similar delay to that of the old sync producer.

Put another way, if you test it out and see longer delays than the
sync producer had, we need to find out why and fix it.

Gwen

On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
 wrote:
> Hi,
>
> Nope, unfortunately it can't do that.  X is a remote app, doesn't listen to
> anything external, calls Y via HTTPS.  So X has to decide what to do with
> its data based on Y's synchronous response.  It has to block until Y
> responds.  And it wouldn't be pretty, I think, because nobody wants to run
> apps that talk to remove servers and hang on to connections more than they
> have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
> guessing the delay would be more or less the same as if the Producer was
> using SYNC mode?" is YES, in which case the connection from X to Y would be
> open for just as long as with a SYNC producer running in Y?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira  wrote:
>
>> Can Y have a callback that will handle the notification to X?
>> In this case, perhaps Y can be async and X can buffer the data until
>> the callback triggers and says "all good" (or resend if the callback
>> indicates an error)
>>
>> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
>>  wrote:
>> > Hi,
>> >
>> > Thanks for the info.  Here's the use case.  We have something up stream
>> > sending data, say a log shipper called X.  It sends it to some remote
>> > component Y.  Y is the Kafka Producer and it puts data into Kafka.  But Y
>> > needs to send a reply to X and tell it whether it successfully put all
>> its
>> > data into Kafka.  If it did not, Y wants to tell X to buffer data locally
>> > and resend it later.
>> >
>> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would just
>> > need to wait for the Future to come back and only then send the response
>> > back to X?  If so, I'm guessing the delay would be more or less the same
>> as
>> > if the Producer was using SYNC mode?
>> >
>> > Thanks,
>> > Otis
>> > --
>> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > Solr & Elasticsearch Support * http://sematext.com/
>> >
>> >
>> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps  wrote:
>> >
>> >> Yeah as Gwen says there is no sync/async mode anymore. There is a new
>> >> configuration which does a lot of what async did in terms of allowing
>> >> batching:
>> >>
>> >> batch.size - This is the target amount of data per partition the server
>> >> will attempt to batch together.
>> >> linger.ms - This is the time the producer will wait for more data to be
>> >> sent to better batch up writes. The default is 0 (send immediately). So
>> if
>> >> you set this to 50 ms the client will send immediately if it has already
>> >> filled up its batch, otherwise it will wait to accumulate the number of
>> >> bytes given by batch.size.
>> >>
>> >> To send asynchronously you do
>> >>producer.send(record)
>> >> whereas to block on a response you do
>> >>producer.send(record).get();
>> >> which will wait for acknowledgement from the server.
>> >>
>> >> One advantage of this model is that the client will do it's best to
>> batch
>> >> under the covers even if linger.ms=0. It will do this by batching any
>> data
>> >> that arrives while another send is in progress into a single
>> >> request--giving a kind of "group commit" effect.
>> >>
>> >> The hope is that this will be both simpler to understand (a single api
>> that
>> >> always works the same) and more powerful (you always get a response with
>> >> error and offset information whether or not you choose to use it).
>> >>
>> >> -Jay
>> >>
>> >>
>> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira 
>> >> wrote:
>> >>
>> >> > If you want to emulate the old sync producer behavior, you need to set
>> >> > the batch size to 1  (in producer config) and wait on the future you
>> >> 

Re: New Producer - ONLY sync mode?

2015-02-02 Thread Gwen Shapira
I've been thinking about that too, since both Flume and Sqoop rely on
send(List) API of the old API.

I'd like to see this API come back, but I'm debating how we'd handle
errors. IIRC, the old API would fail an entire batch on a single
error, which can lead to duplicates. Having N callbacks lets me retry
/ save / whatever just the messages that had issues.

If messages had identifiers from the producer side, we could have the
API call the callback with a list of message-ids and their status. But
they don't :)

Any thoughts on how you'd like it to work?

Gwen


On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota  wrote:
> This is a great question Otis. Like Gwen said, you can accomplish Sync mode
> by setting the batch size to 1. But this does highlight a shortcoming of
> the new producer API.
>
> I really like the design of the new API and it has really great properties
> and I'm enjoying working with it. However, once API that I think we're
> lacking is a "batch" API. Currently, I have to iterate over a batch and
> call .send() on each record, which returns n callbacks instead of 1
> callback for the whole batch. This significantly complicates recovery logic
> where we need to commit a batch as opposed 1 record at a time.
>
> Do you guys have any plans to add better semantics around batches?
>
> On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira  wrote:
>
>> If I understood the code and Jay correctly - if you wait for the
>> future it will be a similar delay to that of the old sync producer.
>>
>> Put another way, if you test it out and see longer delays than the
>> sync producer had, we need to find out why and fix it.
>>
>> Gwen
>>
>> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
>>  wrote:
>> > Hi,
>> >
>> > Nope, unfortunately it can't do that.  X is a remote app, doesn't listen
>> to
>> > anything external, calls Y via HTTPS.  So X has to decide what to do with
>> > its data based on Y's synchronous response.  It has to block until Y
>> > responds.  And it wouldn't be pretty, I think, because nobody wants to
>> run
>> > apps that talk to remove servers and hang on to connections more than
>> they
>> > have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
>> > guessing the delay would be more or less the same as if the Producer was
>> > using SYNC mode?" is YES, in which case the connection from X to Y would
>> be
>> > open for just as long as with a SYNC producer running in Y?
>> >
>> > Thanks,
>> > Otis
>> > --
>> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > Solr & Elasticsearch Support * http://sematext.com/
>> >
>> >
>> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira 
>> wrote:
>> >
>> >> Can Y have a callback that will handle the notification to X?
>> >> In this case, perhaps Y can be async and X can buffer the data until
>> >> the callback triggers and says "all good" (or resend if the callback
>> >> indicates an error)
>> >>
>> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
>> >>  wrote:
>> >> > Hi,
>> >> >
>> >> > Thanks for the info.  Here's the use case.  We have something up
>> stream
>> >> > sending data, say a log shipper called X.  It sends it to some remote
>> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
>> But Y
>> >> > needs to send a reply to X and tell it whether it successfully put all
>> >> its
>> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
>> locally
>> >> > and resend it later.
>> >> >
>> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would
>> just
>> >> > need to wait for the Future to come back and only then send the
>> response
>> >> > back to X?  If so, I'm guessing the delay would be more or less the
>> same
>> >> as
>> >> > if the Producer was using SYNC mode?
>> >> >
>> >> > Thanks,
>> >> > Otis
>> >> > --
>> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> >> > Solr & Elasticsearch Support * http://sematext.com/
>> >> >
>> >> >
>> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps 
>> wrote:
>> 

Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-03 Thread Gwen Shapira
When's the party?
:)

On Mon, Feb 2, 2015 at 8:13 PM, Jay Kreps  wrote:
> Yay!
>
> -Jay
>
> On Mon, Feb 2, 2015 at 2:23 PM, Neha Narkhede  wrote:
>>
>> Great! Thanks Jun for helping with the release and everyone involved for
>> your contributions.
>>
>> On Mon, Feb 2, 2015 at 1:32 PM, Joe Stein  wrote:
>>
>> > Huzzah!
>> >
>> > Thanks Jun for preparing the release candidates and getting this out to
>> > the
>> > community.
>> >
>> > - Joe Stein
>> >
>> > On Mon, Feb 2, 2015 at 2:27 PM, Jun Rao  wrote:
>> >
>> > > The following are the results of the votes.
>> > >
>> > > +1 binding = 3 votes
>> > > +1 non-binding = 1 votes
>> > > -1 = 0 votes
>> > > 0 = 0 votes
>> > >
>> > > The vote passes.
>> > >
>> > > I will release artifacts to maven central, update the dist svn and
>> > download
>> > > site. Will send out an announce after that.
>> > >
>> > > Thanks everyone that contributed to the work in 0.8.2.0!
>> > >
>> > > Jun
>> > >
>> > > On Wed, Jan 28, 2015 at 9:22 PM, Jun Rao  wrote:
>> > >
>> > >> This is the third candidate for release of Apache Kafka 0.8.2.0.
>> > >>
>> > >> Release Notes for the 0.8.2.0 release
>> > >>
>> > >>
>> >
>> > https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
>> > >>
>> > >> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>> > >>
>> > >> Kafka's KEYS file containing PGP keys we use to sign the release:
>> > >> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
>> > >> (SHA256) checksum.
>> > >>
>> > >> * Release artifacts to be voted upon (source and binary):
>> > >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>> > >>
>> > >> * Maven artifacts to be voted upon prior to release:
>> > >> https://repository.apache.org/content/groups/staging/
>> > >>
>> > >> * scala-doc
>> > >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
>> > >>
>> > >> * java-doc
>> > >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
>> > >>
>> > >> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
>> > >>
>> > >>
>> >
>> > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
>> > >> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
>> > >>
>> > >> /***
>> > >>
>> > >> Thanks,
>> > >>
>> > >> Jun
>> > >>
>> > >>
>> > >  --
>> > > You received this message because you are subscribed to the Google
>> > > Groups
>> > > "kafka-clients" group.
>> > > To unsubscribe from this group and stop receiving emails from it, send
>> > > an
>> > > email to kafka-clients+unsubscr...@googlegroups.com.
>> > > To post to this group, send email to kafka-clie...@googlegroups.com.
>> > > Visit this group at http://groups.google.com/group/kafka-clients.
>> > > To view this discussion on the web visit
>> > >
>> >
>> > https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com
>> > > <
>> >
>> > https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com?utm_medium=email&utm_source=footer
>> > >
>> > > .
>> > >
>> > > For more options, visit https://groups.google.com/d/optout.
>> > >
>> >
>>
>>
>>
>> --
>> Thanks,
>> Neha
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at http://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAOeJiJjkYXyK_3qxJYpchG%2B_-c1Jt6K_skT_1geP%3DEJXV5w9uQ%40mail.gmail.com.
>
> For more options, visit https://groups.google.com/d/optout.


Re: New Producer - ONLY sync mode?

2015-02-04 Thread Gwen Shapira
I thought Jay Kreps had the right recipes:

To mimic the old Sync producer:
 producer.send(record).get();

To mimic old batches:
List responses = new ArrayList();
for(input: recordBatch)
responses.add(producer.send(input));
for(response: responses)
response.get

Perhaps we need to add this to the FAQ?

Gwen


On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin  wrote:

> Looking at this thread I would ideally want something at least the right
> recipe to mimic sync behavior like Otis is talking about.
>
> In the second case, would like to be able to individually know if messages
> have failed even regardless if they are in separate batches, sort of like
> what Kinesis does as Pradeep mentioned.
> -Steve
>
> On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps  wrote:
>
> > Yeah totally. Using a callback is, of course, the Right Thing for this
> kind
> > of stuff. But I have found that kind of asynchronous thinking can be hard
> > for people. Even if you get out of the pre-java 8 syntactic pain that
> > anonymous inner classes inflict just dealing with multiple threads of
> > control without creating async spaghetti can be a challenge for complex
> > stuff. That is really the only reason for the futures in the api, they
> are
> > strictly less powerful than the callbacks, but at least using them you
> can
> > just call .get() and pretend it is blocking.
> >
> > -Jay
> >
> > On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein  wrote:
> >
> > > Now that 0.8.2.0 is in the wild I look forward to working with more and
> > > seeing what folks start to-do with this function
> > >
> > >
> >
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> > > ,
> > > org.apache.kafka.clients.producer.Callback) and keeping it fully non
> > > blocking.
> > >
> > > One sprint I know of coming up is going to have the new producer as a
> > > component in their reactive calls and handling bookkeeping and retries
> > > through that type of call back approach. Should work well (haven't
> tried
> > > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc,
> etc
> > in
> > > functional languages and frameworks.
> > >
> > > I think as JDK 8 starts to get out in the wild too more (may after jdk7
> > > eol) the use of .get will be reduced (imho) and folks will be thinking
> > more
> > > about non-blocking vs blocking and not as so much sync vs async but my
> > > crystal ball just back from the shop so well see =8^)
> > >
> > > /***
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > /
> > >
> > > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps 
> wrote:
> > >
> > > > Hey guys,
> > > >
> > > > I guess the question is whether it really matters how many underlying
> > > > network requests occur? It is very hard for an application to depend
> on
> > > > this even in the old producer since it depends on the partitions
> > > placement
> > > > (a send to two partitions may go to either one machine or two and so
> it
> > > > will send either one or two requests). So when you send a batch in
> one
> > > call
> > > > you may feel that is "all at once", but that is only actually
> > guaranteed
> > > if
> > > > all messages have the same partition.
> > > >
> > > > The challenge is allowing even this in the presence of bounded
> request
> > > > sizes which we have in the new producer. The user sends a list of
> > objects
> > > > and the serialized size that will result is not very apparent to
> them.
> > If
> > > > you break it up into multiple requests then that is kind of further
> > > ruining
> > > > the illusion of a single send. If you don't then you have to just
> error
> > > out
> > > > which is equally annoying to have to handle.
> > > >
> > > > But I'm not sure if from your description you are saying you actually
> > > care
> > > > how many physical requests are issued. I think it is more like it is
> > just
> > > > syntact

Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Gwen Shapira
Thanks Jon. I updated the FAQ with your procedure:

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
?

On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst <
jbringhu...@linkedin.com.invalid> wrote:

> There should probably be a wiki page started for this so we have the
> details in one place. The same question was asked on Freenode IRC a few
> minutes ago. :)
>
> A summary of the migration procedure is:
>
> 1) Upgrade your brokers and set dual.commit.enabled=false and
> offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
> 2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
> (Commit offsets to Zookeeper and Kafka).
> 3) Set dual.commit.enabled=false and offsets.storage=kafka and restart
> (Commit offsets to Kafka only).
>
> -Jon
>
> On Feb 5, 2015, at 9:03 AM, Jason Rosenberg  wrote:
>
> > Hi,
> >
> > For 0.8.2, one of the features listed is:
> >  - Kafka-based offset storage.
> >
> > Is there documentation on this (I've heard discussion of it of course)?
> >
> > Also, is it something that will be used by existing consumers when they
> > migrate up to 0.8.2?  What is the migration process?
> >
> > Thanks,
> >
> > Jason
>
>


Re: Kafka Architecture diagram

2015-02-05 Thread Gwen Shapira
The Kafka documentation has several good diagrams. Did you check it out?

http://kafka.apache.org/documentation.html

On Thu, Feb 5, 2015 at 6:31 AM, Ankur Jain  wrote:

> Hi Team,
>
> I am looking out high and low level architecture diagram of Kafka with
> Zookeeper, but haven't got any good one , showing concepts like
> replication, high availability etc.
>
> Please do let me know if there is any...
>
> Thank you
> Ankur
>


  1   2   3   4   5   6   >