Create topic programmatically

2014-10-13 Thread hsy...@gmail.com
Hi guys,

Besides TopicCommand, which I believe is not provided to create topic
programmatically, is there any other way to automate creating topic in
code? Thanks!

Best,
Siyuan


log4j dir?

2014-11-13 Thread hsy...@gmail.com
Hi guys,

Just notice kafka.logs.dir in log4j.properties doesn't take effect

It's always set to *$base_dir/logs* in kafka-run-class.sh

LOG_DIR=$base_dir/logs
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"

Best,
Siyuan


Re: log4j dir?

2014-11-14 Thread hsy...@gmail.com
Anyone has any idea how do I config the log4j file dir?

On Thu, Nov 13, 2014 at 4:58 PM, hsy...@gmail.com  wrote:

> Hi guys,
>
> Just notice kafka.logs.dir in log4j.properties doesn't take effect
>
> It's always set to *$base_dir/logs* in kafka-run-class.sh
>
> LOG_DIR=$base_dir/logs
> KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
>
> Best,
> Siyuan
>


Re: log4j dir?

2014-11-14 Thread hsy...@gmail.com
I think there is no way to specify different log location without modifying
shell script.
The *kafka.**logs.dir* in log4j.properties file is misleading

On Fri, Nov 14, 2014 at 1:24 PM, Ben Drees  wrote:

> Hi,
>
> I had trouble with this as well.  The version of Kafka I'm running insists
> on using 'kafka/logs', so I create a soft link from there to the desired
> destination directory:
>
> # kafka scripts hard-code the logs dir, so point that path to where we want
> the logs to be.
> ln -s $STREAM_BUFFER_LOGS_DIR kafka/logs
>
> -Ben
>
>
> On Fri, Nov 14, 2014 at 11:17 AM, hsy...@gmail.com 
> wrote:
>
> > Anyone has any idea how do I config the log4j file dir?
> >
> > On Thu, Nov 13, 2014 at 4:58 PM, hsy...@gmail.com 
> > wrote:
> >
> > > Hi guys,
> > >
> > > Just notice kafka.logs.dir in log4j.properties doesn't take effect
> > >
> > > It's always set to *$base_dir/logs* in kafka-run-class.sh
> > >
> > > LOG_DIR=$base_dir/logs
> > > KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
> > >
> > > Best,
> > > Siyuan
> > >
> >
>


Is there a plan to build a ubiquitous web service API to manage the kafka cluster

2014-11-24 Thread hsy...@gmail.com
Hi guys,

Nowadays, all kafka administration work (add, tear down node, topic
management, throughput monitor) are done by various different tool talk to
brokers, zookeeper etc. Is there a plan for core team to build a central
universal server providing webservice API to do all the admin work?

Best,
Siyuan


Questions about new consumer API

2014-12-02 Thread hsy...@gmail.com
Hi guys,

I'm interested in the new Consumer API.
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/

I have couple of question.
1. In this doc it says kafka consumer will automatically do load balance.
Is it based on throughtput or same as what we have now balance the
cardinality among all consumers in same ConsumerGroup? In a real case
different partitions could have different peak time.
2. In the API, threre is subscribe(partition...) method saying not using
group management, does it mean the group.id property will be discarded and
developer has full control of distributing partitions to consumers?
3. Is new API compatible with old broker?
4. Will simple consumer api and high-level consumer api still be supported?

Thanks!

Best,
Siyuan


Re: Questions about new consumer API

2014-12-02 Thread hsy...@gmail.com
Thanks Neha, another question, so if offsets are stored under group.id,
dose it mean in one group, there should be at most one subscriber for each
topic partition?

Best,
Siyuan

On Tue, Dec 2, 2014 at 12:55 PM, Neha Narkhede 
wrote:

> 1. In this doc it says kafka consumer will automatically do load balance.
> Is it based on throughtput or same as what we have now balance the
> cardinality among all consumers in same ConsumerGroup? In a real case
> different partitions could have different peak time.
>
> Load balancing is still based on # of partitions for the subscribed topics
> and
> ensuring that each partition has exactly one consumer as the owner.
>
> 2. In the API, threre is subscribe(partition...) method saying not using
> group management, does it mean the group.id property will be discarded and
> developer has full control of distributing partitions to consumers?
>
> group.id is also required for offset management, if the user chooses to
> use
> Kafka based offset management. The user will have full control over
> distribution
> of partitions to consumers.
>
> 3. Is new API compatible with old broker?
>
> Yes, it will.
>
> 4. Will simple consumer api and high-level consumer api still be supported?
>
> Over time, we will phase out the current high-level and simple consumer
> since the
> 0.9 API supports both.
>
> Thanks,
> Neha
>
> On Tue, Dec 2, 2014 at 12:07 PM, hsy...@gmail.com 
> wrote:
>
> > Hi guys,
> >
> > I'm interested in the new Consumer API.
> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/
> >
> > I have couple of question.
> > 1. In this doc it says kafka consumer will automatically do load balance.
> > Is it based on throughtput or same as what we have now balance the
> > cardinality among all consumers in same ConsumerGroup? In a real case
> > different partitions could have different peak time.
> > 2. In the API, threre is subscribe(partition...) method saying not using
> > group management, does it mean the group.id property will be discarded
> and
> > developer has full control of distributing partitions to consumers?
> > 3. Is new API compatible with old broker?
> > 4. Will simple consumer api and high-level consumer api still be
> supported?
> >
> > Thanks!
> >
> > Best,
> > Siyuan
> >
>


Given brokers, is it able to know all the zookeepers that brokers connect to

2014-12-11 Thread hsy...@gmail.com
Hi Guys,

If I know the brokers. Is there a way to know the zookeeper host from
broker list?

Thanks!
Siyuan


Is there a way to pull out kafka metadata from zookeeper?

2013-10-10 Thread hsy...@gmail.com
Hi guys,

I'm trying to maintain a bunch of simple kafka consumer to consume messages
from brokers. I know there is a way to send TopicMetadataRequest to broker
and get the response from the broker. But you have to specify the broker
list to query the information. But broker might not be available because of
some failure. My question is is there any api I can call and query broker
metadata for topic/partition directly from zookeeper? I know I can query
that information using zookeeper API. But that's not friendly datastructure
 like the TopicMetadata/PartitionMetadata.  Thank you!

Best,
Siyuan


Question about auto-rebalancing

2013-10-10 Thread hsy...@gmail.com
Hi guys,

Here is a case I observed, I have a single-node 3 broker instance cluster.
I created 1 topic with 2 partitions and 2 replica for each partition. The
initial distribution is like this topic1/partition0 ->(broker0, broker2)
 topic1/partition1 ->(broker1,broker2). So broker0 is leader broker for
partition0 and broker1 is the leader broker for partition1.  I then kill
broker0, broker3 becomes leader broker for partition1, then I kill broker2,
broker1 becomes leader broker of both partition0 and partition1 which is
fine.  But when I restart broker0, broker2, after they synced with broker1,
they are just replica broker for partition0 and partition1. So my
consumers(simple consumer) really don't know which broker it should read
from. I found a command to that will force re-balance after failover, but
isn't there any automatic way to rebalance the lleader broker?

Best regards,
Siyuan


Re: Question about auto-rebalancing

2013-10-11 Thread hsy...@gmail.com
Hi Jun,

Thanks for your reply, but in a real cluster, one broker could serve
different topics and different partitions, the simple consumer only has
knowledge of brokers that are available but it has no knowledge to decide
which broker is best to pick up to consume messages.  If you don't choose
carefully, multiple simple consumer might end up with reading from same
node which is definitely not good for performance.
Interesting thing is I find out there is
command kafka-preferred-replica-election.sh which will try to equally
distribute the leadership among different brokers, this is good that I can
always let my simple consumer reads from leader broker(even it fails, the
replica will pick up as leader which is fine).  But why don't kafka cluster
run this command automatically when there is a broker change(up/down) in
the cluster so that the leadership can always be equally distributed among
different brokers ASAP?  I think it's very good for simple consumer to
decide which broker is good to read from.

Another question is I'm also curious how high-level consumer is balanced. I
assume each high-level consumer know other consumers(int the same group)
which broker they read message from and it can try to avoid those brokers
and to pick up a free one?  Is there a document for the balancing rule
among high-level consumer. Does it always guarantee that after several
leadership change/temporary broker fail, it can always equally distribute
the read among the brokers. Basically I think it's nice to have a API to
let dev know which consumer reads from which broker otherwise I don't know
anything behind the high-level consumer

Thanks!

Best,
Siyuan


Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread hsy...@gmail.com
Thanks guys!
But I feel weird. Assume I have 20 brokers for 10 different topics with 2
partitions and  2 replicas for each. For each consumer consumes different
topic/replica I have to specify those 20 brokers and go over all of them to
know which broker is alive. And even worse how about I dynamically add new
broker into the cluster and remove the old one. I think it's nice to have a
way to get metadata from zookeeper(centralized coordinator?) directly.

Best,
Siyuan


On Fri, Oct 11, 2013 at 9:12 AM, Neha Narkhede wrote:

> If, for some reason, you don't have access to a virtual IP or load
> balancer, you need to round robin once through all the brokers before
> failing a TopicMetadataRequest. So unless all the brokers in your cluster
> are down, this should not be a problem.
>
> Thanks,
> Neha
>
>
> On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com 
> wrote:
>
> > Hi guys,
> >
> > I'm trying to maintain a bunch of simple kafka consumer to consume
> messages
> > from brokers. I know there is a way to send TopicMetadataRequest to
> broker
> > and get the response from the broker. But you have to specify the broker
> > list to query the information. But broker might not be available because
> of
> > some failure. My question is is there any api I can call and query broker
> > metadata for topic/partition directly from zookeeper? I know I can query
> > that information using zookeeper API. But that's not friendly
> datastructure
> >  like the TopicMetadata/PartitionMetadata.  Thank you!
> >
> > Best,
> > Siyuan
> >
>


Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread hsy...@gmail.com
What if the broker it tries to get the TopicMetadata from is down?
I assume a broker is more like to be down than a zookeeper.

Thanks,
Siyuan


On Fri, Oct 11, 2013 at 2:39 PM, Neha Narkhede wrote:

> >> For each consumer consumes different
> topic/replica I have to specify those 20 brokers and go over all of them to
> know which broker is alive. And even worse how about I dynamically add new
> broker into the cluster and remove the old one
>
> TopicMetadataRequest is a batch API and you can get metadata information
> for either a list of all topics or all topics in the cluster, if you
> specify an empty list of topics. Adding a broker is not a problem since the
> metadata request also returns the list of brokers in a cluster. The reason
> this is better than reading from zookeeper is because the same operation
> would require multiple zookeeper roundtrips, instead of a single
> TopicMetadataRequest roundtrip to some kafka broker.
>
> Thanks,
> Neha
>
>
> On Fri, Oct 11, 2013 at 11:30 AM, hsy...@gmail.com 
> wrote:
>
> > Thanks guys!
> > But I feel weird. Assume I have 20 brokers for 10 different topics with 2
> > partitions and  2 replicas for each. For each consumer consumes different
> > topic/replica I have to specify those 20 brokers and go over all of them
> to
> > know which broker is alive. And even worse how about I dynamically add
> new
> > broker into the cluster and remove the old one. I think it's nice to
> have a
> > way to get metadata from zookeeper(centralized coordinator?) directly.
> >
> > Best,
> > Siyuan
> >
> >
> > On Fri, Oct 11, 2013 at 9:12 AM, Neha Narkhede  > >wrote:
> >
> > > If, for some reason, you don't have access to a virtual IP or load
> > > balancer, you need to round robin once through all the brokers before
> > > failing a TopicMetadataRequest. So unless all the brokers in your
> cluster
> > > are down, this should not be a problem.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com 
> > > wrote:
> > >
> > > > Hi guys,
> > > >
> > > > I'm trying to maintain a bunch of simple kafka consumer to consume
> > > messages
> > > > from brokers. I know there is a way to send TopicMetadataRequest to
> > > broker
> > > > and get the response from the broker. But you have to specify the
> > broker
> > > > list to query the information. But broker might not be available
> > because
> > > of
> > > > some failure. My question is is there any api I can call and query
> > broker
> > > > metadata for topic/partition directly from zookeeper? I know I can
> > query
> > > > that information using zookeeper API. But that's not friendly
> > > datastructure
> > > >  like the TopicMetadata/PartitionMetadata.  Thank you!
> > > >
> > > > Best,
> > > > Siyuan
> > > >
> > >
> >
>


Re: Is there a way to pull out kafka metadata from zookeeper?

2013-10-11 Thread hsy...@gmail.com
That's why I'm asking, I would like to see a kafka zookeeper client api to
get TopicMetadata instead of my own hacky way to query the zookeeper

Thanks!
Best,
Siyuan


On Fri, Oct 11, 2013 at 4:00 PM, Bruno D. Rodrigues <
bruno.rodrig...@litux.org> wrote:

> Why not ask zookeeper for the list of brokers and then ask a random
> broker for the metadata (and repeat if the broker is down), even if
> it's two calls.
>
> Heck it already does unnecessary connections. It connects to a broker,
> gets the metadata, disconnects, and then connects again for the data.
> If it's already assumed a producer or consumer will take some seconds
> until ready, what is another call gonna prejudice the flow.
>
> Then producers and consumers would then be consistently configured. Or
> allow the producers to also go to a broker instead of zookeeper.
>
> This way the consumer needs to know and hardcode at least one node.
> The node can fail. It can be changed.
>
> I thought zookeeper served to abstract this kind of complexity
>
>
>
>
>
>
>
> --
> Bruno Rodrigues
> Sent from my iPhone
>
> No dia 11/10/2013, às 22:40, Neha Narkhede 
> escreveu:
>
> >>> For each consumer consumes different
> > topic/replica I have to specify those 20 brokers and go over all of them
> to
> > know which broker is alive. And even worse how about I dynamically add
> new
> > broker into the cluster and remove the old one
> >
> > TopicMetadataRequest is a batch API and you can get metadata information
> > for either a list of all topics or all topics in the cluster, if you
> > specify an empty list of topics. Adding a broker is not a problem since
> the
> > metadata request also returns the list of brokers in a cluster. The
> reason
> > this is better than reading from zookeeper is because the same operation
> > would require multiple zookeeper roundtrips, instead of a single
> > TopicMetadataRequest roundtrip to some kafka broker.
> >
> > Thanks,
> > Neha
> >
> >
> >> On Fri, Oct 11, 2013 at 11:30 AM, hsy...@gmail.com 
> wrote:
> >>
> >> Thanks guys!
> >> But I feel weird. Assume I have 20 brokers for 10 different topics with
> 2
> >> partitions and  2 replicas for each. For each consumer consumes
> different
> >> topic/replica I have to specify those 20 brokers and go over all of
> them to
> >> know which broker is alive. And even worse how about I dynamically add
> new
> >> broker into the cluster and remove the old one. I think it's nice to
> have a
> >> way to get metadata from zookeeper(centralized coordinator?) directly.
> >>
> >> Best,
> >> Siyuan
> >>
> >>
> >> On Fri, Oct 11, 2013 at 9:12 AM, Neha Narkhede  >>> wrote:
> >>
> >>> If, for some reason, you don't have access to a virtual IP or load
> >>> balancer, you need to round robin once through all the brokers before
> >>> failing a TopicMetadataRequest. So unless all the brokers in your
> cluster
> >>> are down, this should not be a problem.
> >>>
> >>> Thanks,
> >>> Neha
> >>>
> >>>
> >>> On Thu, Oct 10, 2013 at 10:50 PM, hsy...@gmail.com 
> >>> wrote:
> >>>
> >>>> Hi guys,
> >>>>
> >>>> I'm trying to maintain a bunch of simple kafka consumer to consume
> >>> messages
> >>>> from brokers. I know there is a way to send TopicMetadataRequest to
> >>> broker
> >>>> and get the response from the broker. But you have to specify the
> >> broker
> >>>> list to query the information. But broker might not be available
> >> because
> >>> of
> >>>> some failure. My question is is there any api I can call and query
> >> broker
> >>>> metadata for topic/partition directly from zookeeper? I know I can
> >> query
> >>>> that information using zookeeper API. But that's not friendly
> >>> datastructure
> >>>> like the TopicMetadata/PartitionMetadata.  Thank you!
> >>>>
> >>>> Best,
> >>>> Siyuan
> >>
>


Re: Question about auto-rebalancing

2013-10-11 Thread hsy...@gmail.com
Oh, Sriram, Thank you very much!



On Fri, Oct 11, 2013 at 5:44 PM, Sriram Subramanian <
srsubraman...@linkedin.com> wrote:

> We already have a JIRA for auto rebalance. I would be working on this soon.
>
> KAFKA-930 <https://issues.apache.org/jira/browse/KAFKA-930>
>
>
>
> On 10/11/13 5:39 PM, "Guozhang Wang"  wrote:
>
> >Hello Siyuan,
> >
> >For the automatic leader re-election, yes we are considering to make it
> >work. Could you file a JIRA for this issue?
> >
> >For the high-level consumer's rebalancing logic, you can find it at
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredictthere
> >sultsoftheconsumerrebabalance%3F
> >
> >Guozhang
> >
> >
> >On Fri, Oct 11, 2013 at 11:06 AM, hsy...@gmail.com 
> >wrote:
> >
> >> Hi Jun,
> >>
> >> Thanks for your reply, but in a real cluster, one broker could serve
> >> different topics and different partitions, the simple consumer only has
> >> knowledge of brokers that are available but it has no knowledge to
> >>decide
> >> which broker is best to pick up to consume messages.  If you don't
> >>choose
> >> carefully, multiple simple consumer might end up with reading from same
> >> node which is definitely not good for performance.
> >> Interesting thing is I find out there is
> >> command kafka-preferred-replica-election.sh which will try to equally
> >> distribute the leadership among different brokers, this is good that I
> >>can
> >> always let my simple consumer reads from leader broker(even it fails,
> >>the
> >> replica will pick up as leader which is fine).  But why don't kafka
> >>cluster
> >> run this command automatically when there is a broker change(up/down) in
> >> the cluster so that the leadership can always be equally distributed
> >>among
> >> different brokers ASAP?  I think it's very good for simple consumer to
> >> decide which broker is good to read from.
> >>
> >> Another question is I'm also curious how high-level consumer is
> >>balanced. I
> >> assume each high-level consumer know other consumers(int the same group)
> >> which broker they read message from and it can try to avoid those
> >>brokers
> >> and to pick up a free one?  Is there a document for the balancing rule
> >> among high-level consumer. Does it always guarantee that after several
> >> leadership change/temporary broker fail, it can always equally
> >>distribute
> >> the read among the brokers. Basically I think it's nice to have a API to
> >> let dev know which consumer reads from which broker otherwise I don't
> >>know
> >> anything behind the high-level consumer
> >>
> >> Thanks!
> >>
> >> Best,
> >> Siyuan
> >>
> >
> >
> >
> >--
> >-- Guozhang
>
>


KafkaStream bug?

2013-10-14 Thread hsy...@gmail.com
I found some weird behavior,
I follow the exact code example for HighlevelConsumer

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example#

but add one debug line here
"
public void run() {
ConsumerIterator it = m_stream.iterator();
while (it.hasNext()){

* my line
here//*
* System.out.println("from the stream" + m_stream); \\This line will be
blocked. KafkaStream.toString() is a blocking method?*
* /// end of my line
///*
*
*
System.out.println("Thread " + m_threadNumber + ": " + new
String(it.next().message()));
 }
System.out.println("Shutting down Thread: " + m_threadNumber);
}
"


Is there a programmatic way to create topic

2013-10-14 Thread hsy...@gmail.com
Hi kafka,

Is there a programmatic way to create topic.
http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api/18480684#18480684
is too hacky, plus it's not a sync function.
I'm asking this because I'm writing a test case which will start kafka
server, create topic, produce and consume some message(in separate thread).
 But it fails some time because producer start to produce before the topic
is created. The auto topic creation property won't work either because the
consumer may start get TopicMeta data before the topic is created.
Thanks!

Best,
Siyuan


Re: Is there a programmatic way to create topic

2013-10-14 Thread hsy...@gmail.com
Hi Neha,

Thank you, but can you give an example of CreateTopicCommand.createTopic()
and is it sync or async?

Best,
Siyuan


On Mon, Oct 14, 2013 at 4:45 PM, Neha Narkhede wrote:

> In 0.8, the only programmatic way, other than create topic, is to use
> CreateTopicCommand.createTopic(). This is probably something we can improve
> in the forthcoming releases.
>
> Thanks,
> Neha
>
>
> On Mon, Oct 14, 2013 at 3:02 PM, hsy...@gmail.com 
> wrote:
>
> > Hi kafka,
> >
> > Is there a programmatic way to create topic.
> >
> >
> http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api/18480684#18480684
> > is too hacky, plus it's not a sync function.
> > I'm asking this because I'm writing a test case which will start kafka
> > server, create topic, produce and consume some message(in separate
> thread).
> >  But it fails some time because producer start to produce before the
> topic
> > is created. The auto topic creation property won't work either because
> the
> > consumer may start get TopicMeta data before the topic is created.
> > Thanks!
> >
> > Best,
> > Siyuan
> >
>


Re: partition reassignment

2013-10-16 Thread hsy...@gmail.com
There is a ticket for auto-rebalancing, hopefully they'll do auto
redistribution soon
https://issues.apache.org/jira/browse/KAFKA-930


On Wed, Oct 16, 2013 at 12:29 AM, Kane Kane  wrote:

> Yes, thanks, looks like that's what i need, do you know why it tends to
> choose the leader for all partitions on the single broker, despite I have
> 3?
>
>
> On Wed, Oct 16, 2013 at 12:19 AM, Joel Koshy  wrote:
>
> > Did the reassignment complete? If the assigned replicas are in ISR and
> > the preferred replicas for the partitions are evenly distributed
> > across the brokers (which seems to be a case on a cursory glance of
> > your assignment) you can use this tool:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-2.PreferredReplicaLeaderElectionTool
> >
> > On Wed, Oct 16, 2013 at 12:15 AM, Kane Kane 
> wrote:
> > > Oh i see, what is the better way to initiate the leader change? As I
> told
> > > somehow all my partitions have the same leader for some reason. I have
> 3
> > > brokers and all partitions have leader on single one.
> > >
> > >
> > > On Wed, Oct 16, 2013 at 12:04 AM, Joel Koshy 
> > wrote:
> > >
> > >> For a leader change yes, but this is partition reassignment which
> > >> completes when all the reassigned replicas are in sync with the
> > >> original replica(s). You can check the status of the command using the
> > >> option I mentioned earlier.
> > >>
> > >> On Tue, Oct 15, 2013 at 7:02 PM, Kane Kane 
> > wrote:
> > >> > I thought if i have all replicas in sync, leader change should be
> much
> > >> > faster?
> > >> >
> > >> >
> > >> > On Tue, Oct 15, 2013 at 5:12 PM, Joel Koshy 
> > wrote:
> > >> >
> > >> >> Depending on how much data there is in those partitions it can
> take a
> > >> >> while for reassignment to actually complete. You will need to use
> the
> > >> >> --status-check-json-file option of the reassign partitions command
> to
> > >> >> determine whether partition reassignment has completed or not.
> > >> >>
> > >> >> Joel
> > >> >>
> > >> >>
> > >> >> On Tue, Oct 15, 2013 at 3:46 PM, Kane Kane 
> > >> wrote:
> > >> >> > I have 3 brokers and a topic with replication factor of 3.
> > >> >> > Somehow all partitions ended up being on the same broker.
> > >> >> > I've created topic with 3 brokers alive, and they didn't die
> since
> > >> then.
> > >> >> >
> > >> >> > Even when i try to reassign it:
> > >> >> > bin/kafka-reassign-partitions.sh --zookeeper
> > >> >> > 10.80.42.147:2181--broker-list 0,1,2 --topics-to-move-json-file
> > >> >> > ~/reassign.txt --execute
> > >> >> >
> > >> >> > The leader of all partitions after that is still the single
> broker
> > >> (first
> > >> >> > one).
> > >> >> > This is how my json file looks like:
> > >> >> > {"topics":
> > >> >> > [{"topic": "perf1"}],
> > >> >> > "version":1
> > >> >> > }
> > >> >> >
> > >> >> > The command reports success:
> > >> >> > Successfully started reassignment of partitions Map([perf1,33] ->
> > >> List(1,
> > >> >> > 0, 2), [perf1,13] -> List(2, 0, 1), [perf1,28] -> List(2, 1, 0),
> > >> >> [perf1,20]
> > >> >> > -> List(0, 1, 2), [perf1,5] -> List(0, 2, 1), [perf1,14] ->
> > List(0, 1,
> > >> >> 2),
> > >> >> > [perf1,39] -> List(1, 0, 2), [perf1,19] -> List(2, 0, 1),
> > [perf1,25]
> > >> ->
> > >> >> > List(2, 0, 1), [perf1,43] -> List(2, 0, 1), [perf1,11] -> List(0,
> > 2,
> > >> 1),
> > >> >> > [perf1,8] -> List(0, 1, 2), [perf1,22] -> List(2, 1, 0),
> > [perf1,16] ->
> > >> >> > List(2, 1, 0), [perf1,26] -> List(0, 1, 2), [perf1,41] -> List(0,
> > 2,
> > >> 1),
> > >> >> > [perf1,9] -> List(1, 0, 2), [perf1,27] -> List(1, 0, 2),
> > [perf1,44] ->
> > >> >> > List(0, 1, 2), [perf1,34] -> List(2, 1, 0), [perf1,6] -> List(1,
> 2,
> > >> 0),
> > >> >> > [perf1,1] -> List(2, 0, 1), [perf1,37] -> List(2, 0, 1),
> > [perf1,24] ->
> > >> >> > List(1, 2, 0), [perf1,29] -> List(0, 2, 1), [perf1,32] -> List(0,
> > 1,
> > >> 2),
> > >> >> > [perf1,7] -> List(2, 0, 1), [perf1,17] -> List(0, 2, 1),
> > [perf1,36] ->
> > >> >> > List(1, 2, 0), [perf1,0] -> List(1, 2, 0), [perf1,42] -> List(1,
> 2,
> > >> 0),
> > >> >> > [perf1,4] -> List(2, 1, 0), [perf1,10] -> List(2, 1, 0),
> > [perf1,30] ->
> > >> >> > List(1, 2, 0), [perf1,23] -> List(0, 2, 1), [perf1,18] -> List(1,
> > 2,
> > >> 0),
> > >> >> > [perf1,3] -> List(1, 0, 2), [perf1,35] -> List(0, 2, 1),
> > [perf1,31] ->
> > >> >> > List(2, 0, 1), [perf1,12] -> List(1, 2, 0), [perf1,2] -> List(0,
> 1,
> > >> 2),
> > >> >> > [perf1,40] -> List(2, 1, 0), [perf1,38] -> List(0, 1, 2),
> > [perf1,15]
> > >> ->
> > >> >> > List(1, 0, 2), [perf1,21] -> List(1, 0, 2))
> > >> >> >
> > >> >> > But still nothing changes. I use ~/kafka-src/bin/kafka-topics.sh
> > >> >> > --zookeeper 10.80.42.147:2181 --describe --topic perf1 to check
> > >> leaders.
> > >> >> >
> > >> >> > Thanks.
> > >> >>
> > >>
> >
>


Throughput Questions

2013-10-31 Thread hsy...@gmail.com
Hi guys, I have some throughput questions.

I try to test the throughput using both the High Level Consumer and Simple
Consumer example from the document.  But I get much lower throughput of
simple consumer than the high level consumer.
I run the test in the cluster and I'm sure I distribute the leader broker
and consumer reads on different machine. So Is there any suggestion for the
high throughput?

Another question is if produce rate of producer is much higher than
consumer, what would happen? All the data would pile up in the broker?

Thank you!

Best,
Siyuan


Is there a way to add partition to a particular topic

2013-11-08 Thread hsy...@gmail.com
Hi guys, since kafka is able to add new broker into the cluster at runtime,
I'm wondering is there a way to add new partition for a specific topic at
run time?  If not what will you do if you want to add more partition to a
topic? Thanks!


Re: Is there a way to add partition to a particular topic

2013-11-08 Thread hsy...@gmail.com
It's in the branch, cool, I'll wait for it's release. actually I find I can
use ./kafka-delete-topic.sh and ./kafk-create-topic.sh with same topic name
and keep the broker running. It's interesting that delete topic doesn't
actually remove the data from the brokers. So what I understand is as long
as I deal with the error caught on the producer and consumer site, I can
use this method instead of that add-topic script, am i correct? Do you have
any concern I add topic in this way?


On Fri, Nov 8, 2013 at 6:00 PM, Guozhang Wang  wrote:

> Hello,
>
> Please check the add-partition tool:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-5.AddPartitionTool
>
> Guozhang
>
>
> On Fri, Nov 8, 2013 at 5:32 PM, hsy...@gmail.com  wrote:
>
> > Hi guys, since kafka is able to add new broker into the cluster at
> runtime,
> > I'm wondering is there a way to add new partition for a specific topic at
> > run time?  If not what will you do if you want to add more partition to a
> > topic? Thanks!
> >
>
>
>
> --
> -- Guozhang
>


Re: Is there a way to add partition to a particular topic

2013-11-08 Thread hsy...@gmail.com
I mean I assume the messages not yet consumed before delete-topic will be
delivered before you create same topic, correct?


On Fri, Nov 8, 2013 at 6:30 PM, hsy...@gmail.com  wrote:

> It's in the branch, cool, I'll wait for it's release. actually I find I
> can use ./kafka-delete-topic.sh and ./kafk-create-topic.sh with same topic
> name and keep the broker running. It's interesting that delete topic
> doesn't actually remove the data from the brokers. So what I understand is
> as long as I deal with the error caught on the producer and consumer site,
> I can use this method instead of that add-topic script, am i correct? Do
> you have any concern I add topic in this way?
>
>
> On Fri, Nov 8, 2013 at 6:00 PM, Guozhang Wang  wrote:
>
>> Hello,
>>
>> Please check the add-partition tool:
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-5.AddPartitionTool
>>
>> Guozhang
>>
>>
>> On Fri, Nov 8, 2013 at 5:32 PM, hsy...@gmail.com 
>> wrote:
>>
>> > Hi guys, since kafka is able to add new broker into the cluster at
>> runtime,
>> > I'm wondering is there a way to add new partition for a specific topic
>> at
>> > run time?  If not what will you do if you want to add more partition to
>> a
>> > topic? Thanks!
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Detail description of metrcs value?

2013-11-11 Thread hsy...@gmail.com
Hi guys,

Is there a detail document about the attributes and objectnames about the
mbeans?

For example, what does attribute "MeanRate" of Object "MessagesPerSec"
mean? It's the mean value of last 1 sec/1min ?

http://kafka.apache.org/documentation.html#monitoring
only have a little information about the object names.

Best,
Siyuan


pom warning

2013-11-12 Thread hsy...@gmail.com
Hi guys,

When I built my project using maven I got WARNING

[WARNING] The POM for org.apache.kafka:kafka_2.8.0:jar:0.8.0-beta1 is
invalid, transitive dependencies (if any) will not be available: 1 problem
was encountered while building the effective model

And I looked at the kafka_2.8.0-0.8.0-beta1.pom file
I found 2  tags













  


Is it supposed to under one single  ?

Best
Siyuan


Re: Kafka cluster with lots of topics

2013-11-13 Thread hsy...@gmail.com
I didn't see any auto leader election for adding nodes. The data are still
skewed on the old nodes. You have to force it by running script?


On Wed, Nov 13, 2013 at 6:41 AM, Neha Narkhede wrote:

> At those many topics, zookeeper will be the main bottleneck. Leader
> election process will take very long increasing the unavailability window
> of the cluster.
>
> Thanks,
> Neha
> On Nov 13, 2013 4:49 AM, "Joe Freeman"  wrote:
>
> > Would I be correct in assuming that a Kafka cluster won't scale well to
> > support lots (tens of millions) of topics? If I understand correctly, a
> > node being added or removed would involve a leader election for each
> topic,
> > which is a relatively expensive operation?
> >
>


Re: High level consumer Blocked when there is still message in topic

2013-11-13 Thread hsy...@gmail.com
Since you have a cluster, why not distribute the consumers in different
nodes instead of threads. I think that's the only way to scale up with
kafka.
Question here: if there are more and more high-level consumers, is there a
bottleneck on the zookeeper?


On Tue, Nov 12, 2013 at 9:27 PM, Jun Rao  wrote:

> What's the max lag (reported in JMX) in the consumer? Can the consumer keep
> up with the incoming data rate?
>
> Thanks,
>
> Jun
>
>
> On Tue, Nov 12, 2013 at 7:19 PM, 李帅  wrote:
>
> > Hi,
> >
> >I use Kafka 0.8 high level consumer reads message from topic
> > stream, 3 replica and 10 paritions.
> >
> >When I use 10 threads read the stream and runing for some time (one
> > hour or one day),
> >
> > some threads block at m_stream.iterator().hasNext(), but the parition
> > still has lots of messages.
> >
> >I check consumer's fetch.message.max.bytes and broker's
> > message.max.bytes, there is no
> >
> > message size bigger than these values.
> >
> >The consumer configure is
> >props.put("zookeeper.session.timeout.ms", "4000");
> >props.put("zookeeper.sync.time.ms", "200");
> >props.put("auto.commit.interval.ms", "1000");
> >
> >
> >Please give me some option about how to avoid consumer block.
> >
> >Is there some configure parameter can fix this problem.
> >
> > Thanks!
> >
> > XiaoTian
> >
>


Re: pom warning

2013-11-13 Thread hsy...@gmail.com
Thanks!


On Tue, Nov 12, 2013 at 12:29 PM, Joe Stein  wrote:

> Hi Siyuan, we have a fix for this in 0.8.0 which is in progress being
> released.
>
> If you find the pom breaking for any reason (some build systems have
> problems with the bad pom) you can use the direct apache repository
> https://repository.apache.org/content/groups/public/ which has a fixed pom
> for 0.8.0-beta1 which couldn't get pushed to maven central since it is
> immutable.
>
> Thanks!
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ****/
>
>
> On Tue, Nov 12, 2013 at 2:56 PM, hsy...@gmail.com 
> wrote:
>
> > Hi guys,
> >
> > When I built my project using maven I got WARNING
> >
> > [WARNING] The POM for org.apache.kafka:kafka_2.8.0:jar:0.8.0-beta1 is
> > invalid, transitive dependencies (if any) will not be available: 1
> problem
> > was encountered while building the effective model
> >
> > And I looked at the kafka_2.8.0-0.8.0-beta1.pom file
> > I found 2  tags
> >
> > 
> > 
> > 
> > 
> > 
> > 
> >  > rev="3.3.4">
> > 
> > 
> > 
> > 
> > 
> >   
> > 
> >
> > Is it supposed to under one single  ?
> >
> > Best
> > Siyuan
> >
>


A problem of fault-tolerant high-level consumer group

2013-11-13 Thread hsy...@gmail.com
I'm working on some fault-tolerant consumer group. The idea is this, to
maximize the throughput of kafka. I request the metadata from broker and
create #{num of partition} consumers for each topic and distribute them on
different nodes. Moreover, there is mechanism to detect fail of any node
and restart it.
The problem is if I kill one of the consumer process, my program would
detect and relaunch a new consumer with same group id and client id. But it
would have some error(something like zookeeper entry doesn't exist, i
didn't keep the log) and never start.
I think the root cause is the zookeeper detect the fail of old consumer
process, before it delete the consumer, the new consumer is coming up and
communicate with the zookeeper, and at this time the zookeeper delete the
entry of that consumer, and the new consumer fail to be recognized by
zookeeper.
The sequence is like this:
old consumer die -> zookeeper detect -> new consumer(same groupid clientid)
up -> zookeeper delete consumer -> new consumer find error and not
recognized by zookeeper

It's ok that I wont lose any data cause that data will go to other
consumer, but it's annoying that I want to keep consumer group balanced
after fail-over

Thanks,
Siyuan


Re: will this cause message loss?

2013-11-14 Thread hsy...@gmail.com
Also if you use HEAD, you can create more partitions at runtime, you just
need dynamic partitioner class I think


On Thu, Nov 14, 2013 at 7:23 AM, Neha Narkhede wrote:

> There is no way to delete topics in Kafka yet. You can add partitions to
> existing topics, but you may have to use 0.8 HEAD since we have fixed a few
> bugs on the consumer.
>
> You can read about add partitions here
>
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-5.AddPartitionTool
>
> Thanks,
> Neha
> On Nov 14, 2013 5:56 AM, "Yu, Libo"  wrote:
>
> > Hi team,
> >
> > We are using beta1. I am going to delete all topics and create them with
> > more partitions.
> > But I don't want to lose any messages.
> >
> > Assume the consumers are online all the time for the following steps. The
> > consumer's
> > auto.offset.reset is set to largest.
> >
> > 1 stop publishing to the brokers.
> > 2 wait until all N messages on the brokers have been consumed
> > 3 delete all topics. That includes deleting the log files and
> > /brokers/topics/mytopic and
> > /consumers/myusergroup/owners/mytopic and
> > /consumers/myusergroup/offsets/mytopic
> > 4 recreate all the topics with more partitions.
> > 5 start the brokers
> > 6 resume publishing to the brokers
> >
> > When a consumer tries to get the next message from a newly created topic,
> > will the OutOfRange
> > Exception will reset the offset to 0 in this case?
> > Thanks,
> >
> > Libo
> >
> >
>


High-level consumer load-balancing problem

2013-11-14 Thread hsy...@gmail.com
Hi,

I have questions about the load balancing of kafka high-level consumer
Suppose I have 4 partition
And the producer throughput to these 4 partitions are like this
01   23
10MB/s  10MB/s  1MB/s1MB/s
1kMsg/s,10kMsg/s   1kMsg/s 10kMsg/s

And I have only 2 consumers, what partitions will the 2 consumers pick up?

If the throughput is changing dynamically, will the consumer re-balancing
accordingly?

Thanks,
Siyuan


Re: Producer reaches a max of 7Mbps

2013-11-19 Thread hsy...@gmail.com
I think the max 50Mbps is almost the disk bottleneck
My guess is IO is the bottle neck for kafka if you set to same type(async
without ack)
I got throughput at about 30Mb
Try to increase if you don't care about latency very much
log.flush.interval.messages=1
log.flush.interval.ms=3000



On Tue, Nov 19, 2013 at 7:43 AM, Abhinav Anand  wrote:

> Hi Neha,
>
> I thought request.required.acks has a default value of 0. I have not
> modified it and running with the same default value. At the same time what
> is the max throughput expected in 0.8 ?
>
>
> On Tue, Nov 19, 2013 at 8:43 PM, Wendy Bartlett <
> wendy.bartl...@threattrack.com> wrote:
>
> > Will Kafka 09 be backward compatible with 08?
> > 
> > From: Neha Narkhede 
> > Sent: Tuesday, November 19, 2013 9:27 AM
> > To: users@kafka.apache.org
> > Subject: Re: Producer reaches a max of 7Mbps
> >
> > I went through the performance page where  it can reach a speed of
> 50MBps.
> >
> > I think that number is true for 07, not 08. If you want higher producer
> > throughout in 08, you can set request . required.acks=0. Note that it
> means
> > that the producer does not receive server side acknowledgements if you
> use
> > that config. We plan to address the 08 throughput issue in 09.
> >
> > Thanks,
> > Neha
> > On Nov 18, 2013 11:50 PM, "Abhinav Anand"  wrote:
> >
> > > Hi,
> > >  I am using kafka producer and broker for a production setup. The
> > expected
> > > producer output is 20MBps but I am only getting max of 8MBps. I have
> > > verified that we are losing packets by directly connecting to the data
> > > source through TCP though the metrics is not reflecting any loss.
> > > I went through the performance page where  it can reach a speed of
> > 50MBps.
> > > Please look at the config and suggest if there is some configuration
> > > improvement i can do.
> > >
> > > *** *Message Size* ***
> > > Message size = 3KB
> > >
> > > * Producer Config 
> > > producer.type = async
> > > queue.buffering.max.ms = 100
> > > queue.buffering.max.messages = 4000
> > > request.timeout.ms = 3
> > > batch.num.messages = 200
> > >
> > >  Broker Config* ***
> > >
> > > num.network.threads=3
> > > num.io.threads=8
> > > socket.send.buffer.bytes=1048576
> > > socket.receive.buffer.bytes=2097152
> > > socket.request.max.bytes=104857600
> > > log.dir=/data1/kafka/logs
> > > num.partitions=1
> > > log.flush.interval.messages=1000
> > > log.flush.interval.ms=300
> > > log.retention.hours=48
> > > log.retention.bytes=107374182400
> > > log.segment.bytes=536870912
> > > log.cleanup.interval.mins=1
> > > zookeeper.connect=dare-msgq00:2181,dare-msgq01:2181,dare-msgq02:2181
> > > zookeeper.connection.timeout.ms=100
> > >
> > > --
> > > Abhinav Anand
> > >
> >
>
>
>
> --
> Abhinav Anand
>


Re: Consuming from a replica

2013-11-27 Thread hsy...@gmail.com
What I did for my project is I have a thread send metadata request to a
random broker and monitor the metadata change periodically. The good thing
is, to my knowledge, any broker in the cluster know the metadata for all
the topics served in this cluster. Another options is you can always query
zookeeper to pull the latest information about the cluster


On Wed, Nov 27, 2013 at 7:57 AM, Jun Rao  wrote:

> Currently, regular consumers can only fetch from the leader replica.
> Otherwise, they will get an error in response. We allow some special
> consumers to read from follower replicas, but this is really for testing.
>
> Are you thinking of load balancing? Currently, we do load balancing across
> partitions. We assume that there are many partitions in a cluster and try
> to spread the leaders evenly among brokers. That way, even for a particular
> partition, all clients have to talk to a single broker, the overall
> workload is still balanced.
>
> Thanks,
>
> Jun
>
>
> On Wed, Nov 27, 2013 at 2:13 AM, Simon Cooper <
> simon.coo...@featurespace.co.uk> wrote:
>
> > Hi,
> >
> > I've been looking at the SimpleConsumer example, and that I've noticed
> > that it always reads from the leader, and reacts to leader changes by
> > reconnecting to the new leader. Is it possible to read from a replica in
> > ISR that's not the leader? If so, how does the consumer get notified the
> > replica it's reading from is no longer in ISR?
> >
> > Thanks,
> > SimonC
> >
>


kafka_2.8.0/0.8.0 pom seems invalid

2013-12-04 Thread hsy...@gmail.com
Hi All, I was trying to upgrade the kafka to 0.8 but I get an empty jar
file for

org.apache.kafka
kafka_2.8.0
0.8.0



However

org.apache.kafka
kafka_2.8.2
0.8.0


is good for me.

BTW from the download page I can only see kafka_2.8.0_0.8.0. Where can I
download the scala 2.10 version. Is there any difference between different
version?


Re: kafka_2.8.0/0.8.0 pom seems invalid

2013-12-04 Thread hsy...@gmail.com
Thanks Joe. If any 0.8.0 client is compatible with this release, I'll just
upgrade my client.


On Wed, Dec 4, 2013 at 1:59 PM, Joe Stein  wrote:

> There is a ticket for the 2.8.0 POM issue (I would send the link to it but
> JIRA seems to be down so don't know the ticket off the top of my head).
>
> The binary releases are only done using 2.8.0 but that really shouldn't
> matter since that is just the broker and your producers and consumers are
> communicating over the wire protocol using TCP/IP to it (so you can run
> whatever client you want with whatever language available and version of
> the language).  There have been talks about changing (using another Scala
> version to build the broker) but this will be more of a consensus for
> operating the broker in production with that Scala version and having
> confidence around that from use.
>
> I haven't looked into the 2.8.0 POM issue yet myself it could be code
> related or even part of the publishing process that bonked at some point
> (it happens).
>
> Have you tried doing a publish-local to see if that works?
>
> I was honestly not aware folks still used 2.8.0 and there have been talks
> about discontinuing that.
>
> Why do you need the 2.8.0 client jar but a 2.10 broker?
>
> Lastly if it becomes such a big issue for the community and a fix is
> required before we can get trunk released we could always role 0.8.1 from
> branch and make the trunk 0.8.2 however we should go through some due
> diligence since 0.8.1 is not really that far off and the trade off may not
> make sense.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> /
>
>
> On Wed, Dec 4, 2013 at 4:48 PM, hsy...@gmail.com  wrote:
>
> > Hi All, I was trying to upgrade the kafka to 0.8 but I get an empty jar
> > file for
> > 
> > org.apache.kafka
> > kafka_2.8.0
> > 0.8.0
> > 
> >
> >
> > However
> > 
> > org.apache.kafka
> > kafka_2.8.2
> > 0.8.0
> > 
> >
> > is good for me.
> >
> > BTW from the download page I can only see kafka_2.8.0_0.8.0. Where can I
> > download the scala 2.10 version. Is there any difference between
> different
> > version?
> >
>


Is there a way to delete partition at runtime?

2013-12-05 Thread hsy...@gmail.com
Hi guys,

I found there is a tool to add partition on the fly. My question is, is
there a way to delete a partition at runtime? Thanks!

Best,
Siyuan


build error

2015-11-16 Thread hsy...@gmail.com
I got a build error on both trunk and 0.9.0 branch

> docs/producer_config.html (No such file or directory)

Do I miss anything before build

Thanks,
Siyuan


Questions about new consumer API

2015-11-16 Thread hsy...@gmail.com
The new consumer API looks good. If I understand it correctly you can use
it like simple consumer or high-level consumer. But I have couple questions
about it's internal implementation

First of all does the consumer have any internal fetcher threads like
high-level consumer?

When you assign multiple TopicPartitions to a consumer, how many TCP
connections it establish to the brokers. Is it same as number of leader
brokers that host those partitions or just number of TopicPartitions. If
there is any leader broker change does it establish new connections/using
existing connections to fetch the data? Can it continue consuming? Also is
the connection kept until the consumer is closed?

Thanks!

Best,
Siyuan


Re: build error

2015-11-16 Thread hsy...@gmail.com
The actual thing I want to do is I want to build and install in my local
maven repository so I can include new api in my dependencies. When the
release is officially out, I can have both my code ready with the official
maven dependency

Thanks,
Siyuan

On Monday, November 16, 2015, Grant Henke  wrote:

> Hi Siyuan,
>
> My guess is that you are trying to build from a subdirectory. I have a
> minor patch available to fix this that has not been pulled in yet here:
> https://github.com/apache/kafka/pull/509
>
> In the mean time, if you need to build a subproject you can execute a
> command like the following:
> gradle clients:build
>
> Thanks,
> Grant
>
> On Mon, Nov 16, 2015 at 6:33 PM, Guozhang Wang  > wrote:
>
> > Siyuan,
> >
> > Which command did you use to build?
> >
> > Guozhang
> >
> > On Mon, Nov 16, 2015 at 4:01 PM, hsy...@gmail.com  <
> hsy...@gmail.com >
> > wrote:
> >
> > > I got a build error on both trunk and 0.9.0 branch
> > >
> > > > docs/producer_config.html (No such file or directory)
> > >
> > > Do I miss anything before build
> > >
> > > Thanks,
> > > Siyuan
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com  | twitter.com/gchenke |
> linkedin.com/in/granthenke
>


Re: build error

2015-11-17 Thread hsy...@gmail.com
I got main class not found error. So I installed gradle 2.5 and run gradle
build (not the wrapper)

On Mon, Nov 16, 2015 at 10:17 PM, Guozhang Wang  wrote:

> Did you just use "./gradlew build" in root directory?
>
> Guozhang
>
> On Mon, Nov 16, 2015 at 6:41 PM, hsy...@gmail.com 
> wrote:
>
> > The actual thing I want to do is I want to build and install in my local
> > maven repository so I can include new api in my dependencies. When the
> > release is officially out, I can have both my code ready with the
> official
> > maven dependency
> >
> > Thanks,
> > Siyuan
> >
> > On Monday, November 16, 2015, Grant Henke  wrote:
> >
> > > Hi Siyuan,
> > >
> > > My guess is that you are trying to build from a subdirectory. I have a
> > > minor patch available to fix this that has not been pulled in yet here:
> > > https://github.com/apache/kafka/pull/509
> > >
> > > In the mean time, if you need to build a subproject you can execute a
> > > command like the following:
> > > gradle clients:build
> > >
> > > Thanks,
> > > Grant
> > >
> > > On Mon, Nov 16, 2015 at 6:33 PM, Guozhang Wang  > > > wrote:
> > >
> > > > Siyuan,
> > > >
> > > > Which command did you use to build?
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Nov 16, 2015 at 4:01 PM, hsy...@gmail.com  <
> > > hsy...@gmail.com >
> > > > wrote:
> > > >
> > > > > I got a build error on both trunk and 0.9.0 branch
> > > > >
> > > > > > docs/producer_config.html (No such file or directory)
> > > > >
> > > > > Do I miss anything before build
> > > > >
> > > > > Thanks,
> > > > > Siyuan
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > Grant Henke
> > > Software Engineer | Cloudera
> > > gr...@cloudera.com  | twitter.com/gchenke |
> > > linkedin.com/in/granthenke
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: build error

2015-11-17 Thread hsy...@gmail.com
And I couldn't find wrapper jar files under the gradle folder
https://github.com/apache/kafka/tree/0.9.0/gradle



On Mon, Nov 16, 2015 at 10:17 PM, Guozhang Wang  wrote:

> Did you just use "./gradlew build" in root directory?
>
> Guozhang
>
> On Mon, Nov 16, 2015 at 6:41 PM, hsy...@gmail.com 
> wrote:
>
> > The actual thing I want to do is I want to build and install in my local
> > maven repository so I can include new api in my dependencies. When the
> > release is officially out, I can have both my code ready with the
> official
> > maven dependency
> >
> > Thanks,
> > Siyuan
> >
> > On Monday, November 16, 2015, Grant Henke  wrote:
> >
> > > Hi Siyuan,
> > >
> > > My guess is that you are trying to build from a subdirectory. I have a
> > > minor patch available to fix this that has not been pulled in yet here:
> > > https://github.com/apache/kafka/pull/509
> > >
> > > In the mean time, if you need to build a subproject you can execute a
> > > command like the following:
> > > gradle clients:build
> > >
> > > Thanks,
> > > Grant
> > >
> > > On Mon, Nov 16, 2015 at 6:33 PM, Guozhang Wang  > > > wrote:
> > >
> > > > Siyuan,
> > > >
> > > > Which command did you use to build?
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Nov 16, 2015 at 4:01 PM, hsy...@gmail.com  <
> > > hsy...@gmail.com >
> > > > wrote:
> > > >
> > > > > I got a build error on both trunk and 0.9.0 branch
> > > > >
> > > > > > docs/producer_config.html (No such file or directory)
> > > > >
> > > > > Do I miss anything before build
> > > > >
> > > > > Thanks,
> > > > > Siyuan
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > Grant Henke
> > > Software Engineer | Cloudera
> > > gr...@cloudera.com  | twitter.com/gchenke |
> > > linkedin.com/in/granthenke
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Questions about new consumer API

2015-11-17 Thread hsy...@gmail.com
Thanks Guozhang,

Maybe I should give a few words about what I'm going to achieve with new API

Currently, I'm building a new kafka connector for Apache Apex(
http://apex.incubator.apache.org/) using 0.9.0 API
Apex support dynamic partition, so in the old version, We manage all the
consumer partitions in either 1:1 strategy (each consumer process consumes
only from one kafka partition) or 1:n strategy (each consumer process could
consume from multiple kafka partitions, using round-robin to distribute)
And we also have separate thread to monitor topic metadata change(leader
broker change, new partition added, using internal API like ZkUtil etc)
and do dynamic partition based on that(for example auto-reconnect to new
leader broker, create new partition to consume from new kafka partition at
runtime).  You can see High-level consumer doesn't work(It can only balance
between existing consumers unless you manually add new one)  I'm thinking
if the new consumer could be used to save some work we did before.

I'm still confused with assign() and subscribe().  My understanding is if
you use assign() only, the consumer becomes more like a simple consumer
except if the leader broker changes it automatically reconnect to the new
leader broker, is it correct?   If you use subscribe() method only then all
the partitions will be distributed to running consumer process with same "
group.id" using "partition.assignment.strategy". Is it true?

So I assume assign() and subscribe()(and group.id
partition.assignment.strategy settings) can not be used together?

Also in the old API we found one thread per broker is the most efficient
way to consume data, for example, if one process consumes from p1, p2, p3
and p1,p2 are sitting on one broker b1, p3 is sitting on another one b2,
the best thing is create 2 threads each thread use simple consumer API and
only consume from one broker.  I'm thinking how do I use the new API to do
this.

Thanks,
Siyuan

On Mon, Nov 16, 2015 at 4:43 PM, Guozhang Wang  wrote:

> Hi Siyuan,
>
> 1) new consumer is single-threaded, it does not maintain any internal
> threads as the old high-level consumer.
>
> 2) each consumer will only maintain one TCP connection with each broker.
> The only extra socket is the one with its coordinator. That is, if there is
> three brokers S1, S2, S3, and S1 is the coordinator for this consumer, it
> will maintain 4 sockets in total, 2 for S1 (one for fetching, one for
> coordinating) and 1 for S2 and S3 (only for fetching).
>
> 3) Currently the connection is not closed by consumer, although the
> underlying network client / selector will close idle ones after some
> timeout. So in worst case it will only maintain N+1 sockets in total for N
> Kafka brokers at one time.
>
> Guozhang
>
> On Mon, Nov 16, 2015 at 4:22 PM, hsy...@gmail.com 
> wrote:
>
> > The new consumer API looks good. If I understand it correctly you can use
> > it like simple consumer or high-level consumer. But I have couple
> questions
> > about it's internal implementation
> >
> > First of all does the consumer have any internal fetcher threads like
> > high-level consumer?
> >
> > When you assign multiple TopicPartitions to a consumer, how many TCP
> > connections it establish to the brokers. Is it same as number of leader
> > brokers that host those partitions or just number of TopicPartitions. If
> > there is any leader broker change does it establish new connections/using
> > existing connections to fetch the data? Can it continue consuming? Also
> is
> > the connection kept until the consumer is closed?
> >
> > Thanks!
> >
> > Best,
> > Siyuan
> >
>
>
>
> --
> -- Guozhang
>


Re: Questions about new consumer API

2015-11-17 Thread hsy...@gmail.com
By efficiency, I mean maximize throughput while minimize resources on both
broker sides and consumer sides.

One example is if you have over 200 partitions on 10 brokers and you can
start 5 consumer processes to consume data, if each one is single-thread
and you do round-robin to distribute the load then each one will try to
fetch from over 40 partitions one by one through 10 connections
possibly(overall is 50),  but if it's smart enough to group partitions by
brokers, each process can have 2 separate threads(consuming from 2
different brokers concurrently). That seems a more optimal solution than
another, right?

On Tue, Nov 17, 2015 at 2:54 PM, Jason Gustafson  wrote:

> Hi Siyuan,
>
> Your understanding about assign/subscribe is correct. We think of topic
> subscription as enabling automatic assignment as opposed to doing manual
> assignment through assign(). We don't currently them to be mixed.
>
> Can you elaborate on your findings with respect to using one thread per
> broker? In what sense was it more efficient? Doing the same thing might be
> tricky with the new consumer, but I think you could do it using
> partitionsFor() to find the current partition leaders and assign() to set
> the assignment in each thread.
>
> -Jason
>
> On Tue, Nov 17, 2015 at 10:25 AM, hsy...@gmail.com 
> wrote:
>
> > Thanks Guozhang,
> >
> > Maybe I should give a few words about what I'm going to achieve with new
> > API
> >
> > Currently, I'm building a new kafka connector for Apache Apex(
> > http://apex.incubator.apache.org/) using 0.9.0 API
> > Apex support dynamic partition, so in the old version, We manage all the
> > consumer partitions in either 1:1 strategy (each consumer process
> consumes
> > only from one kafka partition) or 1:n strategy (each consumer process
> could
> > consume from multiple kafka partitions, using round-robin to distribute)
> > And we also have separate thread to monitor topic metadata change(leader
> > broker change, new partition added, using internal API like ZkUtil etc)
> > and do dynamic partition based on that(for example auto-reconnect to new
> > leader broker, create new partition to consume from new kafka partition
> at
> > runtime).  You can see High-level consumer doesn't work(It can only
> balance
> > between existing consumers unless you manually add new one)  I'm thinking
> > if the new consumer could be used to save some work we did before.
> >
> > I'm still confused with assign() and subscribe().  My understanding is if
> > you use assign() only, the consumer becomes more like a simple consumer
> > except if the leader broker changes it automatically reconnect to the new
> > leader broker, is it correct?   If you use subscribe() method only then
> all
> > the partitions will be distributed to running consumer process with same
> "
> > group.id" using "partition.assignment.strategy". Is it true?
> >
> > So I assume assign() and subscribe()(and group.id
> > partition.assignment.strategy settings) can not be used together?
> >
> > Also in the old API we found one thread per broker is the most efficient
> > way to consume data, for example, if one process consumes from p1, p2, p3
> > and p1,p2 are sitting on one broker b1, p3 is sitting on another one b2,
> > the best thing is create 2 threads each thread use simple consumer API
> and
> > only consume from one broker.  I'm thinking how do I use the new API to
> do
> > this.
> >
> > Thanks,
> > Siyuan
> >
> > On Mon, Nov 16, 2015 at 4:43 PM, Guozhang Wang 
> wrote:
> >
> > > Hi Siyuan,
> > >
> > > 1) new consumer is single-threaded, it does not maintain any internal
> > > threads as the old high-level consumer.
> > >
> > > 2) each consumer will only maintain one TCP connection with each
> broker.
> > > The only extra socket is the one with its coordinator. That is, if
> there
> > is
> > > three brokers S1, S2, S3, and S1 is the coordinator for this consumer,
> it
> > > will maintain 4 sockets in total, 2 for S1 (one for fetching, one for
> > > coordinating) and 1 for S2 and S3 (only for fetching).
> > >
> > > 3) Currently the connection is not closed by consumer, although the
> > > underlying network client / selector will close idle ones after some
> > > timeout. So in worst case it will only maintain N+1 sockets in total
> for
> > N
> > > Kafka brokers at one time.
> > >
> > > Guozhang
> > >
> > > On Mon, Nov 16, 2015 at 4:22 PM, hsy...@gmail.com 
&

Re: Questions about new consumer API

2015-11-18 Thread hsy...@gmail.com
That sounds like a good suggestion. I'm actually looking at the code and I
will start another thread for questions about that.

On Tue, Nov 17, 2015 at 5:42 PM, Jason Gustafson  wrote:

> Thanks for the explanation. Certainly you'd use less connections with this
> approach, but it might be worthwhile to do some performance analysis to see
> whether there is much difference in throughput (I'd be interested in seeing
> these results myself). Another approach that might be interesting would be
> to implement your own partition assignor which took into account the
> leaders of each partition. Then you could just use subscribe() and let
> Kafka manage the group for you. This is similar to how we were thinking of
> implementing consumer rack-awareness.
>
> -Jason
>
> On Tue, Nov 17, 2015 at 4:04 PM, hsy...@gmail.com 
> wrote:
>
> > By efficiency, I mean maximize throughput while minimize resources on
> both
> > broker sides and consumer sides.
> >
> > One example is if you have over 200 partitions on 10 brokers and you can
> > start 5 consumer processes to consume data, if each one is single-thread
> > and you do round-robin to distribute the load then each one will try to
> > fetch from over 40 partitions one by one through 10 connections
> > possibly(overall is 50),  but if it's smart enough to group partitions by
> > brokers, each process can have 2 separate threads(consuming from 2
> > different brokers concurrently). That seems a more optimal solution than
> > another, right?
> >
> > On Tue, Nov 17, 2015 at 2:54 PM, Jason Gustafson 
> > wrote:
> >
> > > Hi Siyuan,
> > >
> > > Your understanding about assign/subscribe is correct. We think of topic
> > > subscription as enabling automatic assignment as opposed to doing
> manual
> > > assignment through assign(). We don't currently them to be mixed.
> > >
> > > Can you elaborate on your findings with respect to using one thread per
> > > broker? In what sense was it more efficient? Doing the same thing might
> > be
> > > tricky with the new consumer, but I think you could do it using
> > > partitionsFor() to find the current partition leaders and assign() to
> set
> > > the assignment in each thread.
> > >
> > > -Jason
> > >
> > > On Tue, Nov 17, 2015 at 10:25 AM, hsy...@gmail.com 
> > > wrote:
> > >
> > > > Thanks Guozhang,
> > > >
> > > > Maybe I should give a few words about what I'm going to achieve with
> > new
> > > > API
> > > >
> > > > Currently, I'm building a new kafka connector for Apache Apex(
> > > > http://apex.incubator.apache.org/) using 0.9.0 API
> > > > Apex support dynamic partition, so in the old version, We manage all
> > the
> > > > consumer partitions in either 1:1 strategy (each consumer process
> > > consumes
> > > > only from one kafka partition) or 1:n strategy (each consumer process
> > > could
> > > > consume from multiple kafka partitions, using round-robin to
> > distribute)
> > > > And we also have separate thread to monitor topic metadata
> > change(leader
> > > > broker change, new partition added, using internal API like ZkUtil
> etc)
> > > > and do dynamic partition based on that(for example auto-reconnect to
> > new
> > > > leader broker, create new partition to consume from new kafka
> partition
> > > at
> > > > runtime).  You can see High-level consumer doesn't work(It can only
> > > balance
> > > > between existing consumers unless you manually add new one)  I'm
> > thinking
> > > > if the new consumer could be used to save some work we did before.
> > > >
> > > > I'm still confused with assign() and subscribe().  My understanding
> is
> > if
> > > > you use assign() only, the consumer becomes more like a simple
> consumer
> > > > except if the leader broker changes it automatically reconnect to the
> > new
> > > > leader broker, is it correct?   If you use subscribe() method only
> then
> > > all
> > > > the partitions will be distributed to running consumer process with
> > same
> > > "
> > > > group.id" using "partition.assignment.strategy". Is it true?
> > > >
> > > > So I assume assign() and subscribe()(and group.id
> > > > partition.assignment.strategy settings) can not be used together?
>

Q about PartitionAssignor

2015-11-18 Thread hsy...@gmail.com
Hey guys,

I saw the PartitionAssignor is not in public doc API and the package name
is internals.

Does it mean this API is not stable and could be changed even in minor
release?

And in the assign method signature, the key for the "subscription" map is
memberId, what is memberId, can I manually set the id to identify member?
I want to do some sticky assignment.


Thanks!

Best,
Siyuan


Re: Q about PartitionAssignor

2015-11-18 Thread hsy...@gmail.com
Thanks Guozhang,  what is userData for in the Subscription?

On Wed, Nov 18, 2015 at 12:05 PM, Guozhang Wang  wrote:

> Currently the whole KafkaConsumer interface is tagged as "
> @InterfaceStability.Unstable", meaning that the API may change in the
> future. We have been very careful to make any dramatic public API changes
> but still cannot guarantee this will not happen.
>
> Member-Id is assigned by the server-side coordinator upon accepting the
> consumer to join the specified group, hence it cannot be manually set. But
> the memberId will not change as long as the consumer is still part of the
> members of the group, so you want to do some sticky assignment you can just
> remember the memberId -> partitions map on the consumer side in some
> persistent storage so that even when the leader who does the assignment has
> failed over other new leaders can still access the past assignment history.
>
> Guozhang
>
>
>
> On Wed, Nov 18, 2015 at 9:02 AM, hsy...@gmail.com 
> wrote:
>
> > Hey guys,
> >
> > I saw the PartitionAssignor is not in public doc API and the package name
> > is internals.
> >
> > Does it mean this API is not stable and could be changed even in minor
> > release?
> >
> > And in the assign method signature, the key for the "subscription" map is
> > memberId, what is memberId, can I manually set the id to identify member?
> > I want to do some sticky assignment.
> >
> >
> > Thanks!
> >
> > Best,
> > Siyuan
> >
>
>
>
> --
> -- Guozhang
>


Commit offsets only work for subscribe(), not assign()

2015-11-18 Thread hsy...@gmail.com
In the new API, the explicit commit offset method call only works for
subscribe consumer, not the assign consumer, correct?

Best,
Siyuan


Re: Commit offsets only work for subscribe(), not assign()

2015-11-23 Thread hsy...@gmail.com
Hey Jason,

The test I did is very simple, I was using manual assignment with it's own
groupid and clientid. I first started a process to consume data, then
produce some data, then kill the process, continue produce more data and
start the process again, I didn't see anything from the time the process
was killed.  Do I have set "auto.offset.reset" as "none"?  Thank

On Fri, Nov 20, 2015 at 3:56 PM, Jason Gustafson  wrote:

> I suppose I should have added one qualification to that. The commit API
> will not work for a consumer using manual assignment if its groupId is
> shared with another consumer using automatic assignment (with subscribe()).
> When a consumer group is active, Kafka only allows commits from members of
> that group.
>
> -Jason
>
> On Fri, Nov 20, 2015 at 3:41 PM, Jason Gustafson 
> wrote:
>
> > Hey Siyuan,
> >
> > The commit API should work the same regardless whether subscribe() or
> > assign() was used. Does this not appear to be working?
> >
> > Thanks,
> > Jason
> >
> > On Wed, Nov 18, 2015 at 4:40 PM, hsy...@gmail.com 
> > wrote:
> >
> >> In the new API, the explicit commit offset method call only works for
> >> subscribe consumer, not the assign consumer, correct?
> >>
> >> Best,
> >> Siyuan
> >>
> >
> >
>


Re: 0.9.0.0 RC4

2015-11-23 Thread hsy...@gmail.com
In http://kafka.apache.org/090/documentation.html#newconsumerconfigs
partition.assignment.strategy should string, not a list of string?

On Fri, Nov 20, 2015 at 5:21 PM, Jun Rao  wrote:

> This is the fourth candidate for release of Apache Kafka 0.9.0.0. This a
> major release that includes (1) authentication (through SSL and SASL) and
> authorization, (2) a new java consumer, (3) a Kafka connect framework for
> data ingestion and egression, and (4) quotas. Since this is a major
> release, we will give people a bit more time for trying this out.
>
> Release Notes for the 0.9.0.0 release
>
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/RELEASE_NOTES.html
>
> *** Please download, test and vote by Monday, Nov. 23, 6pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS in addition to the md5, sha1
> and sha2 (SHA256) checksum.
>
> * Release artifacts to be voted upon (source and binary):
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/
>
> * Maven artifacts to be voted upon prior to release:
> https://repository.apache.org/content/groups/staging/
>
> * scala-doc
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/scaladoc/
>
> * java-doc
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/javadoc/
>
> * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.0 tag
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=132943b0f83831132cd46ac961cf6f1c00132565
>
> * Documentation
> http://kafka.apache.org/090/documentation.html
>
> /***
>
> Thanks,
>
> Jun
>


partitionsFor method doesn't return latest partition metadata

2015-11-30 Thread hsy...@gmail.com
Hi guys,

I want to use partitionsFor method of new consumer API periodically to get
the monitor partition metadata change, It seems it only issue remote call
to the server for the first time. If I add partitions after that,
partitionsFor will return stale value. Is there a way to reuse consumer
object to refresh the metadata change. Thanks!

regards,
Siyuan


Is 0.9 new consumer API compatible with 0.8.x.x broker

2015-11-30 Thread hsy...@gmail.com
Is 0.9 new consumer API compatible with 0.8.x.x broker


Re: Kafka 0.9 consumer API question

2015-12-17 Thread hsy...@gmail.com
Hi Rajiv,

I think it makes sense to return a read-only assignments. What we can
improve here is we can have addPartition&removePartition method for
consumer.
Then we don't have to do any operations on the assignments returned by
assignment method

BTW,
I think you can implement PartitionAssignor interface to solve your use
case.
I couldn't find the javadoc for that interface but here is method you can
use

/**
 * Perform the group assignment given the member subscriptions and
current cluster metadata.
 * @param metadata Current topic/broker metadata known by consumer
 * @param subscriptions Subscriptions from all members provided through
{@link #subscription(Set)}
 * @return A map from the members to their respective assignment. This
should have one entry
 * for all members who in the input subscription map.
 */
Map assign(Cluster metadata, Map subscriptions);

The subscription map has each consumer's member id as key. It can be used
as a reference to the consumer and you can adjust the assignments there.




On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian  wrote:

> Hi Jason,
>
> The copying is not a problem in terms of performance. It's just annoying to
> write the extra code. My point with the copy is that since the client is
> already making a copy when it returns the set to me, why would it matter if
> I modify the copy. Creating an unmodifiable set on top of a copy seems
> redundant. It would be easiest for us as users to do something like this:
>
> final Set partitions = consumer.assignment();  // This
> already returns a copy of the underlying assignment, thus ensuring that the
> internal data structures are protected.
> partitions.add(myNewTopicPartition);  // This is fine to modify since
> consumer.assignment() returns a copy.
> partitions.remove(topicPartitionToBeRemoved);
> consumer.assign(partitions);
>
> Instead we have to do something like this right now.
>
> final Set partitions = consumer.assignment();  // This
> returns a copy of the underlying assignment wrapped in an UnmodifiableSet
> which seems redundant.
> final Set yetAnotherCopy = new HashSet<>(partitions);  //
> We need this copy since consumer.assignment() is unmodifiable, even though
> it is a copy.
> yetAnotherCopy.add(myNewTopicPartition);
> yetAnotherCopy.remove(topicPartitionToBeRemoved);
> List wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
> consumer.assign(wayTooManyCopies);
>
> Thanks,
> Rajiv
>
>
> On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson 
> wrote:
>
> > Hey Rajiv,
> >
> > I agree the Set/List inconsistency is a little unfortunate (another
> > annoying one is pause() which uses a vararg). I think we should probably
> > add the following variants:
> >
> > assign(Collection)
> > subscribe(Collection)
> > pause(Collection)
> >
> > I can open a JIRA to fix this. As for returning the unmodifiable set, I
> can
> > see your point, but I think it's a little dangerous for user code to
> depend
> > on being able to modify a collection returned from the API. Making it
> > immutable reduces the coupling with user code and gives us more freedom
> in
> > the future (not that we have any intention of changing the set type, but
> we
> > could). I think the way I might try to implement your use case would be
> to
> > maintain the assignment set yourself. You can make changes to that set
> and
> > always pass it to assign(), which would avoid the need to use
> assignment().
> > Also, I probably wouldn't be overly concerned about the copying overhead
> > unless profiling shows that it is actually a problem. Are your partition
> > assignments generally very large?
> >
> > -Jason
> >
> >
> > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian 
> wrote:
> >
> > > We are trying to use the Kafka 0.9 consumer API to poll specific
> > > partitions. We consume partitions based on our own logic instead of
> > > delegating that to Kafka. One of our use cases is handling a change in
> > the
> > > partitions that we consume. This means that sometimes we need to
> consume
> > > additional partitions and other times we need to stop consuming (not
> > pause
> > > but stop entirely) some of the partitions that we are currently
> polling.
> > >
> > > The semantics of the assign() call at
> > >
> > >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > is that we need to provide the entire list of subscriptions. So when we
> > > want to add or remove partitions we call the assignment() method to get
> > the
> > > existing set of TopicPartitions being polled, and then modify this set
> > and
> > > pass it back to the assign() call. However it seems weird that the
> > assign()
> > > call takes a List whereas the assignment call returns
> a
> > > Set. Further the Set returned by the method is an
> > > unmodifiable set which means to change this set we need to create a new
> > > List/Set from it and then modify the new collection. Looking at the
> code
> > 

Re: how to programatically monitor Kafka availability

2015-12-17 Thread hsy...@gmail.com
Hey Hohl,

I use *partitionsFor
*
method to monitor the partition info for particular topics



On Tue, Dec 15, 2015 at 11:27 AM, Hohl, Ken  wrote:

> We want to be able to monitor the ability to send messages to Kafka
> topics.  We want to be aware of the inability to do so before the time we
> attempt to send a message.  What we're looking for is something like a
> heartbeat.  The reason we need this is that in our deployment environment,
> Kafka and its clients will not be co-located.  As such, network issues
> could cause Kafka to not be available to its client.
>
> We've considered using Zookeeper that's already managing the Kafka cluster
> but have not been able to determine exactly how we would use it.
>
> We've also considered requesting a JMX MBean periodically and concluding
> the cluster is not accessible if we can't get the MBean from at least 1
> broker.
>
> What is the recommended way of accomplishing what we're trying to do?
>
> Thanks.
>
> Ken Hohl
> Cars.com
>
>


Where can I find the document for consumer metrics

2015-12-17 Thread hsy...@gmail.com
I can find some broker/producer metrics here
http://kafka.apache.org/documentation.html#monitoring

but where can I find consumer metrics docs

Everytime I have to log this to find out what metrics I want

MetricName [name=join-rate, group=consumer-coordinator-metrics,
description=The number of group joins per second,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@ebea716;MetricName
[name=fetch-size-avg, group=consumer-fetch-manager-metrics, description=The
average number of bytes fetched per request,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@6cb9cea;MetricName
[name=commit-latency-avg, group=consumer-coordinator-metrics,
description=The average time taken for a commit request,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@21aaca22;MetricName
[name=join-time-avg, group=consumer-coordinator-metrics, description=The
average time taken for a group rejoin,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@53bc8f72;MetricName
[name=incoming-byte-rate, group=consumer-metrics, description=Bytes/second
read off all sockets,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@34f9c3e0;MetricName
[name=bytes-consumed-rate, group=consumer-fetch-manager-metrics,
description=The average number of bytes consumed per second,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@36c7401a;MetricName
[name=response-rate, group=consumer-metrics, description=Responses received
sent per second.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@5341870e;MetricName
[name=connection-creation-rate, group=consumer-metrics, description=New
connections established per second in the window.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@9f0d8f4;MetricName
[name=fetch-rate, group=consumer-fetch-manager-metrics, description=The
number of fetch requests per second.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@23338045;MetricName
[name=join-time-max, group=consumer-coordinator-metrics, description=The
max time taken for a group rejoin,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@5cdabd4d;MetricName
[name=io-wait-ratio, group=consumer-metrics, description=The fraction of
time the I/O thread spent waiting.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@659b7186;MetricName
[name=fetch-size-max, group=consumer-fetch-manager-metrics, description=The
maximum number of bytes fetched per request,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@403a4887;MetricName
[name=assigned-partitions, group=consumer-coordinator-metrics,
description=The number of partitions currently assigned to this consumer,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@326fb802;MetricName
[name=io-time-ns-avg, group=consumer-metrics, description=The average
length of time for I/O per select call in nanoseconds.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@432b0ee3;MetricName
[name=records-consumed-rate, group=consumer-fetch-manager-metrics,
description=The average number of records consumed per second,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@3fde7b88;MetricName
[name=io-wait-time-ns-avg, group=consumer-metrics, description=The average
length of time the I/O thread spent waiting for a socket ready for reads or
writes in nanoseconds.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@5485cfd8;MetricName
[name=select-rate, group=consumer-metrics, description=Number of times the
I/O layer checked for new I/O to perform per second,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@2cbdcaf6;MetricName
[name=fetch-throttle-time-max, group=consumer-fetch-manager-metrics,
description=The maximum throttle time in ms,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@6057f36c;MetricName
[name=heartbeat-response-time-max, group=consumer-coordinator-metrics,
description=The max time taken to receive a response to a heartbeat
request,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@2e2e68de;MetricName
[name=network-io-rate, group=consumer-metrics, description=The average
number of network operations (reads or writes) on all connections per
second.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@68e6de81;MetricName
[name=fetch-latency-max, group=consumer-fetch-manager-metrics,
description=The max time taken for any fetch request.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@d1a1cf5;MetricName
[name=request-size-avg, group=consumer-metrics, description=The average
size of all requests in the window..,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@2d631f8b;MetricName
[name=commit-rate, group=consumer-coordinator-metrics, descri

Exceptions when programmatically start multiple kafka brokers

2015-12-21 Thread hsy...@gmail.com
I'm trying to start 2 brokers in my kafka ingestion unit test and I got
exception

javax.management.InstanceAlreadyExistsException:
kafka.server:type=app-info,id=0
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
at kafka.server.KafkaServer.startup(KafkaServer.scala:239)
at
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
at
org.apache.apex.malhar.kafka.KafkaOperatorTestBase.startKafkaServer(KafkaOperatorTestBase.java:133)
at
org.apache.apex.malhar.kafka.KafkaOperatorTestBase.startKafkaServer(KafkaOperatorTestBase.java:143)
at
org.apache.apex.malhar.kafka.KafkaOperatorTestBase.beforeTest(KafkaOperatorTestBase.java:175)

It is caused by JMXMetrcsReporter?
It doesn't affect any function we want, but it is annoying.
How to disable it?

Thanks!


Log level for consumer properties

2016-05-04 Thread hsy...@gmail.com
Hi,

Right now, when we initialize kafka consumer, it always log the consumer
properties at INFO level, can we put it into DEBUG level? I have to
periodically create consumer instance to just pull some metadata of some
topic and I don't want to get this noisy log.

Regards,
Siyuan


log4j setting for embedded kafka server

2016-06-24 Thread hsy...@gmail.com
Hi guys,

I start server grammatically in my application using
KafkaStatableServer.startup() method. And in the log4j.properties setting.
I add this

log4j.logger.org.apacke.kafka=WARN
log4j.logger.kafka=WARN

But I always get INFO log, Do you guys know how to enforce the log level
here? Thanks!


How do I turn off INFO log for embedded kafka server

2016-07-08 Thread hsy...@gmail.com
Hey guys,

I have some unit test that has an embedded kafka server running. I want to
skip all debug and info logs from kafka server. But having this set in
log4j.properties does work. Some INFO log still keep showing up like this:

2016-07-08 18:01:14,288 [kafka-request-handler-4] INFO
cluster.Partition info - Partition [__consumer_offsets,4] on broker 0:
No checkpointed highwatermark is found for partition
[__consumer_offsets,4]
2016-07-08 18:01:14,286 [kafka-request-handler-2] INFO
cluster.Partition info - Partition [__consumer_offsets,39] on broker
100: No checkpointed highwatermark is found for partition
[__consumer_offsets,39]
2016-07-08 18:01:14,296 [kafka-request-handler-7] INFO  log.Log info -
Completed load of log __consumer_offsets-33 with log end offset 0
2016-07-08 18:01:14,310 [kafka-request-handler-2] INFO  log.Log info -
Completed load of log __consumer_offsets-4 with log end offset 0

Here is my setting:
log4j.logger.org.apache.kafka=WARN
log4j.logger.kafka=WARN
log4j.logger.org.apache.zookeeper=WARN


Zookeeper is fine, some of the logs from kafka brokers are also
skipped, but some of the logs are still showing up


Thanks,

Siyuan


Async producer callback?

2014-05-20 Thread hsy...@gmail.com
Hi guys,

So far, is there a way to track the asyn producer callback.
My requirement is basically if all nodes of the topic goes down, can I
pause the producer and after the broker comes back online,  continue to
produce from the failure point?


Best,
Siyuan


How to get last message

2014-05-28 Thread hsy...@gmail.com
Is there a way to get the last message of a partition for a given topic?


Re: Help is processing huge data through Kafka-storm cluster

2014-06-17 Thread hsy...@gmail.com
Hi Shaikh,

I heard some throughput bottleneck of storm. It cannot really scale up with
kafka.
I recommend you to try DataTorrent platform(https://www.datatorrent.com/)

The platform itself is not open-source but it has a open-source library (
https://github.com/DataTorrent/Malhar) which contains a kafka ingestion
functions.
The library is pretty cool, it can scale up dynamically with kafka
partitions and is fully HA.

And in your case you might be able to use the platform for free.(It's free
if your application doesn't require large amount of memory)

With datatorrent platform and the open-source library I can scale my
application up to 300k/s (10 nodes, 3 replica, 1kb msg, 0.8.0 client).
I heard the performance of kafka client has been improved for 0.8.1 release
:)

Best,
Siyuan


On Sat, Jun 14, 2014 at 8:14 AM, Shaikh Ahmed  wrote:

> Hi,
>
> Daily we are downloaded 28 Million of messages and Monthly it goes up to
> 800+ million.
>
> We want to process this amount of data through our kafka and storm cluster
> and would like to store in HBase cluster.
>
> We are targeting to process one month of data in one day. Is it possible?
>
> We have setup our cluster thinking that we can process million of messages
> in one sec as mentioned on web. Unfortunately, we have ended-up with
> processing only 1200-1700 message per second.  if we continue with this
> speed than it will take min 10 days to process 30 days of data, which is
> the relevant solution in our case.
>
> I suspect that we have to change some configuration to achieve this goal.
> Looking for help from experts to support me in achieving this task.
>
> *Kafka Cluster:*
> Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of
> storage. We have total 11 nodes kafka cluster spread across these two
> servers.
>
> *Kafka Configuration:*
> producer.type=async
> compression.codec=none
> request.required.acks=-1
> serializer.class=kafka.serializer.StringEncoder
> queue.buffering.max.ms=10
> batch.num.messages=1
> queue.buffering.max.messages=10
> default.replication.factor=3
> controlled.shutdown.enable=true
> auto.leader.rebalance.enable=true
> num.network.threads=2
> num.io.threads=8
> num.partitions=4
> log.retention.hours=12
> log.segment.bytes=536870912
> log.retention.check.interval.ms=6
> log.cleaner.enable=false
>
> *Storm Cluster:*
> Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB
> of RAM and 8TB of storage. These servers are shared with hbase cluster.
>
> *Kafka spout configuration*
> kafkaConfig.bufferSizeBytes = 1024*1024*8;
> kafkaConfig.fetchSizeBytes = 1024*1024*4;
> kafkaConfig.forceFromStart = true;
>
> *Topology: StormTopology*
> Spout   - Partition: 4
> First Bolt -  parallelism hint: 6 and Num tasks: 5
> Second Bolt -  parallelism hint: 5
> Third Bolt -   parallelism hint: 3
> Fourth Bolt   -  parallelism hint: 3 and Num tasks: 4
> Fifth Bolt  -  parallelism hint: 3
> Sixth Bolt -  parallelism hint: 3
>
> *Supervisor configuration:*
>
> storm.local.dir: "/app/storm"
> storm.zookeeper.port: 2181
> storm.cluster.mode: "distributed"
> storm.local.mode.zmq: false
> supervisor.slots.ports:
> - 6700
> - 6701
> - 6702
> - 6703
> supervisor.worker.start.timeout.secs: 180
> supervisor.worker.timeout.secs: 30
> supervisor.monitor.frequency.secs: 3
> supervisor.heartbeat.frequency.secs: 5
> supervisor.enable: true
>
> storm.messaging.netty.server_worker_threads: 2
> storm.messaging.netty.client_worker_threads: 2
> storm.messaging.netty.buffer_size: 52428800 #50MB buffer
> storm.messaging.netty.max_retries: 25
> storm.messaging.netty.max_wait_ms: 1000
> storm.messaging.netty.min_wait_ms: 100
>
>
> supervisor.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
> worker.childopts: "-Xmx2048m -Djava.net.preferIPv4Stack=true"
>
>
> Please let me know if more information needed..
>
> Thanks in advance.
>
> Regards,
> Riyaz
>


Re: delete topic ?

2014-06-18 Thread hsy...@gmail.com
I'm using 0.8.1.1
I use DeleteTopicCommand to delete topic
args[0] = "--topic";
args[1] = the topic you want to delete
args[2] = "--zookeeper";
args[3] = kafkaZookeepers;
DeleteTopicCommand.main(args);

You can write your own script to delete the topic, I guess. And I think it
only deletes the entry in zookeeper

Best,
Siyuan



On Wed, Jun 18, 2014 at 9:13 AM, Mark Roberts  wrote:

> When we were in testing phase, we would either create a new topic with the
> correct details or shut the cluster down and hard kill the topic in
> zookeeper + local disk.  In prod we have the cluster configured via
> configuration management and auto create turned off.
>
> The ability to delete a topic in a live, running kafka cluster is tricky,
> and the implementations of it have been subtly incorrect (and therefore
> dangerous). I know that there is work happening around that, but haven't
> kept up with the status of it.  Maybe in 8.2? It sounds conceptually
> simpler to implement with the new metadata API.
>
> -Mark
>
> > On Jun 18, 2014, at 4:06, "Shlomi Hazan"  wrote:
> >
> > Hi,
> >
> > Doing some evaluation testing, and accidently create a queue with wrong
> > replication factor.
> >
> > Trying to delete as in:
> >
> > kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181
> --delete
> > --topic replicated-topic
> >
> > Yeilded:
> >
> > Command must include exactly one action: --list, --describe, --create or
> > -alter
> >
> > Event though this page (https://kafka.apache.org/documentation.html)
> says:
> >
> >
> >
> > And finally deleting a topic:
> >
> >> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic
> > my_topic_name
> >
> > WARNING: Delete topic functionality is beta in 0.8.1. Please report any
> bugs
> > that you encounter on the  mailing
> list or
> >  JIRA.
> >
> > Kafka does not currently support reducing the number of partitions for a
> > topic or changing the replication factor.
> >
> > What should I do?
> >
> > Shlomi
> >
>


Re: Help is processing huge data through Kafka-storm cluster

2014-06-19 Thread hsy...@gmail.com
To clarify for my last email, by 10 nodes, I mean 10 kafka partitions
distributed in 10 different brokers. In my test, datatorrent can scale up
linearly with kafka partitions without any problem. Whatever you produce to
kafka, it can easily take into your application. And I'm quite sure it can
handle much more data than kafka boundary. :)

Best,
Siyuan


On Thu, Jun 19, 2014 at 4:30 PM, Shaikh Ahmed  wrote:

> Hi All,
>
> Thanks for your valuable comments.
>
> Sure, I will give a try with Samza and Data Torrent.
>
> Meanwhile, I sharing screenshot of Storm UI. Please have a look at it.
>
> Kafka producer is able to push 35 million messages to broker in two hours
> with the of approx. 4k messages per second. On other side Storm is
> consuming with the max speed of 1100 messages per second. It means Storm is
> consuming messages 4 times slower than Kafka producing.
>
> We running these systems in production and I am bit worried about
> data loss. Kafka is pushing 35 million in 2 hours and Storm is taking 7-8
> hours to process that much amount of data. There is a lag of 6 hours which
> is very scary.
>
>  Please suggest me if I can do something to improve the performance of
> existing application before moving to new system.
>
> Thanks in advance.
>
> Regards,
> Riyaz
>
>
> On Tue, Jun 17, 2014 at 10:58 PM, Neha Narkhede 
> wrote:
>
>> Samza is an open source stream processing framework built on top of Kafka
>> and YARN. It is high throughput, scalable and has in built state
>> management
>> and fault tolerance support. Though I may be biased, it is worth taking a
>> look :-)
>>
>> Thanks,
>> Neha
>>
>>
>> On Tue, Jun 17, 2014 at 10:55 AM, Robert Rodgers 
>> wrote:
>>
>> > we have been experimenting with Samza which is also worth a look.  It's
>> > basically a topic-to-topic node on Yarn.
>> >
>> >
>> >
>> > On Jun 17, 2014, at 10:44 AM, hsy...@gmail.com wrote:
>> >
>> > > Hi Shaikh,
>> > >
>> > > I heard some throughput bottleneck of storm. It cannot really scale up
>> > with
>> > > kafka.
>> > > I recommend you to try DataTorrent platform(
>> https://www.datatorrent.com/
>> > )
>> > >
>> > > The platform itself is not open-source but it has a open-source
>> library (
>> > > https://github.com/DataTorrent/Malhar) which contains a kafka
>> ingestion
>> > > functions.
>> > > The library is pretty cool, it can scale up dynamically with kafka
>> > > partitions and is fully HA.
>> > >
>> > > And in your case you might be able to use the platform for free.(It's
>> > free
>> > > if your application doesn't require large amount of memory)
>> > >
>> > > With datatorrent platform and the open-source library I can scale my
>> > > application up to 300k/s (10 nodes, 3 replica, 1kb msg, 0.8.0 client).
>> > > I heard the performance of kafka client has been improved for 0.8.1
>> > release
>> > > :)
>> > >
>> > > Best,
>> > > Siyuan
>> > >
>> > >
>> > > On Sat, Jun 14, 2014 at 8:14 AM, Shaikh Ahmed 
>> > wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> Daily we are downloaded 28 Million of messages and Monthly it goes
>> up to
>> > >> 800+ million.
>> > >>
>> > >> We want to process this amount of data through our kafka and storm
>> > cluster
>> > >> and would like to store in HBase cluster.
>> > >>
>> > >> We are targeting to process one month of data in one day. Is it
>> > possible?
>> > >>
>> > >> We have setup our cluster thinking that we can process million of
>> > messages
>> > >> in one sec as mentioned on web. Unfortunately, we have ended-up with
>> > >> processing only 1200-1700 message per second.  if we continue with
>> this
>> > >> speed than it will take min 10 days to process 30 days of data,
>> which is
>> > >> the relevant solution in our case.
>> > >>
>> > >> I suspect that we have to change some configuration to achieve this
>> > goal.
>> > >> Looking for help from experts to support me in achieving this task.
>> > >>
>> > >> *Kafka Cluster:*
>> > >> Kafka is running on two dedicated machines with 48 GB of RAM and 2TB
&

Re: Too Many Open Files Broker Error

2014-07-09 Thread hsy...@gmail.com
I have the same problem. I didn't dig deeper but I saw this happen when I
launch kafka in daemon mode. I found the daemon mode is just launch kafka
with nohup. Not quite clear why this happen.


On Wed, Jul 9, 2014 at 9:59 AM, Lung, Paul  wrote:

> Yup. In fact, I just ran the test program again while the Kafak broker is
> still running, using the same user of course. I was able to get up to 10K
> connections with the test program. The test program uses the same java NIO
> library that the broker does. So the machine is capable of handling that
> many connections. The only issue I saw was that the NIO
> ServerSocketChannel is a bit slow at accepting connections when the total
> connection goes around 4K, but this could be due to the fact that I put
> the ServerSocketChannel in the same Selector as the 4K SocketChannels. So
> sometimes on the client side, I see:
>
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcher.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:122)
> at sun.nio.ch.IOUtil.write(IOUtil.java:93)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:352)
> at FdTest$ClientThread.run(FdTest.java:108)
>
>
> But all I have to do is sleep for a bit on the client, and then retry
> again. However, 4K does seem like a magic number, since that¹s seems to be
> the number that the Kafka broker machine can handle before it gives me the
> ³Too Many Open Files² error and eventually crashes.
>
> Paul Lung
>
> On 7/8/14, 9:29 PM, "Jun Rao"  wrote:
>
> >Does your test program run as the same user as Kafka broker?
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Tue, Jul 8, 2014 at 1:42 PM, Lung, Paul  wrote:
> >
> >> Hi Guys,
> >>
> >> I¹m seeing the following errors from the 0.8.1.1 broker. This occurs
> >>most
> >> often on the Controller machine. Then the controller process crashes,
> >>and
> >> the controller bounces to other machines, which causes those machines to
> >> crash. Looking at the file descriptors being held by the process, it¹s
> >>only
> >> around 4000 or so(looking at . There aren¹t a whole lot of connections
> >>in
> >> TIME_WAIT states, and I¹ve increased the ephemeral port range to ³16000
> >>­
> >> 64000² via "/proc/sys/net/ipv4/ip_local_port_range². I¹ve written a Java
> >> test program to see how many sockets and files I can open. The socket is
> >> definitely limited by the ephemeral port range, which was around 22K at
> >>the
> >> time. But I
> >> can open tons of files, since the open file limit of the user is set to
> >> 100K.
> >>
> >> So given that I can theoretically open 48K sockets and probably 90K
> >>files,
> >> and I only see around 4K total for the Kafka broker, I¹m really
> >>confused as
> >> to why I¹m seeing this error. Is there some internal Kafka limit that I
> >> don¹t know about?
> >>
> >> Paul Lung
> >>
> >>
> >>
> >> java.io.IOException: Too many open files
> >>
> >> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >>
> >> at
> >>
> >>sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:16
> >>3)
> >>
> >> at kafka.network.Acceptor.accept(SocketServer.scala:200)
> >>
> >> at kafka.network.Acceptor.run(SocketServer.scala:154)
> >>
> >> at java.lang.Thread.run(Thread.java:679)
> >>
> >> [2014-07-08 13:07:21,534] ERROR Error in acceptor
> >>(kafka.network.Acceptor)
> >>
> >> java.io.IOException: Too many open files
> >>
> >> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >>
> >> at
> >>
> >>sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:16
> >>3)
> >>
> >> at kafka.network.Acceptor.accept(SocketServer.scala:200)
> >>
> >> at kafka.network.Acceptor.run(SocketServer.scala:154)
> >>
> >> at java.lang.Thread.run(Thread.java:679)
> >>
> >> [2014-07-08 13:07:21,563] ERROR [ReplicaFetcherThread-3-2124488], Error
> >> for partition [bom__021active_80__32__miniactiveitem_lvs_qn,0]
> >>to
> >> broker 2124488:class kafka.common.NotLeaderForPartitionException
> >> (kafka.server.ReplicaFetcherThread)
> >>
> >> [2014-07-08 13:07:21,558] FATAL [Replica Manager on Broker 2140112]:
> >>Error
> >> writing to highwatermark file:  (kafka.server.ReplicaManager)
> >>
> >> java.io.FileNotFoundException:
> >>
> >>/ebay/cronus/software/cronusapp_home/kafka/kafka-logs/replication-offset-
> >>checkpoint.tmp
> >> (Too many open files)
> >>
> >> at java.io.FileOutputStream.open(Native Method)
> >>
> >> at java.io.FileOutputStream.(FileOutputStream.java:209)
> >>
> >> at java.io.FileOutputStream.(FileOutputStream.java:160)
> >>
> >> at java.io.FileWriter.(FileWriter.java:90)
> >>
> >> at
> >>kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> >>
> >> at
> >>
> >>kafka.server.ReplicaManager$$anonfun$checkpointH

Re: Interested in contributing to Kafka?

2014-07-16 Thread hsy...@gmail.com
Hi Jay,

I would like to take a look at the code base and maybe start working on
some jiras.

Best,
Siyuan


On Wed, Jul 16, 2014 at 3:09 PM, Jay Kreps  wrote:

> Hey All,
>
> A number of people have been submitting really nice patches recently.
>
> If you are interested in contributing and are looking for something to
> work on, or if you are contributing and are interested in ramping up
> to be a committer on the project, please let us know--we are happy to
> help you help us :-). It is often hard to know what JIRAs or projects
> would be good to work on, how hard those will be, and where to get
> started. Feel free to reach out to me, Neha, Jun, or any of the other
> committers for help with this.
>
> Cheers,
>
>  -Jay
>


Re: Interested in contributing to Kafka?

2014-07-16 Thread hsy...@gmail.com
Is there a scala API doc for the entire kafka library?


On Wed, Jul 16, 2014 at 5:34 PM, hsy...@gmail.com  wrote:

> Hi Jay,
>
> I would like to take a look at the code base and maybe start working on
> some jiras.
>
> Best,
> Siyuan
>
>
> On Wed, Jul 16, 2014 at 3:09 PM, Jay Kreps  wrote:
>
>> Hey All,
>>
>> A number of people have been submitting really nice patches recently.
>>
>> If you are interested in contributing and are looking for something to
>> work on, or if you are contributing and are interested in ramping up
>> to be a committer on the project, please let us know--we are happy to
>> help you help us :-). It is often hard to know what JIRAs or projects
>> would be good to work on, how hard those will be, and where to get
>> started. Feel free to reach out to me, Neha, Jun, or any of the other
>> committers for help with this.
>>
>> Cheers,
>>
>>  -Jay
>>
>
>


Kafka on yarn

2014-07-23 Thread hsy...@gmail.com
Hi guys,

Kafka is getting more and more popular and in most cases people run kafka
as long-term service in the cluster. Is there a discussion of running kafka
on yarn cluster which we can utilize the convenient configuration/resource
management and HA.  I think there is a big potential and requirement for
that.
I found a project https://github.com/kkasravi/kafka-yarn. But is there a
official roadmap/plan for this?

Thank you very much!

Best,
Siyuan


Re: Kafka on yarn

2014-07-23 Thread hsy...@gmail.com
Thanks guys for your knowledge. Is there any other concern on
producer/consumer side? My understanding is High level consumer  and
producer would refresh metadata of the cluster and detect the leadership
change or node failure. I guess, there shouldn't be anything worried if I
delete 1 broker and add it back from other node at run time?


On Wed, Jul 23, 2014 at 4:44 PM, Kam Kasravi 
wrote:

> Thanks Joe for the input related to Mesos as well as acknowledging the
> need for YARN to support this type of cluster allocation - long running
> services with node locality priority.
>
> Thanks Jay - That's an interesting fact that I wasn't aware of - though I
> imagine there could possibly be a long latency for the replica data to be
> transferred to the new broker (depending on #/size of partitions). It does
> open up some possibilities to restart brokers on app master restart using
> different containers  (as well as some complications if an old container
> with old data were reallocated on restart). I had used zookeeper to store
> broker locations so the app master on restart would look for this
> information and attempt to reallocate containers on these nodes.  All this
> said, would this be part of kafka or some other framework? I can see kafka
> benefitting from this at the same time kafka's appeal IMO is it's
> simplicity. Spark has chosen to include YARN within its distribution, not
> sure what the kafka team thinks.
>
>
>
> On Wednesday, July 23, 2014 4:19 PM, Jay Kreps 
> wrote:
>
>
>
> Hey Kam,
>
> It would be nice to have a way to get a failed node back with it's
> original data, but this isn't strictly necessary, it is just a good
> optimization. As long as you run with replication you can restart a
> broker elsewhere with no data, and it will restore it's state off the
> other replicas.
>
> -Jay
>
>
> On Wed, Jul 23, 2014 at 3:47 PM, Kam Kasravi
>  wrote:
> > Hi
> >
> > Kafka-on-yarn requires YARN to consistently allocate a kafka broker at a
> particular resource since the broker needs to always use its local data.
> YARN doesn't do this well, unless you provide (override) the default
> scheduler (CapacityScheduler or FairScheduler). SequenceIO did something
> along these lines for a different use case. Unfortunately replacing the
> scheduler is a global operation which would affect all App masters.
> Additionally one could argue that the broker should be run as an OS service
> and auto restarted on failure if necessary. Slider (incubating) did some of
> this groundwork but YARN still has lots of limitations in providing
> guarantees to consistently allocate a container on a particular node
> especially on appmaster restart (eg ResourceManager dies). That said, it
> might be worthwhile to enumerate all of this here with some possible
> solutions. If there is interest I could certainly list the relevant JIRA's
> along with some additional JIRA's
> >  required IMO.
> >
> > Thanks
> > Kam
> >
> >
> > On Wednesday, July 23, 2014 2:37 PM, "hsy...@gmail.com" <
> hsy...@gmail.com> wrote:
> >
> >
> >
> > Hi guys,
> >
> > Kafka is getting more and more popular and in most cases people run kafka
> > as long-term service in the cluster. Is there a discussion of running
> kafka
> > on yarn cluster which we can utilize the convenient
> configuration/resource
> > management and HA.  I think there is a big potential and requirement for
> > that.
> > I found a project https://github.com/kkasravi/kafka-yarn. But is there a
> > official roadmap/plan for this?
> >
> > Thank you very much!
> >
> > Best,
> > Siyuan
>