Network failure leads to split brain in brokers

2014-06-18 Thread Abhinav Anand
Hi Guys,
 We have a 6 node cluster. Due to network failure yesterday(network was up
and running in few minutes), few of the brokers were not able to talk to
each other, including the zookeeper. During an issue in our camus job, we
identified that broker-2 was misbehaving. The controller logs reflect that
the broker-2 is not alive, though broker-2 was serving all
metadata/consumer requests and its log don't show nasty errors.

If you see the camus logs below, any request to broker-2 would return
broker-2 as the leader.
Even though list-topic would show (1,3) as ISR and 3 as the leader. This
state is pretty disturbing as its difficult to detect and inherently not
supposed to state.

 *Log analysis *
For brevity i have attached my analysis in Kafka_split_brain.log file,
reflected the key points in the mail

Network failure occurred somewhere around 2014-06-16 20:30 hrs
*###Controller Logs (Time ordered)*

1. Controller fails to send request to broker 2

[2014-06-16 20:23:34,197] WARN [Controller-3-to-broker-2-send-thread],
Controller 3 fails to send a request to broker 2
(kafka.controller.RequestSendThread)
2. NoReplicaOnlineException shows broker 2 to be not live

[2014-06-16 21:40:14,351] ERROR Controller 3 epoch 63 initiated state
change for partition [same-topic-name,40] from OfflinePartition to
OnlinePartition failed (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition
[same-topic-name,40] is alive. Live brokers are: [Set(5, 1, 6, 3, 4)],
Assigned replicas are: [List(2)]
3. After restarting the broker today, the controller logs selects 2 as ISR.

[2014-06-17 21:26:14,109] WARN [OfflinePartitionLeaderSelector]: No broker
in ISR is alive for [bfd-logstream_9092-PROD,40]. Elect leader 2 from live
brokers 2. There's potential data loss.
(kafka.controller.OfflinePartitionLeaderSelector)

*###Broker-2 Logs*

1. When controller sends replica-request, warning invalid LeaderAndIsr

- [2014-06-16 20:23:31,482] WARN Broker 2 received invalid LeaderAndIsr
  request with correlation id 0 from controller 3 epoch 63 with an older
  leader epoch 41 for partition [cse-sauron,1], current leader epoch is
  41 (state.change.logger)

2. On restart today

- [2014-06-17 21:26:14,135] ERROR Broker 2 aborted the become-follower
  state change with correlation id 32 from controller 3 epoch 63 for
  partition [bfd-logstream_9092-PROD,82] new leader -1
(state.change.logger)



*###Where it all Started (Camus logs)*

We have a camus job which runs every half-hour, during some of the
yesterdays run we had duplicate data creeping in. On investigation, we
found out whenever the metadata request was sent to broker-id 2, broker-id
2 used to declare itself as leader. If the request was sent to other
brokers, the leader was always broker 3. On list topic, broker (1,3) were
ISR and 3 was the leader.  Though if we see the camus log for yesterday’s
run, it finds broker-2 as the leader for one of its run.


   - Job Run at 2014-06-17 00:06:34
  - 2014-06-17 00:06:34 INFO: com.linkedin.camus.etl.kafka.CamusJob -
  Fetching metadata from broker broker-1 with client id cse-sauron for 0
  topic(s) []
  2014-06-17 00:06:34 INFO: com.linkedin.camus.etl.kafka.CamusJob -
  cse-sauron uri:tcp://broker-3-url leader:3 partition:0 offset:10693174
  latest_offset:10693174
   - Job Run at 2014-06-17 00:24:58
  - 2014-06-17 00:24:58 INFO: com.linkedin.camus.etl.kafka.CamusJob -
  Fetching metadata from broker 2 with client id cse-sauron for 0
topic(s) []
  2014-06-17 00:24:58 ERROR: com.linkedin.camus.etl.kafka.CamusJob -
  The current offset was found to be more than the latest offset
  - 2014-06-17 00:24:58 ERROR: com.linkedin.camus.etl.kafka.CamusJob -
  Moving to the earliest offset available
  - 2014-06-17 00:24:58 INFO: com.linkedin.camus.etl.kafka.CamusJob -
  cse-sauron uri:tcp://broker-2-url leader:2 partition:0 offset:9747106
  latest_offset:10674802
   - Job Run at 2014-06-17 01:01:54:
  - 2014-06-17 01:01:54 INFO: com.linkedin.camus.etl.kafka.CamusJob -
  Fetching metadata from broker dare-broker02:9092 with client id
cse-sauron
  for 0 topic(s) []
  2014-06-17 01:01:54 INFO: com.linkedin.camus.etl.kafka.CamusJob -
  cse-sauron uri:tcp://dare-broker02.sv.walmartlabs.com:9092 leader:3
  partition:0 offset:10253311 latest_offset:10697656

 Let me know if you need any more details

-- 

Abhinav Anand


Re: ISR not updating

2014-06-18 Thread Bongyeon Kim
Yes.
it contain my server.properties file.


auto.leader.rebalance.enable=true


On Wed, Jun 18, 2014 at 12:44 PM, Jun Rao  wrote:

> Did you have auto leader balancing enabled?
>
> Thanks,
>
> Jun
>
>
> On Tue, Jun 17, 2014 at 5:06 PM, Bongyeon Kim 
> wrote:
>
> > There is some error log about failing leader election like that.
> >
> >
> > [2014-06-18 08:59:21,014] ERROR Controller 7 epoch 4 encountered error
> > while electing leader for partition [topicDEBUG,5] due to: Preferred
> > replica 1 for partition [topicDEBUG,5] is either not alive or not in the
> > isr. Current leader and ISR: [{"leader":8,"leader_epoch":6,"isr":[8,2]}].
> > (state.change.logger)
> > [2014-06-18 08:59:21,014] ERROR Controller 7 epoch 4 initiated state
> change
> > for partition [topicDEBUG,5] from OnlinePartition to OnlinePartition
> failed
> > (state.change.logger)
> > kafka.common.StateChangeFailedException: encountered error while electing
> > leader for partition [topicDEBUG,5] due to: Preferred replica 1 for
> > partition [topicDEBUG,5] is either not alive or not in the isr. Current
> > leader and ISR: [{"leader":8,"leader_epoch":6,"isr":[8,2]}].
> > at
> >
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:360)
> > at
> >
> >
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:187)
> > at
> >
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:125)
> > at
> >
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:124)
> > at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
> > at
> >
> >
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:124)
> > at
> >
> >
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:618)
> > at
> >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1118)
> > at
> >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1112)
> > at
> >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1112)
> > at kafka.utils.Utils$.inLock(Utils.scala:538)
> > at
> >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
> > at
> >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1107)
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> > at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> > at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> > at
> >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1107)
> > at
> >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1086)
> > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
> > at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
> > at
> >
> >
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1086)
> > at
> >
> >
> kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:324)
> > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > at java.lang.Thread.run(Thread.java:744)
> > Caused by: 

SimpleConsumer kafka.api.OffsetRequest.LatestTime() not working ?

2014-06-18 Thread rafeeq s
Simple Consumer is *not processing* messages when it's offset in
kafka.api.OffsetRequest.*LatestTime()* .

If i use *EarlierTime()  *offset method it is processing from start, which
is as expected.

But when I try for *LatestTime()* offset, no reaction from simple consumer
and I am referring below simpleconsumer example:

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example


Regards,
Rafeeq S
*(“What you do is what matters, not what you think or say or plan.” )*


Kafka latency measures

2014-06-18 Thread Supun Kamburugamuva
Hi,

We are trying to evaluate Kafka for a real time application. We are sending
50 Kb messages at a fixed rate. The normal messages have a reasonable
latency. But then there are these outliers that takes unpredictable amount
of time. This causes the average latency to increase dramatically. We are
running with basically the default configuration. Any suggestions for
improving the latency?

Thanks in advance,
Supun..

-- 
Supun Kamburugamuva
Member, Apache Software Foundation; http://www.apache.org
E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
Blog: http://supunk.blogspot.com


delete topic ?

2014-06-18 Thread Shlomi Hazan
Hi,

Doing some evaluation testing, and accidently create a queue with wrong
replication factor.

Trying to delete as in:

kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181 --delete
--topic replicated-topic

Yeilded:

Command must include exactly one action: --list, --describe, --create or
-alter

Event though this page (https://kafka.apache.org/documentation.html) says:

 

And finally deleting a topic:

 > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic
my_topic_name

WARNING: Delete topic functionality is beta in 0.8.1. Please report any bugs
that you encounter on the  mailing list or
 JIRA.

Kafka does not currently support reducing the number of partitions for a
topic or changing the replication factor.

What should I do?

Shlomi



Re: MirrorMaker documentation suggestions

2014-06-18 Thread Jun Rao
I just granted you the wiki permission. Could you give it a try?

Thanks,

Jun


On Tue, Jun 17, 2014 at 11:30 PM, Daniel Compton 
wrote:

> Hi Guozhang
>
> I feel pretty silly for asking this but how do I edit the Confluence Wiki?
> I have an account and it says there is no edit restrictions defined for the
> page but I can’t see an edit button anywhere. Do I need permission to edit
> the wiki?
>
> Daniel.
>
>
> On Wednesday, 18 June 2014 at 5:39 pm, Guozhang Wang wrote:
>
> > Thanks Daniel for the findings, please feel free to update the wiki.
> >
> > Guozhang
> >
> >
> > On Tue, Jun 17, 2014 at 9:56 PM, Daniel Compton  (mailto:d...@danielcompton.net)>
> > wrote:
> >
> > > Hi
> > >
> > > I was following the instructions for Kafka mirroring and had two
> > > suggestions for improving the documentation at
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330:
> > > 1. Move "Note that the --zkconnect argument should point to the source
> > > cluster's ZooKeeper...” above the console output section, I missed this
> > > last line on the page and spent some time scratching my head when I was
> > > connecting to the wrong zookeeper.
> > > 2. The output from ConsumerOffsetChecker has changed since May 2012
> when
> > > this part was written. The new format is:
> > >
> > > Group Topic Pid Offset logSize
> > > Lag Owner
> > > test-consumer-group-dc1 log-batch 0 5 5
> > > 0 none
> > > test-consumer-group-dc1 log-batch 1 3 3
> > > 0 none
> > > test-consumer-group-dc1 log-batch 2 6 9
> > > 3 none
> > >
> > > I’m happy to update the docs if it’s not too tricky to give me access
> to
> > > it.
> > >
> > > Thanks, Daniel.
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
>
>
>


Re: ISR not updating

2014-06-18 Thread Jun Rao
Auto leader balancing has some know issues when using together with
controlled shutdown. So, we don't recommend it to be turned on in 0.8.1.1

Thanks,

Jun


On Wed, Jun 18, 2014 at 1:41 AM, Bongyeon Kim 
wrote:

> Yes.
> it contain my server.properties file.
>
>
> auto.leader.rebalance.enable=true
>
>
> On Wed, Jun 18, 2014 at 12:44 PM, Jun Rao  wrote:
>
> > Did you have auto leader balancing enabled?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jun 17, 2014 at 5:06 PM, Bongyeon Kim 
> > wrote:
> >
> > > There is some error log about failing leader election like that.
> > >
> > >
> > > [2014-06-18 08:59:21,014] ERROR Controller 7 epoch 4 encountered error
> > > while electing leader for partition [topicDEBUG,5] due to: Preferred
> > > replica 1 for partition [topicDEBUG,5] is either not alive or not in
> the
> > > isr. Current leader and ISR:
> [{"leader":8,"leader_epoch":6,"isr":[8,2]}].
> > > (state.change.logger)
> > > [2014-06-18 08:59:21,014] ERROR Controller 7 epoch 4 initiated state
> > change
> > > for partition [topicDEBUG,5] from OnlinePartition to OnlinePartition
> > failed
> > > (state.change.logger)
> > > kafka.common.StateChangeFailedException: encountered error while
> electing
> > > leader for partition [topicDEBUG,5] due to: Preferred replica 1 for
> > > partition [topicDEBUG,5] is either not alive or not in the isr. Current
> > > leader and ISR: [{"leader":8,"leader_epoch":6,"isr":[8,2]}].
> > > at
> > >
> > >
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:360)
> > > at
> > >
> > >
> >
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:187)
> > > at
> > >
> > >
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:125)
> > > at
> > >
> > >
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:124)
> > > at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
> > > at
> > >
> > >
> >
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:124)
> > > at
> > >
> > >
> >
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:618)
> > > at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1118)
> > > at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1112)
> > > at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1112)
> > > at kafka.utils.Utils$.inLock(Utils.scala:538)
> > > at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
> > > at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1107)
> > > at
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> > > at
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> > > at
> > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> > > at
> > >
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> > > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> > > at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1107)
> > > at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1086)
> > > at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
> > > at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
> > > at
> > >
> > >
> >
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1086)
> > > at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:324)
> > > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
> > > at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> > > at
> > >
> > >
> >

Re: Kafka latency measures

2014-06-18 Thread Magnus Edenhill
Hi,

do these spikes happen to correlate with log.flush.interval.messages or
log.flush.interval.ms?
If so it's the file system sync blockage you are seeing.

/Magnus


2014-06-18 16:31 GMT+02:00 Supun Kamburugamuva :

> Hi,
>
> We are trying to evaluate Kafka for a real time application. We are sending
> 50 Kb messages at a fixed rate. The normal messages have a reasonable
> latency. But then there are these outliers that takes unpredictable amount
> of time. This causes the average latency to increase dramatically. We are
> running with basically the default configuration. Any suggestions for
> improving the latency?
>
> Thanks in advance,
> Supun..
>
> --
> Supun Kamburugamuva
> Member, Apache Software Foundation; http://www.apache.org
> E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> Blog: http://supunk.blogspot.com
>


Rolling update Kafka 0.8 -> Kafka 0.8.1.1 in detail

2014-06-18 Thread Yury Ruchin
Hi folks,

In my project, we want to perform to update our active Kafka 0.8 cluster to
Kafka 0.8.1.1 without downtime and losing any data. The process (after
reading http://kafka.apache.org/documentation.html#upgrade) looks to me
like this. For each broker in turn:

1. Bring the broker down.
2. Update Kafka to 0.8.1.1 on the broker node.
3. Start the broker.
4. Run preferred-replica-election script to restore broker's leadership for
respective partitions.
5. Wait for the the preferred replica election to complete.

I deem step#5 necessary since preferred replica election is an asynchronous
process. There is a slim chance that bringing other brokers down before the
election is complete would result in all replicas down for some partitions,
so a portion of the incoming data stream would be lost. Is my understanding
of the process correct?


Re: Rolling update Kafka 0.8 -> Kafka 0.8.1.1 in detail

2014-06-18 Thread Neha Narkhede
You don't gain much by running #4 between broker bounces. Running it after
the cluster is upgraded will be sufficient.

Thanks,
Neha


On Wed, Jun 18, 2014 at 8:33 AM, Yury Ruchin  wrote:

> Hi folks,
>
> In my project, we want to perform to update our active Kafka 0.8 cluster to
> Kafka 0.8.1.1 without downtime and losing any data. The process (after
> reading http://kafka.apache.org/documentation.html#upgrade) looks to me
> like this. For each broker in turn:
>
> 1. Bring the broker down.
> 2. Update Kafka to 0.8.1.1 on the broker node.
> 3. Start the broker.
> 4. Run preferred-replica-election script to restore broker's leadership for
> respective partitions.
> 5. Wait for the the preferred replica election to complete.
>
> I deem step#5 necessary since preferred replica election is an asynchronous
> process. There is a slim chance that bringing other brokers down before the
> election is complete would result in all replicas down for some partitions,
> so a portion of the incoming data stream would be lost. Is my understanding
> of the process correct?
>


Re: Kafka latency measures

2014-06-18 Thread Neha Narkhede
Which version of Kafka did you use? When you say latency, do you mean the
latency between the producer and consumer? If so, are you using a timestamp
within the message to compute this latency?


On Wed, Jun 18, 2014 at 8:15 AM, Magnus Edenhill  wrote:

> Hi,
>
> do these spikes happen to correlate with log.flush.interval.messages or
> log.flush.interval.ms?
> If so it's the file system sync blockage you are seeing.
>
> /Magnus
>
>
> 2014-06-18 16:31 GMT+02:00 Supun Kamburugamuva :
>
> > Hi,
> >
> > We are trying to evaluate Kafka for a real time application. We are
> sending
> > 50 Kb messages at a fixed rate. The normal messages have a reasonable
> > latency. But then there are these outliers that takes unpredictable
> amount
> > of time. This causes the average latency to increase dramatically. We are
> > running with basically the default configuration. Any suggestions for
> > improving the latency?
> >
> > Thanks in advance,
> > Supun..
> >
> > --
> > Supun Kamburugamuva
> > Member, Apache Software Foundation; http://www.apache.org
> > E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> > Blog: http://supunk.blogspot.com
> >
>


Re: SimpleConsumer kafka.api.OffsetRequest.LatestTime() not working ?

2014-06-18 Thread Neha Narkhede
Is that because you have no more data produced to the topic/partition after
the last offset returned by OffsetRequest.LatestTime() ?


On Wed, Jun 18, 2014 at 6:19 AM, rafeeq s  wrote:

> Simple Consumer is *not processing* messages when it's offset in
> kafka.api.OffsetRequest.*LatestTime()* .
>
> If i use *EarlierTime()  *offset method it is processing from start, which
> is as expected.
>
> But when I try for *LatestTime()* offset, no reaction from simple consumer
> and I am referring below simpleconsumer example:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>
>
> Regards,
> Rafeeq S
> *(“What you do is what matters, not what you think or say or plan.” )*
>


Re: delete topic ?

2014-06-18 Thread Neha Narkhede
Kafka allows increasing the replication factor of a topic. You can read
about it here
.
We do not support reducing the number of partitions, so you either have to
create a new topic or delete the existing one. We fixed a number of bugs in
delete topic but those fixes are on trunk. You can give it a spin.


On Wed, Jun 18, 2014 at 4:06 AM, Shlomi Hazan  wrote:

> Hi,
>
> Doing some evaluation testing, and accidently create a queue with wrong
> replication factor.
>
> Trying to delete as in:
>
> kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181 --delete
> --topic replicated-topic
>
> Yeilded:
>
> Command must include exactly one action: --list, --describe, --create or
> -alter
>
> Event though this page (https://kafka.apache.org/documentation.html) says:
>
>
>
> And finally deleting a topic:
>
>  > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic
> my_topic_name
>
> WARNING: Delete topic functionality is beta in 0.8.1. Please report any
> bugs
> that you encounter on the  mailing list
> or
>  JIRA.
>
> Kafka does not currently support reducing the number of partitions for a
> topic or changing the replication factor.
>
> What should I do?
>
> Shlomi
>
>


Re: delete topic ?

2014-06-18 Thread Mark Roberts
When we were in testing phase, we would either create a new topic with the 
correct details or shut the cluster down and hard kill the topic in zookeeper + 
local disk.  In prod we have the cluster configured via configuration 
management and auto create turned off.

The ability to delete a topic in a live, running kafka cluster is tricky, and 
the implementations of it have been subtly incorrect (and therefore dangerous). 
I know that there is work happening around that, but haven't kept up with the 
status of it.  Maybe in 8.2? It sounds conceptually simpler to implement with 
the new metadata API.

-Mark

> On Jun 18, 2014, at 4:06, "Shlomi Hazan"  wrote:
> 
> Hi,
> 
> Doing some evaluation testing, and accidently create a queue with wrong
> replication factor.
> 
> Trying to delete as in:
> 
> kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181 --delete
> --topic replicated-topic
> 
> Yeilded:
> 
> Command must include exactly one action: --list, --describe, --create or
> -alter
> 
> Event though this page (https://kafka.apache.org/documentation.html) says:
> 
> 
> 
> And finally deleting a topic:
> 
>> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic
> my_topic_name
> 
> WARNING: Delete topic functionality is beta in 0.8.1. Please report any bugs
> that you encounter on the  mailing list or
>  JIRA.
> 
> Kafka does not currently support reducing the number of partitions for a
> topic or changing the replication factor.
> 
> What should I do?
> 
> Shlomi
> 


Re: Building Kafka on Mac OS X

2014-06-18 Thread Jorge Marizan

I will let you know once I come back from my job.

Jorge

On mié 18 jun 2014 00:35:02 AST, Timothy Chen wrote:

So do you have the build folder generated in core/client and
classes/jars compiled in them?

Kafka-server-start.sh also sets the CLASSPATH to load the jar and
kafka.Kafka as well, so you want to make sure they're there.

Tim

On Tue, Jun 17, 2014 at 9:18 PM, Jorge Marizan  wrote:

Now when I try to run, it fails finding the kafka.Kafka class:

kafka-server-start.sh /usr/local/etc/kafka/server.properties
Error: Could not find or load main class kafka.Kafka

  Jorge


On Jun 17, 2014, at 11:54 PM, Jorge Marizan  wrote:


Not at all,  I verified with ps aux and there is no Gradle processes left 
behind when I cancel the compile job.

Jorge.

On Jun 17, 2014, at 11:45 PM, Timothy Chen  wrote:


Not sure what's wrong but I'm guessing there probably can be a gradle lock 
somewhere.

Is there other gradle processes that is hanging around?

Tim

Sent from my iPhone


On Jun 17, 2014, at 8:35 PM, Jorge Marizan  wrote:

:core:compileScala

Jorge.


On Jun 17, 2014, at 8:54 PM, Timothy Chen  wrote:

What's the last line it's stuck on with debug flag on?

Tim


On Tue, Jun 17, 2014 at 4:46 PM, Jorge Marizan  wrote:
I almost got it to work but still stuck compiling scala

Any idea?

Jorge.

On Jun 17, 2014, at 7:22 AM, Jorge Marizan  wrote:

It got stuck on this:

./gradlew jar
The TaskContainer.add() method has been deprecated and is scheduled to be 
removed in Gradle 2.0. Please use the create() method instead.
Building project 'core' with Scala version 2.8.0
Building project 'perf' with Scala version 2.8.0
:clients:compileJava
:clients:processResources UP-TO-DATE
:clients:classes
:clients:jar
:contrib:compileJava UP-TO-DATE
:contrib:processResources UP-TO-DATE
:contrib:classes UP-TO-DATE
:contrib:jar
:core:compileJava UP-TO-DATE
:core:compileScala


On Jun 17, 2014, at 12:46 AM, Steve Morin  wrote:

Have seen if you have a write with zero data it will hang


On Jun 16, 2014, at 21:02, Timothy Chen  wrote:

Can you try running it in debug mode? (./gradlew jar -d)

Tim


On Mon, Jun 16, 2014 at 8:44 PM, Jorge Marizan  wrote:
It just hangs there without any output at all.

Jorge.


On Jun 16, 2014, at 11:27 PM, Timothy Chen  wrote:

What output was it stuck on?

Tim


On Mon, Jun 16, 2014 at 6:39 PM, Jorge Marizan  wrote:
Hi team, I’m a newcomer to Kafka, but I’m having some troubles trying to get it 
to run on OS X.
Basically building Kafka on OS X with 'gradlew jar’  gets stuck forever without 
any progress (Indeed I tried to leave it building all night with no avail).

Any advices will be greatly appreciated. Thanks in advance.








Re: Kafka latency measures

2014-06-18 Thread Supun Kamburugamuva
The spikes happens without any correlation with the
log.flush.interval.message.
They happen more frequently.

I'm using the latest version. I'm sending the messages to Kafka, then there
is a message receiver, it sends the same messages back through kafka to
original sender. The round trip latency is measured.

Thanks,
Supun..


On Wed, Jun 18, 2014 at 12:02 PM, Neha Narkhede 
wrote:

> Which version of Kafka did you use? When you say latency, do you mean the
> latency between the producer and consumer? If so, are you using a timestamp
> within the message to compute this latency?
>
>
> On Wed, Jun 18, 2014 at 8:15 AM, Magnus Edenhill 
> wrote:
>
> > Hi,
> >
> > do these spikes happen to correlate with log.flush.interval.messages or
> > log.flush.interval.ms?
> > If so it's the file system sync blockage you are seeing.
> >
> > /Magnus
> >
> >
> > 2014-06-18 16:31 GMT+02:00 Supun Kamburugamuva :
> >
> > > Hi,
> > >
> > > We are trying to evaluate Kafka for a real time application. We are
> > sending
> > > 50 Kb messages at a fixed rate. The normal messages have a reasonable
> > > latency. But then there are these outliers that takes unpredictable
> > amount
> > > of time. This causes the average latency to increase dramatically. We
> are
> > > running with basically the default configuration. Any suggestions for
> > > improving the latency?
> > >
> > > Thanks in advance,
> > > Supun..
> > >
> > > --
> > > Supun Kamburugamuva
> > > Member, Apache Software Foundation; http://www.apache.org
> > > E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> > > Blog: http://supunk.blogspot.com
> > >
> >
>



-- 
Supun Kamburugamuva
Member, Apache Software Foundation; http://www.apache.org
E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
Blog: http://supunk.blogspot.com


Re: SimpleConsumer kafka.api.OffsetRequest.LatestTime() not working ?

2014-06-18 Thread Supun Kamburugamuva
Hi Refeeq,

I'm using the LatestTime() and it is working.

Thanks,
Supun..


On Wed, Jun 18, 2014 at 12:07 PM, Neha Narkhede 
wrote:

> Is that because you have no more data produced to the topic/partition after
> the last offset returned by OffsetRequest.LatestTime() ?
>
>
> On Wed, Jun 18, 2014 at 6:19 AM, rafeeq s  wrote:
>
> > Simple Consumer is *not processing* messages when it's offset in
> > kafka.api.OffsetRequest.*LatestTime()* .
> >
> > If i use *EarlierTime()  *offset method it is processing from start,
> which
> > is as expected.
> >
> > But when I try for *LatestTime()* offset, no reaction from simple
> consumer
> > and I am referring below simpleconsumer example:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> >
> >
> > Regards,
> > Rafeeq S
> > *(“What you do is what matters, not what you think or say or plan.” )*
> >
>



-- 
Supun Kamburugamuva
Member, Apache Software Foundation; http://www.apache.org
E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
Blog: http://supunk.blogspot.com


Re: Kafka latency measures

2014-06-18 Thread Supun Kamburugamuva
I've found this performance test.

http://blog.liveramp.com/2013/04/08/kafka-0-8-producer-performance-2/

This performance test has mentioned about the same issue at the end.

Thanks,
Supun..


On Wed, Jun 18, 2014 at 12:43 PM, Supun Kamburugamuva 
wrote:

> The spikes happens without any correlation with the  
> log.flush.interval.message.
> They happen more frequently.
>
> I'm using the latest version. I'm sending the messages to Kafka, then
> there is a message receiver, it sends the same messages back through kafka
> to original sender. The round trip latency is measured.
>
> Thanks,
> Supun..
>
>
> On Wed, Jun 18, 2014 at 12:02 PM, Neha Narkhede 
> wrote:
>
>> Which version of Kafka did you use? When you say latency, do you mean the
>> latency between the producer and consumer? If so, are you using a
>> timestamp
>> within the message to compute this latency?
>>
>>
>> On Wed, Jun 18, 2014 at 8:15 AM, Magnus Edenhill 
>> wrote:
>>
>> > Hi,
>> >
>> > do these spikes happen to correlate with log.flush.interval.messages or
>> > log.flush.interval.ms?
>> > If so it's the file system sync blockage you are seeing.
>> >
>> > /Magnus
>> >
>> >
>> > 2014-06-18 16:31 GMT+02:00 Supun Kamburugamuva :
>> >
>> > > Hi,
>> > >
>> > > We are trying to evaluate Kafka for a real time application. We are
>> > sending
>> > > 50 Kb messages at a fixed rate. The normal messages have a reasonable
>> > > latency. But then there are these outliers that takes unpredictable
>> > amount
>> > > of time. This causes the average latency to increase dramatically. We
>> are
>> > > running with basically the default configuration. Any suggestions for
>> > > improving the latency?
>> > >
>> > > Thanks in advance,
>> > > Supun..
>> > >
>> > > --
>> > > Supun Kamburugamuva
>> > > Member, Apache Software Foundation; http://www.apache.org
>> > > E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
>> > > Blog: http://supunk.blogspot.com
>> > >
>> >
>>
>
>
>
> --
> Supun Kamburugamuva
> Member, Apache Software Foundation; http://www.apache.org
> E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> Blog: http://supunk.blogspot.com
>
>


-- 
Supun Kamburugamuva
Member, Apache Software Foundation; http://www.apache.org
E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
Blog: http://supunk.blogspot.com


Running mirrormaker in HA?

2014-06-18 Thread Prakash Gowri Shankor
Hi,

I've looked at the documentation for mirrormaker. It doesnt appear that
there is a notion of HA ( high availability ) for the mirrormaker tool
itself. Ideally, I would like to have some topics flowing to another
datacenter with a 2 node mirrormaker in between, where one mirrormaker is
always active  and the other passive until it needs to become primary. The
transfer from passive to active also needs to happen with offset awareness
( ie previously written topic offset should be picked up by the new
primary) . Is anything like this planned in future releases ?

Thanks,
Prakash


Re: Network failure leads to split brain in brokers

2014-06-18 Thread Guozhang Wang
Hello Abhinav,

Which Kafka version are you using?

Guozhang


On Wed, Jun 18, 2014 at 1:40 AM, Abhinav Anand  wrote:

> Hi Guys,
>  We have a 6 node cluster. Due to network failure yesterday(network was up
> and running in few minutes), few of the brokers were not able to talk to
> each other, including the zookeeper. During an issue in our camus job, we
> identified that broker-2 was misbehaving. The controller logs reflect that
> the broker-2 is not alive, though broker-2 was serving all
> metadata/consumer requests and its log don't show nasty errors.
>
> If you see the camus logs below, any request to broker-2 would return
> broker-2 as the leader.
> Even though list-topic would show (1,3) as ISR and 3 as the leader. This
> state is pretty disturbing as its difficult to detect and inherently not
> supposed to state.
>
>  *Log analysis *
> For brevity i have attached my analysis in Kafka_split_brain.log file,
> reflected the key points in the mail
>
> Network failure occurred somewhere around 2014-06-16 20:30 hrs
> *###Controller Logs (Time ordered)*
>
> 1. Controller fails to send request to broker 2
>
> [2014-06-16 20:23:34,197] WARN [Controller-3-to-broker-2-send-thread],
> Controller 3 fails to send a request to broker 2
> (kafka.controller.RequestSendThread)
> 2. NoReplicaOnlineException shows broker 2 to be not live
>
> [2014-06-16 21:40:14,351] ERROR Controller 3 epoch 63 initiated state
> change for partition [same-topic-name,40] from OfflinePartition to
> OnlinePartition failed (state.change.logger)
> kafka.common.NoReplicaOnlineException: No replica for partition
> [same-topic-name,40] is alive. Live brokers are: [Set(5, 1, 6, 3, 4)],
> Assigned replicas are: [List(2)]
> 3. After restarting the broker today, the controller logs selects 2 as
> ISR.
>
> [2014-06-17 21:26:14,109] WARN [OfflinePartitionLeaderSelector]: No
> broker in ISR is alive for [bfd-logstream_9092-PROD,40]. Elect leader 2
> from live brokers 2. There's potential data loss.
> (kafka.controller.OfflinePartitionLeaderSelector)
>
> *###Broker-2 Logs*
>
> 1. When controller sends replica-request, warning invalid LeaderAndIsr
>
> - [2014-06-16 20:23:31,482] WARN Broker 2 received invalid
>   LeaderAndIsr request with correlation id 0 from controller 3 epoch 63 
> with
>   an older leader epoch 41 for partition [cse-sauron,1], current
>   leader epoch is 41 (state.change.logger)
>
> 2. On restart today
>
> - [2014-06-17 21:26:14,135] ERROR Broker 2 aborted the
>   become-follower state change with correlation id 32 from controller 3 
> epoch
>   63 for partition [bfd-logstream_9092-PROD,82] new leader -1
>   (state.change.logger)
>
>
>
> *###Where it all Started (Camus logs)*
>
> We have a camus job which runs every half-hour, during some of the
> yesterdays run we had duplicate data creeping in. On investigation, we
> found out whenever the metadata request was sent to broker-id 2, broker-id
> 2 used to declare itself as leader. If the request was sent to other
> brokers, the leader was always broker 3. On list topic, broker (1,3) were
> ISR and 3 was the leader.  Though if we see the camus log for yesterday’s
> run, it finds broker-2 as the leader for one of its run.
>
>
>- Job Run at 2014-06-17 00:06:34
>   - 2014-06-17 00:06:34 INFO: com.linkedin.camus.etl.kafka.CamusJob -
>   Fetching metadata from broker broker-1 with client id cse-sauron for 0
>   topic(s) []
>   2014-06-17 00:06:34 INFO: com.linkedin.camus.etl.kafka.CamusJob -
>   cse-sauron uri:tcp://broker-3-url leader:3 partition:0
>   offset:10693174 latest_offset:10693174
>- Job Run at 2014-06-17 00:24:58
>   - 2014-06-17 00:24:58 INFO: com.linkedin.camus.etl.kafka.CamusJob -
>   Fetching metadata from broker 2 with client id cse-sauron for 0 
> topic(s) []
>   2014-06-17 00:24:58 ERROR: com.linkedin.camus.etl.kafka.CamusJob -
>   The current offset was found to be more than the latest offset
>   - 2014-06-17 00:24:58 ERROR: com.linkedin.camus.etl.kafka.CamusJob
>   - Moving to the earliest offset available
>   - 2014-06-17 00:24:58 INFO: com.linkedin.camus.etl.kafka.CamusJob -
>   cse-sauron uri:tcp://broker-2-url leader:2 partition:0
>   offset:9747106 latest_offset:10674802
>- Job Run at 2014-06-17 01:01:54:
>   - 2014-06-17 01:01:54 INFO: com.linkedin.camus.etl.kafka.CamusJob -
>   Fetching metadata from broker dare-broker02:9092 with client id 
> cse-sauron
>   for 0 topic(s) []
>   2014-06-17 01:01:54 INFO: com.linkedin.camus.etl.kafka.CamusJob -
>   cse-sauron uri:tcp://dare-broker02.sv.walmartlabs.com:9092 leader:3
>   partition:0 offset:10253311 latest_offset:10697656
>
>  Let me know if you need any more details
>
> --
>
> Abhinav Anand
>



-- 
-- Guozhang


Re: delete topic ?

2014-06-18 Thread hsy...@gmail.com
I'm using 0.8.1.1
I use DeleteTopicCommand to delete topic
args[0] = "--topic";
args[1] = the topic you want to delete
args[2] = "--zookeeper";
args[3] = kafkaZookeepers;
DeleteTopicCommand.main(args);

You can write your own script to delete the topic, I guess. And I think it
only deletes the entry in zookeeper

Best,
Siyuan



On Wed, Jun 18, 2014 at 9:13 AM, Mark Roberts  wrote:

> When we were in testing phase, we would either create a new topic with the
> correct details or shut the cluster down and hard kill the topic in
> zookeeper + local disk.  In prod we have the cluster configured via
> configuration management and auto create turned off.
>
> The ability to delete a topic in a live, running kafka cluster is tricky,
> and the implementations of it have been subtly incorrect (and therefore
> dangerous). I know that there is work happening around that, but haven't
> kept up with the status of it.  Maybe in 8.2? It sounds conceptually
> simpler to implement with the new metadata API.
>
> -Mark
>
> > On Jun 18, 2014, at 4:06, "Shlomi Hazan"  wrote:
> >
> > Hi,
> >
> > Doing some evaluation testing, and accidently create a queue with wrong
> > replication factor.
> >
> > Trying to delete as in:
> >
> > kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181
> --delete
> > --topic replicated-topic
> >
> > Yeilded:
> >
> > Command must include exactly one action: --list, --describe, --create or
> > -alter
> >
> > Event though this page (https://kafka.apache.org/documentation.html)
> says:
> >
> >
> >
> > And finally deleting a topic:
> >
> >> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic
> > my_topic_name
> >
> > WARNING: Delete topic functionality is beta in 0.8.1. Please report any
> bugs
> > that you encounter on the  mailing
> list or
> >  JIRA.
> >
> > Kafka does not currently support reducing the number of partitions for a
> > topic or changing the replication factor.
> >
> > What should I do?
> >
> > Shlomi
> >
>


Re: Running mirrormaker in HA?

2014-06-18 Thread Guozhang Wang
Hi Prakash,

Actually the MM is HA such that it used a consumer group fetching data from
the source DC and another producer pushing them to the destination DC. So
suppose there are two instances of MM as in your case, each of the instance
will get half of the traffic mirroring to the other DC since they have
consumers in the same group. When one of them dies, the load balance will
automatically moves all its traffic to the other instance.

Guozhang


On Wed, Jun 18, 2014 at 1:10 PM, Prakash Gowri Shankor <
prakash.shan...@gmail.com> wrote:

> Hi,
>
> I've looked at the documentation for mirrormaker. It doesnt appear that
> there is a notion of HA ( high availability ) for the mirrormaker tool
> itself. Ideally, I would like to have some topics flowing to another
> datacenter with a 2 node mirrormaker in between, where one mirrormaker is
> always active  and the other passive until it needs to become primary. The
> transfer from passive to active also needs to happen with offset awareness
> ( ie previously written topic offset should be picked up by the new
> primary) . Is anything like this planned in future releases ?
>
> Thanks,
> Prakash
>



-- 
-- Guozhang


Re: delete topic ?

2014-06-18 Thread Timothy Chen
Yes the existing delete topic command just cleans up the topic entry in zk, but 
not really deleting the topic from the cluster.

I have a patch that enables kafka-topics.sh to delete topic but not sure if 
it's merged to trunk.

Tim

> On Jun 18, 2014, at 1:39 PM, "hsy...@gmail.com"  wrote:
> 
> I'm using 0.8.1.1
> I use DeleteTopicCommand to delete topic
>args[0] = "--topic";
>args[1] = the topic you want to delete
>args[2] = "--zookeeper";
>args[3] = kafkaZookeepers;
>DeleteTopicCommand.main(args);
> 
> You can write your own script to delete the topic, I guess. And I think it
> only deletes the entry in zookeeper
> 
> Best,
> Siyuan
> 
> 
> 
>> On Wed, Jun 18, 2014 at 9:13 AM, Mark Roberts  wrote:
>> 
>> When we were in testing phase, we would either create a new topic with the
>> correct details or shut the cluster down and hard kill the topic in
>> zookeeper + local disk.  In prod we have the cluster configured via
>> configuration management and auto create turned off.
>> 
>> The ability to delete a topic in a live, running kafka cluster is tricky,
>> and the implementations of it have been subtly incorrect (and therefore
>> dangerous). I know that there is work happening around that, but haven't
>> kept up with the status of it.  Maybe in 8.2? It sounds conceptually
>> simpler to implement with the new metadata API.
>> 
>> -Mark
>> 
>>> On Jun 18, 2014, at 4:06, "Shlomi Hazan"  wrote:
>>> 
>>> Hi,
>>> 
>>> Doing some evaluation testing, and accidently create a queue with wrong
>>> replication factor.
>>> 
>>> Trying to delete as in:
>>> 
>>> kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181
>> --delete
>>> --topic replicated-topic
>>> 
>>> Yeilded:
>>> 
>>> Command must include exactly one action: --list, --describe, --create or
>>> -alter
>>> 
>>> Event though this page (https://kafka.apache.org/documentation.html)
>> says:
>>> 
>>> 
>>> 
>>> And finally deleting a topic:
>>> 
 bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic
>>> my_topic_name
>>> 
>>> WARNING: Delete topic functionality is beta in 0.8.1. Please report any
>> bugs
>>> that you encounter on the  mailing
>> list or
>>>  JIRA.
>>> 
>>> Kafka does not currently support reducing the number of partitions for a
>>> topic or changing the replication factor.
>>> 
>>> What should I do?
>>> 
>>> Shlomi
>> 


Re: Running mirrormaker in HA?

2014-06-18 Thread Prakash Gowri Shankor
 I see what you mean by the implicit HA.Thanks Guozhang.


On Wed, Jun 18, 2014 at 1:45 PM, Guozhang Wang  wrote:

> Hi Prakash,
>
> Actually the MM is HA such that it used a consumer group fetching data from
> the source DC and another producer pushing them to the destination DC. So
> suppose there are two instances of MM as in your case, each of the instance
> will get half of the traffic mirroring to the other DC since they have
> consumers in the same group. When one of them dies, the load balance will
> automatically moves all its traffic to the other instance.
>
> Guozhang
>
>
> On Wed, Jun 18, 2014 at 1:10 PM, Prakash Gowri Shankor <
> prakash.shan...@gmail.com> wrote:
>
> > Hi,
> >
> > I've looked at the documentation for mirrormaker. It doesnt appear that
> > there is a notion of HA ( high availability ) for the mirrormaker tool
> > itself. Ideally, I would like to have some topics flowing to another
> > datacenter with a 2 node mirrormaker in between, where one mirrormaker is
> > always active  and the other passive until it needs to become primary.
> The
> > transfer from passive to active also needs to happen with offset
> awareness
> > ( ie previously written topic offset should be picked up by the new
> > primary) . Is anything like this planned in future releases ?
> >
> > Thanks,
> > Prakash
> >
>
>
>
> --
> -- Guozhang
>


Re: Reliable Message Commits

2014-06-18 Thread Kyle Banker
I think I've discovered the answer to my second question: according to the
code in ZookeeperConsumerConnector.scala, a rebalance derives its offsets
from what's already in Zookeeper. Therefore, uncommitted but consumed
messages from a given partition will be replayed when the partition is
reassigned.


On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker  wrote:

> I'm using Kafka 0.8.1.1.
>
> I have a simple goal: use the high-level consumer to consume a message
> from Kafka, publish the message to a different system, and then commit the
> message in Kafka. Based on my reading of the docs and the mailing list, it
> seems like this isn't so easy to achieve. Here is my current understanding:
>
> First, I have to disable auto-commit. If the consumer automatically
> commits, then I may lose messages if, for example, my process dies after
> consuming but before publishing my message.
>
> Next, if my app is multi-threaded, I need to either
>
> a) use a separate consumer per thread (memory-intensive, hard on
> Zookeeper) or
> b) use a single consumer and assign a KafkaStream to each thread. Then,
> when I want to commit, first synchronize all threads using a barrier.
>
> First question: is this correct so far?
>
>
> Still, it appears that rebalancing may be a problem. In particular, this
> sequence of events:
>
> 1. I'm consuming from a stream tied to two partitions, A and B.
> 2. I consume a message, M, from partition A.
> 3. Partition A gets assigned to a different consumer.
> 4. I choose not to commit M or my process fails.
>
> Second question: When the partition is reassigned, will the message that I
> consumed be automatically committed? If so, then there's no way to get the
> reliability I want.
>
>
> Third question: How do the folks at LinkedIn handle this overall use case?
> What about other users?
>
> It seems to me that a lot of the complexity here could be easily addressed
> by changing the way in which a partition's message pointer is advanced.
> That is, when I consume message M, advance the pointer to message (M - 1)
> rather than to M. In other words, calling iterator.next() would imply that
> the previously consumed message may be safely committed. If this were the
> case, I could simply enable auto-commit and be happy.
>


Re: MirrorMaker documentation suggestions

2014-06-18 Thread Daniel Compton
Hi Jun

Thanks, I was able to update the wiki.  

Daniel.


On Thursday, 19 June 2014 at 2:45 am, Jun Rao wrote:

> I just granted you the wiki permission. Could you give it a try?
>  
> Thanks,
>  
> Jun
>  
>  
> On Tue, Jun 17, 2014 at 11:30 PM, Daniel Compton  (mailto:d...@danielcompton.net)>
> wrote:
>  
> > Hi Guozhang
> >  
> > I feel pretty silly for asking this but how do I edit the Confluence Wiki?
> > I have an account and it says there is no edit restrictions defined for the
> > page but I can’t see an edit button anywhere. Do I need permission to edit
> > the wiki?
> >  
> > Daniel.
> >  
> >  
> > On Wednesday, 18 June 2014 at 5:39 pm, Guozhang Wang wrote:
> >  
> > > Thanks Daniel for the findings, please feel free to update the wiki.
> > >  
> > > Guozhang
> > >  
> > >  
> > > On Tue, Jun 17, 2014 at 9:56 PM, Daniel Compton  > > (mailto:d...@danielcompton.net)
> > (mailto:d...@danielcompton.net)>
> > > wrote:
> > >  
> > > > Hi
> > > >  
> > > > I was following the instructions for Kafka mirroring and had two
> > > > suggestions for improving the documentation at
> > > >  
> > >  
> > >  
> >  
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330:
> > > > 1. Move "Note that the --zkconnect argument should point to the source
> > > > cluster's ZooKeeper...” above the console output section, I missed this
> > > > last line on the page and spent some time scratching my head when I was
> > > > connecting to the wrong zookeeper.
> > > > 2. The output from ConsumerOffsetChecker has changed since May 2012
> > > >  
> > >  
> >  
> > when
> > > > this part was written. The new format is:
> > > >  
> > > > Group Topic Pid Offset logSize
> > > > Lag Owner
> > > > test-consumer-group-dc1 log-batch 0 5 5
> > > > 0 none
> > > > test-consumer-group-dc1 log-batch 1 3 3
> > > > 0 none
> > > > test-consumer-group-dc1 log-batch 2 6 9
> > > > 3 none
> > > >  
> > > > I’m happy to update the docs if it’s not too tricky to give me access
> > to
> > > > it.
> > > >  
> > > > Thanks, Daniel.
> > >  
> > >  
> > >  
> > > --
> > > -- Guozhang
> > >  
> >  
> >  
>  
>  
>  




Re: Building Kafka on Mac OS X

2014-06-18 Thread Jorge Marizan
Everything ran smooth now, I will start running tests.

Thanks.

Jorge
On Jun 18, 2014, at 11:44 AM, Jorge Marizan  wrote:

> I will let you know once I come back from my job.
> 
> Jorge
> 
> On mié 18 jun 2014 00:35:02 AST, Timothy Chen wrote:
>> So do you have the build folder generated in core/client and
>> classes/jars compiled in them?
>> 
>> Kafka-server-start.sh also sets the CLASSPATH to load the jar and
>> kafka.Kafka as well, so you want to make sure they're there.
>> 
>> Tim
>> 
>> On Tue, Jun 17, 2014 at 9:18 PM, Jorge Marizan  
>> wrote:
>>> Now when I try to run, it fails finding the kafka.Kafka class:
>>> 
>>> kafka-server-start.sh /usr/local/etc/kafka/server.properties
>>> Error: Could not find or load main class kafka.Kafka
>>> 
>>>  Jorge
>>> 
>>> 
>>> On Jun 17, 2014, at 11:54 PM, Jorge Marizan  wrote:
>>> 
 Not at all,  I verified with ps aux and there is no Gradle processes left 
 behind when I cancel the compile job.
 
 Jorge.
 
 On Jun 17, 2014, at 11:45 PM, Timothy Chen  wrote:
 
> Not sure what's wrong but I'm guessing there probably can be a gradle 
> lock somewhere.
> 
> Is there other gradle processes that is hanging around?
> 
> Tim
> 
> Sent from my iPhone
> 
>> On Jun 17, 2014, at 8:35 PM, Jorge Marizan  
>> wrote:
>> 
>> :core:compileScala
>> 
>> Jorge.
>> 
>>> On Jun 17, 2014, at 8:54 PM, Timothy Chen  wrote:
>>> 
>>> What's the last line it's stuck on with debug flag on?
>>> 
>>> Tim
>>> 
 On Tue, Jun 17, 2014 at 4:46 PM, Jorge Marizan 
  wrote:
 I almost got it to work but still stuck compiling scala
 
 Any idea?
 
 Jorge.
> On Jun 17, 2014, at 7:22 AM, Jorge Marizan  
> wrote:
> 
> It got stuck on this:
> 
> ./gradlew jar
> The TaskContainer.add() method has been deprecated and is scheduled 
> to be removed in Gradle 2.0. Please use the create() method instead.
> Building project 'core' with Scala version 2.8.0
> Building project 'perf' with Scala version 2.8.0
> :clients:compileJava
> :clients:processResources UP-TO-DATE
> :clients:classes
> :clients:jar
> :contrib:compileJava UP-TO-DATE
> :contrib:processResources UP-TO-DATE
> :contrib:classes UP-TO-DATE
> :contrib:jar
> :core:compileJava UP-TO-DATE
> :core:compileScala
> 
>> On Jun 17, 2014, at 12:46 AM, Steve Morin  
>> wrote:
>> 
>> Have seen if you have a write with zero data it will hang
>> 
>>> On Jun 16, 2014, at 21:02, Timothy Chen  wrote:
>>> 
>>> Can you try running it in debug mode? (./gradlew jar -d)
>>> 
>>> Tim
>>> 
 On Mon, Jun 16, 2014 at 8:44 PM, Jorge Marizan 
  wrote:
 It just hangs there without any output at all.
 
 Jorge.
 
> On Jun 16, 2014, at 11:27 PM, Timothy Chen  
> wrote:
> 
> What output was it stuck on?
> 
> Tim
> 
>> On Mon, Jun 16, 2014 at 6:39 PM, Jorge Marizan 
>>  wrote:
>> Hi team, I’m a newcomer to Kafka, but I’m having some troubles 
>> trying to get it to run on OS X.
>> Basically building Kafka on OS X with 'gradlew jar’  gets stuck 
>> forever without any progress (Indeed I tried to leave it 
>> building all night with no avail).
>> 
>> Any advices will be greatly appreciated. Thanks in advance.
>> 
 
>>> 



Re: ISR not updating

2014-06-18 Thread Bongyeon Kim
I'm not enable controllerd.shutdown property. anyway, I'd better disable
auto leader banancing.
I would like to find that information on documentation.

Thanks. Jun




On Wed, Jun 18, 2014 at 11:47 PM, Jun Rao  wrote:

> Auto leader balancing has some know issues when using together with
> controlled shutdown. So, we don't recommend it to be turned on in 0.8.1.1
>
> Thanks,
>
> Jun
>
>
> On Wed, Jun 18, 2014 at 1:41 AM, Bongyeon Kim 
> wrote:
>
> > Yes.
> > it contain my server.properties file.
> >
> >
> > auto.leader.rebalance.enable=true
> >
> >
> > On Wed, Jun 18, 2014 at 12:44 PM, Jun Rao  wrote:
> >
> > > Did you have auto leader balancing enabled?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Jun 17, 2014 at 5:06 PM, Bongyeon Kim 
> > > wrote:
> > >
> > > > There is some error log about failing leader election like that.
> > > >
> > > >
> > > > [2014-06-18 08:59:21,014] ERROR Controller 7 epoch 4 encountered
> error
> > > > while electing leader for partition [topicDEBUG,5] due to: Preferred
> > > > replica 1 for partition [topicDEBUG,5] is either not alive or not in
> > the
> > > > isr. Current leader and ISR:
> > [{"leader":8,"leader_epoch":6,"isr":[8,2]}].
> > > > (state.change.logger)
> > > > [2014-06-18 08:59:21,014] ERROR Controller 7 epoch 4 initiated state
> > > change
> > > > for partition [topicDEBUG,5] from OnlinePartition to OnlinePartition
> > > failed
> > > > (state.change.logger)
> > > > kafka.common.StateChangeFailedException: encountered error while
> > electing
> > > > leader for partition [topicDEBUG,5] due to: Preferred replica 1 for
> > > > partition [topicDEBUG,5] is either not alive or not in the isr.
> Current
> > > > leader and ISR: [{"leader":8,"leader_epoch":6,"isr":[8,2]}].
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:360)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:187)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:125)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:124)
> > > > at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:124)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:618)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1118)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1112)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1112)
> > > > at kafka.utils.Utils$.inLock(Utils.scala:538)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1107)
> > > > at
> > > >
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> > > > at
> > > >
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> > > > at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> > > > at
> > > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> > > > at
> > > >
> > >
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> > > > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> > > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1107)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1086)
> > > > at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
> > > > at
> > > >
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
> > > > at
> > > 

Re: Kafka latency measures

2014-06-18 Thread Neha Narkhede
what are the latency numbers you observed, avg as well as worst case? Here
is a blog that we did recently which should reflect latest performance
metrics for latency -
http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines


On Wed, Jun 18, 2014 at 11:01 AM, Supun Kamburugamuva 
wrote:

> I've found this performance test.
>
> http://blog.liveramp.com/2013/04/08/kafka-0-8-producer-performance-2/
>
> This performance test has mentioned about the same issue at the end.
>
> Thanks,
> Supun..
>
>
> On Wed, Jun 18, 2014 at 12:43 PM, Supun Kamburugamuva 
> wrote:
>
> > The spikes happens without any correlation with the
>  log.flush.interval.message.
> > They happen more frequently.
> >
> > I'm using the latest version. I'm sending the messages to Kafka, then
> > there is a message receiver, it sends the same messages back through
> kafka
> > to original sender. The round trip latency is measured.
> >
> > Thanks,
> > Supun..
> >
> >
> > On Wed, Jun 18, 2014 at 12:02 PM, Neha Narkhede  >
> > wrote:
> >
> >> Which version of Kafka did you use? When you say latency, do you mean
> the
> >> latency between the producer and consumer? If so, are you using a
> >> timestamp
> >> within the message to compute this latency?
> >>
> >>
> >> On Wed, Jun 18, 2014 at 8:15 AM, Magnus Edenhill 
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > do these spikes happen to correlate with log.flush.interval.messages
> or
> >> > log.flush.interval.ms?
> >> > If so it's the file system sync blockage you are seeing.
> >> >
> >> > /Magnus
> >> >
> >> >
> >> > 2014-06-18 16:31 GMT+02:00 Supun Kamburugamuva :
> >> >
> >> > > Hi,
> >> > >
> >> > > We are trying to evaluate Kafka for a real time application. We are
> >> > sending
> >> > > 50 Kb messages at a fixed rate. The normal messages have a
> reasonable
> >> > > latency. But then there are these outliers that takes unpredictable
> >> > amount
> >> > > of time. This causes the average latency to increase dramatically.
> We
> >> are
> >> > > running with basically the default configuration. Any suggestions
> for
> >> > > improving the latency?
> >> > >
> >> > > Thanks in advance,
> >> > > Supun..
> >> > >
> >> > > --
> >> > > Supun Kamburugamuva
> >> > > Member, Apache Software Foundation; http://www.apache.org
> >> > > E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> >> > > Blog: http://supunk.blogspot.com
> >> > >
> >> >
> >>
> >
> >
> >
> > --
> > Supun Kamburugamuva
> > Member, Apache Software Foundation; http://www.apache.org
> > E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> > Blog: http://supunk.blogspot.com
> >
> >
>
>
> --
> Supun Kamburugamuva
> Member, Apache Software Foundation; http://www.apache.org
> E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> Blog: http://supunk.blogspot.com
>


Re: Kafka latency measures

2014-06-18 Thread Supun Kamburugamuva
My machine configuration is not very high. The average one way latency we
observe is around 10 ~ 15 ms for 50k messages. The outliers doesn't occur
for small messages. For small messages we observe around 6 ms latency.

Thanks,
Supun..


On Wed, Jun 18, 2014 at 10:18 PM, Neha Narkhede 
wrote:

> what are the latency numbers you observed, avg as well as worst case? Here
> is a blog that we did recently which should reflect latest performance
> metrics for latency -
>
> http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>
>
> On Wed, Jun 18, 2014 at 11:01 AM, Supun Kamburugamuva 
> wrote:
>
> > I've found this performance test.
> >
> > http://blog.liveramp.com/2013/04/08/kafka-0-8-producer-performance-2/
> >
> > This performance test has mentioned about the same issue at the end.
> >
> > Thanks,
> > Supun..
> >
> >
> > On Wed, Jun 18, 2014 at 12:43 PM, Supun Kamburugamuva  >
> > wrote:
> >
> > > The spikes happens without any correlation with the
> >  log.flush.interval.message.
> > > They happen more frequently.
> > >
> > > I'm using the latest version. I'm sending the messages to Kafka, then
> > > there is a message receiver, it sends the same messages back through
> > kafka
> > > to original sender. The round trip latency is measured.
> > >
> > > Thanks,
> > > Supun..
> > >
> > >
> > > On Wed, Jun 18, 2014 at 12:02 PM, Neha Narkhede <
> neha.narkh...@gmail.com
> > >
> > > wrote:
> > >
> > >> Which version of Kafka did you use? When you say latency, do you mean
> > the
> > >> latency between the producer and consumer? If so, are you using a
> > >> timestamp
> > >> within the message to compute this latency?
> > >>
> > >>
> > >> On Wed, Jun 18, 2014 at 8:15 AM, Magnus Edenhill 
> > >> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > do these spikes happen to correlate with log.flush.interval.messages
> > or
> > >> > log.flush.interval.ms?
> > >> > If so it's the file system sync blockage you are seeing.
> > >> >
> > >> > /Magnus
> > >> >
> > >> >
> > >> > 2014-06-18 16:31 GMT+02:00 Supun Kamburugamuva :
> > >> >
> > >> > > Hi,
> > >> > >
> > >> > > We are trying to evaluate Kafka for a real time application. We
> are
> > >> > sending
> > >> > > 50 Kb messages at a fixed rate. The normal messages have a
> > reasonable
> > >> > > latency. But then there are these outliers that takes
> unpredictable
> > >> > amount
> > >> > > of time. This causes the average latency to increase dramatically.
> > We
> > >> are
> > >> > > running with basically the default configuration. Any suggestions
> > for
> > >> > > improving the latency?
> > >> > >
> > >> > > Thanks in advance,
> > >> > > Supun..
> > >> > >
> > >> > > --
> > >> > > Supun Kamburugamuva
> > >> > > Member, Apache Software Foundation; http://www.apache.org
> > >> > > E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> > >> > > Blog: http://supunk.blogspot.com
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > Supun Kamburugamuva
> > > Member, Apache Software Foundation; http://www.apache.org
> > > E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> > > Blog: http://supunk.blogspot.com
> > >
> > >
> >
> >
> > --
> > Supun Kamburugamuva
> > Member, Apache Software Foundation; http://www.apache.org
> > E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> > Blog: http://supunk.blogspot.com
> >
>



-- 
Supun Kamburugamuva
Member, Apache Software Foundation; http://www.apache.org
E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
Blog: http://supunk.blogspot.com


Re: Reliable Message Commits

2014-06-18 Thread Jagbir
Hi Kyle, 

Thanks for the update.  Wondering if you found answer to your N-1 commit 
question? If auto commit happens only at iterator.next () and onky for the N -1 
message then client code can be much simpler and reliable as you mentioned. I'm 
also looking forward to any post in this regard. 

Jagbir 

On June 18, 2014 3:17:25 PM PDT, Kyle Banker  wrote:
>I think I've discovered the answer to my second question: according to
>the
>code in ZookeeperConsumerConnector.scala, a rebalance derives its
>offsets
>from what's already in Zookeeper. Therefore, uncommitted but consumed
>messages from a given partition will be replayed when the partition is
>reassigned.
>
>
>On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker 
>wrote:
>
>> I'm using Kafka 0.8.1.1.
>>
>> I have a simple goal: use the high-level consumer to consume a
>message
>> from Kafka, publish the message to a different system, and then
>commit the
>> message in Kafka. Based on my reading of the docs and the mailing
>list, it
>> seems like this isn't so easy to achieve. Here is my current
>understanding:
>>
>> First, I have to disable auto-commit. If the consumer automatically
>> commits, then I may lose messages if, for example, my process dies
>after
>> consuming but before publishing my message.
>>
>> Next, if my app is multi-threaded, I need to either
>>
>> a) use a separate consumer per thread (memory-intensive, hard on
>> Zookeeper) or
>> b) use a single consumer and assign a KafkaStream to each thread.
>Then,
>> when I want to commit, first synchronize all threads using a barrier.
>>
>> First question: is this correct so far?
>>
>>
>> Still, it appears that rebalancing may be a problem. In particular,
>this
>> sequence of events:
>>
>> 1. I'm consuming from a stream tied to two partitions, A and B.
>> 2. I consume a message, M, from partition A.
>> 3. Partition A gets assigned to a different consumer.
>> 4. I choose not to commit M or my process fails.
>>
>> Second question: When the partition is reassigned, will the message
>that I
>> consumed be automatically committed? If so, then there's no way to
>get the
>> reliability I want.
>>
>>
>> Third question: How do the folks at LinkedIn handle this overall use
>case?
>> What about other users?
>>
>> It seems to me that a lot of the complexity here could be easily
>addressed
>> by changing the way in which a partition's message pointer is
>advanced.
>> That is, when I consume message M, advance the pointer to message (M
>- 1)
>> rather than to M. In other words, calling iterator.next() would imply
>that
>> the previously consumed message may be safely committed. If this were
>the
>> case, I could simply enable auto-commit and be happy.
>>

-- 
Sent from my Android phone with K-9 Mail. Please excuse my brevity.

Re: SimpleConsumer kafka.api.OffsetRequest.LatestTime() not working ?

2014-06-18 Thread rafeeq s
Thanks to all.

LatestTime() is working as expected.



Regards,

Rafeeq S
*(“What you do is what matters, not what you think or say or plan.” )*



On Wed, Jun 18, 2014 at 8:40 PM, Supun Kamburugamuva 
wrote:

> Hi Refeeq,
>
> I'm using the LatestTime() and it is working.
>
> Thanks,
> Supun..
>
>
> On Wed, Jun 18, 2014 at 12:07 PM, Neha Narkhede 
> wrote:
>
> > Is that because you have no more data produced to the topic/partition
> after
> > the last offset returned by OffsetRequest.LatestTime() ?
> >
> >
> > On Wed, Jun 18, 2014 at 6:19 AM, rafeeq s 
> wrote:
> >
> > > Simple Consumer is *not processing* messages when it's offset in
> > > kafka.api.OffsetRequest.*LatestTime()* .
> > >
> > > If i use *EarlierTime()  *offset method it is processing from start,
> > which
> > > is as expected.
> > >
> > > But when I try for *LatestTime()* offset, no reaction from simple
> > consumer
> > > and I am referring below simpleconsumer example:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > >
> > >
> > > Regards,
> > > Rafeeq S
> > > *(“What you do is what matters, not what you think or say or plan.” )*
> > >
> >
>
>
>
> --
> Supun Kamburugamuva
> Member, Apache Software Foundation; http://www.apache.org
> E-mail: supu...@gmail.com;  Mobile: +1 812 369 6762
> Blog: http://supunk.blogspot.com
>