Re: Why fetching meta-data for topic is done three times?

2015-04-29 Thread Madhukar Bharti
Hi Zakee,

>message.send.max.retries is 1

Regards,
Madhukar

On Tue, Apr 28, 2015 at 6:17 PM, Madhukar Bharti 
wrote:

> Hi Zakee,
>
> Thanks for your reply.
>
> >message.send.max.retries
> 3
>
> >retry.backoff.ms
> 100
>
> >topic.metadata.refresh.interval.ms
> 600*1000
>
> This is my properties.
>
> Regards,
> Madhukar
>
> On Tue, Apr 28, 2015 at 3:26 AM, Zakee  wrote:
>
>> What values do you have for below properties? Or are these set to
>> defaults?
>>
>> message.send.max.retries
>> retry.backoff.ms
>> topic.metadata.refresh.interval.ms
>>
>> Thanks
>> Zakee
>>
>>
>>
>> > On Apr 23, 2015, at 11:48 PM, Madhukar Bharti 
>> wrote:
>> >
>> > Hi All,
>> >
>> > Once gone through code found that, While Producer starts it does three
>> > things:
>> >
>> > 1. Sends Meta-data request
>> > 2. Send message to broker(fetching broker list)
>> > 3. If number of message to be produce is grater than 0 then again tries
>> to
>> > refresh metadata for outstanding produce requests.
>> >
>> > Each of the request takes configured timeout and go to next and finally
>> > once all is done then it will throw Exception(if 3 also fails).
>> >
>> > Here the problem is, if we set timeout as 1 sec then to throw an
>> exception
>> > It takes 3 sec, so user request will be hanged up till 3 sec, that is
>> > comparatively high for response time and if all threads will be blocked
>> due
>> > to producer send then whole application will be blocked for 3 sec. So we
>> > want to reduce this time to 1 sec. in overall to throw Exception.
>> >
>> > What is the possible way to do this?
>> >
>> > Thanks
>> > Madhukar
>> >
>> > On Thu, Apr 16, 2015 at 8:10 PM, Madhukar Bharti <
>> bhartimadhu...@gmail.com>
>> > wrote:
>> >
>> >> Hi All,
>> >>
>> >> I came across a problem, If we use broker IP which is not reachable or
>> out
>> >> of network. Then it takes more than configured time(request.timeout.ms
>> ).
>> >> After checking the log got to know that it is trying to fetch topic
>> >> meta-data three times by changing correlation id.
>> >> Due to this even though i keep (request.timeout.ms=1000) It takes 3
>> sec
>> >> to throw Exception. I am using Kafka0.8.1.1 with patch
>> >>
>> https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch
>> >>
>> >>
>> >> I have attached the log. Please check this and clarify why it is
>> behaving
>> >> like this. Whether it is by design or have to set some other property
>> also.
>> >>
>> >>
>> >>
>> >> Regards
>> >> Madhukar
>> >>
>> >>
>> >>
>> > 
>> > Want to place your ad here?
>> > Advertise on United Online
>> >
>> http://thirdpartyoffers.netzero.net/TGL3255/5539ed87d69846d871dafmp08duc
>>
>>
>
>


Horizontally Scaling Kafka Consumers

2015-04-29 Thread Nimi Wariboko Jr
Hi,

I was wondering what options there are for horizontally scaling kafka
consumers? Basically if I have 100 partitions and 10 consumers, and want to
temporarily scale up to 50 consumers, what options do I have?

So far I've thought of just simply tracking consumer membership somehow
(either through Raft or zookeeper's znodes) on the consumers.


Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread David Corley
If the 100 partitions are all for the same topic, you can have up to 100
consumers working as part of a single consumer group for that topic.
You cannot have more consumers than there are partitions within a given
consumer group.

On 29 April 2015 at 08:41, Nimi Wariboko Jr  wrote:

> Hi,
>
> I was wondering what options there are for horizontally scaling kafka
> consumers? Basically if I have 100 partitions and 10 consumers, and want to
> temporarily scale up to 50 consumers, what options do I have?
>
> So far I've thought of just simply tracking consumer membership somehow
> (either through Raft or zookeeper's znodes) on the consumers.
>


Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread Stevo Slavić
Please correct me if wrong, but I think it is really not hard constraint
that one cannot have more consumers (from same group) than partitions on
single topic - all the surplus consumers will not be assigned to consume
any partition, but they can be there and as soon as one active consumer
from same group goes offline (its connection to ZK is dropped), consumers
from the group will be rebalanced so one passively waiting consumer will
become active.

Kind regards,
Stevo Slavic.

On Wed, Apr 29, 2015 at 2:25 PM, David Corley  wrote:

> If the 100 partitions are all for the same topic, you can have up to 100
> consumers working as part of a single consumer group for that topic.
> You cannot have more consumers than there are partitions within a given
> consumer group.
>
> On 29 April 2015 at 08:41, Nimi Wariboko Jr  wrote:
>
> > Hi,
> >
> > I was wondering what options there are for horizontally scaling kafka
> > consumers? Basically if I have 100 partitions and 10 consumers, and want
> to
> > temporarily scale up to 50 consumers, what options do I have?
> >
> > So far I've thought of just simply tracking consumer membership somehow
> > (either through Raft or zookeeper's znodes) on the consumers.
> >
>


Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread David Corley
You're right Stevo, I should re-phrase to say that there can be no more
_active_ consumers than there are partitions (within a single consumer
group).
I'm guessing that's what Nimi is alluding to asking, but perhaps he can
elaborate on whether he's using consumer groups and/or whether the 100
partitions are all for a single topic, or multiple topics.

On 29 April 2015 at 13:38, Stevo Slavić  wrote:

> Please correct me if wrong, but I think it is really not hard constraint
> that one cannot have more consumers (from same group) than partitions on
> single topic - all the surplus consumers will not be assigned to consume
> any partition, but they can be there and as soon as one active consumer
> from same group goes offline (its connection to ZK is dropped), consumers
> from the group will be rebalanced so one passively waiting consumer will
> become active.
>
> Kind regards,
> Stevo Slavic.
>
> On Wed, Apr 29, 2015 at 2:25 PM, David Corley 
> wrote:
>
> > If the 100 partitions are all for the same topic, you can have up to 100
> > consumers working as part of a single consumer group for that topic.
> > You cannot have more consumers than there are partitions within a given
> > consumer group.
> >
> > On 29 April 2015 at 08:41, Nimi Wariboko Jr 
> wrote:
> >
> > > Hi,
> > >
> > > I was wondering what options there are for horizontally scaling kafka
> > > consumers? Basically if I have 100 partitions and 10 consumers, and
> want
> > to
> > > temporarily scale up to 50 consumers, what options do I have?
> > >
> > > So far I've thought of just simply tracking consumer membership somehow
> > > (either through Raft or zookeeper's znodes) on the consumers.
> > >
> >
>


MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread christopher palm
Hi All,

I am trying to get a multi threaded HL consumer working against a 2 broker
Kafka cluster with a 4 partition 2 replica  topic.

The consumer code is set to run with 4 threads, one for each partition.

The producer code uses the default partitioner and loops indefinitely
feeding events into the topic.(I excluded the while loop in the paste below)

What I see is the threads eventually all exit, even thought the producer is
still sending events into the topic.

My understanding is that the consumer thread per partition is the correct
setup.

Any ideas why this code doesn't continue to consume events at they are
pushed to the topic?

I suspect I am configuring something wrong here, but am not sure what.

Thanks,

Chris


*T**opic Configuration*

Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:

Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

*Producer Code:*

 Properties props = new Properties();

props.put("metadata.broker.list", args[0]);

props.put("zk.connect", args[1]);

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("request.required.acks", "1");

String TOPIC = args[2];

ProducerConfig config = new ProducerConfig(props);

Producer producer = new Producer(
config);

finalEvent = new Timestamp(new Date().getTime()) + "|"

+ truckIds[0] + "|" + driverIds[0] + "|" + events[random
.nextInt(evtCnt)]

+ "|" + getLatLong(arrayroute17[i]);

try {

KeyedMessage data = new
KeyedMessage(TOPIC, finalEvent);

LOG.info("Sending Messge #: " + routeName[0] + ": " + i +",
msg:" + finalEvent);

producer.send(data);

Thread.sleep(1000);

} catch (Exception e) {

e.printStackTrace();

}


*Consumer Code:*

public class ConsumerTest implements Runnable {

   private KafkaStream m_stream;

   private int m_threadNumber;

   public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

   m_threadNumber = a_threadNumber;

   m_stream = a_stream;

   }

   public void run() {

   ConsumerIterator it = m_stream.iterator();

   while (it.hasNext()){

   System.out.println("Thread " + m_threadNumber + ": " + new
String(it.next().message()));

   try {

 Thread.sleep(1000);

}catch (InterruptedException e) {

 e.printStackTrace();

 }

   }

   System.out.println("Shutting down Thread: " + m_threadNumber);

   }

}

public class ConsumerGroupExample {

private final ConsumerConnector consumer;

private final String topic;

private  ExecutorService executor;



public ConsumerGroupExample(String a_zookeeper, String a_groupId,
String a_topic) {

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig(a_zookeeper, a_groupId));

this.topic = a_topic;

}



public void shutdown() {

if (consumer != null) consumer.shutdown();

if (executor != null) executor.shutdown();

try {

if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

System.out.println("Timed out waiting for consumer threads
to shut down, exiting uncleanly");

}

} catch (InterruptedException e) {

System.out.println("Interrupted during shutdown, exiting
uncleanly");

}

   }



public void run(int a_numThreads) {

Map topicCountMap = new HashMap();

topicCountMap.put(topic, new Integer(a_numThreads));

Map>> consumerMap =
consumer.createMessageStreams(topicCountMap);

List> streams = consumerMap.get(topic);

executor = Executors.newFixedThreadPool(a_numThreads);

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.submit(new ConsumerTest(stream, threadNumber));

threadNumber++;

}

}



private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {

Properties props = new Properties();

props.put("zookeeper.connect", a_zookeeper);

props.put("group.id", a_groupId);

props.put("zookeeper.session.timeout.ms", "400");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

props.put("consumer.timeout.ms", "-1");

 return new ConsumerConfig(props);

}



public static void main(String[] args) {

String zooKeeper = args[0];

String groupId = args[1];

String topic = args[2];

int threads = Integer.parseInt(

Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread tao xiao
example.shutdown(); in ConsumerGroupExample closes all consumer connections
to Kafka. remove this line the consumer threads will run forever

On Wed, Apr 29, 2015 at 9:42 PM, christopher palm  wrote:

> Hi All,
>
> I am trying to get a multi threaded HL consumer working against a 2 broker
> Kafka cluster with a 4 partition 2 replica  topic.
>
> The consumer code is set to run with 4 threads, one for each partition.
>
> The producer code uses the default partitioner and loops indefinitely
> feeding events into the topic.(I excluded the while loop in the paste
> below)
>
> What I see is the threads eventually all exit, even thought the producer is
> still sending events into the topic.
>
> My understanding is that the consumer thread per partition is the correct
> setup.
>
> Any ideas why this code doesn't continue to consume events at they are
> pushed to the topic?
>
> I suspect I am configuring something wrong here, but am not sure what.
>
> Thanks,
>
> Chris
>
>
> *T**opic Configuration*
>
> Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
>
> Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
>
> Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
>
> Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
>
>  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
>
> *Producer Code:*
>
>  Properties props = new Properties();
>
> props.put("metadata.broker.list", args[0]);
>
> props.put("zk.connect", args[1]);
>
> props.put("serializer.class", "kafka.serializer.StringEncoder");
>
> props.put("request.required.acks", "1");
>
> String TOPIC = args[2];
>
> ProducerConfig config = new ProducerConfig(props);
>
> Producer producer = new Producer(
> config);
>
> finalEvent = new Timestamp(new Date().getTime()) + "|"
>
> + truckIds[0] + "|" + driverIds[0] + "|" +
> events[random
> .nextInt(evtCnt)]
>
> + "|" + getLatLong(arrayroute17[i]);
>
> try {
>
> KeyedMessage data = new
> KeyedMessage(TOPIC, finalEvent);
>
> LOG.info("Sending Messge #: " + routeName[0] + ": " + i +",
> msg:" + finalEvent);
>
> producer.send(data);
>
> Thread.sleep(1000);
>
> } catch (Exception e) {
>
> e.printStackTrace();
>
> }
>
>
> *Consumer Code:*
>
> public class ConsumerTest implements Runnable {
>
>private KafkaStream m_stream;
>
>private int m_threadNumber;
>
>public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
>
>m_threadNumber = a_threadNumber;
>
>m_stream = a_stream;
>
>}
>
>public void run() {
>
>ConsumerIterator it = m_stream.iterator();
>
>while (it.hasNext()){
>
>System.out.println("Thread " + m_threadNumber + ": " + new
> String(it.next().message()));
>
>try {
>
>  Thread.sleep(1000);
>
> }catch (InterruptedException e) {
>
>  e.printStackTrace();
>
>  }
>
>}
>
>System.out.println("Shutting down Thread: " + m_threadNumber);
>
>}
>
> }
>
> public class ConsumerGroupExample {
>
> private final ConsumerConnector consumer;
>
> private final String topic;
>
> private  ExecutorService executor;
>
>
>
> public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> String a_topic) {
>
> consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
>
> createConsumerConfig(a_zookeeper, a_groupId));
>
> this.topic = a_topic;
>
> }
>
>
>
> public void shutdown() {
>
> if (consumer != null) consumer.shutdown();
>
> if (executor != null) executor.shutdown();
>
> try {
>
> if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
>
> System.out.println("Timed out waiting for consumer threads
> to shut down, exiting uncleanly");
>
> }
>
> } catch (InterruptedException e) {
>
> System.out.println("Interrupted during shutdown, exiting
> uncleanly");
>
> }
>
>}
>
>
>
> public void run(int a_numThreads) {
>
> Map topicCountMap = new HashMap Integer>();
>
> topicCountMap.put(topic, new Integer(a_numThreads));
>
> Map>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
> List> streams = consumerMap.get(topic);
>
> executor = Executors.newFixedThreadPool(a_numThreads);
>
> int threadNumber = 0;
>
> for (final KafkaStream stream : streams) {
>
> executor.submit(new ConsumerTest(stream, threadNumber));
>
> threadNumber++;
>
> }
>
> }
>
>
>
> private static ConsumerConfig createConsumerConfig(String a_zookeeper,
> String a_groupId) {
>
> Properties props = new Properties();
>
> props.put("zookeepe

Re: zookeeper restart fatal error

2015-04-29 Thread David Corley
Unfortunately sounds like a Zookeeper data corruption issue on the node in
question:
https://issues.apache.org/jira/browse/ZOOKEEPER-1546

The fix from the Jira is to clean out the Zookeeper data on the affected
node (if that's possible)

On 28 April 2015 at 16:59, Emley, Andrew  wrote:

> Hi
>
> I have had zk and kafka(2_8.0-0.8.1) set up nicely running for a week or
> so, I decided to stop the zk and the kafka brokers and re-start them, since
> stopping zk I can't start it again!! It gives me this fatal exception that
> is related to one of my test topics "multinode1partition4reptopic"!?
>
> Can anyone give me any pointers on how to resolve?
>
> Many thanks
> andy
>
>
> [2015-04-28 16:31:05,282] ERROR Failed to increment parent cversion for:
> /consumers/console-consumer-22432/offsets/multinode1partition4reptopic
> (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> NoNode for
> /consumers/console-consumer-22432/offsets/multinode1partition4reptopic
> at
> org.apache.zookeeper.server.DataTree.incrementCversion(DataTree.java:1218)
> at
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.processTransaction(FileTxnSnapLog.java:222)
> at
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:150)
> at
> org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
> at
> org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:239)
> at
> org.apache.zookeeper.server.ZooKeeperServer.startdata(ZooKeeperServer.java:366)
> at
> org.apache.zookeeper.server.NIOServerCnxn$Factory.startup(NIOServerCnxn.java:160)
> at
> org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:110)
> at
> org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:85)
> at
> org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:51)
> at
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:108)
> at
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
> [2015-04-28 16:31:05,289] FATAL Unexpected exception, exiting abnormally
> (org.apache.zookeeper.server.ZooKeeperServerMain)
> java.io.IOException: Failed to process transaction type: 2 error:
> KeeperErrorCode = NoNode for
> /consumers/console-consumer-22432/offsets/multinode1partition4reptopic
> at
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:152)
> at
> org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
> at
> org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:239)
> at
> org.apache.zookeeper.server.ZooKeeperServer.startdata(ZooKeeperServer.java:366)
> at
> org.apache.zookeeper.server.NIOServerCnxn$Factory.startup(NIOServerCnxn.java:160)
> at
> org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:110)
> at
> org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:85)
> at
> org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:51)
> at
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:108)
> at
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
>
>
>
>
>


Re: Kafka client - 0.9

2015-04-29 Thread Bharath Srinivasan
Any pointers on this feature?

Thanks.

On Thu, Apr 23, 2015 at 9:57 PM, Bharath Srinivasan 
wrote:

> Thanks Gwen.
>
> I'm specifically looking for the consumer rewrite API (
> org.apache.kafka.clients.consumer.KafkaConsumer). Based on the wiki, this
> feature is available only in 0.9.
>
> The specific use case is that, I wanted to use the high level consumer but
> with the ability to rollback the offset in case of any exceptions. Based on
> the documentation, it seems like the current high level consumer API does
> not seem to be supporting it, atleast not in a straight forward fashion.
>
> Appreciate any alternate solutions.
>
> On Thu, Apr 23, 2015 at 8:08 PM, Gwen Shapira 
> wrote:
>
>> We don't normally plan dates for releases, when we are done with
>> features we want in the next release and happy with quality, we
>> release. Many Apache communities are like that.
>>
>> If you need firmer roadmaps and specific release dates, there are few
>> vendors selling Kafka distributions and support.
>>
>> Are there any specific features you are waiting for?
>>
>> Gwen
>>
>> On Thu, Apr 23, 2015 at 2:25 PM, Bharath Srinivasan
>>  wrote:
>> > Hi,
>> >
>> > I'm looking for the 0.9 client release plan.
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
>> >
>> > Is there a planned date for this release?
>> >
>> > Thanks,
>> > Bharath
>>
>
>


Re: Kafka client - 0.9

2015-04-29 Thread Gwen Shapira
In current high-level consumer, you can still manually control when you
commit offsets (see this blog for details:
http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/
)

While you can't explicitly roll-back a commit, you can simply avoid
committing when you have an exception (its a tiny bit more complex than
that - because the iterator stores a buffer and location in the buffer...
so you need to maintain your own collection of events to retry this is
also explained in the blog above).

Hope this helps.

Gwen

On Wed, Apr 29, 2015 at 9:50 AM, Bharath Srinivasan 
wrote:

> Any pointers on this feature?
>
> Thanks.
>
> On Thu, Apr 23, 2015 at 9:57 PM, Bharath Srinivasan 
> wrote:
>
> > Thanks Gwen.
> >
> > I'm specifically looking for the consumer rewrite API (
> > org.apache.kafka.clients.consumer.KafkaConsumer). Based on the wiki, this
> > feature is available only in 0.9.
> >
> > The specific use case is that, I wanted to use the high level consumer
> but
> > with the ability to rollback the offset in case of any exceptions. Based
> on
> > the documentation, it seems like the current high level consumer API does
> > not seem to be supporting it, atleast not in a straight forward fashion.
> >
> > Appreciate any alternate solutions.
> >
> > On Thu, Apr 23, 2015 at 8:08 PM, Gwen Shapira 
> > wrote:
> >
> >> We don't normally plan dates for releases, when we are done with
> >> features we want in the next release and happy with quality, we
> >> release. Many Apache communities are like that.
> >>
> >> If you need firmer roadmaps and specific release dates, there are few
> >> vendors selling Kafka distributions and support.
> >>
> >> Are there any specific features you are waiting for?
> >>
> >> Gwen
> >>
> >> On Thu, Apr 23, 2015 at 2:25 PM, Bharath Srinivasan
> >>  wrote:
> >> > Hi,
> >> >
> >> > I'm looking for the 0.9 client release plan.
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> >> >
> >> > Is there a planned date for this release?
> >> >
> >> > Thanks,
> >> > Bharath
> >>
> >
> >
>


Consuming keyed messages with null value

2015-04-29 Thread Warren Henning
I have an application producing Avro-encoded keyed messages (Martin
Kleppmann's new Bottled Water project).

It encodes a delete as a keyed message with an id as a key and a null
payload. I have log compaction turned on.

The Avro console consumer correctly displays this as "null" in my terminal,
but when I try to consume it using the high-level consumer in Java, the
message is never consumed. Subsequent non-null messages that were produced
after that null also aren't consumed.

Do I need to do something in order to have the iterator's hasNext() method
(my code is pretty much exactly what appears in
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example )
not treat a null value as not being a valid message to consume or
something? Or am I misunderstanding what's going on and need to do
something different?

Thanks!


Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread christopher palm
Commenting out Example shutdown did not seem to make a difference, I added
the print statement below to highlight the fact.

The other threads still shut down, and only one thread lives on, eventually
that dies after a few minutes as well

Could this be that the producer default partitioner is isn't balancing data
across all partitions?

Thanks,
Chris

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
scheduler

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping leader finder thread

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping all fetchers

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-consumergroup], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-] All connections stopped

15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
thread.

Shutting down Thread: 2

Shutting down Thread: 1

Shutting down Thread: 3

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], ZKConsumerConnector shut down completed

Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
distance|-73.99021500035|40.6636611

15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], stopping watcher executor thread for consumer consumergroup

Thread 0: 2015-04-29
12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009

On Wed, Apr 29, 2015 at 10:11 AM, tao xiao  wrote:

> example.shutdown(); in ConsumerGroupExample closes all consumer connections
> to Kafka. remove this line the consumer threads will run forever
>
> On Wed, Apr 29, 2015 at 9:42 PM, christopher palm 
> wrote:
>
> > Hi All,
> >
> > I am trying to get a multi threaded HL consumer working against a 2
> broker
> > Kafka cluster with a 4 partition 2 replica  topic.
> >
> > The consumer code is set to run with 4 threads, one for each partition.
> >
> > The producer code uses the default partitioner and loops indefinitely
> > feeding events into the topic.(I excluded the while loop in the paste
> > below)
> >
> > What I see is the threads eventually all exit, even thought the producer
> is
> > still sending events into the topic.
> >
> > My understanding is that the consumer thread per partition is the correct
> > setup.
> >
> > Any ideas why this code doesn't continue to consume events at they are
> > pushed to the topic?
> >
> > I suspect I am configuring something wrong here, but am not sure what.
> >
> > Thanks,
> >
> > Chris
> >
> >
> > *T**opic Configuration*
> >
> > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
> >
> > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
> >
> >  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
> >
> > *Producer Code:*
> >
> >  Properties props = new Properties();
> >
> > props.put("metadata.broker.list", args[0]);
> >
> > props.put("zk.connect", args[1]);
> >
> > props.put("serializer.class", "kafka.serializer.StringEncoder");
> >
> > props.put("request.required.acks", "1");
> >
> > String TOPIC = args[2];
> >
> > ProducerConfig config = new ProducerConfig(props);
> >
> > Producer producer = new Producer(
> > config);
> >
> > finalEvent = new Timestamp(new Date().getTime()) + "|"
> >
> > + truckIds[0] + "|" + driverIds[0] + "|" +
> > events[random
> > .nextInt(evtCnt)]
> >
> > + "|" + getLatLong(arrayroute17[i]);
> >
> > try {
> >
> > KeyedMessage data = new
> > KeyedMessage(TOPIC, finalEvent);
> >
> > LOG.info("Sending Messge #: " + routeName[0] + ": " + i
> +",
> > msg:" + finalEvent);
> >
> > producer.send(data);
> >
> > Thread.sleep(1000);
> >
> > } catch (Exception e) {
> >
> > e.printStackTrace();
> >
> > }
> >
> >
> > *Consumer Code:*
> >
> > public class Consume

Re: Kafka 0.8.2 consumer offset checker throwing kafka.common.NotCoordinatorForConsumerException

2015-04-29 Thread Kartheek Karra
Update here, we resolved this by deleting the kafka-data directory on that
host ( which had file inconsistencies from 'fsck' run log last week in the
kafka-data dir ) and restarting kafka. Note we also never reimaged the host
( that was another host, which we got confused over ).

Thanks,
Kartheek

On Tue, Apr 28, 2015 at 6:36 PM, Kartheek Karra 
wrote:

> We recently upgraded kafka in our production environment cluster of 5
> brokers from 0.8.0 to 0.8.2. Since then the consumerOffsetChecker script is
> unable to fetch offset due to
> kafka.common.NotCoordinatorForConsumerException.
> Note I'm able to run the 'consumerOffsetChecker' from an older version
> 0.8.0 successfully without any exceptions against the same upgraded
> cluster. Also we haven't migrated to kafka for offsets and are still using
> the default zookeeper. Another piece of information here is that kafka is
> always picking the same host as coordinator for fetching offset and we had
> to reimage that host just after the upgrade. Haven't been able to reproduce
> this in our test environments yet.
> Any clue what might be wrong here ? Let me know if more details are needed
> anywhere.
>
>
> Thanks,
> Kartheek
>


Horizontally Scaling Kafka Consumers

2015-04-29 Thread Nimi Wariboko Jr
Hi,

I was wondering what options there are/what other people are doing for
horizontally scaling kafka consumers? Basically if I have 100 partitions
and 10 consumers, and want to temporarily scale up to 50 consumers, what
can I do?

So far I've thought of just simply tracking consumer membership somehow
(either through zookeeper's ephemeral nodes or maybe using gossip) on the
consumers to achieve consensus on who consumes what. Another option would
be having a router, possibly using something like nsq (I understand that
they are similar pieces of software, but what we are going for is a
persistent distributed queue (sharding) which is why I'm looking into
Kafka)?


Kafka offset using kafka topic - not consuming messages

2015-04-29 Thread Gomathivinayagam Muthuvinayagam
I am using Kafka 0.8.2 and I am using Kafka based storage for offset.
Whenever I restart a consumer (high level consumer api) it is not consuming
messages whichever were posted when the consumer was down.

I am using the following consumer properties

Properties props = new Properties();

props.put("zookeeper.connect", zooKeeper);

props.put("group.id", consumerName);

props.put("zookeeper.session.timeout.ms", "6000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.enable", "false");

props.put("offsets.storage", "kafka");

props.put("dual.commit.enabled", "false");

props.put("auto.offset.reset", "largest");


My offset manager is here
https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why the
consumer is behaving weird. Please share any updates if you have.



Thanks & Regards,


Re: Kafka offset using kafka topic - not consuming messages

2015-04-29 Thread Jiangjie Qin
OK, so you turned off auto.offset.commit, and set the auto.offset.reset to
largest.

That means when you consume,
1. If you did not commit offsets manually, no offsets will be committed to
Kafka.
2. If you do not have an offset stored in Kafka, you will start from the
log end and ignore the existing messages in the topic.

Another thing you want to check is that are you using the group Id all the
time?

Jiangjie (Becket) Qin

On 4/29/15, 3:17 PM, "Gomathivinayagam Muthuvinayagam"
 wrote:

>I am using Kafka 0.8.2 and I am using Kafka based storage for offset.
>Whenever I restart a consumer (high level consumer api) it is not
>consuming
>messages whichever were posted when the consumer was down.
>
>I am using the following consumer properties
>
>Properties props = new Properties();
>
>props.put("zookeeper.connect", zooKeeper);
>
>props.put("group.id", consumerName);
>
>props.put("zookeeper.session.timeout.ms", "6000");
>
>props.put("zookeeper.sync.time.ms", "200");
>
>props.put("auto.commit.enable", "false");
>
>props.put("offsets.storage", "kafka");
>
>props.put("dual.commit.enabled", "false");
>
>props.put("auto.offset.reset", "largest");
>
>
>My offset manager is here
>https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why
>the
>consumer is behaving weird. Please share any updates if you have.
>
>
>
>Thanks & Regards,



Re: Kafka offset using kafka topic - not consuming messages

2015-04-29 Thread Gomathivinayagam Muthuvinayagam
Thank you, I am using the same groupId all the time.

I printed OffsetsMessageFormatter output for the consumer, and the output
is showing as

[async_force_consumers,force_msgs,9]::OffsetAndMetadata[2,associated
metadata,1430277791077]

But If I restart the consumer, it starts consuming messages from offset 1
for partition 9. Even though I have stored the offset as 2. I am not sure
what I am missing here.



Thanks & Regards,



On Wed, Apr 29, 2015 at 5:17 PM, Jiangjie Qin 
wrote:

> OK, so you turned off auto.offset.commit, and set the auto.offset.reset to
> largest.
>
> That means when you consume,
> 1. If you did not commit offsets manually, no offsets will be committed to
> Kafka.
> 2. If you do not have an offset stored in Kafka, you will start from the
> log end and ignore the existing messages in the topic.
>
> Another thing you want to check is that are you using the group Id all the
> time?
>
> Jiangjie (Becket) Qin
>
> On 4/29/15, 3:17 PM, "Gomathivinayagam Muthuvinayagam"
>  wrote:
>
> >I am using Kafka 0.8.2 and I am using Kafka based storage for offset.
> >Whenever I restart a consumer (high level consumer api) it is not
> >consuming
> >messages whichever were posted when the consumer was down.
> >
> >I am using the following consumer properties
> >
> >Properties props = new Properties();
> >
> >props.put("zookeeper.connect", zooKeeper);
> >
> >props.put("group.id", consumerName);
> >
> >props.put("zookeeper.session.timeout.ms", "6000");
> >
> >props.put("zookeeper.sync.time.ms", "200");
> >
> >props.put("auto.commit.enable", "false");
> >
> >props.put("offsets.storage", "kafka");
> >
> >props.put("dual.commit.enabled", "false");
> >
> >props.put("auto.offset.reset", "largest");
> >
> >
> >My offset manager is here
> >https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why
> >the
> >consumer is behaving weird. Please share any updates if you have.
> >
> >
> >
> >Thanks & Regards,
>
>


Re: New Producer API - batched sync mode support

2015-04-29 Thread Gwen Shapira
I'm starting to think that the old adage "If two people say you are drunk,
lie down" applies here :)

Current API seems perfectly clear, useful and logical to everyone who wrote
it... but we are getting multiple users asking for the old batch behavior
back.
One reason to get it back is to make upgrades easier - people won't need to
rethink their existing logic if they get an API with the same behavior in
the new producer. The other reason is what Ewen mentioned earlier - if
everyone re-implements Joel's logic, we can provide something for that.

How about getting the old batch send behavior back by adding a new API with:
public void batchSend(List>)

With this implementation (mixes the old behavior with Joel's snippet):
* send records one by one
* flush
* iterate on futures and "get" them
* log a detailed message on each error
* throw an exception if any send failed.

It reproduces the old behavior - which apparently everyone really liked,
and I don't think it is overly weird. It is very limited, but anyone who
needs more control over his sends already have plenty of options.

Thoughts?

Gwen




On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps  wrote:

> Hey guys,
>
> The locking argument is correct for very small records (< 50 bytes),
> batching will help here because for small records locking becomes the big
> bottleneck. I think these use cases are rare but not unreasonable.
>
> Overall I'd emphasize that the new producer is way faster at virtually all
> use cases. If there is a use case where that isn't true, let's look at it
> in a data driven way by comparing the old producer to the new producer and
> looking for any areas where things got worse.
>
> I suspect the "reducing allocations" argument to be not a big thing. We do
> a number of small per-message allocations and it didn't seem to have much
> impact. I do think there are a couple of big producer memory optimizations
> we could do by reusing the arrays in the accumulator in the serialization
> of the request but I don't think this is one of them.
>
> I'd be skeptical of any api that was too weird--i.e. introduces a new way
> of partitioning, gives back errors on a per-partition rather than per
> message basis (given that partitioning is transparent this is really hard
> to think about), etc. Bad apis end up causing a ton of churn and just don't
> end up being a good long term commitment as we change how the underlying
> code works over time (i.e. we hyper optimize for something then have to
> maintain some super weird api as it becomes hyper unoptimized for the
> client over time).
>
> Roshan--Flush works as you would hope, it blocks on the completion of all
> outstanding requests. Calling get on the future for the request gives you
> the associated error code back. Flush doesn't throw any exceptions because
> waiting for requests to complete doesn't error, the individual requests
> fail or succeed which is always reported with each request.
>
> Ivan--The batches you send in the scala producer today actually aren't
> truely atomic, they just get sent in a single request.
>
> One tricky problem to solve when user's do batching is size limits on
> requests. This can be very hard to manage since predicting the serialized
> size of a bunch of java objects is not always obvious. This was repeatedly
> a problem before.
>
> -Jay
>
> On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov 
> wrote:
>
> > I must agree with @Roshan – it's hard to imagine anything more intuitive
> > and easy to use for atomic batching as old sync batch api. Also, it's
> fast.
> > Coupled with a separate instance of producer per
> > broker:port:topic:partition it works very well. I would be glad if it
> finds
> > its way into new producer api.
> >
> > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> > fetchSize must be set at least as batch bytes (before or after
> > compression), otherwise client risks not getting any messages?
> >
>


Re: Unclaimed partitions

2015-04-29 Thread Dave Hamilton
Hi, would anyone be able to help me with this issue? Thanks.

- Dave



On Tue, Apr 28, 2015 at 1:32 PM -0700, "Dave Hamilton" 
mailto:dhamil...@nanigans.com>> wrote:

1. We’re using version 0.8.1.1.
2. No failures in the consumer logs
3. We’re using the ConsumerOffsetChecker to see what partitions are assigned to 
the consumer group and what their offsets are. 8 of the 12 process each have 
been assigned two partitions and they’re keeping up with the topic. The other 4 
do not get assigned partitions and no consumers in the group are consuming 
those 8 partitions.

Thanks for your help,
Dave



On 4/28/15, 1:40 PM, "Aditya Auradkar"  wrote:

>Couple of questions:
>- What version of the consumer API are you using?
>- Are you seeing any rebalance failures in the consumer logs?
>- How do you determine that some partitions are unassigned? Just confirming 
>that you have partitions that are not being consumed from as opposed to 
>consumer threads that aren't assigned any partitions.
>
>Aditya
>
>
>From: Dave Hamilton [dhamil...@nanigans.com]
>Sent: Tuesday, April 28, 2015 10:19 AM
>To: users@kafka.apache.org
>Subject: Re: Unclaimed partitions
>
>I’m sorry, I forgot to specify that these processes are in the same consumer 
>group.
>
>Thanks,
>Dave
>
>
>
>
>
>On 4/28/15, 1:15 PM, "Aditya Auradkar"  wrote:
>
>>Hi Dave,
>>
>>The simple consumer doesn't do any state management across consumer 
>>instances. So I'm not sure how you are assigning partitions in your 
>>application code. Did you mean to say that you are using the high level 
>>consumer API?
>>
>>Thanks,
>>Aditya
>>
>>
>>From: Dave Hamilton [dhamil...@nanigans.com]
>>Sent: Tuesday, April 28, 2015 7:58 AM
>>To: users@kafka.apache.org
>>Subject: Unclaimed partitions
>>
>>Hi, I am trying to consume a 24-partition topic across 12 processes. Each 
>>process is using the simple consumer API, and each is being assigned two 
>>consumer threads. I have noticed when starting these processes that sometimes 
>>some of my processes are not being assigned any partitions, and no rebalance 
>>seems to ever be triggered, leaving some of the partitions unclaimed.
>>
>>When I first tried deploying this yesterday, I noticed 8 of the 24 
>>partitions, for 4 of the consumer processes, went unclaimed. Redeploying 
>>shortly later corrected this issue. I tried deploying again today, and now I 
>>see a different set of 4 processes not getting assigned partitions. The 
>>processes otherwise appear to be running normally, they are currently running 
>>in production and we are working to get the consumers quietly running before 
>>enabling them to do any work. I’m not sure if we might be looking at some 
>>sort of timing issue.
>>
>>Does anyone know what might be causing the issues we’re observing?
>>
>>Thanks,
>>Dave


RE: Unclaimed partitions

2015-04-29 Thread Aditya Auradkar
Hey Dave,

It's hard to say why this is happening without more information. Even if there 
are no errors in the log, is there anything to indicate that the rebalance 
process on those hosts even started? Does this happen occasionally or every 
time you start the consumer group? Can you paste the output of 
ConsumerOffsetChecker and describe topic?

Thanks,
Aditya

From: Dave Hamilton [dhamil...@nanigans.com]
Sent: Wednesday, April 29, 2015 6:46 PM
To: users@kafka.apache.org; users@kafka.apache.org
Subject: Re: Unclaimed partitions

Hi, would anyone be able to help me with this issue? Thanks.

- Dave



On Tue, Apr 28, 2015 at 1:32 PM -0700, "Dave Hamilton" 
mailto:dhamil...@nanigans.com>> wrote:

1. We’re using version 0.8.1.1.
2. No failures in the consumer logs
3. We’re using the ConsumerOffsetChecker to see what partitions are assigned to 
the consumer group and what their offsets are. 8 of the 12 process each have 
been assigned two partitions and they’re keeping up with the topic. The other 4 
do not get assigned partitions and no consumers in the group are consuming 
those 8 partitions.

Thanks for your help,
Dave



On 4/28/15, 1:40 PM, "Aditya Auradkar"  wrote:

>Couple of questions:
>- What version of the consumer API are you using?
>- Are you seeing any rebalance failures in the consumer logs?
>- How do you determine that some partitions are unassigned? Just confirming 
>that you have partitions that are not being consumed from as opposed to 
>consumer threads that aren't assigned any partitions.
>
>Aditya
>
>
>From: Dave Hamilton [dhamil...@nanigans.com]
>Sent: Tuesday, April 28, 2015 10:19 AM
>To: users@kafka.apache.org
>Subject: Re: Unclaimed partitions
>
>I’m sorry, I forgot to specify that these processes are in the same consumer 
>group.
>
>Thanks,
>Dave
>
>
>
>
>
>On 4/28/15, 1:15 PM, "Aditya Auradkar"  wrote:
>
>>Hi Dave,
>>
>>The simple consumer doesn't do any state management across consumer 
>>instances. So I'm not sure how you are assigning partitions in your 
>>application code. Did you mean to say that you are using the high level 
>>consumer API?
>>
>>Thanks,
>>Aditya
>>
>>
>>From: Dave Hamilton [dhamil...@nanigans.com]
>>Sent: Tuesday, April 28, 2015 7:58 AM
>>To: users@kafka.apache.org
>>Subject: Unclaimed partitions
>>
>>Hi, I am trying to consume a 24-partition topic across 12 processes. Each 
>>process is using the simple consumer API, and each is being assigned two 
>>consumer threads. I have noticed when starting these processes that sometimes 
>>some of my processes are not being assigned any partitions, and no rebalance 
>>seems to ever be triggered, leaving some of the partitions unclaimed.
>>
>>When I first tried deploying this yesterday, I noticed 8 of the 24 
>>partitions, for 4 of the consumer processes, went unclaimed. Redeploying 
>>shortly later corrected this issue. I tried deploying again today, and now I 
>>see a different set of 4 processes not getting assigned partitions. The 
>>processes otherwise appear to be running normally, they are currently running 
>>in production and we are working to get the consumers quietly running before 
>>enabling them to do any work. I’m not sure if we might be looking at some 
>>sort of timing issue.
>>
>>Does anyone know what might be causing the issues we’re observing?
>>
>>Thanks,
>>Dave


Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread tao xiao
The log suggests that the shutdown method were still called

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
during the course of your program

On Thu, Apr 30, 2015 at 2:23 AM, christopher palm  wrote:

> Commenting out Example shutdown did not seem to make a difference, I added
> the print statement below to highlight the fact.
>
> The other threads still shut down, and only one thread lives on, eventually
> that dies after a few minutes as well
>
> Could this be that the producer default partitioner is isn't balancing data
> across all partitions?
>
> Thanks,
> Chris
>
> Thread 0: 2015-04-29
> 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753
>
> Last Shutdown via example.shutDown called!
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> ZKConsumerConnector shutting down
>
> 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
> scheduler
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1430330968420] Stopping leader finder thread
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Shutting down
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Stopped
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Shutdown completed
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1430330968420] Stopping all fetchers
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-consumergroup], Shutting down
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-], Stopped
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-], Shutdown completed
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-] All connections stopped
>
> 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
> thread.
>
> Shutting down Thread: 2
>
> Shutting down Thread: 1
>
> Shutting down Thread: 3
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
> [consumergroup], ZKConsumerConnector shut down completed
>
> Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
> distance|-73.99021500035|40.6636611
>
> 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
> [consumergroup], stopping watcher executor thread for consumer
> consumergroup
>
> Thread 0: 2015-04-29
> 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009
>
> On Wed, Apr 29, 2015 at 10:11 AM, tao xiao  wrote:
>
> > example.shutdown(); in ConsumerGroupExample closes all consumer
> connections
> > to Kafka. remove this line the consumer threads will run forever
> >
> > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm 
> > wrote:
> >
> > > Hi All,
> > >
> > > I am trying to get a multi threaded HL consumer working against a 2
> > broker
> > > Kafka cluster with a 4 partition 2 replica  topic.
> > >
> > > The consumer code is set to run with 4 threads, one for each partition.
> > >
> > > The producer code uses the default partitioner and loops indefinitely
> > > feeding events into the topic.(I excluded the while loop in the paste
> > > below)
> > >
> > > What I see is the threads eventually all exit, even thought the
> producer
> > is
> > > still sending events into the topic.
> > >
> > > My understanding is that the consumer thread per partition is the
> correct
> > > setup.
> > >
> > > Any ideas why this code doesn't continue to consume events at they are
> > > pushed to the topic?
> > >
> > > I suspect I am configuring something wrong here, but am not sure what.
> > >
> > > Thanks,
> > >
> > > Chris
> > >
> > >
> > > *T**opic Configuration*
> > >
> > > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
> 1,2
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
> 1,2
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
> 1,2
> > >
> > >  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
> 1,2
> > >
> > > *Producer Code:*
> > >
> > >  Properties props = new Properties();
> > >
> > > props.put("metadata.broker.list", args[0]);
> > >
> > > props.put("zk.connect", args[1]);
> > >
> > > props.put("serializer.class",
> "kafka.serializer.StringEncoder");
> > >
> > > props.put("request.required.acks", "1");
> > >
> > > String TOPIC = args[2];
> > >
> > > ProducerConfig config = new ProducerConfig(props);
> > >
> > > Producer producer = new Producer String>(
> > >

Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread Joe Stein
You can do this with the existing Kafka Consumer
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106
and probably any other Kafka client too (maybe with minor/major rework
to-do the offset management).

The new consumer approach is more transparent on "Subscribing To Specific
Partitions"
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234
.

Here is a Docker file (** pull request pending **) for wrapping kafka
consumers (doesn't have to be the go client, need to abstract that out some
more after more testing)
https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile


Also a VM (** pull request pending **) to build container, push to local
docker repository and launch on Apache Mesos
https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant
as working example how-to-do.

All of this could be done without the Docker container and still work on
Mesos ... or even without Mesos and on YARN.

You might also want to checkout how Samza integrates with Execution
Frameworks
http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html
which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375 and
built in YARN support.

~ Joe Stein
- - - - - - - - - - - - - - - - -

  http://www.stealth.ly
- - - - - - - - - - - - - - - - -

On Wed, Apr 29, 2015 at 8:56 AM, David Corley  wrote:

> You're right Stevo, I should re-phrase to say that there can be no more
> _active_ consumers than there are partitions (within a single consumer
> group).
> I'm guessing that's what Nimi is alluding to asking, but perhaps he can
> elaborate on whether he's using consumer groups and/or whether the 100
> partitions are all for a single topic, or multiple topics.
>
> On 29 April 2015 at 13:38, Stevo Slavić  wrote:
>
> > Please correct me if wrong, but I think it is really not hard constraint
> > that one cannot have more consumers (from same group) than partitions on
> > single topic - all the surplus consumers will not be assigned to consume
> > any partition, but they can be there and as soon as one active consumer
> > from same group goes offline (its connection to ZK is dropped), consumers
> > from the group will be rebalanced so one passively waiting consumer will
> > become active.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> > On Wed, Apr 29, 2015 at 2:25 PM, David Corley 
> > wrote:
> >
> > > If the 100 partitions are all for the same topic, you can have up to
> 100
> > > consumers working as part of a single consumer group for that topic.
> > > You cannot have more consumers than there are partitions within a given
> > > consumer group.
> > >
> > > On 29 April 2015 at 08:41, Nimi Wariboko Jr 
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I was wondering what options there are for horizontally scaling kafka
> > > > consumers? Basically if I have 100 partitions and 10 consumers, and
> > want
> > > to
> > > > temporarily scale up to 50 consumers, what options do I have?
> > > >
> > > > So far I've thought of just simply tracking consumer membership
> somehow
> > > > (either through Raft or zookeeper's znodes) on the consumers.
> > > >
> > >
> >
>


Kafka 0.8.2 beta - release

2015-04-29 Thread Gomathivinayagam Muthuvinayagam
I see lot of interesting features with Kafka 0.8.2 beta. I am just
wondering when that will be released. Is there any timeline for that?

Thanks & Regards,


Re: New Producer API - batched sync mode support

2015-04-29 Thread Ewen Cheslack-Postava
On Wed, Apr 29, 2015 at 6:08 PM, Gwen Shapira  wrote:

> I'm starting to think that the old adage "If two people say you are drunk,
> lie down" applies here :)
>
> Current API seems perfectly clear, useful and logical to everyone who wrote
> it... but we are getting multiple users asking for the old batch behavior
> back.
> One reason to get it back is to make upgrades easier - people won't need to
> rethink their existing logic if they get an API with the same behavior in
> the new producer. The other reason is what Ewen mentioned earlier - if
> everyone re-implements Joel's logic, we can provide something for that.
>
> How about getting the old batch send behavior back by adding a new API
> with:
> public void batchSend(List>)
>
> With this implementation (mixes the old behavior with Joel's snippet):
> * send records one by one
> * flush
> * iterate on futures and "get" them
> * log a detailed message on each error
> * throw an exception if any send failed.
>
> It reproduces the old behavior - which apparently everyone really liked,
> and I don't think it is overly weird. It is very limited, but anyone who
> needs more control over his sends already have plenty of options.
>

First, I'll just say that I actually prefer a smaller API -- most wrappers
are trivial and if someone reuse them *that* often, they can always package
them into a wrapper API. Implementing everything people repeat can be a
slippery slope towards including every pattern under the sun. I'm
definitely skeptical of adding to the core API if there isn't something
that requires a specialized implementation. If it works as a trivial
wrapper that people want in Kafka proper, then it'd probably be better
placed in a contrib package or implemented as a separate utility wrapper
class that guarantees isolation from the main producer implementation.

Second, one of the reasons I mentioned alternative APIs is that I think the
behavior you'd get with this API is fairly confusing and hard to work with
given the behavior I *think* people are actually looking for. I suspect the
reason they want synchronous batch behavior is to get atomic batch writes.
They aren't going to get this anyway (as Jay pointed out) given the current
broker implementation, but you could get a close approximation if you have
max 1 in flight request and retry indefinitely (with the retries going to
the front of the queue to avoid out-of-order writes). That has the drawback
that failures never bubble up. Alternatively, you need to be able to
immediately clear the buffered records upon a failure.

I mentioned the fwrite()-like API because it lets you expose semantics like
this in a way I think can be pretty clean -- give the producer a list of
records and it will use as many as it can without a) violating the buffer
restrictions and b) ensuring that all the accepted requests will be
included in the same request to the broker. You're guaranteed that there
are no "extra" records in the accumulator if something fails, which lets
you get an error back without the possibility of message reordering,
allowing you to handle the issue however you like. (This still has a ton of
issues if you introduce client-side timeouts, which is why
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer would
be the real solution.)

I was also sensitive to the possible performance issues because I was
recently bitten by an issue where some code was unexpectedly processing
characters one at a time with really awful performance as a result, and a
small bit of batching solved the problem :) But here, I agree with Jay that
there's enough other stuff going on under the hood that this alone isn't
likely to have a huge impact. Then again, I'd love to see some profiler
numbers from a semi-realistic workload!

On clarity of and ease of use of APIs, and coming back to the behavior I
think people are looking for -- sometimes the challenge isn't just
documenting the API you've concluded is the right one, but also explaining
why the API everyone seems to want can't work/was broken/doesn't provide
the guarantees they thought it did.


> Thoughts?
>
> Gwen
>
>
>
>
> On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps  wrote:
>
> > Hey guys,
> >
> > The locking argument is correct for very small records (< 50 bytes),
> > batching will help here because for small records locking becomes the big
> > bottleneck. I think these use cases are rare but not unreasonable.
> >
> > Overall I'd emphasize that the new producer is way faster at virtually
> all
> > use cases. If there is a use case where that isn't true, let's look at it
> > in a data driven way by comparing the old producer to the new producer
> and
> > looking for any areas where things got worse.
> >
> > I suspect the "reducing allocations" argument to be not a big thing. We
> do
> > a number of small per-message allocations and it didn't seem to have much
> > impact. I do think there are a couple of big producer memory
> optimizations
> > we could do by re

Re: Kafka 0.8.2 beta - release

2015-04-29 Thread Ewen Cheslack-Postava
It has already been released, including a minor revision to fix some
critical bugs. The latest release is 0.8.2.1. The downloads page has links
and release notes: http://kafka.apache.org/downloads.html

On Wed, Apr 29, 2015 at 10:22 PM, Gomathivinayagam Muthuvinayagam <
sankarm...@gmail.com> wrote:

> I see lot of interesting features with Kafka 0.8.2 beta. I am just
> wondering when that will be released. Is there any timeline for that?
>
> Thanks & Regards,
>



-- 
Thanks,
Ewen


Re: Kafka 0.8.2 beta - release

2015-04-29 Thread Gomathivinayagam Muthuvinayagam
Thank you,

It seems the following methods are not supported in KafkaConsumer. Do you
know when they will be supported?

public OffsetMetadata commit(Map offsets, boolean
sync) {

throw new UnsupportedOperationException();

}

Thanks & Regards,



On Wed, Apr 29, 2015 at 10:52 PM, Ewen Cheslack-Postava 
wrote:

> It has already been released, including a minor revision to fix some
> critical bugs. The latest release is 0.8.2.1. The downloads page has links
> and release notes: http://kafka.apache.org/downloads.html
>
> On Wed, Apr 29, 2015 at 10:22 PM, Gomathivinayagam Muthuvinayagam <
> sankarm...@gmail.com> wrote:
>
> > I see lot of interesting features with Kafka 0.8.2 beta. I am just
> > wondering when that will be released. Is there any timeline for that?
> >
> > Thanks & Regards,
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread Nimi Wariboko Jr
My mistake, it seems the Java drivers are a lot more advanced than the
Shopify's Kafka driver (or I am missing something) - and I haven't used
Kafka before.

With the Go driver - it seems you have to manage offsets and partitions
within the application code, while in Scala driver it seems you have the
option of simply subscribing to a topic, and someone else will manage that
part.

After digging around a bit more, I found there is another library -
https://github.com/wvanbergen/kafka - that speaks the consumergroup API and
accomplishes what I was looking for and I assume is implemented by keeping
track of memberships w/ Zookeeper.

Thank you for the information - it really helped clear up what I failing to
understand with kafka.

Nimi

On Wed, Apr 29, 2015 at 10:10 PM, Joe Stein  wrote:

> You can do this with the existing Kafka Consumer
>
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106
> and probably any other Kafka client too (maybe with minor/major rework
> to-do the offset management).
>
> The new consumer approach is more transparent on "Subscribing To Specific
> Partitions"
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234
> .
>
> Here is a Docker file (** pull request pending **) for wrapping kafka
> consumers (doesn't have to be the go client, need to abstract that out some
> more after more testing)
>
> https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile
>
>
> Also a VM (** pull request pending **) to build container, push to local
> docker repository and launch on Apache Mesos
>
> https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant
> as working example how-to-do.
>
> All of this could be done without the Docker container and still work on
> Mesos ... or even without Mesos and on YARN.
>
> You might also want to checkout how Samza integrates with Execution
> Frameworks
>
> http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html
> which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375
> and
> built in YARN support.
>
> ~ Joe Stein
> - - - - - - - - - - - - - - - - -
>
>   http://www.stealth.ly
> - - - - - - - - - - - - - - - - -
>
> On Wed, Apr 29, 2015 at 8:56 AM, David Corley 
> wrote:
>
> > You're right Stevo, I should re-phrase to say that there can be no more
> > _active_ consumers than there are partitions (within a single consumer
> > group).
> > I'm guessing that's what Nimi is alluding to asking, but perhaps he can
> > elaborate on whether he's using consumer groups and/or whether the 100
> > partitions are all for a single topic, or multiple topics.
> >
> > On 29 April 2015 at 13:38, Stevo Slavić  wrote:
> >
> > > Please correct me if wrong, but I think it is really not hard
> constraint
> > > that one cannot have more consumers (from same group) than partitions
> on
> > > single topic - all the surplus consumers will not be assigned to
> consume
> > > any partition, but they can be there and as soon as one active consumer
> > > from same group goes offline (its connection to ZK is dropped),
> consumers
> > > from the group will be rebalanced so one passively waiting consumer
> will
> > > become active.
> > >
> > > Kind regards,
> > > Stevo Slavic.
> > >
> > > On Wed, Apr 29, 2015 at 2:25 PM, David Corley 
> > > wrote:
> > >
> > > > If the 100 partitions are all for the same topic, you can have up to
> > 100
> > > > consumers working as part of a single consumer group for that topic.
> > > > You cannot have more consumers than there are partitions within a
> given
> > > > consumer group.
> > > >
> > > > On 29 April 2015 at 08:41, Nimi Wariboko Jr 
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I was wondering what options there are for horizontally scaling
> kafka
> > > > > consumers? Basically if I have 100 partitions and 10 consumers, and
> > > want
> > > > to
> > > > > temporarily scale up to 50 consumers, what options do I have?
> > > > >
> > > > > So far I've thought of just simply tracking consumer membership
> > somehow
> > > > > (either through Raft or zookeeper's znodes) on the consumers.
> > > > >
> > > >
> > >
> >
>


Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread Joe Stein
The Go Kafka Client also supports offset storage in ZK and Kafka
https://github.com/stealthly/go_kafka_client/blob/master/docs/offset_storage.md
and has two other strategies for partition ownership with a consensus
server (currently uses Zookeeper will be implementing Consul in near
future).

~ Joestein

On Thu, Apr 30, 2015 at 2:15 AM, Nimi Wariboko Jr 
wrote:

> My mistake, it seems the Java drivers are a lot more advanced than the
> Shopify's Kafka driver (or I am missing something) - and I haven't used
> Kafka before.
>
> With the Go driver - it seems you have to manage offsets and partitions
> within the application code, while in Scala driver it seems you have the
> option of simply subscribing to a topic, and someone else will manage that
> part.
>
> After digging around a bit more, I found there is another library -
> https://github.com/wvanbergen/kafka - that speaks the consumergroup API
> and
> accomplishes what I was looking for and I assume is implemented by keeping
> track of memberships w/ Zookeeper.
>
> Thank you for the information - it really helped clear up what I failing to
> understand with kafka.
>
> Nimi
>
> On Wed, Apr 29, 2015 at 10:10 PM, Joe Stein  wrote:
>
> > You can do this with the existing Kafka Consumer
> >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106
> > and probably any other Kafka client too (maybe with minor/major rework
> > to-do the offset management).
> >
> > The new consumer approach is more transparent on "Subscribing To Specific
> > Partitions"
> >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234
> > .
> >
> > Here is a Docker file (** pull request pending **) for wrapping kafka
> > consumers (doesn't have to be the go client, need to abstract that out
> some
> > more after more testing)
> >
> >
> https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile
> >
> >
> > Also a VM (** pull request pending **) to build container, push to local
> > docker repository and launch on Apache Mesos
> >
> >
> https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant
> > as working example how-to-do.
> >
> > All of this could be done without the Docker container and still work on
> > Mesos ... or even without Mesos and on YARN.
> >
> > You might also want to checkout how Samza integrates with Execution
> > Frameworks
> >
> >
> http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html
> > which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375
> > and
> > built in YARN support.
> >
> > ~ Joe Stein
> > - - - - - - - - - - - - - - - - -
> >
> >   http://www.stealth.ly
> > - - - - - - - - - - - - - - - - -
> >
> > On Wed, Apr 29, 2015 at 8:56 AM, David Corley 
> > wrote:
> >
> > > You're right Stevo, I should re-phrase to say that there can be no more
> > > _active_ consumers than there are partitions (within a single consumer
> > > group).
> > > I'm guessing that's what Nimi is alluding to asking, but perhaps he can
> > > elaborate on whether he's using consumer groups and/or whether the 100
> > > partitions are all for a single topic, or multiple topics.
> > >
> > > On 29 April 2015 at 13:38, Stevo Slavić  wrote:
> > >
> > > > Please correct me if wrong, but I think it is really not hard
> > constraint
> > > > that one cannot have more consumers (from same group) than partitions
> > on
> > > > single topic - all the surplus consumers will not be assigned to
> > consume
> > > > any partition, but they can be there and as soon as one active
> consumer
> > > > from same group goes offline (its connection to ZK is dropped),
> > consumers
> > > > from the group will be rebalanced so one passively waiting consumer
> > will
> > > > become active.
> > > >
> > > > Kind regards,
> > > > Stevo Slavic.
> > > >
> > > > On Wed, Apr 29, 2015 at 2:25 PM, David Corley  >
> > > > wrote:
> > > >
> > > > > If the 100 partitions are all for the same topic, you can have up
> to
> > > 100
> > > > > consumers working as part of a single consumer group for that
> topic.
> > > > > You cannot have more consumers than there are partitions within a
> > given
> > > > > consumer group.
> > > > >
> > > > > On 29 April 2015 at 08:41, Nimi Wariboko Jr  >
> > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I was wondering what options there are for horizontally scaling
> > kafka
> > > > > > consumers? Basically if I have 100 partitions and 10 consumers,
> and
> > > > want
> > > > > to
> > > > > > temporarily scale up to 50 consumers, what options do I have?
> > > > > >
> > > > > > So far I've thought of just simply tracking consumer membership
> > > somehow
> > > > > > (either through Raft or zookeeper's znodes) on the consumers.
> > > > > >
> > > > >
> > > >
> > >
> >
>