Re: How many partition can one single machine handle in Kafka?

2014-10-21 Thread Todd Palino
As far as the number of partitions a single broker can handle, we've set
our cap at 4000 partitions (including replicas). Above that we've seen some
performance and stability issues.

-Todd

On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She  wrote:

> hello, everyone
>
> I'm new to kafka, I'm wondering what's the max num of partition can one
> siggle machine handle in Kafka?
>
> Is there an sugeest num?
>
> Thanks.
>
> xiaobinshe
>


Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Todd Palino
The number of brokers doesn't really matter here, as far as I can tell,
because the question is about what a single broker can handle. The number
of partitions in the cluster is governed by the ability of the controller
to manage the list of partitions for the cluster, and the ability of each
broker to keep that list (to serve metadata requests). The number of
partitions on a single broker is governed by that broker's ability to
handle the messages and files on disk. That's a much more limiting factor
than what the controller can do.

-Todd

On Tue, Oct 21, 2014 at 2:52 PM, Neil Harkins  wrote:

> On Tue, Oct 21, 2014 at 2:10 PM, Todd Palino  wrote:
> > As far as the number of partitions a single broker can handle, we've set
> > our cap at 4000 partitions (including replicas). Above that we've seen
> some
> > performance and stability issues.
>
> How many brokers? I'm curious: what kinds of problems would affect
> a single broker with a large number of partitions, but not affect the
> entire cluster with even more partitions?
>


Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Todd Palino
In fact there are many more than 4000 open files. Many of our brokers run
with 28,000+ open files (regular file handles, not network connections). In
our case, we're beefing up the disk performance as much as we can by
running in a RAID-10 configuration with 14 disks.

-Todd

On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She  wrote:

> Todd,
>
> Actually I'm wondering how kafka handle so much partition, with one
> partition there is at least one file on disk, and with 4000 partition,
> there will be at least 4000 files.
>
> When all these partitions have write request, how did Kafka make the write
> operation on the disk to be sequential (which is emphasized in the design
> document of Kafka) and make sure the disk access is effective?
>
> Thank you for your reply.
>
> xiaobinshe
>
>
>
> 2014-10-22 5:10 GMT+08:00 Todd Palino :
>
> > As far as the number of partitions a single broker can handle, we've set
> > our cap at 4000 partitions (including replicas). Above that we've seen
> some
> > performance and stability issues.
> >
> > -Todd
> >
> > On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She 
> > wrote:
> >
> > > hello, everyone
> > >
> > > I'm new to kafka, I'm wondering what's the max num of partition can one
> > > siggle machine handle in Kafka?
> > >
> > > Is there an sugeest num?
> > >
> > > Thanks.
> > >
> > > xiaobinshe
> > >
> >
>


Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Todd Palino
Yeah, Jonathan, I'm the LinkedIn SRE who said that :) And Neha, up until
recently, sat 8 feet from my desk. The data from the wiki page is off a
little bit as well (we're running 14 disks now, and 64 GB systems)

So to hit the first questions, RAID 10 gives higher read performance, and
also allows you to suffer a disk failure without having to drop the entire
cluster. As Neha noted, you're going to take a hit on the rebuild, and
because of ongoing traffic in the cluster it will be for a long time (we
can easily take half a day to rebuild a disk). But you still get some
benefit out of the RAID over just killing the data and letting it rebuild
from the replica, because during that time the cluster is not under
replicated, so you can suffer another failure. The more servers and disks
you have, the more often disks are going to fail, not to mention other
components. Both hardware and software. I like running on the safer side.

That said, I'm not sure RAID 10 is the answer either. We're going to be
doing some experimenting with other disk layouts shortly. We've inherited a
lot of our architecture, and many things have changed in that time. We're
probably going to test out RAID 5 and 6 to start with and see how much we
lose from the parity calculations.

-Todd


On Wed, Oct 22, 2014 at 3:59 PM, Jonathan Weeks 
wrote:

> Neha,
>
> Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is
> definitely very painful, but less so with RAID 10.
>
> We have been using the guidance here:
>
> http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site
> Reliability Engineers state they run RAID 10 on all Kafka clusters @34:40
> or so)
>
> Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations
>
> LinkedIn
> Hardware
> We are using dual quad-core Intel Xeon machines with 24GB of memory. In
> general this should not matter too much, we only see pretty low CPU usage
> at peak even with GZIP compression enabled and a number of clients that
> don't batch requests. The memory is probably more than is needed for
> caching the active segments of the log.
> The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID
> 10 array. In general this is the performance bottleneck, and more disks is
> more better. Depending on how you configure flush behavior you may or may
> not benefit from more expensive disks (if you flush often then higher RPM
> SAS drives may be better).
> OS Settings
> We use Linux. Ext4 is the filesystem and we run using software RAID 10. We
> haven't benchmarked filesystems so other filesystems may be superior.
> We have added two tuning changes: (1) we upped the number of file
> descriptors since we have lots of topics and lots of connections, and (2)
> we upped the max socket buffer size to enable high-performance data
> transfer between data centers (described here).
>
>
> Best Regards,
>
> -Jonathan
>
>
>
> On Oct 22, 2014, at 3:44 PM, Neha Narkhede 
> wrote:
>
> > In my experience, RAID 10 doesn't really provide value in the presence of
> > replication. When a disk fails, the RAID resync process is so I/O
> intensive
> > that it renders the broker useless until it completes. When this happens,
> > you actually have to take the broker out of rotation and move the leaders
> > off of it to prevent it from serving requests in a degraded state. You
> > might as well shutdown the broker, delete the broker's data and let it
> > catch up from the leader.
> >
> > On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira 
> > wrote:
> >
> >> Makes sense. Thanks :)
> >>
> >> On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
> >>  wrote:
> >>> There are various costs when a broker fails, including broker leader
> >> election for each partition, etc., as well as exposing possible issues
> for
> >> in-flight messages, and client rebalancing etc.
> >>>
> >>> So even though replication provides partition redundancy, RAID 10 on
> >> each broker is usually a good tradeoff to prevent the typical most
> common
> >> cause of broker server failure (e.g. disk failure) as well, and overall
> >> smoother operation.
> >>>
> >>> Best Regards,
> >>>
> >>> -Jonathan
> >>>
> >>>
> >>> On Oct 22, 2014, at 11:01 AM, Gwen Shapira 
> >> wrote:
> >>>
> >>>> RAID-10?
> >>>> Interesting choice for a system where the data is already replicated
> >>>> between nodes. Is it to avoid the cost of large replication over the
> >>>> network? how large are these disks?
> >>>>

Re: How many partition can one single machine handle in Kafka?

2014-10-23 Thread Todd Palino
Your understanding of RAID 10 is slightly off. Because it is a combination
of striping and mirroring, trying to say that there are 4000 open files per
pair of disks is not accurate. The disk, as far as the system is concerned,
is the entire RAID. Files are striped across all mirrors, so any open file
will cross all 7 mirror sets.

Even if you were to operate on a single disk, you're never going to be able
to ensure sequential disk access with Kafka. Even if you have a single
partition on a disk, there will be multiple log files for that partition
and you will have to seek to read older data. What you have to do is use
multiple spindles, with sufficiently fast disk speeds, to increase your
overall IO capacity. You can also tune to get a little more. For example,
we use a 120 second commit on that mount point to reduce the frequency of
flushing to disk.

-Todd


On Wed, Oct 22, 2014 at 10:09 PM, Xiaobin She  wrote:

> Todd,
>
> Thank you for the information.
>
> With 28,000+ files and 14 disks, that makes there are averagely about 4000
> open files on two disk ( which is treated as one single disk) , am I right?
>
> How do you manage to make the all the write operation to thest 4000 open
> files be sequential to the disk?
>
> As far as I know, write operation to different files on the same disk will
> cause random write, which is not good for performance.
>
> xiaobinshe
>
>
>
>
> 2014-10-23 1:00 GMT+08:00 Todd Palino :
>
> > In fact there are many more than 4000 open files. Many of our brokers run
> > with 28,000+ open files (regular file handles, not network connections).
> In
> > our case, we're beefing up the disk performance as much as we can by
> > running in a RAID-10 configuration with 14 disks.
> >
> > -Todd
> >
> > On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She 
> wrote:
> >
> > > Todd,
> > >
> > > Actually I'm wondering how kafka handle so much partition, with one
> > > partition there is at least one file on disk, and with 4000 partition,
> > > there will be at least 4000 files.
> > >
> > > When all these partitions have write request, how did Kafka make the
> > write
> > > operation on the disk to be sequential (which is emphasized in the
> design
> > > document of Kafka) and make sure the disk access is effective?
> > >
> > > Thank you for your reply.
> > >
> > > xiaobinshe
> > >
> > >
> > >
> > > 2014-10-22 5:10 GMT+08:00 Todd Palino :
> > >
> > > > As far as the number of partitions a single broker can handle, we've
> > set
> > > > our cap at 4000 partitions (including replicas). Above that we've
> seen
> > > some
> > > > performance and stability issues.
> > > >
> > > > -Todd
> > > >
> > > > On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She 
> > > > wrote:
> > > >
> > > > > hello, everyone
> > > > >
> > > > > I'm new to kafka, I'm wondering what's the max num of partition can
> > one
> > > > > siggle machine handle in Kafka?
> > > > >
> > > > > Is there an sugeest num?
> > > > >
> > > > > Thanks.
> > > > >
> > > > > xiaobinshe
> > > > >
> > > >
> > >
> >
>


Re: How many partition can one single machine handle in Kafka?

2014-10-23 Thread Todd Palino
I've mentioned this a couple times in discussions recently as well. We were
discussing the concept of infinite retention for a certain type of service,
and how it might be accomplished. My suggestion was to have a combination
of storage types and the ability for Kafka to look for segments in two
different directory structures. This way you could expand the backend
storage as needed (which could be on an external storage appliance) while
still maintaining performance for recent segments.

I still think this is something worth pursuing at some point, and it should
be relatively easy to implement within the broker.

-Todd


On Wed, Oct 22, 2014 at 11:53 PM, Neil Harkins  wrote:

> I've been thinking about this recently.
> If kafka provided cmdline hooks to be executed on segment rotation,
> similar to postgres' wal 'archive_command', configurations could store
> only the current segments and all their random i/o on flash, then once
> rotated, copy them sequentially onto larger/slower spinning disks,
> or even S3.
>
> -neil
>
> On Wed, Oct 22, 2014 at 10:09 PM, Xiaobin She 
> wrote:
> > Todd,
> >
> > Thank you for the information.
> >
> > With 28,000+ files and 14 disks, that makes there are averagely about
> 4000
> > open files on two disk ( which is treated as one single disk) , am I
> right?
> >
> > How do you manage to make the all the write operation to thest 4000 open
> > files be sequential to the disk?
> >
> > As far as I know, write operation to different files on the same disk
> will
> > cause random write, which is not good for performance.
> >
> > xiaobinshe
> >
> >
> >
> >
> > 2014-10-23 1:00 GMT+08:00 Todd Palino :
> >
> >> In fact there are many more than 4000 open files. Many of our brokers
> run
> >> with 28,000+ open files (regular file handles, not network
> connections). In
> >> our case, we're beefing up the disk performance as much as we can by
> >> running in a RAID-10 configuration with 14 disks.
> >>
> >> -Todd
> >>
> >> On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She 
> wrote:
> >>
> >> > Todd,
> >> >
> >> > Actually I'm wondering how kafka handle so much partition, with one
> >> > partition there is at least one file on disk, and with 4000 partition,
> >> > there will be at least 4000 files.
> >> >
> >> > When all these partitions have write request, how did Kafka make the
> >> write
> >> > operation on the disk to be sequential (which is emphasized in the
> design
> >> > document of Kafka) and make sure the disk access is effective?
> >> >
> >> > Thank you for your reply.
> >> >
> >> > xiaobinshe
> >> >
> >> >
> >> >
> >> > 2014-10-22 5:10 GMT+08:00 Todd Palino :
> >> >
> >> > > As far as the number of partitions a single broker can handle, we've
> >> set
> >> > > our cap at 4000 partitions (including replicas). Above that we've
> seen
> >> > some
> >> > > performance and stability issues.
> >> > >
> >> > > -Todd
> >> > >
> >> > > On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She  >
> >> > > wrote:
> >> > >
> >> > > > hello, everyone
> >> > > >
> >> > > > I'm new to kafka, I'm wondering what's the max num of partition
> can
> >> one
> >> > > > siggle machine handle in Kafka?
> >> > > >
> >> > > > Is there an sugeest num?
> >> > > >
> >> > > > Thanks.
> >> > > >
> >> > > > xiaobinshe
> >> > > >
> >> > >
> >> >
> >>
>


Re: How many partition can one single machine handle in Kafka?

2014-10-24 Thread Todd Palino
Hmm, I haven't read the design doc lately, but I'm surprised that there's
even a discussion of sequential disk access. I suppose for small subsets of
the writes you can write larger blocks of sequential data, but that's about
the extent of it. Maybe one of the developers can speak more to that aspect.

As far as the number of files goes, it really doesn't matter that much
whether you have a few or a lot. Once you have more than one, the disk
access is random, so the performance is more like a cliff than a gentle
slope. As I said, we've found issues once we go above 4000 partitions, and
that's probably a combination of what the software can handle and the
number of open files.

-Todd


On Thu, Oct 23, 2014 at 11:19 PM, Xiaobin She  wrote:

> Todd,
>
> Thank you very much for your reply. My understanding of RAID 10 is wrong.
>
> I understand that one can not get absolute sequential disk access even on
> one single disk, the reason I'm interested with this question is that the
> design document of Kafka emphasize that Kafka make advantage of the
> sequential disk acceess to improve the disk performance, and I can' t
> understand how to achive this with thounds of open files.
>
> I thought that compare to one or fewer files, thounds of open files will
> make the disk access much more random, and make the disk performance much
> more weak.
>
> You mentioned that to increase overall IO cpapcity, one will have to use
> multiple spindles with sufficiently fast disk speed, but will it be more
> effective for the disk with fewer files? Or does the num of files is not an
> important factor for the entire performance of Kafka?
>
> Thanks again.
>
> xiaobinshe
>
>
>
> 2014-10-23 22:01 GMT+08:00 Todd Palino :
>
> > Your understanding of RAID 10 is slightly off. Because it is a
> combination
> > of striping and mirroring, trying to say that there are 4000 open files
> per
> > pair of disks is not accurate. The disk, as far as the system is
> concerned,
> > is the entire RAID. Files are striped across all mirrors, so any open
> file
> > will cross all 7 mirror sets.
> >
> > Even if you were to operate on a single disk, you're never going to be
> able
> > to ensure sequential disk access with Kafka. Even if you have a single
> > partition on a disk, there will be multiple log files for that partition
> > and you will have to seek to read older data. What you have to do is use
> > multiple spindles, with sufficiently fast disk speeds, to increase your
> > overall IO capacity. You can also tune to get a little more. For example,
> > we use a 120 second commit on that mount point to reduce the frequency of
> > flushing to disk.
> >
> > -Todd
> >
> >
> > On Wed, Oct 22, 2014 at 10:09 PM, Xiaobin She 
> > wrote:
> >
> > > Todd,
> > >
> > > Thank you for the information.
> > >
> > > With 28,000+ files and 14 disks, that makes there are averagely about
> > 4000
> > > open files on two disk ( which is treated as one single disk) , am I
> > right?
> > >
> > > How do you manage to make the all the write operation to thest 4000
> open
> > > files be sequential to the disk?
> > >
> > > As far as I know, write operation to different files on the same disk
> > will
> > > cause random write, which is not good for performance.
> > >
> > > xiaobinshe
> > >
> > >
> > >
> > >
> > > 2014-10-23 1:00 GMT+08:00 Todd Palino :
> > >
> > > > In fact there are many more than 4000 open files. Many of our brokers
> > run
> > > > with 28,000+ open files (regular file handles, not network
> > connections).
> > > In
> > > > our case, we're beefing up the disk performance as much as we can by
> > > > running in a RAID-10 configuration with 14 disks.
> > > >
> > > > -Todd
> > > >
> > > > On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She 
> > > wrote:
> > > >
> > > > > Todd,
> > > > >
> > > > > Actually I'm wondering how kafka handle so much partition, with one
> > > > > partition there is at least one file on disk, and with 4000
> > partition,
> > > > > there will be at least 4000 files.
> > > > >
> > > > > When all these partitions have write request, how did Kafka make
> the
> > > > write
> > > > > operation on the disk to be sequential (which is emphasized in the
> > > design
> > > &

Re: How many partition can one single machine handle in Kafka?

2014-10-24 Thread Todd Palino
We haven't done any testing of Kafka on SSDs, mostly because our storage
density needs are too high. Since our IO load has been fine on the current
model, we haven't pushed in that direction yet. Additionally, I haven't
done any real load testing since I got here, which is part of why we're
going to reevaluate our storage soon.

That said, we are using SSDs for the transaction log volume on our
Zookeeper nodes, with great success. We detailed some of that in the
presentation that Jonathan linked (no latency or outstanding requests). It
helps that we use very high quality SSD drives.

-Todd


On Fri, Oct 24, 2014 at 10:44 AM, Gwen Shapira 
wrote:

> Todd,
>
> Did you load-test using SSDs?
> Got numbers to share?
>
> On Fri, Oct 24, 2014 at 10:40 AM, Todd Palino  wrote:
> > Hmm, I haven't read the design doc lately, but I'm surprised that there's
> > even a discussion of sequential disk access. I suppose for small subsets
> of
> > the writes you can write larger blocks of sequential data, but that's
> about
> > the extent of it. Maybe one of the developers can speak more to that
> aspect.
> >
> > As far as the number of files goes, it really doesn't matter that much
> > whether you have a few or a lot. Once you have more than one, the disk
> > access is random, so the performance is more like a cliff than a gentle
> > slope. As I said, we've found issues once we go above 4000 partitions,
> and
> > that's probably a combination of what the software can handle and the
> > number of open files.
> >
> > -Todd
> >
> >
> > On Thu, Oct 23, 2014 at 11:19 PM, Xiaobin She 
> wrote:
> >
> >> Todd,
> >>
> >> Thank you very much for your reply. My understanding of RAID 10 is
> wrong.
> >>
> >> I understand that one can not get absolute sequential disk access even
> on
> >> one single disk, the reason I'm interested with this question is that
> the
> >> design document of Kafka emphasize that Kafka make advantage of the
> >> sequential disk acceess to improve the disk performance, and I can' t
> >> understand how to achive this with thounds of open files.
> >>
> >> I thought that compare to one or fewer files, thounds of open files will
> >> make the disk access much more random, and make the disk performance
> much
> >> more weak.
> >>
> >> You mentioned that to increase overall IO cpapcity, one will have to use
> >> multiple spindles with sufficiently fast disk speed, but will it be more
> >> effective for the disk with fewer files? Or does the num of files is
> not an
> >> important factor for the entire performance of Kafka?
> >>
> >> Thanks again.
> >>
> >> xiaobinshe
> >>
> >>
> >>
> >> 2014-10-23 22:01 GMT+08:00 Todd Palino :
> >>
> >> > Your understanding of RAID 10 is slightly off. Because it is a
> >> combination
> >> > of striping and mirroring, trying to say that there are 4000 open
> files
> >> per
> >> > pair of disks is not accurate. The disk, as far as the system is
> >> concerned,
> >> > is the entire RAID. Files are striped across all mirrors, so any open
> >> file
> >> > will cross all 7 mirror sets.
> >> >
> >> > Even if you were to operate on a single disk, you're never going to be
> >> able
> >> > to ensure sequential disk access with Kafka. Even if you have a single
> >> > partition on a disk, there will be multiple log files for that
> partition
> >> > and you will have to seek to read older data. What you have to do is
> use
> >> > multiple spindles, with sufficiently fast disk speeds, to increase
> your
> >> > overall IO capacity. You can also tune to get a little more. For
> example,
> >> > we use a 120 second commit on that mount point to reduce the
> frequency of
> >> > flushing to disk.
> >> >
> >> > -Todd
> >> >
> >> >
> >> > On Wed, Oct 22, 2014 at 10:09 PM, Xiaobin She 
> >> > wrote:
> >> >
> >> > > Todd,
> >> > >
> >> > > Thank you for the information.
> >> > >
> >> > > With 28,000+ files and 14 disks, that makes there are averagely
> about
> >> > 4000
> >> > > open files on two disk ( which is treated as one single disk) , am I
> >> > right?
> >> > >
> >>

Re: Tuning replication

2014-11-04 Thread Todd Palino
I think your answers are pretty spot-on, Joel. Under Replicated Count is
the metric that we monitor to make sure the cluster is healthy. It lets us
know when a broker is down (because all the numbers except one broker are
elevated), or when a broker is struggling (low counts fluctuating across a
few hosts).

As far as lots of small partitions vs. a few large partitions, we prefer
the former. It means we can spread the load out over brokers more evenly.

-Todd

On Tue, Nov 4, 2014 at 10:07 AM, Joel Koshy  wrote:

> Ops-experts can share more details but here are some comments:
> >
> > * Does Kafka 'like' lots of small partitions for replication, or larger
> > ones?  ie: if I'm passing 1Gbps into a topic, will replication be happier
> > if that's one partition, or many partitions?
>
> Since you also have to account for the NIC utilization by replica
> fetches it is better to split a heavy topic into many partitions.
>
> > * How can we 'up' the priority of replication over other actions?
>
> If you do the above, this should not be necessary but you could
> increase the number of replica fetchers. (num.replica.fetchers)
>
> > * What is the most effective way to monitor the replication lag?  On
> > brokers with hundreds of partitions, the JMX data starts getting very
> > muddled and plentiful.  I'm trying to find something we can
> graph/dashboard
> > to say 'replication is in X state'.  When we look at it in aggregate, we
> > assume that 'big numbers are further behind', but then sometimes find
> > negative numbers as well?
>
> The easiest mbean to look at is the underreplicated partition count.
> This is at the broker-level so it is coarse-grained. If it is > 0 you
> can use various tools to do mbean queries to figure out which
> partition is lagging behind. Another thing you can look at is the ISR
> shrink/expand rate. If you see a lot of churn you may need to tune the
> settings that affect ISR maintenance (replica.lag.time.max.ms,
> replica.lag.max.messages).
>
>
> --
> Joel
>


Re: Tuning replication

2014-11-04 Thread Todd Palino
We have our threshold for under replicated set at anything over 2. The
reason we picked that number is because we have a cluster that tends to
take very high traffic for short periods of time, and 2 gets us around the
false positives (with a careful balance of the partitions in the cluster).
We're also holding ourselves to a fairly strict standard, so whenever we
see URP for any reason, we're investigating what's going on and resolving
it so it doesn't happen again.

Technically, we're supposed to be called for any URP alert. In reality, we
don't have any in normal operation unless we have a problem like a down
broker. If replicas are falling behind due to network congestion (or other
resource exhaustion), we balance things out, expand the cluster, or find
our problem producer or consumer and fix them.

-Todd


On Tue, Nov 4, 2014 at 12:13 PM, Todd S  wrote:

> Joel,
>
> Thanks for your input - it fits what I was thinking, so it's good
> confirmation.
>
> > The easiest mbean to look at is the underreplicated partition count.
> > This is at the broker-level so it is coarse-grained. If it is > 0 you
> > can use various tools to do mbean queries to figure out which
> > partition is lagging behind. Another thing you can look at is the ISR
> > shrink/expand rate. If you see a lot of churn you may need to tune the
> > settings that affect ISR maintenance (replica.lag.time.max.ms,
> > replica.lag.max.messages).
>
> and Todd Palino said:
>
> > Under Replicated Count is the metric that we monitor to make sure the
> > cluster is healthy.
>
> We report/alert on under replicated partitions.  what i'm trying to do
> is get away from event driven alerts to the NOC/ops people, and give
> them something qualitative (replication is {ok|a little
> behind|behind|really behind|really really behind|oh no we're doomed}
> so we know how to respond appropriately.  I don't really want ops
> folks getting called at 2am on a Saturday because a single replica is
> behind by a few thousand messages .. however I *do* want someone
> called if we're a billion messages behind.
>
> If I look at
> 'KAFKA|kafka.server|FetcherLagMetrics|ReplicaFetcherThread-.*:Value'
> , can I use that as my measure of badness/behindness?
>
>
> In a similar vein, at what point do you/Todd/others wake someone up?
> How many replicas out of sync, by how much?  What is the major concern
> point, vs 'meh, it'll catch up soon'?  I know it's likely different
> between different environments, but as I'm new to this, I'd love to
> know how others see things.
>
> Thanks!
>


Re: Using OffsetRequest to get the Head and Tail of a partition in a single request.

2014-11-18 Thread Todd Palino
It is not possible, due to how the results for the offset request are stored 
within the broker and API (as a map). You will have to do 2 requests to get 
both offsets. 

-Todd


> On Nov 18, 2014, at 8:52 PM, Thunder Stumpges  wrote:
> 
> Hey all,
> 
> We are working on a .net client, and I have a question about the 
> OffsetRequest api 
> (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI)
> 
> It seems to indicate that you would always get the "log end offset" (tail 
> offset) regardless of the input request:
> 
>"The response contains the starting offset of each segment for 
> the requested partition as well as the "log end offset" i.e. the offset of 
> the next message that would be appended to the given partition."
> 
> However from inspecting the code, and experimenting with the API, that 
> doesn't seem to be the case. We cannot seem to fetch both the head (earliest 
> offset, based on Time specified as -2L ) and Tail which we would have 
> expected based on the comment above in the protocol docs.
> 
> We can get either the earliest OR latest, but not both. We attempted to pass 
> two entries (the Array of [Partition,Time,MaxNumberOfOffsets] per the 
> protocol) for the same partition ID, one with a -1L and one with a -2L, 
> however we get only one result back, and from the code in KafkaApis.scala 
> handleOffsetRequest, it seems like there could never be multiple requests for 
> the same topic/partition.
> 
> Does anyone know if this should be possible, or if there is a work-around for 
> this? Also, should something like this go to the dev group instead?
> 
> Thanks
> Thunder
> 


Re: Increased CPU usage with 0.8.2-beta

2015-02-13 Thread Todd Palino
I'm checking into this on our side. The version we're working on jumping to
right now is not the 0.8.2 release version, but it is significantly ahead
of 0.8.1.1. We've got it deployed on one cluster and I'm making sure it's
balanced right now before I take a look at all the metrics. I'll fill in
more detail once I have it.

-Todd

On Thu, Feb 12, 2015 at 9:51 PM, Jay Kreps  wrote:

> This is a serious issue, we'll take a look.
>
> -Jay
>
> On Thu, Feb 12, 2015 at 3:19 PM, Solon Gordon  wrote:
>
> > I saw a very similar jump in CPU usage when I tried upgrading from
> 0.8.1.1
> > to 0.8.2.0 today in a test environment. The Kafka cluster there is two
> > m1.larges handling 2,000 partitions across 32 topics. CPU usage rose from
> > 40% into the 150%–190% range, and load average from under 1 to over 4.
> > Downgrading to 0.8.1.1 brought the CPU and load back to the previous
> > values.
> >
> > If there's more info that would be helpful, please let me know.
> >
> > On Thu, Feb 12, 2015 at 4:17 PM, Mathias Söderberg <
> > mathias.soederb...@gmail.com> wrote:
> >
> > > Jun,
> > >
> > > Pardon the radio silence. I booted up a new broker, created a topic
> with
> > > three (3) partitions and replication factor one (1) and used the
> > > *kafka-producer-perf-test.sh
> > > *script to generate load (using messages of roughly the same size as
> > ours).
> > > There was a slight increase in CPU usage (~5-10%) on 0.8.2.0-rc2
> compared
> > > to 0.8.1.1, but that was about it.
> > >
> > > I upgraded our staging cluster to 0.8.2.0 earlier this week or so, and
> > had
> > > to add an additional broker due to increased load after the upgrade
> (note
> > > that the incoming load on the cluster has been pretty much consistent).
> > > Since the upgrade we've been seeing an 2-3x increase in latency as
> well.
> > > I'm considering downgrading to 0.8.1.1 again to see if it resolves our
> > > issues.
> > >
> > > Best regards,
> > > Mathias
> > >
> > > On Tue Feb 03 2015 at 6:44:36 PM Jun Rao  wrote:
> > >
> > > > Mathias,
> > > >
> > > > The new hprof doesn't reveal anything new to me. We did fix the logic
> > in
> > > > using Purgatory in 0.8.2, which could potentially drive up the CPU
> > usage
> > > a
> > > > bit. To verify that, could you do your test on a single broker (with
> > > > replication factor 1) btw 0.8.1 and 0.8.2 and see if there is any
> > > > significant difference in cpu usage?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Feb 3, 2015 at 5:09 AM, Mathias Söderberg <
> > > > mathias.soederb...@gmail.com> wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > I re-ran the hprof test, for about 30 minutes again, for
> 0.8.2.0-rc2
> > > with
> > > > > the same version of snappy that 0.8.1.1 used. Attached the logs.
> > > > > Unfortunately there wasn't any improvement as the node running
> > > > 0.8.2.0-rc2
> > > > > still had a higher load and CPU usage.
> > > > >
> > > > > Best regards,
> > > > > Mathias
> > > > >
> > > > > On Tue Feb 03 2015 at 4:40:31 AM Jaikiran Pai <
> > > jai.forums2...@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> On Monday 02 February 2015 11:03 PM, Jun Rao wrote:
> > > > >> > Jaikiran,
> > > > >> >
> > > > >> > The fix you provided in probably unnecessary. The channel that
> we
> > > use
> > > > in
> > > > >> > SimpleConsumer (BlockingChannel) is configured to be blocking.
> So
> > > even
> > > > >> > though the read from the socket is in a loop, each read blocks
> if
> > > > there
> > > > >> is
> > > > >> > no bytes received from the broker. So, that shouldn't cause
> extra
> > > CPU
> > > > >> > consumption.
> > > > >> Hi Jun,
> > > > >>
> > > > >> Of course, you are right! I forgot that while reading the thread
> > dump
> > > in
> > > > >> hprof output, one has to be aware that the thread state isn't
> shown
> > > and
> > > > >> the thread need not necessarily be doing any CPU activity.
> > > > >>
> > > > >> -Jaikiran
> > > > >>
> > > > >>
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Jun
> > > > >> >
> > > > >> > On Mon, Jan 26, 2015 at 10:05 AM, Mathias Söderberg <
> > > > >> > mathias.soederb...@gmail.com> wrote:
> > > > >> >
> > > > >> >> Hi Neha,
> > > > >> >>
> > > > >> >> I sent an e-mail earlier today, but noticed now that it didn't
> > > > >> actually go
> > > > >> >> through.
> > > > >> >>
> > > > >> >> Anyhow, I've attached two files, one with output from a 10
> minute
> > > run
> > > > >> and
> > > > >> >> one with output from a 30 minute run. Realized that maybe I
> > > should've
> > > > >> done
> > > > >> >> one or two runs with 0.8.1.1 as well, but nevertheless.
> > > > >> >>
> > > > >> >> I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing
> the
> > > > same
> > > > >> CPU
> > > > >> >> usage as with the beta version (basically pegging all cores).
> If
> > I
> > > > >> manage
> > > > >> >> to find the time I'll do another run with hprof on the rc2
> > version
> > > > >> later
> > > > >> >> today.
> > > > >> >>
> > > > >> >> Best 

Re: consumer lag metric

2015-02-16 Thread Todd Palino
The reason for this is the mechanic by which each of the lags are
calculated. MaxLag (and the FetcherLagMetric) are calculated by the
consumer itself using the difference between the offset it knows it is at,
and the offset that the broker has as the end of the partition. The offset
checker, however, uses the last offset that the consumer committed.
Depending on your configuration, this is somewhere behind where the
consumer actually is. For example, if your commit interval is set to 10
minutes, the number used by the offset checker can be up to 10 minutes
behind where it actually is.

So while MaxLag may be more up to date at any given time, it's actually
less accurate. Because MaxLag relies on the consumer to report it, if the
consumer breaks, you will not see an accurate lag number. This is why when
we are checking consumer lag, we use an external process that uses the
committed consumer offsets. This allows us to catch a broken consumer, as
well as an active consumer that is just falling behind.

-Todd


On Fri, Feb 13, 2015 at 9:34 PM, tao xiao  wrote:

> Thanks Joel. But I discover that both MaxLag and FetcherLagMetrics are
> always
> much smaller than the lag shown in offset checker. any reason?
>
> On Sat, Feb 14, 2015 at 7:22 AM, Joel Koshy  wrote:
>
> > There are FetcherLagMetrics that you can take a look at. However, it
> > is probably easiest to just monitor MaxLag as that reports the maximum
> > of all the lag metrics.
> >
> > On Fri, Feb 13, 2015 at 05:03:28PM +0800, tao xiao wrote:
> > > Hi team,
> > >
> > > Is there a metric that shows the consumer lag of a particular consumer
> > > group? similar to what offset checker provides
> > >
> > > --
> > > Regards,
> > > Tao
> >
> >
>
>
> --
> Regards,
> Tao
>


Re: consumer lag metric

2015-02-17 Thread Todd Palino
In order to do that, you'll need to run it and parse the output, and then
emit it to your metrics system of choice. This is essentially what I do - I
have a monitoring application which runs every minute and pulls the offsets
for a select set of topics and consumers, and then packages up the metrics
and sends them to our internal system.

It's not ideal. We're working on a script to calculate lag efficiently for
all consumers who commit offsets to Kafka, rather than a select set.

-Todd


On Mon, Feb 16, 2015 at 12:27 AM, tao xiao  wrote:

> Thank you Todd for your detailed explanation. Currently I export all
> metrics to graphite using the reporter configuration. is there a way I can
> do similar thing with offset checker?
>
> On Mon, Feb 16, 2015 at 4:21 PM, Todd Palino  wrote:
>
> > The reason for this is the mechanic by which each of the lags are
> > calculated. MaxLag (and the FetcherLagMetric) are calculated by the
> > consumer itself using the difference between the offset it knows it is
> at,
> > and the offset that the broker has as the end of the partition. The
> offset
> > checker, however, uses the last offset that the consumer committed.
> > Depending on your configuration, this is somewhere behind where the
> > consumer actually is. For example, if your commit interval is set to 10
> > minutes, the number used by the offset checker can be up to 10 minutes
> > behind where it actually is.
> >
> > So while MaxLag may be more up to date at any given time, it's actually
> > less accurate. Because MaxLag relies on the consumer to report it, if the
> > consumer breaks, you will not see an accurate lag number. This is why
> when
> > we are checking consumer lag, we use an external process that uses the
> > committed consumer offsets. This allows us to catch a broken consumer, as
> > well as an active consumer that is just falling behind.
> >
> > -Todd
> >
> >
> > On Fri, Feb 13, 2015 at 9:34 PM, tao xiao  wrote:
> >
> > > Thanks Joel. But I discover that both MaxLag and FetcherLagMetrics are
> > > always
> > > much smaller than the lag shown in offset checker. any reason?
> > >
> > > On Sat, Feb 14, 2015 at 7:22 AM, Joel Koshy 
> wrote:
> > >
> > > > There are FetcherLagMetrics that you can take a look at. However, it
> > > > is probably easiest to just monitor MaxLag as that reports the
> maximum
> > > > of all the lag metrics.
> > > >
> > > > On Fri, Feb 13, 2015 at 05:03:28PM +0800, tao xiao wrote:
> > > > > Hi team,
> > > > >
> > > > > Is there a metric that shows the consumer lag of a particular
> > consumer
> > > > > group? similar to what offset checker provides
> > > > >
> > > > > --
> > > > > Regards,
> > > > > Tao
> > > >
> > > >
> > >
> > >
> > > --
> > > Regards,
> > > Tao
> > >
> >
>
>
>
> --
> Regards,
> Tao
>


Re: Need to understand consumer groups.

2015-02-17 Thread Todd Palino
I'm assuming from your description here that all of these topics are being
consumed by a single consumer (i.e. a single process that does something
different with each topic it sees). In general, you're going to get more
efficiency out of a single consumer instance that consumes multiple topics
than you will out of multiple consumers that each consume a single topic.
Which means that you should go with a single consumer group to describe the
topics consumed by a single consumer.

If, on the other hand, you have separate processes/threads/components that
consume each topic, you'll find that it doesn't matter much either way. In
that case I would probably go with individual groups for isolation.

-Todd


On Mon, Feb 16, 2015 at 3:30 PM, Scott Chapman  wrote:

> We have several dozen topics, each with only one topic (replication factor
> or 2).
>
> We are wanting to launch console-consumer for these in a manner that will
> support saving offsets (so they can resume where they left off if they need
> to be restarted). And I know consumer groups is the mechanism for doing
> that.
>
> My question is, should we use a single consumer-group for all the console
> consumers (we are launching one for each topic) or should be be generating
> topic-specific consumer groups?
>
> Thanks in advance!
>
> -Scott.
>


Re: cross-colo writing/reading?

2015-03-02 Thread Todd Palino
Latencies like this are one of the big reasons that we run our Kafka
clusters local to the producers and consumers. Another is network
partition. As Jeff noted, mirror maker is the way to connect them together.
Our architecture uses a local cluster in each datacenter, and then an
aggregate cluster (either in those datacenters or in another) where we
mirror all the traffic from the local clusters together into one place.
This way clients can pick a local or a global view of the data, and we can
have everyone still stay in their local Kafka clusters.

-Todd


On Fri, Feb 27, 2015 at 4:21 PM, Yang  wrote:

> we tested our new application that reads and writes to kafka.
>
> at first we found the access latency is very high. then we realized that
> it's because the client and server are in different colos. moving them
> together reduces down the access time to < 4 ms.
>
>
> I was wondering if there are any techniques/properties to set , or maybe
> enhancements to be made to the kafka code, that could cater to such
> situations ?
>
> Thanks
> Yang
>


Re: cross-colo writing/reading?

2015-03-03 Thread Todd Palino
I don't know if I'd go as far as "elegant". Functional, definitely. :)

Yang, I'm not entirely sure what you're looking for here. You can already
specify the acks setting in the producer if you do not care about
acknowledgement from your remote produce requests (setting it to 0). If you
do care, you just have to tolerate the latency. The speed of light isn't
just a good idea, it's the law.

-Todd


On Tue, Mar 3, 2015 at 3:34 PM, Jeff Schroeder 
wrote:

> Mirror maker is about separating latency and failure domains. I think it is
> a very elegant solution to a difficult problem. My suspicion is that the
> LinkedIn / Confluent team agrees.
>
> On Tue, Mar 3, 2015 at 3:50 PM, Yang  wrote:
>
> > thanks guys.
> >
> > it's just quite a lot of ops cost to setup and monitor a separate
> cluster,
> > connected through mirror maker. sometimes if I have just a single
> > producer/consumer in a new cluster, it would be more desirable to just
> > connect it directly to an existing kafka setup.
> >
> > I remember at least in an earlier version of Cassandra (about 2 years
> ago,
> > when I still actively worked on that), you could define network
> topologies
> > , and define the different acknowledgement requirements for different
> parts
> > of the topology.  if this kind of flexibility is built into the internal
> > protocol of Kafka, it would be much nicer. personally I think the mirror
> > maker is more of a patch work than a systematic design.
> >
> > On Mon, Mar 2, 2015 at 7:53 AM, Todd Palino  wrote:
> >
> > > Latencies like this are one of the big reasons that we run our Kafka
> > > clusters local to the producers and consumers. Another is network
> > > partition. As Jeff noted, mirror maker is the way to connect them
> > together.
> > > Our architecture uses a local cluster in each datacenter, and then an
> > > aggregate cluster (either in those datacenters or in another) where we
> > > mirror all the traffic from the local clusters together into one place.
> > > This way clients can pick a local or a global view of the data, and we
> > can
> > > have everyone still stay in their local Kafka clusters.
> > >
> > > -Todd
> > >
> > >
> > > On Fri, Feb 27, 2015 at 4:21 PM, Yang  wrote:
> > >
> > > > we tested our new application that reads and writes to kafka.
> > > >
> > > > at first we found the access latency is very high. then we realized
> > that
> > > > it's because the client and server are in different colos. moving
> them
> > > > together reduces down the access time to < 4 ms.
> > > >
> > > >
> > > > I was wondering if there are any techniques/properties to set , or
> > maybe
> > > > enhancements to be made to the kafka code, that could cater to such
> > > > situations ?
> > > >
> > > > Thanks
> > > > Yang
> > > >
> > >
> >
>
>
>
> --
> Jeff Schroeder
>
> Don't drink and derive, alcohol and analysis don't mix.
> http://www.digitalprognosis.com
>


Re: mapping between disk and partition

2015-03-07 Thread Todd Palino
This is one of the major issues that we have noted with using JBOD disk 
layouts, that there is no tool like partition reassignment to move partitions 
between disks.

Another is that the partition balance algorithm would need to be improved, 
allowing for better selection of a mount point than round robin allows. This 
could be handled with pluggable partition assignment schemes (see my comment on 
the KIP), but I think it also speaks to the need for compound reassignment 
schemes. For example, rack aware across brokers, and by mount point usage 
within the broker.

-Todd


> On Mar 7, 2015, at 11:18 AM, Jiangjie Qin  wrote:
> 
> I don¹t think we can specify partition to disk mapping now. All the
> partition will resides in the same directory.
> Here is a wild idea but I haven¹t tried this.
> 1. Create the topic and make sure all the log files are created.
> 2. Move each partition log directory to the disk that you want them to
> resides.
> 3. Create a link in its original path pointing to the real file.
> 
> Theoretically it should work.
> 
> Jiangjie (Becket) Qin
> 
>> On 3/7/15, 8:19 AM, "sunil kalva"  wrote:
>> 
>> please be advice on this.
>> 
>>> On Fri, Mar 6, 2015 at 2:02 AM, sunil kalva  wrote:
>>> 
>>> Hi
>>> 
>>> Can i map a specific partition to a different disk in a broker. And what
>>> is the general recommendations for disk to partition mapping for which
>>> that
>>> broker is leader. and also for replications that broker handles.
>>> 
>>> --
>>> SunilKalva
>> 
>> 
>> 
>> -- 
>> SunilKalva
> 


Re: consumer groups in python

2015-03-17 Thread Todd Palino
Yeah, this is exactly correct. The python client does not implement the
Zookeeper logic that would be needed to do a balanced consumer. While it's
certainly possible to do it (for example, Joe implemented it in Go), the
logic is non-trivial and nobody has bothered to this point. I don't think
anyone will, as the new consumer will make it much easier to implement
clients without needing to do it.

In the past, we've used an internal python module that calls a C library
underneath that does the balancing. Now we're moving to one that calls our
REST interface to Kafka, which is easier to work with. Another option that
some consumers use is to pipe messages in from the kafka-console-consumer.
This works well, but if you're not careful with stopping it you can easily
lose messages.

-Todd


On Tue, Mar 17, 2015 at 6:47 AM, Sloot, Hans-Peter <
hans-peter.sl...@atos.net> wrote:

> Thanks
>
> I just came across this https://github.com/mumrah/kafka-python/issues/112
> It says:
> That contract of one message per consumer group only works for the
> coordinated consumers which are implemented for the JVM only (i.e., Scala
> and Java clients).
>
>
> -Original Message-
> From: Steve Miller [mailto:st...@idrathernotsay.com]
> Sent: Tuesday, March 17, 2015 2:18 PM
> To: users@kafka.apache.org
> Subject: Re: consumer groups in python
>
> It's possible that I just haven't used it but I am reasonably sure that
> the python API doesn't have a way to store offsets in ZK.  You would need
> to implement something more or less compatible with what the Scala/Java API
> does, presumably.
>
> On the plus side the python API -- possibly just because in python,
> nothing is truly private (: -- exposes offsets and offset management in
> ways that those other APIs seem not to.   Seeking, say, to approximately
> 1000 messages before the current offset is no big deal in python, nor is
> fetching oldest and newest offsets for topics (e.g., if you want to alert
> if nothing is being produced, without having to fire up a consumer).  I
> have close to zero experience with anything other than the python API and
> librdkafka but judging from questions I see here those seem to be difficult
> to do in Scala or Java.  I hope to do more with those APIs soon (and in
> fact am at ScalaDays right now in part so I can attend some intro Scala
> training (-: ).
>
> -Steve
>
>
>
> > On Mar 17, 2015, at 3:54 AM, Sloot, Hans-Peter <
> hans-peter.sl...@atos.net> wrote:
> >
> > Hi,
> >
> > I wrote a small python script to consume messages from kafka.
> >
> > The consumer is defined as follows:
> > kafka = KafkaConsumer('my-replicated-topic',
> >   metadata_broker_list=['localhost:9092'],
> >   group_id='my_consumer_group',
> >   auto_commit_enable=True,
> >   auto_commit_interval_ms=30 * 1000,
> >   auto_offset_reset='smallest')
> >
> > But when I start 2 consumers simultaneously both receive all messages
> from the topic.
> > I would expect to have 1 consumer about half the number of messages and
> the other the rest.
> >
> > How can I arrange this?
> >
> > Regards Hans-Peter
> >
> > This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, Atos’ liability cannot be
> triggered for the message content. Although the sender endeavours to
> maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted. On all offers and agreements under
> which Atos Nederland B.V. supplies goods and/or services of whatever
> nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply.
> The Terms of Delivery shall be promptly submitted to you on your request.
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, Atos’ liability cannot be
> triggered for the message content. Although the sender endeavours to
> maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted. On all offers and agreements under
> which Atos Nederland B.V. supplies goods and/or services of whatever
> nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply.
> The Terms of Delivery shall be promptly submitted to you on your request.
>


Post on running Kafka at LinkedIn

2015-03-20 Thread Todd Palino
For those who are interested in detail on how we've got Kafka set up at
LinkedIn, I have just published a new posted to our Engineering blog titled
"Running Kafka at Scale"

 https://engineering.linkedin.com/kafka/running-kafka-scale

It's a general overview of our current Kafka install, tiered architecture,
audit, and the libraries we use for producers and consumers. You'll also be
seeing more posts from the SRE team here in the coming weeks on deeper
looks into both Kafka and Samza.

Additionally, I'll be giving a talk at ApacheCon next month on running
tiered Kafka architectures. If you're in Austin for that, please come by
and check it out.

-Todd


Re: Post on running Kafka at LinkedIn

2015-03-23 Thread Todd Palino
Emmanuel, if it helps, here's a little more detail on the hardware spec we
are using at the moment:

12 CPU (HT enabled)
64 GB RAM
16 x 1TB SAS drives (2 are used as a RAID-1 set for the OS, 14 are a
RAID-10 set just for the Kafka log segments)

We don't colocate any other applications with Kafka except for a couple
monitoring agents. Zookeeper runs on completely separate nodes.

I suggest starting with looking at the basics - watch the CPU, memory, and
disk IO usage on the brokers as you are testing. You're likely going to
find one of these three is the constraint. Disk IO in particular can lead
to a significant increase in produce latency as it increases even over
10-15% utilization.

-Todd


On Fri, Mar 20, 2015 at 3:41 PM, Emmanuel  wrote:

> This is why I'm confused because I'm tryign to benchmark and I see numbers
> that seem pretty low to me...8000 events/sec on 2 brokers with 3CPU each
> and 5 partitions should be way faster than this and I don't know where to
> start to debug...
> the kafka-consumer-perf-test script gives me ridiculously low numbers
> (1000 events/sec/thread)
>
> So what could be causing this?
> From: jbringhu...@linkedin.com.INVALID
> To: users@kafka.apache.org
> Subject: Re: Post on running Kafka at LinkedIn
> Date: Fri, 20 Mar 2015 22:16:29 +
>
> Keep in mind that these brokers aren't really stressed too much at any
> given time -- we need to stay ahead of the capacity curve.
> Your message throughput will really just depend on what hardware you're
> using. However, in the past, we've benchmarked at 400,000 to more than
> 800,000 messages / broker / sec, depending on configuration (
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> ).
>
> -Jon
> On Mar 20, 2015, at 3:03 PM, Emmanuel  wrote:800B
> messages / day = 9.26M messages / sec over 1100 brokers
> = ~8400 message / broker / sec
> Do I get this right?
> Trying to benchmark my own test cluster and that's what I see with 2
> brokers...Just wondering if my numbers are good or bad...
>
>
> Subject: Re: Post on running Kafka at LinkedIn
> From: cl...@kafka.guru
> Date: Fri, 20 Mar 2015 14:27:58 -0700
> To: users@kafka.apache.org
>
> Yep! We are growing :)
>
> -Clark
>
> Sent from my iPhone
>
> On Mar 20, 2015, at 2:14 PM, James Cheng  wrote:
>
> Amazing growth numbers.
>
> At the meetup on 1/27, Clark Haskins presented their Kafka usage at the
> time. It was:
>
> Bytes in: 120 TB
> Messages In: 585 million
> Bytes out: 540 TB
> Total brokers: 704
>
> In Todd's post, the current numbers:
>
> Bytes in: 175 TB (45% growth)
> Messages In: 800 billion (36% growth)
> Bytes out: 650 TB (20% growth)
> Total brokers: 1100 (56% growth)
>
> That much growth in just 2 months? Wowzers.
>
> -James
>
> On Mar 20, 2015, at 11:30 AM, James Cheng  wrote:
>
> For those who missed it:
>
> The Kafka Audit tool was also presented at the 1/27 Kafka meetup:
> http://www.meetup.com/http-kafka-apache-org/events/219626780/
>
> Recorded video is here, starting around the 40 minute mark:
> http://www.ustream.tv/recorded/58109076
>
> Slides are here:
> http://www.ustream.tv/recorded/58109076
>
> -James
>
> On Mar 20, 2015, at 9:47 AM, Todd Palino  wrote:
>
> For those who are interested in detail on how we've got Kafka set up at
> LinkedIn, I have just published a new posted to our Engineering blog titled
> "Running Kafka at Scale"
>
>   https://engineering.linkedin.com/kafka/running-kafka-scale
>
> It's a general overview of our current Kafka install, tiered architecture,
> audit, and the libraries we use for producers and consumers. You'll also be
> seeing more posts from the SRE team here in the coming weeks on deeper
> looks into both Kafka and Samza.
>
> Additionally, I'll be giving a talk at ApacheCon next month on running
> tiered Kafka architectures. If you're in Austin for that, please come by
> and check it out.
>
> -Todd
>
>
>
>


Re: kafka audit

2015-03-23 Thread Todd Palino
We've talked about it a little bit, but part of the problem is that it is
pretty well integrated into our infrastructure, and as such it's hard to
pull it out. I illustrated this a little differently than Jon did in my
latest blog post (http://engineering.linkedin.com/kafka/running-kafka-scale),
how the producer (and consumer) bits that handle audit are integrated in
our internal libraries that wrap the open source libraries. Between the
schema-registry, the publishing of the audit data back into Kafka, the
audit consumers, and the database that is needed for storing the audit
data, it gets woven in pretty tightly.

Confluent has made a start on this by releasing a stack with schemas
integrated in. This is probably a good place to start as far as building an
open source audit service.

-Todd


On Mon, Mar 23, 2015 at 12:47 AM, Navneet Gupta (Tech - BLR) <
navneet.gu...@flipkart.com> wrote:

> Are there any plans to open source the same? What alternates do we have
> here?
>
> We are building an internal auditing framework for our entire big data
> pipeline. Kafka is one of the data sources we have (ingested data).
>
> On Mon, Mar 23, 2015 at 1:03 PM, tao xiao  wrote:
>
> > Linkedin has an excellent tool that monitors lag/data loss/data
> duplication
> > and etc. Here is the reference
> >
> >
> >
> http://www.slideshare.net/JonBringhurst/kafka-audit-kafka-meetup-january-27th-2015
> >
> > it is not open sourced though.
> >
> > On Mon, Mar 23, 2015 at 3:26 PM, sunil kalva 
> > wrote:
> >
> > > Hi
> > > What is best practice for adding audit feature in kafka, Is there any
> > > framework available for enabling audit feature at producer and consumer
> > > level and any UI frameworks for monitoring.
> > >
> > > tx
> > > SunilKalva
> > >
> >
> >
> >
> > --
> > Regards,
> > Tao
> >
>
>
>
> --
> Thanks & Regards,
> Navneet Gupta
>


Re: Kafka - deployment size and topologies

2015-04-06 Thread Todd Palino
Luckily, I was just reviewing a lot of this information for my ApacheCon
talk next week. Those slides, and the video (I hope) will be published as
soon as the talk is done. I'll give you the information I have from
LinkedIn's point of view, but out of order :)

Our Kafka brokers are all the same model. We use a system with 12 CPU
cores, currently 2.6 GHz, with hyperthreading enabled. They have 64 GB of
memory, and dual 1G network interfaces that are bonded, but operating
active/passive. The systems have 16 1 TB SAS drives in them: 2 are
configured as RAID-1 for the OS, and the other 14 are configured as RAID-10
specifically for the Kafka log segments. This gives us a little under 7 TB
of useable space for message retention per broker.

On layout, we try to follow a few rules with varying consistency (we're
getting more strict over time):
- We do not colocate other applications with Kafka. It gets the entire
system to itself
- Zookeeper runs on 5 separate servers (also not colocated with other
applications). Those servers have the same CPU, memory, and network spec,
but they do not have all the disks. They do have 550GB SSD drives which are
dedicated for the ZK transaction logs.
- We try not to have more than 1 Kafka broker in a cluster in the same
rack. This is to minimize the kinds of failures that can take partitions
offline
- All Kafka producers and consumers are local to the datacenter that
the cluster is in. We use mirror maker and aggregate clusters to copy
messages between datacenters.

Our smallest cluster is currently 3 brokers, and our largest is 42. It
largely depends on how much retention we need, and how much traffic that
cluster is getting. Our clusters are separated out by the general type of
traffic: queuing, tracking, metrics, and logging. Queuing clusters are
generally the smallest, while metrics clusters are the largest (with
tracking close behind). We expand clusters based on the following loose
rules:
- Disk usage on the log segments partition should stay under 60% (we
have default 4 day retention)
- Network usage on each broker should stay under 75%
- Partition count (leader and follower combined) on each broker should
stay under 4000

As far as topic volume goes, it varies widely. We have topics that only see
a single message per minute (or less). Our largest topic by bytes has a
peak rate of about 290 Mbits/sec. Our largest topic by messages has a peak
rate of about 225k messages/sec. Note that those are in the same cluster.
When we are sizing topics (number of partitions), we use the following
guidelines:
- Have at least as many partitions as there are consumers in the
largest group
- Keep partition size on disk under 50GB per partition (better balance)
- Take into account any other application requirements (keyed messages,
specific topic counts required, etc.)

I hope this helps. I'll be covering some of this at my ApacheCon talk
(Kafka at Scale: Multi-Tier Architectures) and at the meet up that Jun has
set up at ApacheCon. If you have any questions, just ask!

-Todd


On Mon, Apr 6, 2015 at 9:35 AM, Rama Ramani  wrote:

> Hello,
>   I am trying to understand some of the common Kafka deployment
> sizes ("small", "medium", "large") and configuration to come up with a set
> of common templates for deployment on Linux. Some of the Qs to answer are:
>
> - Number of nodes in the cluster
> - Machine Specs (cpu, memory, number of disks, network etc.)
> - Speeds & Feeds of messages
> - What are some of the best practices to consider when laying out the
> clusters?
> -  Is there a sizing calculator for coming up with this?
>
> If you can please share pointers to existing materials or specific details
> of your deployment, that will be great.
>
> Regards
> Rama
>


Re: New broker ignoring retention

2015-04-06 Thread Todd Palino
I answered this in IRC, but the issue is that retention depends on the
modification time of the log segments on disk. When you copy a partition
from one broker to another, the mtime of the log segments on the new broker
will be now. That means the retention clock starts over again. This means
that your retention for those partitions will grow to 2 times what it
should be, before dropping off to what you want.

We deal with this a lot as well, which is part of why we keep a lot of
headroom on our brokers when it comes to disk space. We've considered
trying to change the mtime of the files manually after a move (we have a
separate time-series database of offsets for every partition, so we can
tell what the mtime of the file "should" be within 60 seconds), but we
haven't done any experimentation with this as to whether or not it would
actually work without problems.

-Todd


On Mon, Apr 6, 2015 at 1:10 PM, Andrew Jorgensen <
ajorgen...@twitter.com.invalid> wrote:

> I can't find any, but does anyone know of any bugs in 0.8.1.1 that would
> cause new brokers added to an existing cluster to ignore the per-topic
> configuration for retention?
>
> I had a 8 node cluster with a topic with per topic retention set
> like: Configs:retention.ms=540. I attempted to add 2 more brokers to
> the cluster today and transfer 3 of the existing partitions over to the new
> nodes. In general, the existing nodes stay around 60% disk usage but the
> new brokers start at around 60% and then fall over the course of about two
> hours down to 0%. It is unclear to me why the new brokers are ignoring the
> log retention time and seemingly keeping the logs indefinitely. Both the
> existing brokers and the new ones have the same server.properties file
> which the default log.retention.hours=24.
>
> To add the new brokers I ran the reassign-partition tool and just moved 3
> partitions from some other nodes to the new nodes, the reassignment seemed
> to complete successfully, there are 30 partitions and 10 brokers so each
> broker has 3 partitions.
>


Re: Increasing replication factor of existing topics

2015-04-07 Thread Todd Palino
The partition reassignment is started by writing a zookeeper node in the
admin tree. While it's possible to kick off the partition reassignment by
writing the zookeeper node that controls it directly, you have to be very
careful about doing this, making sure that the format is perfect and you
perform all the same checks that the command line tool does. If you're
asking if you can just change the /brokers/topics/* znodes directly, the
answer is no.

-Todd

On Tue, Apr 7, 2015 at 6:44 AM, Navneet Gupta (Tech - BLR) <
navneet.gu...@flipkart.com> wrote:

> Hi,
>
> I got a method to increase replication factor of topics here
> 
>
> However,  I was wondering if it's possible to do it by altering some nodes
> in zookeeper.
>
> Thoughts/suggestions welcome.
>
>
> --
> Thanks & Regards,
> Navneet Gupta
>


Re: Number of Partitions and Performance

2015-04-07 Thread Todd Palino
Going to stand with Jay here :)

I just posted an email yesterday about how we size clusters and topics.
Basically, have at least as many partitions as you have consumers in your
consumer group (preferably a multiple). If you want to balance it across
the cluster, also have it be a multiple of the number of brokers you have.
We tend to ignore the second one on most clusters, but we will expand a
topic (as long as it is not keyed) if the retention on disk exceeds 50 GB.
That's just a guideline we have so it's easier to balance the traffic and
move partitions around when needed.

-Todd


On Tue, Apr 7, 2015 at 10:28 AM, Jay Kreps  wrote:

> I think the blog post was giving that as an upper bound not a recommended
> size. I think that blog goes through some of the trade offs of having more
> or fewer partitions.
>
> -Jay
>
> On Tue, Apr 7, 2015 at 10:13 AM, François Méthot 
> wrote:
>
> > Hi,
> >
> >   We initially had configured our topics to have between 8 to 16
> partitions
> > each on a cluster of 10 brokers (vm with 2 cores, 16 MB ram, Few TB of
> SAN
> > Disk).
> >
> > Then I came across the rule of thump formula *100 x b x r.*
> > (
> >
> >
> http://blog.confluent.io/2015/03/12/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
> > )
> >
> > 100 x 10 brokers x 2 Replication = 2000 partitions.
> >
> > We gave it try and but our single threaded kafka producer performance
> > dropped by 80%.
> >
> > What is the benefits of having that much partitions?
> >
> > Is there any problem in the long run with using a topic with as few as 16
> > partitions?
> >
> >
> > Francois
> >
>


Re: Kafka - deployment size and topologies

2015-04-08 Thread Todd Palino
So as I noted, it really does depend on what you need. In the case of a
small number of topics, I would say to make the number of partitions be a
multiple of the number of brokers. That will balance them in the cluster,
while still giving you some freedom to have larger partition counts for
larger topics.

-Todd

On Wed, Apr 8, 2015 at 9:29 AM, Akshat Aranya  wrote:

> Thanks for the info, Todd.  This is very useful.  Please see my question
> inline:
>
> On Mon, Apr 6, 2015 at 10:24 AM, Todd Palino  wrote:
>
> >
> > - Partition count (leader and follower combined) on each broker
> should
> > stay under 4000
> >
> > As far as topic volume goes, it varies widely. We have topics that only
> see
> > a single message per minute (or less). Our largest topic by bytes has a
> > peak rate of about 290 Mbits/sec. Our largest topic by messages has a
> peak
> > rate of about 225k messages/sec. Note that those are in the same cluster.
> > When we are sizing topics (number of partitions), we use the following
> > guidelines:
> > - Have at least as many partitions as there are consumers in the
> > largest group
> > - Keep partition size on disk under 50GB per partition (better
> balance)
> > - Take into account any other application requirements (keyed
> messages,
> > specific topic counts required, etc.)
> >
> >  What would you say is a recommended configuration when you don't have
> too
> many topics?  It seems like having too many partitions is not recommended,
> but at the same time, you need more partitions to be able to utilize all
> the disks and handle the data rate, especially for high volume topics.
>
> I hope this helps. I'll be covering some of this at my ApacheCon talk
> > (Kafka at Scale: Multi-Tier Architectures) and at the meet up that Jun
> has
> > set up at ApacheCon. If you have any questions, just ask!
> >
> > -Todd
> >
> >
> > On Mon, Apr 6, 2015 at 9:35 AM, Rama Ramani 
> wrote:
> >
> > > Hello,
> > >   I am trying to understand some of the common Kafka deployment
> > > sizes ("small", "medium", "large") and configuration to come up with a
> > set
> > > of common templates for deployment on Linux. Some of the Qs to answer
> > are:
> > >
> > > - Number of nodes in the cluster
> > > - Machine Specs (cpu, memory, number of disks, network etc.)
> > > - Speeds & Feeds of messages
> > > - What are some of the best practices to consider when laying out the
> > > clusters?
> > > -  Is there a sizing calculator for coming up with this?
> > >
> > > If you can please share pointers to existing materials or specific
> > details
> > > of your deployment, that will be great.
> > >
> > > Regards
> > > Rama
> > >
> >
>


Re: Upper-bound on number of consumers

2015-04-09 Thread Todd Palino
1000s of partitions should not be a problem at all. Our largest clusters
have over 30k partitions in them without a problem (running on 40 brokers).
We've run into some issues when you have more than 4000 partitions (either
leader or replica) on a single broker, but that was on older code so there
may be less of an issue now. You'll want to keep an eye on your retention
settings, combined with the number of open file handles allowed for your
broker process. We run with the limit set to 200k right now so we have
plenty of headroom.

The 100k consumers I'm not as sure about. So we have active clusters that
have over 250k open network connections across all the brokers combined
(about 12-15k per broker), but most of those connections are producers, not
consumers. While the brokers themselves may be able to handle the number of
consumers, especially if you horizontally scale a bit and make sure you use
a high enough partition count so you don't get hot brokers, that's not
where I think you'll hit a problem. It's actually Zookeeper that will give
you the headache, and it will be hard to see it.

Zookeeper has a default limit of 1 MB as the size of the data in a znode.
This is usually fine, although some of the cluster commands like partition
moves and preferred replica election can hit it if you have a high number
of topics. What is less understood is that the list of child nodes of the
znode must ALSO fit inside that limit. So if you have 100k consumers, and
each group name is at least 10 letters long (don't forget overhead for a
list!), you'll blow the limit for the /consumers node. We actually ran into
this in one of our ZK clusters for a different application. It only only
caused ZK to fail, it caused corruption of the snapshots in the ensemble.

Now, you could conceivably up the limit in Zookeeper (you need to set it
the same on the servers and the clients of Zookeeper), but I think you're
going to run into other problems. Possibly with Zookeeper, with the amount
of traffic you'll get from those consumers, and also from Kafka itself not
handling the number of consumers well or hitting previously unknown race
conditions.

Now, as far as your model goes, I think you should rethink it a little. We
have a similar model in place that we're in the process of getting rid of
for reading metrics out of Kafka. All the servers that store metrics in RRD
files consume ALL the metrics data, and then they throw out everything that
they don't have an RRD for. It's not only inefficient, it magnifies any
increase in incoming traffic many-fold on the consume side. We nearly took
down a cluster at one point because we had a 1.5 MB/sec increase in traffic
on the produce side that turned into a 100-fold increase on the consume
side. Kafka can be part of your system, but I think you should use a layer
between Kafka and the consumers to route the messages properly if that's
the way you're going to go. A queue solution that would consume the data
out of Kafka once, and separate it out into buckets with no retention to
then be pulled by your customers.

Another solution is to use keyed partitioning, if it is possible with your
architecture, to bucket the userids into separate partitions. That way you
could have the customers just consume the bucket they are interested in. It
would require more up front work to come up with the custom partitioner,
but it would be very efficient as you move forwards.

-Todd


On Wed, Apr 8, 2015 at 7:35 PM, Ralph Caraveo  wrote:

> Hello Kafka Friends,
>
> We are considering a use-case where we'd like to have a Kafka Cluster with
> potentially 1000's of partitions using a hashed key on customer userids.
> We have heard that Kafka can support 1000's of partitions in a single
> cluster and I wanted to find out if it's reasonable to have that many
> partitions?
>
> Additionally, we'd like to have potentially 100,000's of consumers that are
> consuming a somewhat low volume of log data from these partitions.  And I'd
> also like to know if having that many consumers is reasonable with Kafka or
> recommended.
>
> The scenario would be something like we have 100,000 to 200,000 customers
> where we'd like to have their data sharded by userid into a cluster of say
> 4000 partitions.  And then we'd like to have a consumer running for each
> userid that is consuming the log data.
>
> In this scenario we'd have (assuming 100,000 userids)
>
> 100,000/4000 = 25 consumers per partition where each consumer would be
> reading each offset and ignoring whatever key is not related to the
> assigned userid that it is consuming from.
>
> My gut feeling with all of this tells me that this may not be a sound
> solution because we'd need to have a ton of file descriptors open and there
> could be a lot of overhead on Kafka managing this volume of consumers.
>
> Any guidance is appreciated...mainly I'm just looking to see if this a
> reasonable use of Kafka or if we need to go back to the drawing board.
>
> I appreciate any 

Re: Post on running Kafka at LinkedIn

2015-04-08 Thread Todd Palino
Good questions. Here are the answers...

- Yes, all brokers we run are hardware. We do not use virtual systems for
Kafka or Zookeeper

- There's a number of things we have done. I covered a lot of them last
year at ApacheCon (
http://www.slideshare.net/ToddPalino/enterprise-kafka-kafka-as-a-service).
Some of the disk tuning is changing, but we haven't settled on the final
config yet.

- We have an internal configuration system and deployment system at
LinkedIn that work together. We change configs for everything in a central
repository, and when we do deployments they are shipped out and installed.
If we make a change, we either need to wait for the next time we deploy
(current cadence is every 3 weeks), or do a manual deployment

- The number of clusters is because we have 3 major data types (tracking,
metrics, and queuing), which is in multiple datacenters, and each one is
aggregated. We also have clusters for logging in all datacenters, and we
have custom clusters for some other use cases that don't mesh well (either
for security or usage pattern reasons). In addition, we currently have 7
test clusters for development work and release testing.

- Most of the broker tuning is all the same, except for retention times
that may vary. We generally look at tuning the partition counts. For
example, one of our sets of clusters for outbound traffic from Hadoop is
tuned to have number of partitions for each topic equal to the number of
brokers, to keep the balance as close to perfect as possible.

- The largest challenge is around how many instances of mirror maker we
run, and keeping track of all of the data flows. Our overall management is
actually much simpler now, because even a couple years ago we did not have
all the configuration and deployment automation that we have now.

- We have fixed naming schemes for metrics and logs that are enforced by
the clients. Queuing topics are pretty much a free for all, but we don't
care if someone names something the same as another cluster's topic, as
queuing does not interact with other clusters. The tracking pipelines are
generally controlled by an internal data model committee that reviews and
approves schemas. We're moving towards more centralized management of topic
configurations to enable the developers to spin up services with less
friction.

- Repartitioning is all handled manually. We monitor the partition sizes on
disk and periodically review that and expand topics as needed. We're
working on automation for it, but 350k partitions with a per-partition
metric and alert makes parts of the monitoring system a little cranky :)

- Our monitoring systems are all open, so anyone can review the current
status. We have also developed a web console that shows overviews of the
various clusters and mirror makers, and provides tools like offset checking
(without needing the CLI tools). Our client libraries also expose a
standard set of metrics, so when they are instantiated inside an
application, they are automatically put into the monitoring system for the
customers.

-Todd



On Wed, Apr 8, 2015 at 10:50 AM, Todd S  wrote:

> Sorry go back this far in time, I just noticed that the list had
> replied accusing this email being spam, so I'll try again with better
> formatting...
>
> A few questions, hopefully you (and everyone) don't mind. Feel free to
> ignore any/all.. I am trying to learn what I can from people who are
> considerably larger than we are, so we don't have the same pains
> (hopefully)
>
> * Are all 1100 brokers hardware?
> * Is there any Hardware or OS tuning you've found beneficial?
> * How do you manage deploying config updates? In particular, how do
> you manage the broker restarts‎ to pickup changes?
> * Why 60 clusters? What segmentation of purpose (aside from the 2
> layers detailed in this doc) do you have?
> * Do you tune the clusters for different workloads/data types?
> * What challenges have you faced running that many clusters and nodes
> vs when you were smaller?
> * How do you manage keeping topics named nicely between clusters? (not
> conflicting) .
> * How do you manage partitioning and balancing (and rebalancing when a
> topic/partition start growing very quickly)?
> * Have you/how have you enabled your users/customers to monitor their
> data flow, or do they just trust you to let them know if there are
> issues?
>
> Thanks very much, sorry for the question dump!
>
> On Mon, Mar 23, 2015 at 9:42 AM, Todd Palino  wrote:
> > Emmanuel, if it helps, here's a little more detail on the hardware spec
> we
> > are using at the moment:
> >
> > 12 CPU (HT enabled)
> > 64 GB RAM
> > 16 x 1TB SAS drives (2 are used as a RAID-1 set for the OS, 14 are a
> > RAID-10 set just for the Kafka log segments)
> >
> > We don't colocate any other applicat

Re: Replication tools to move topics/partitions gradually

2015-05-24 Thread Todd Palino
We've built tools on top of it that both build the list based on less 
information (like "clone this broker to that one") and break it down into a 
configurable number of discrete moves so it doesn't tank the cluster.

And yes, I've finally started the process of departing them from the 
LinkedIn-specific tooling so we can release them to everyone else :)

-Todd

> On May 24, 2015, at 7:45 PM, Henry Cai  wrote:
> 
> We have a kafka cluster with 10 brokers and we are using the kafka
> replication tool (kafka-reassign-partitions.sh) when we need to add more
> brokers to the cluster.  But this tool tends to move too many
> topic/partitions around at the same time which causes instability.  Do we
> have an option to do it more slowly (e.g. move one topic/partition at a
> step) or did some one build a tool on top of 'kafka-reassign-partitions.sh'?
> 
> Another use case is when a broker node went down, do we have a tool to move
> the topic/partitions serviced by this node to the remaining nodes (and
> doing that in a fashion which doesn't cause too much instability)?


Re: Replication tools to move topics/partitions gradually

2015-05-24 Thread Todd Palino
See, this is why I should never say anything :)

The version I have right now is very limited - it only does a clone (we needed 
it for some hardware testing) and a leader balance (does it using partition 
reassignment without actually moving partitions). We have some scripts that the 
other SREs have written that need to be incorporated which do partition 
balancing by count and by size.

Let me figure out where we are going to host this, but we can probably put out 
the script we have in the next couple weeks. Then we can just iterate on it.

-Todd

> On May 24, 2015, at 8:46 PM, Henry Cai  wrote:
> 
> Todd,
> 
> This is very promising.  Do you know when will we be able to see your tools
> released to public?
> 
>> On Sun, May 24, 2015 at 7:54 PM, Todd Palino  wrote:
>> 
>> We've built tools on top of it that both build the list based on less
>> information (like "clone this broker to that one") and break it down into a
>> configurable number of discrete moves so it doesn't tank the cluster.
>> 
>> And yes, I've finally started the process of departing them from the
>> LinkedIn-specific tooling so we can release them to everyone else :)
>> 
>> -Todd
>> 
>>>> On May 24, 2015, at 7:45 PM, Henry Cai 
>>> wrote:
>>> 
>>> We have a kafka cluster with 10 brokers and we are using the kafka
>>> replication tool (kafka-reassign-partitions.sh) when we need to add more
>>> brokers to the cluster.  But this tool tends to move too many
>>> topic/partitions around at the same time which causes instability.  Do we
>>> have an option to do it more slowly (e.g. move one topic/partition at a
>>> step) or did some one build a tool on top of
>> 'kafka-reassign-partitions.sh'?
>>> 
>>> Another use case is when a broker node went down, do we have a tool to
>> move
>>> the topic/partitions serviced by this node to the remaining nodes (and
>>> doing that in a fashion which doesn't cause too much instability)?
>> 


Re: Kafka JMS metrics meaning

2015-06-02 Thread Todd Palino
Under replicated is a must. Offline partitions is also good to monitor. We also 
use the active controller metric (it's 1 or 0) in aggregate for a cluster to 
know that the controller is running somewhere. 

For more general metrics, all topics bytes in and bytes out is good. We also 
watch the leader partitions count to know when to do a preferred replica 
election. Specifically, we take the ratio of that number to the total partition 
count for the broker and keep it near 50%

Most other things, like specific request type time and 99% metrics, we 
generally only look at when we are doing performance testing or have a specific 
concern. 

-Todd

> On Jun 2, 2015, at 1:01 PM, Aditya Auradkar  
> wrote:
> 
> Number of underreplicated partitions, total request time are some good bets.
> 
> Aditya
> 
> 
> From: Otis Gospodnetic [otis.gospodne...@gmail.com]
> Sent: Tuesday, June 02, 2015 9:56 AM
> To: users@kafka.apache.org; Marina
> Subject: Re: Kafka JMS metrics meaning
> 
> Hi,
> 
>> On Tue, Jun 2, 2015 at 12:50 PM, Marina  wrote:
>> 
>> Hi,
>> I have enabled JMX_PORT for KAfka server and am trying to understand some
>> of the metrics that are being exposed. I have two questions:
>> 1. what are the best metrics to monitor to quickly spot unhealthy Kafka
>> cluster?
> 
> People lve looking at consumer lag :)
> 
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
> 
> 2. what do these metrics mean: ReplicaManager -> LeaderCount ? and
>> ReplicaManager -> PartitionCount ?I have three topics created, with one
>> partition each, and replication = 1, however the values for both of the
>> above attributes is "53" So I am not sure what the count '53' means
>> here
>> thanksMarina
>> 


[ANNOUNCE] Burrow - Consumer Lag Monitoring as a Service

2015-06-04 Thread Todd Palino
I am very happy to introduce Burrow, an application to provide Kafka
consumer status as a service. Burrow is different than just a "lag checker":

* Multiple Kafka cluster support - Burrow supports any number of Kafka
clusters in a single instance. You can also run multiple copies of Burrow
in parallel and only one of them will send out notifications.

* All consumers, all partitions - If the consumer is committing offsets to
Kafka (not Zookeeper), it will be available in Burrow automatically. Every
partition it consumes will be monitored simultaneously, avoiding the trap
of just watching the worst partition (MaxLag) or spot checking individual
topics.

* Status can be checked via HTTP request - There's an internal HTTP server
that provides topic and consumer lists, can give you the latest offsets for
a topic either from the brokers or from the consumer, and lets you check
consumer status.

* Continuously monitor groups with output via email or a call to an
external HTTP endpoint - Configure emails to send for bad groups, checked
continuously. Or you can have Burrow call an HTTP endpoint into another
system for handling alerts.

* No thresholds - Status is determined over a sliding window and does not
rely on a fixed limit. When a consumer is checked, it has a status
indicator that tells whether it is OK, a warning, or an error, and the
partitions that caused it to be bad are provided.


Burrow was created to address specific problems that LinkedIn has with
monitoring consumers, in particular wildcard consumers like mirror makers
and our audit consumers. Instead of checking offsets for specific consumers
periodically, it monitors the stream of all committed offsets
(__consumer_offsets) and continually calculates lag over a sliding window.

We welcome all feedback, comments, and contributors. This project is very
much under active development for us (we're using it in some of our
environments now, and working on getting it running everywhere to replace
our previous monitoring system).

Burrow is written in Go, published under the Apache License, and hosted on
GitHub at:
https://github.com/linkedin/Burrow

Documentation is on the GitHub wiki at:
https://github.com/linkedin/Burrow/wiki

-Todd


Re: Consumer lag lies - orphaned offsets?

2015-06-04 Thread Todd Palino
I just sent out a separate email about the project that I've been working
on, Burrow, to change the way we're monitoring consumer status.

Like Joel said, the mbeans can be a little tricky to work with. Internally
at LinkedIn, we've always depended more on using a tool like the Consumer
Offset Checker, where we check the consumer offsets in Zookeeper or Kafka,
check the broker's offsets, and calculate the difference as lag. This has
worked, but with wildcard consumers like mirror maker we had to resort to
spot checking one or two topics, since we couldn't feasibly monitor 20k or
30k partitions per consumer. We also had to set thresholds for lag, which
could generate false alerts when there is a spike in traffic, or could take
a long time to alert.

Burrow attacks the problem a little differently. It still monitors the
consumer and broker offsets, but it monitors the consumer offsets by
consuming the __consumer_offsets topic, so it checks every consumer and
every partition (instead of spot checking). It then looks at the consumer
over a sliding window (we're working with 10 offset commits, or 10 minutes,
right now) and uses a set of rules to determine whether the consumer is in
a good or bad state (described at
https://github.com/linkedin/Burrow/wiki/Consumer-Lag-Evaluation-Rules). You
can then check the consumer via an HTTP call, or monitor it continuously
and send emails and/or calls to an external HTTP endpoint (like another
alert/notification system).

It's up on GitHub at https://github.com/linkedin/Burrow if you would like
to take a look and give it a try.

-Todd


On Thu, Jun 4, 2015 at 5:52 PM, Joel Koshy  wrote:

> Hi Otis,
>
> Yes this is a limitation in the old consumer. i.e., a number of
> per-topic/partition mbeans remain even on a rebalance. Those need to
> be de-registered. So if you stop consuming from some partition after a
> rebalance that lag mbean currently remain which is why it remains
> flat.  This is a known issue.
>
> On the restart, the lag goes down to zero because - well the mbeans
> get recreated and the consumer starts fetching. If the fetch request
> reads up to the end of the log then the mbean will report zero. Your
> actual committed offset may be behind though which is why your true
> lag is > 0.
>
> The lag mbeans are useful, but have a number of limitations - it
> depends on active fetches in progress; it also does not exactly
> correspond with your actual processed (and committed) offset. The most
> reliable way to monitor application lag is to use the committed
> offsets and the current log end offsets. Todd has been doing a lot of
> interesting work in making lag monitoring less painful and can comment
> more.
>
> Joel
>
> On Thu, Jun 04, 2015 at 04:55:44PM -0400, Otis Gospodnetić wrote:
> > Hi,
> >
> > On Thu, Jun 4, 2015 at 4:26 PM, Scott Reynolds 
> wrote:
> >
> > > I believe the JMX metrics reflect the consumer PRIOR to committing
> offsets
> > > to Kafka / Zookeeper. But when you query from the command line using
> the
> > > kafka tools, you are just getting the committed offsets.
> > >
> >
> > Even if that were the case, and maybe it is, it doesn't explain why the
> > ConsumerLag in JMX often remains *completely constant*.forever...
> until
> > the consumer is restarted.  You see what I mean?
> >
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> >
> > > On Thu, Jun 4, 2015 at 1:23 PM, Otis Gospodnetic <
> > > otis.gospodne...@gmail.com
> > > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Here's something potentially useful.
> > > >
> > > > 1) Before: https://apps.sematext.com/spm-reports/s/eQ9WhLegW9 - the
> > > "flat
> > > > Lag situation"
> > > >
> > > > 2) I restarted the consumer whose lag is shown in the above graph
> > > >
> > > > 3) After restart: https://apps.sematext.com/spm-reports/s/4YGkcUP9ms
> -
> > > NO
> > > > lag at all!?
> > > >
> > > > So that 81560 Lag value that was stuck in JMX is gone.  Went down to
> 0.
> > > > Kind of makes sense - the whole consumer was restarted, consumer/java
> > > > process was restarted, everything that was in JMX got reset, and if
> there
> > > > is truly no consumer lag it makes sense that the values in JMX are 0.
> > > >
> > > > HOWEVER, is the Lag *really* always *exactly* 0?  No way.  Look what
> > > Offset
> > > > Checker shows for this one consumer:
> > > >
> > > > af_servers  spm_cluster_topic-new-cdh  18  220551962
> > > > 220586078   34116
> > > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > > > af_servers  spm_cluster_topic-new-cdh  19  161936440
> > > > 161960377   23937
> > > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > > > af_servers  spm_cluster_topic-new-cdh  20  248308642
> > > > 248340350   31708
> > > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > > > af_servers  spm_cluster_topic-new-cdh  21

Re: [ANNOUNCE] Burrow - Consumer Lag Monitoring as a Service

2015-06-09 Thread Todd Palino
For mirror maker and our audit application, we've been using
Kafka-committed offsets for some time now. We've got a few other consumers
who are using it, but we haven't actively worked on moving the bulk of them
over. It's been less critical since we put the ZK transaction logs on SSD.

And yeah, this is specific for kafka-committed offsets. I'm looking at some
options for handling Zookeeper as well, but since our goal with this was to
monitor our own infrastructure applications and move forwards, it hasn't
gotten a lot of my attention yet.

-Todd


On Tue, Jun 9, 2015 at 11:53 AM, Jason Rosenberg  wrote:

> Hi Todd,
>
> Thanks for open sourcing this, I'm excited to take a look.
>
> It looks like it's specific to offsets stored in kafka (and not zookeeper)
> correct?  I assume by that that LinkedIn is using the kafka storage now in
> production?
>
> Jason
>
> On Thu, Jun 4, 2015 at 9:43 PM, Todd Palino  wrote:
>
> > I am very happy to introduce Burrow, an application to provide Kafka
> > consumer status as a service. Burrow is different than just a "lag
> > checker":
> >
> > * Multiple Kafka cluster support - Burrow supports any number of Kafka
> > clusters in a single instance. You can also run multiple copies of Burrow
> > in parallel and only one of them will send out notifications.
> >
> > * All consumers, all partitions - If the consumer is committing offsets
> to
> > Kafka (not Zookeeper), it will be available in Burrow automatically.
> Every
> > partition it consumes will be monitored simultaneously, avoiding the trap
> > of just watching the worst partition (MaxLag) or spot checking individual
> > topics.
> >
> > * Status can be checked via HTTP request - There's an internal HTTP
> server
> > that provides topic and consumer lists, can give you the latest offsets
> for
> > a topic either from the brokers or from the consumer, and lets you check
> > consumer status.
> >
> > * Continuously monitor groups with output via email or a call to an
> > external HTTP endpoint - Configure emails to send for bad groups, checked
> > continuously. Or you can have Burrow call an HTTP endpoint into another
> > system for handling alerts.
> >
> > * No thresholds - Status is determined over a sliding window and does not
> > rely on a fixed limit. When a consumer is checked, it has a status
> > indicator that tells whether it is OK, a warning, or an error, and the
> > partitions that caused it to be bad are provided.
> >
> >
> > Burrow was created to address specific problems that LinkedIn has with
> > monitoring consumers, in particular wildcard consumers like mirror makers
> > and our audit consumers. Instead of checking offsets for specific
> consumers
> > periodically, it monitors the stream of all committed offsets
> > (__consumer_offsets) and continually calculates lag over a sliding
> window.
> >
> > We welcome all feedback, comments, and contributors. This project is very
> > much under active development for us (we're using it in some of our
> > environments now, and working on getting it running everywhere to replace
> > our previous monitoring system).
> >
> > Burrow is written in Go, published under the Apache License, and hosted
> on
> > GitHub at:
> > https://github.com/linkedin/Burrow
> >
> > Documentation is on the GitHub wiki at:
> > https://github.com/linkedin/Burrow/wiki
> >
> > -Todd
> >
>


Re: How to manage the consumer group id?

2015-06-10 Thread Todd Palino
For us, group ID is a configuration parameter of the application. So we
store it in configuration files (generally on disk) and maintain it there
through our configuration and deployment infrastructure. As you pointed
out, hard coding the group ID into the application is not usually a good
pattern.

If you want to reset, you have a couple choices. One is that you can just
switch group names and start fresh. Another is that you can shut down the
consumer and delete the existing consumer group, then restart. You could
also stop, edit the offsets to set them to something specific (if you need
to roll back to a specific point, for example), and restart.

-Todd


On Wed, Jun 10, 2015 at 1:20 PM, James Cheng  wrote:

> Hi,
>
> How are people specifying/persisting/resetting the consumer group
> identifier ("group.id") when using the high-level consumer?
>
> I understand how it works. I specify some string and all consumers that
> use that same string will help consume a topic. The partitions will be
> distributed amongst them for consumption. And when they save their offsets,
> the offsets will be saved according to the consumer group. That all makes
> sense to me.
>
> What I don't understand is the best way to set and persist them, and reset
> them if needed. For example, do I simply hardcode the string in my code? If
> so, then all deployed instances will have the same value (that's good). If
> I want to bring up a test instance of that code, or a new installation,
> though, then it will also share the load (that's bad).
>
> If I pass in a value to my instances, that lets me have different test and
> production instances of the same code (that's good), but then I have to
> persist my consumer group id somewhere outside of the process (on disk, in
> zookeeper, etc). Which then means I need some way to manage *that*
> identifier (that's... just how it is?).
>
> What if I decide that I want my app to start over? In the case of
> log-compacted streams, I want to throw away any processing I did and start
> "from the beginning". Do I change my consumer group, which effective resets
> everything? Or do I delete my saved offsets, and then resume with the same
> consumer group? The latter is functionally equivalent to the former.
>
> Thanks,
> -James
>
>


Re: How to manage the consumer group id?

2015-06-11 Thread Todd Palino
When we need to delete a group, we do it in Zookeeper directly. When we
need to roll back offsets, we use the Import/Export tool classes to do it,
because it's a little more efficient than working in Zookeeper. You can
find the details on the tools at
https://cwiki.apache.org/confluence/display/KAFKA/System+Tools

I believe that as of right now (and please someone correct me if I'm
wrong), the Import/Export tools are not available yet for Kafka-committed
offsets. There is a patch for this, and we have a version of it built
internally. I think it's waiting for some of the KIP work before it is
finalized.

-Todd


On Wed, Jun 10, 2015 at 4:48 PM, James Cheng  wrote:

>
> > On Jun 10, 2015, at 1:26 PM, Todd Palino  wrote:
> >
> > For us, group ID is a configuration parameter of the application. So we
> > store it in configuration files (generally on disk) and maintain it there
> > through our configuration and deployment infrastructure. As you pointed
> > out, hard coding the group ID into the application is not usually a good
> > pattern.
> >
> > If you want to reset, you have a couple choices. One is that you can just
> > switch group names and start fresh. Another is that you can shut down the
> > consumer and delete the existing consumer group, then restart. You could
> > also stop, edit the offsets to set them to something specific (if you
> need
> > to roll back to a specific point, for example), and restart.
> >
>
> Thanks Todd. That helps. The "on disk" storage doesn't work well if you
> are running consumers in ephemeral nodes like EC2 machines, but in that
> case, I guess you would save the group ID in some other data store ("on
> disk, but elsewhere") associated with your "application cluster" rather
> than any one node of the cluster.
>
> I often hear about people saving their offsets using the consumer, and
> monitoring offsets for lag. I don't hear much about people deleting or
> changing/setting offsets by other means. How is it usually done? Are there
> tools to change the offsets, or do people go into zookeeper to change them
> directly? Or, for broker-stored offsets, use the Kafka APIs?
>
> -James
>
> > -Todd
> >
> >
> > On Wed, Jun 10, 2015 at 1:20 PM, James Cheng  wrote:
> >
> >> Hi,
> >>
> >> How are people specifying/persisting/resetting the consumer group
> >> identifier ("group.id") when using the high-level consumer?
> >>
> >> I understand how it works. I specify some string and all consumers that
> >> use that same string will help consume a topic. The partitions will be
> >> distributed amongst them for consumption. And when they save their
> offsets,
> >> the offsets will be saved according to the consumer group. That all
> makes
> >> sense to me.
> >>
> >> What I don't understand is the best way to set and persist them, and
> reset
> >> them if needed. For example, do I simply hardcode the string in my
> code? If
> >> so, then all deployed instances will have the same value (that's good).
> If
> >> I want to bring up a test instance of that code, or a new installation,
> >> though, then it will also share the load (that's bad).
> >>
> >> If I pass in a value to my instances, that lets me have different test
> and
> >> production instances of the same code (that's good), but then I have to
> >> persist my consumer group id somewhere outside of the process (on disk,
> in
> >> zookeeper, etc). Which then means I need some way to manage *that*
> >> identifier (that's... just how it is?).
> >>
> >> What if I decide that I want my app to start over? In the case of
> >> log-compacted streams, I want to throw away any processing I did and
> start
> >> "from the beginning". Do I change my consumer group, which effective
> resets
> >> everything? Or do I delete my saved offsets, and then resume with the
> same
> >> consumer group? The latter is functionally equivalent to the former.
> >>
> >> Thanks,
> >> -James
> >>
> >>
>
>


Re: [ANNOUNCE] Burrow - Consumer Lag Monitoring as a Service

2015-06-12 Thread Todd Palino
The invalid ACL error is an error that is passed back from Zookeeper. What
version of Zookeeper are you using, and have you set up ACLs within it? I'm
not able to see this on our ZK (3.4.6 with no ACLs).

-Todd

On Fri, Jun 12, 2015 at 9:34 AM, Roger Hoover 
wrote:

> Hi,
>
> I was trying to give burrow a try and got a ZK error "invalid ACL
> specified".  Any suggestions on what's going wrong?
>
> 1434044348908673512 [Critical] Cannot get ZK notifier lock: zk: invalid ACL
> specified
>
>
> Here's my config:
>
>
> [general]
>
> logdir=log
>
> logconfig=logging.cfg
>
> pidfile=burrow.pid
>
> client-id=burrow-lagchecker
>
> group-blacklist=^(console-consumer-|python-kafka-consumer-).*$
>
>
> [zookeeper]
>
> hostname=host1
>
> port=2181
>
> timeout=6
>
> lock-path=/burrow/notifier
>
>
> [kafka "sit"]
>
> broker=host1
>
> broker=host2
>
> broker=host3
>
> broker=host4
>
> broker-port=9092
>
> zookeeper=host1
>
> zookeeper=host2
>
> zookeeper=host3
>
> zookeeper-port=2181
>
> zookeeper-path=/
>
> offsets-topic=__consumer_offsets
>
>
> [tickers]
>
> broker-offsets=60
>
>
> [lagcheck]
>
> intervals=10
>
> expire-group=604800
>
>
> [httpserver]
>
> server=on
>
> port=7000
>
> On Tue, Jun 9, 2015 at 12:34 PM, Todd Palino  wrote:
>
> > For mirror maker and our audit application, we've been using
> > Kafka-committed offsets for some time now. We've got a few other
> consumers
> > who are using it, but we haven't actively worked on moving the bulk of
> them
> > over. It's been less critical since we put the ZK transaction logs on
> SSD.
> >
> > And yeah, this is specific for kafka-committed offsets. I'm looking at
> some
> > options for handling Zookeeper as well, but since our goal with this was
> to
> > monitor our own infrastructure applications and move forwards, it hasn't
> > gotten a lot of my attention yet.
> >
> > -Todd
> >
> >
> > On Tue, Jun 9, 2015 at 11:53 AM, Jason Rosenberg 
> wrote:
> >
> > > Hi Todd,
> > >
> > > Thanks for open sourcing this, I'm excited to take a look.
> > >
> > > It looks like it's specific to offsets stored in kafka (and not
> > zookeeper)
> > > correct?  I assume by that that LinkedIn is using the kafka storage now
> > in
> > > production?
> > >
> > > Jason
> > >
> > > On Thu, Jun 4, 2015 at 9:43 PM, Todd Palino  wrote:
> > >
> > > > I am very happy to introduce Burrow, an application to provide Kafka
> > > > consumer status as a service. Burrow is different than just a "lag
> > > > checker":
> > > >
> > > > * Multiple Kafka cluster support - Burrow supports any number of
> Kafka
> > > > clusters in a single instance. You can also run multiple copies of
> > Burrow
> > > > in parallel and only one of them will send out notifications.
> > > >
> > > > * All consumers, all partitions - If the consumer is committing
> offsets
> > > to
> > > > Kafka (not Zookeeper), it will be available in Burrow automatically.
> > > Every
> > > > partition it consumes will be monitored simultaneously, avoiding the
> > trap
> > > > of just watching the worst partition (MaxLag) or spot checking
> > individual
> > > > topics.
> > > >
> > > > * Status can be checked via HTTP request - There's an internal HTTP
> > > server
> > > > that provides topic and consumer lists, can give you the latest
> offsets
> > > for
> > > > a topic either from the brokers or from the consumer, and lets you
> > check
> > > > consumer status.
> > > >
> > > > * Continuously monitor groups with output via email or a call to an
> > > > external HTTP endpoint - Configure emails to send for bad groups,
> > checked
> > > > continuously. Or you can have Burrow call an HTTP endpoint into
> another
> > > > system for handling alerts.
> > > >
> > > > * No thresholds - Status is determined over a sliding window and does
> > not
> > > > rely on a fixed limit. When a consumer is checked, it has a status
> > > > indicator that tells whether it is OK, a warning, or an error, and
> the
> > > > partitions that caused it to be bad are provided.
> > > >
> > > >
> > > > Burrow was created to address specific problems that LinkedIn has
> with
> > > > monitoring consumers, in particular wildcard consumers like mirror
> > makers
> > > > and our audit consumers. Instead of checking offsets for specific
> > > consumers
> > > > periodically, it monitors the stream of all committed offsets
> > > > (__consumer_offsets) and continually calculates lag over a sliding
> > > window.
> > > >
> > > > We welcome all feedback, comments, and contributors. This project is
> > very
> > > > much under active development for us (we're using it in some of our
> > > > environments now, and working on getting it running everywhere to
> > replace
> > > > our previous monitoring system).
> > > >
> > > > Burrow is written in Go, published under the Apache License, and
> hosted
> > > on
> > > > GitHub at:
> > > > https://github.com/linkedin/Burrow
> > > >
> > > > Documentation is on the GitHub wiki at:
> > > > https://github.com/linkedin/Burrow/wiki
> > > >
> > > > -Todd
> > > >
> > >
> >
>


Re: [ANNOUNCE] Burrow - Consumer Lag Monitoring as a Service

2015-06-12 Thread Todd Palino
Can you open an issue on the github page please, and we can investigate
further there?

-Todd

On Fri, Jun 12, 2015 at 10:22 AM, Roger Hoover 
wrote:

> Thanks, Todd.  I'm also using ZK 3.4.6 with no ACLs.  I'm surprised and
> currently stumped by this error.
>
> On Fri, Jun 12, 2015 at 9:49 AM, Todd Palino  wrote:
>
> > The invalid ACL error is an error that is passed back from Zookeeper.
> What
> > version of Zookeeper are you using, and have you set up ACLs within it?
> I'm
> > not able to see this on our ZK (3.4.6 with no ACLs).
> >
> > -Todd
> >
> > On Fri, Jun 12, 2015 at 9:34 AM, Roger Hoover 
> > wrote:
> >
> > > Hi,
> > >
> > > I was trying to give burrow a try and got a ZK error "invalid ACL
> > > specified".  Any suggestions on what's going wrong?
> > >
> > > 1434044348908673512 [Critical] Cannot get ZK notifier lock: zk: invalid
> > ACL
> > > specified
> > >
> > >
> > > Here's my config:
> > >
> > >
> > > [general]
> > >
> > > logdir=log
> > >
> > > logconfig=logging.cfg
> > >
> > > pidfile=burrow.pid
> > >
> > > client-id=burrow-lagchecker
> > >
> > > group-blacklist=^(console-consumer-|python-kafka-consumer-).*$
> > >
> > >
> > > [zookeeper]
> > >
> > > hostname=host1
> > >
> > > port=2181
> > >
> > > timeout=6
> > >
> > > lock-path=/burrow/notifier
> > >
> > >
> > > [kafka "sit"]
> > >
> > > broker=host1
> > >
> > > broker=host2
> > >
> > > broker=host3
> > >
> > > broker=host4
> > >
> > > broker-port=9092
> > >
> > > zookeeper=host1
> > >
> > > zookeeper=host2
> > >
> > > zookeeper=host3
> > >
> > > zookeeper-port=2181
> > >
> > > zookeeper-path=/
> > >
> > > offsets-topic=__consumer_offsets
> > >
> > >
> > > [tickers]
> > >
> > > broker-offsets=60
> > >
> > >
> > > [lagcheck]
> > >
> > > intervals=10
> > >
> > > expire-group=604800
> > >
> > >
> > > [httpserver]
> > >
> > > server=on
> > >
> > > port=7000
> > >
> > > On Tue, Jun 9, 2015 at 12:34 PM, Todd Palino 
> wrote:
> > >
> > > > For mirror maker and our audit application, we've been using
> > > > Kafka-committed offsets for some time now. We've got a few other
> > > consumers
> > > > who are using it, but we haven't actively worked on moving the bulk
> of
> > > them
> > > > over. It's been less critical since we put the ZK transaction logs on
> > > SSD.
> > > >
> > > > And yeah, this is specific for kafka-committed offsets. I'm looking
> at
> > > some
> > > > options for handling Zookeeper as well, but since our goal with this
> > was
> > > to
> > > > monitor our own infrastructure applications and move forwards, it
> > hasn't
> > > > gotten a lot of my attention yet.
> > > >
> > > > -Todd
> > > >
> > > >
> > > > On Tue, Jun 9, 2015 at 11:53 AM, Jason Rosenberg 
> > > wrote:
> > > >
> > > > > Hi Todd,
> > > > >
> > > > > Thanks for open sourcing this, I'm excited to take a look.
> > > > >
> > > > > It looks like it's specific to offsets stored in kafka (and not
> > > > zookeeper)
> > > > > correct?  I assume by that that LinkedIn is using the kafka storage
> > now
> > > > in
> > > > > production?
> > > > >
> > > > > Jason
> > > > >
> > > > > On Thu, Jun 4, 2015 at 9:43 PM, Todd Palino 
> > wrote:
> > > > >
> > > > > > I am very happy to introduce Burrow, an application to provide
> > Kafka
> > > > > > consumer status as a service. Burrow is different than just a
> "lag
> > > > > > checker":
> > > > > >
> > > > > > * Multiple Kafka cluster support - Burrow supports any number of
> > > Kafka
> > > > > > clusters in a single instance. You can also run multiple 

Re: [ANNOUNCE] Burrow - Consumer Lag Monitoring as a Service

2015-06-14 Thread Todd Palino
It took a little while for this to post (aligning with company PR and all),
but there's a detailed description of what Burrow is on the LinkedIn
Engineering Blog now:

http://engineering.linkedin.com/apache-kafka/burrow-kafka-consumer-monitoring-reinvented

-Todd


On Thu, Jun 4, 2015 at 6:43 PM, Todd Palino  wrote:

> I am very happy to introduce Burrow, an application to provide Kafka
> consumer status as a service. Burrow is different than just a "lag checker":
>
> * Multiple Kafka cluster support - Burrow supports any number of Kafka
> clusters in a single instance. You can also run multiple copies of Burrow
> in parallel and only one of them will send out notifications.
>
> * All consumers, all partitions - If the consumer is committing offsets to
> Kafka (not Zookeeper), it will be available in Burrow automatically. Every
> partition it consumes will be monitored simultaneously, avoiding the trap
> of just watching the worst partition (MaxLag) or spot checking individual
> topics.
>
> * Status can be checked via HTTP request - There's an internal HTTP server
> that provides topic and consumer lists, can give you the latest offsets for
> a topic either from the brokers or from the consumer, and lets you check
> consumer status.
>
> * Continuously monitor groups with output via email or a call to an
> external HTTP endpoint - Configure emails to send for bad groups, checked
> continuously. Or you can have Burrow call an HTTP endpoint into another
> system for handling alerts.
>
> * No thresholds - Status is determined over a sliding window and does not
> rely on a fixed limit. When a consumer is checked, it has a status
> indicator that tells whether it is OK, a warning, or an error, and the
> partitions that caused it to be bad are provided.
>
>
> Burrow was created to address specific problems that LinkedIn has with
> monitoring consumers, in particular wildcard consumers like mirror makers
> and our audit consumers. Instead of checking offsets for specific consumers
> periodically, it monitors the stream of all committed offsets
> (__consumer_offsets) and continually calculates lag over a sliding window.
>
> We welcome all feedback, comments, and contributors. This project is very
> much under active development for us (we're using it in some of our
> environments now, and working on getting it running everywhere to replace
> our previous monitoring system).
>
> Burrow is written in Go, published under the Apache License, and hosted on
> GitHub at:
> https://github.com/linkedin/Burrow
>
> Documentation is on the GitHub wiki at:
> https://github.com/linkedin/Burrow/wiki
>
> -Todd
>
>


Re: Re: Closing socket connection to /192.115.190.61. (kafka.network.Processor)

2015-06-19 Thread Todd Palino
I don't think this got changed until after 0.8.2. I believe the change is
still in trunk and not a released version. We haven't even picked it up
internally at LinkedIn yet.

-Todd


On Fri, Jun 19, 2015 at 12:03 AM, bit1...@163.com  wrote:

> Thank you for the replay.
> I am using kafka_2.10-0.8.2.1,and I didn't change the log things in Kafka.
>
>
>
> bit1...@163.com
>
> From: Joe Stein
> Date: 2015-06-19 13:43
> To: users
> Subject: Re: Closing socket connection to /192.115.190.61.
> (kafka.network.Processor)
> What version of Kafka are you using? This was changed to debug level in
> 0.8.2.
>
> ~ Joestein
> On Jun 18, 2015 10:39 PM, "bit1...@163.com"  wrote:
>
> > Hi,
> > I have started the kafka server as a backgroud process, however, the
> > following INFO log appears on the console very 10 seconds.
> > Looks it is not an error since its log level is INFO. How could I
> suppress
> > this annoying log? Thanks
> >
> >
> > [2015-06-19 13:34:10,884] INFO Closing socket connection to /
> > 192.115.190.61. (kafka.network.Processor)
> >
> >
> >
> > bit1...@163.com
> >
>


Re: data loss - replicas

2015-06-22 Thread Todd Palino
I assume that you are considering the data loss to be the difference in
size between the two directories? This is generally not a good guideline,
as the batching and compression will be different between the two replicas.

-Todd


On Mon, Jun 22, 2015 at 7:26 AM, Nirmal ram 
wrote:

> Hi,
>
> I noticed a data loss while storing in kafka logs.
> Generally, leader hands the request to  followers, is there a data loss in
> that process?
>
> topic 'jun8' with 2 replicas and 8 partitions
>
> *Broker 1*[user@ jun8-6]$ ls -ltr
> total 7337500
> -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log
> -rw-rw-r-- 1 user user1127512 Jun 22 12:45 15195331.index
> -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log
> -rw-rw-r-- 1 user user1108544 Jun 22 12:48 16509739.index
> -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log
> -rw-rw-r-- 1 user user1129064 Jun 22 12:52 17823869.index
> -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log
> -rw-rw-r-- 1 user user1161152 Jun 22 13:17 19136798.index
> -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log
> -rw-rw-r-- 1 user user1152448 Jun 22 13:21 20451309.index
> *-rw-rw-r-- 1 user user 1073740588 Jun 22 13:39 21764229.log*
> -rw-rw-r-- 1 user user1241168 Jun 22 13:39 21764229.index
> -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log
> -rw-rw-r-- 1 user user   10485760 Jun 22 13:42 23077448.index
> [user@ jun8-6]$
>
>
>
> *Broker 2*[user@ jun8-6]$ ls -ltr
> total 7340468
> -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log
> -rw-rw-r-- 1 user user1857144 Jun 22 12:45 15195331.index
> -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log
> -rw-rw-r-- 1 user user1857168 Jun 22 12:48 16509739.index
> -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log
> -rw-rw-r-- 1 user user1857752 Jun 22 12:52 17823869.index
> -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log
> -rw-rw-r-- 1 user user1857440 Jun 22 13:17 19136798.index
> -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log
> -rw-rw-r-- 1 user user1856968 Jun 22 13:21 20451309.index
> *-rw-rw-r-- 1 user user 1073722781 Jun 22 13:39 21764229.log*
> -rw-rw-r-- 1 user user1762288 Jun 22 13:39 21764229.index
> -rw-rw-r-- 1 user user   10485760 Jun 22 13:42 23077448.index
> -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log
> [user@ jun8-6]$
>


Re: Is trunk safe for production?

2015-06-23 Thread Todd Palino
Yes and no. We're running a version about a month behind trunk at any given
time here at LinkedIn. That's generally the amount of time we spend testing
and going through our release process internally (less if there are no
problems). So it can be done.

That said, we also have several Kafka contributors and a committer who work
on Kafka constantly here. When talking to others about how we run Kafka at
LinkedIn, I usually say "we run trunk so you don't have to." Unless you
have someone to track down and work on fixing the bugs, it's probably a
good idea to stick with the release versions, unless you run in a
development environment where you can tolerate failures and performance
regressions.

-Todd


On Tue, Jun 23, 2015 at 2:59 AM, Achanta Vamsi Subhash <
achanta.va...@flipkart.com> wrote:

> I am planning to use for the producer part. How stable is trunk generally?
>
> --
> Regards
> Vamsi Subhash
>
> --
>
>
>
> --
>
> This email and any files transmitted with it are confidential and intended
> solely for the use of the individual or entity to whom they are addressed.
> If you have received this email in error please notify the system manager.
> This message contains confidential information and is intended only for the
> individual named. If you are not the named addressee you should not
> disseminate, distribute or copy this e-mail. Please notify the sender
> immediately by e-mail if you have received this e-mail by mistake and
> delete this e-mail from your system. If you are not the intended recipient
> you are notified that disclosing, copying, distributing or taking any
> action in reliance on the contents of this information is strictly
> prohibited. Although Flipkart has taken reasonable precautions to ensure no
> viruses are present in this email, the company cannot accept responsibility
> for any loss or damage arising from the use of this email or attachments
>


Re: data loss - replicas

2015-06-23 Thread Todd Palino
Thanks, Joel. I know I remember a case where we had a difference like this
between two brokers, and it was not due to retention settings or some other
problem, but I can't remember exactly what we determined it was.

-Todd

On Mon, Jun 22, 2015 at 4:22 PM, Joel Koshy  wrote:

> The replicas do not have to decompress/recompress so I don't think
> that would contribute to this.
>
> There may be some corner cases such as:
> - Multiple unclean leadership elections in sequence
> - Changing the compression codec for a topic on the fly - different
>   brokers may see this config change at almost (but not exactly) the
>   same time, but not sure if you are using that feature.
>
> You may want to use the DumpLogSegments tool to actually compare the
> offsets present in both log files.
>
> On Mon, Jun 22, 2015 at 08:55:40AM -0700, Todd Palino wrote:
> > I assume that you are considering the data loss to be the difference in
> > size between the two directories? This is generally not a good guideline,
> > as the batching and compression will be different between the two
> replicas.
> >
> > -Todd
> >
> >
> > On Mon, Jun 22, 2015 at 7:26 AM, Nirmal ram 
> > wrote:
> >
> > > Hi,
> > >
> > > I noticed a data loss while storing in kafka logs.
> > > Generally, leader hands the request to  followers, is there a data
> loss in
> > > that process?
> > >
> > > topic 'jun8' with 2 replicas and 8 partitions
> > >
> > > *Broker 1*[user@ jun8-6]$ ls -ltr
> > > total 7337500
> > > -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log
> > > -rw-rw-r-- 1 user user1127512 Jun 22 12:45
> 15195331.index
> > > -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log
> > > -rw-rw-r-- 1 user user1108544 Jun 22 12:48
> 16509739.index
> > > -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log
> > > -rw-rw-r-- 1 user user1129064 Jun 22 12:52
> 17823869.index
> > > -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log
> > > -rw-rw-r-- 1 user user1161152 Jun 22 13:17
> 19136798.index
> > > -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log
> > > -rw-rw-r-- 1 user user1152448 Jun 22 13:21
> 20451309.index
> > > *-rw-rw-r-- 1 user user 1073740588 Jun 22 13:39
> 21764229.log*
> > > -rw-rw-r-- 1 user user1241168 Jun 22 13:39
> 21764229.index
> > > -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log
> > > -rw-rw-r-- 1 user user   10485760 Jun 22 13:42
> 23077448.index
> > > [user@ jun8-6]$
> > >
> > >
> > >
> > > *Broker 2*[user@ jun8-6]$ ls -ltr
> > > total 7340468
> > > -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log
> > > -rw-rw-r-- 1 user user1857144 Jun 22 12:45
> 15195331.index
> > > -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log
> > > -rw-rw-r-- 1 user user1857168 Jun 22 12:48
> 16509739.index
> > > -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log
> > > -rw-rw-r-- 1 user user1857752 Jun 22 12:52
> 17823869.index
> > > -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log
> > > -rw-rw-r-- 1 user user1857440 Jun 22 13:17
> 19136798.index
> > > -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log
> > > -rw-rw-r-- 1 user user1856968 Jun 22 13:21
> 20451309.index
> > > *-rw-rw-r-- 1 user user 1073722781 Jun 22 13:39
> 21764229.log*
> > > -rw-rw-r-- 1 user user1762288 Jun 22 13:39
> 21764229.index
> > > -rw-rw-r-- 1 user user   10485760 Jun 22 13:42
> 23077448.index
> > > -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log
> > > [user@ jun8-6]$
> > >
>
> --
> Joel
>


Re: Leap Second Troubles

2015-07-09 Thread Todd Palino
Did you hit the problems in the Kafka brokers and consumers during the
Zookeeper problem, or after you had already cleared it?

For us, we decided to skip the leap second problem (even though we're
supposedly on a version that doesn't have that bug) by shutting down ntpd
everywhere and then allowing it to slowly adjust the time afterwards
without sending the leap second.

-Todd


On Thu, Jul 9, 2015 at 7:58 AM, Christofer Hedbrandh  wrote:

> Hi Kafka users,
>
> ZooKeeper in our staging environment was running on a very old ubuntu
> version, that was exposed to the "leap second causes spuriously high CPU
> usage" bug.
>
> https://bugs.launchpad.net/ubuntu/+source/linux/+bug/1020285
>
> As a result, when the leap second arrived, the ZooKeeper CPU usage went up
> to 100% and stayed there. In response to this, we restarted one ZooKeeper
> process. The ZooKeeper restart unfortunately made the situation much worse
> as we hit three different (possibly related) Kafka problems. We are using
> Kafka 0.8.2 brokers, consumers and producers.
>
>
> #1
> One of our three brokers was kicked out or ISR for some (most but not all)
> partitions, and was continuously logging "Cached zkVersion [XX] not equal
> to that in zookeeper, skip updating ISR" over and over (until I eventually
> stopped this broker).
>
> INFO Partition [topic-x,xx] on broker 1: Shrinking ISR for partition
> [topic-x,xx] from 1,2,3 to 1 (kafka.cluster.Partition)
> INFO Partition [topic-x,xx] on broker 1: Cached zkVersion [62] not equal to
> that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> INFO Partition [topic-y,yy] on broker 1: Shrinking ISR for partition
> [topic-y,yy] from 1,2,3 to 1 (kafka.cluster.Partition)
> INFO Partition [topic-y,yy] on broker 1: Cached zkVersion [39] not equal to
> that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> INFO Partition [topic-z,zz] on broker 1: Shrinking ISR for partition
> [topic-z,zz] from 1,2,3 to 1 (kafka.cluster.Partition)
> INFO Partition [topic-z,zz] on broker 1: Cached zkVersion [45] not equal to
> that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> etc.
>
> Searching the users@kafka.apache.org archive and Googling for this log
> output, gives me similar descriptions but nothing that exactly describes
> this.
> It is very similar to this, but without the "ERROR Conditional update of
> path ..." part.
> https://www.mail-archive.com/users@kafka.apache.org/msg07044.html
>
>
> #2
> The remaining two brokers were logging this every five seconds or so.
> INFO conflict in /brokers/ids/xxx data:
>
> {"jmx_port":xxx,"timestamp":"1435712198759","host":"xxx","version":1,"port":9092}
> stored data:
>
> {"jmx_port":xxx,"timestamp":"1435711782536","host":"xxx","version":1,"port":9092}
> (kafka.utils.ZkUtils$)
> INFO I wrote this conflicted ephemeral node
>
> [{"jmx_port":xxx,"timestamp":"1435712198759","host":"xxx","version":1,"port":9092}]
> at /brokers/ids/xxx a while back in a different session, hence I will
> backoff for this node to be deleted by Zookeeper and retry
> (kafka.utils.ZkUtils$)
>
> It sounds very much like we hit this bug
> https://issues.apache.org/jira/browse/KAFKA-1387
>
>
> #3
> The most serious issue that resulted was that some consumer groups failed
> to claim all partitions. When using the ConsumerOffsetChecker, the owner of
> some partitions was listed as "none", the lag was constantly increasing,
> and it was clear that no consumers were processing these messages.
>
> It is exactly what Dave Hamilton is describing here, but from this email
> chain no one seems to know what caused it.
> https://www.mail-archive.com/users%40kafka.apache.org/msg13364.html
>
> It may be reasonable to assume that the consumer rebalance failures we also
> saw has something to do with this. But why the rebalance failed is still
> unclear.
>
> ERROR k.c.ZookeeperConsumerConnector: error during syncedRebalance
> kafka.common.ConsumerRebalanceFailedException: xxx can't rebalance after 4
> retries
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:633)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:551)
>
>
> I am curious to hear if anyone else had similar problems to this?
>
> And also if anyone can say if these are all known bugs that are being
> tracked with some ticket number?
>
>
> Thanks,
> Christofer
>
> P.S. Eventually after ZooKeeper and Kafka broker and consumer restarts
> everything returned to normal.
>


Re: Leap Second Troubles

2015-07-10 Thread Todd Palino
OK, in that case then I'm thinking that you ran into issues that were a
natural result of the Zookeeper ensemble having very high CPU usage.
Unfortunate, but this would not be an unexpected situation when your ZK
ensemble is having significant problems.

-Todd


On Fri, Jul 10, 2015 at 10:21 AM, Christofer Hedbrandh <
christo...@knewton.com> wrote:

> Todd, the Kafka problems started when one of three ZooKeeper nodes was
> restarted.
>
> On Thu, Jul 9, 2015 at 12:10 PM, Todd Palino  wrote:
>
> > Did you hit the problems in the Kafka brokers and consumers during the
> > Zookeeper problem, or after you had already cleared it?
> >
> > For us, we decided to skip the leap second problem (even though we're
> > supposedly on a version that doesn't have that bug) by shutting down ntpd
> > everywhere and then allowing it to slowly adjust the time afterwards
> > without sending the leap second.
> >
> > -Todd
> >
> >
> > On Thu, Jul 9, 2015 at 7:58 AM, Christofer Hedbrandh <
> > christo...@knewton.com
> > > wrote:
> >
> > > Hi Kafka users,
> > >
> > > ZooKeeper in our staging environment was running on a very old ubuntu
> > > version, that was exposed to the "leap second causes spuriously high
> CPU
> > > usage" bug.
> > >
> > > https://bugs.launchpad.net/ubuntu/+source/linux/+bug/1020285
> > >
> > > As a result, when the leap second arrived, the ZooKeeper CPU usage went
> > up
> > > to 100% and stayed there. In response to this, we restarted one
> ZooKeeper
> > > process. The ZooKeeper restart unfortunately made the situation much
> > worse
> > > as we hit three different (possibly related) Kafka problems. We are
> using
> > > Kafka 0.8.2 brokers, consumers and producers.
> > >
> > >
> > > #1
> > > One of our three brokers was kicked out or ISR for some (most but not
> > all)
> > > partitions, and was continuously logging "Cached zkVersion [XX] not
> equal
> > > to that in zookeeper, skip updating ISR" over and over (until I
> > eventually
> > > stopped this broker).
> > >
> > > INFO Partition [topic-x,xx] on broker 1: Shrinking ISR for partition
> > > [topic-x,xx] from 1,2,3 to 1 (kafka.cluster.Partition)
> > > INFO Partition [topic-x,xx] on broker 1: Cached zkVersion [62] not
> equal
> > to
> > > that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> > > INFO Partition [topic-y,yy] on broker 1: Shrinking ISR for partition
> > > [topic-y,yy] from 1,2,3 to 1 (kafka.cluster.Partition)
> > > INFO Partition [topic-y,yy] on broker 1: Cached zkVersion [39] not
> equal
> > to
> > > that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> > > INFO Partition [topic-z,zz] on broker 1: Shrinking ISR for partition
> > > [topic-z,zz] from 1,2,3 to 1 (kafka.cluster.Partition)
> > > INFO Partition [topic-z,zz] on broker 1: Cached zkVersion [45] not
> equal
> > to
> > > that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> > > etc.
> > >
> > > Searching the users@kafka.apache.org archive and Googling for this log
> > > output, gives me similar descriptions but nothing that exactly
> describes
> > > this.
> > > It is very similar to this, but without the "ERROR Conditional update
> of
> > > path ..." part.
> > > https://www.mail-archive.com/users@kafka.apache.org/msg07044.html
> > >
> > >
> > > #2
> > > The remaining two brokers were logging this every five seconds or so.
> > > INFO conflict in /brokers/ids/xxx data:
> > >
> > >
> >
> {"jmx_port":xxx,"timestamp":"1435712198759","host":"xxx","version":1,"port":9092}
> > > stored data:
> > >
> > >
> >
> {"jmx_port":xxx,"timestamp":"1435711782536","host":"xxx","version":1,"port":9092}
> > > (kafka.utils.ZkUtils$)
> > > INFO I wrote this conflicted ephemeral node
> > >
> > >
> >
> [{"jmx_port":xxx,"timestamp":"1435712198759","host":"xxx","version":1,"port":9092}]
> > > at /brokers/ids/xxx a while back in a different session, hence I will
> > > backoff for this node to be deleted by Zookeeper and retry
> > > (kafka.utils.ZkUtils$)
> > >
> > > It sounds very much

Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2015-07-14 Thread Todd Palino
This is interesting. We have seen something similar internally at LinkedIn
with one particular topic (and Avro schema), and only once in a while.
We've seen it happen 2 or 3 times so far. We had chalked it up to bad
content in the message, figuring that the sender was doing something like
sending a long stream of a single character, in error, which was creating a
highly compressible message. Given these cases, I'm no longer certain
that's the case.

Becket, you had been taking a look at this internally. Do you have any
thoughts on this?

-Todd


On Tue, Jul 14, 2015 at 11:18 AM, JIEFU GONG  wrote:

> @Gwen
> I am having a very very similar issue where I am attempting to send a
> rather small message and it's blowing up on me (my specific error is:
> Invalid receive (size = 1347375956 larger than 104857600)). I tried to
> change the relevant settings but it seems that this particular request is
> of 1340 mbs (and davids will be 1500 mb) and attempting to change the
> setting will give you another error saying there is not enough memory in
> the java heap. Any insight here?
>
> Specifically I am speculating the issue is indeed what Shayne has said
> about encoding: I am trying to use apachebench to send a post request to a
> kafka server but it is returning the above error -- do I have to format the
> data in any way as this might be the reason why I'm experience this issue.
>
>
> On Sun, Jul 12, 2015 at 6:35 AM, Shayne S  wrote:
>
> > Your payload is so small that I suspect it's an encoding issue. Is your
> > producer set to expect a byte array and you're passing a string? Or vice
> > versa?
> >
> > On Sat, Jul 11, 2015 at 11:08 PM, David Montgomery <
> > davidmontgom...@gmail.com> wrote:
> >
> > > I cant send this s simple payload using python.
> > >
> > > topic: topic-test-development
> > > payload: {"utcdt": "2015-07-12T03:59:36", "ghznezzhmx": "apple"}
> > >
> > >
> > > No handlers could be found for logger "kafka.conn"
> > > Traceback (most recent call last):
> > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> > 81,
> > > in 
> > > test_send_data_to_realtimenode()
> > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> > 38,
> > > in test_send_data_to_realtimenode
> > > response = producer.send_messages(test_topic,test_payload)
> > >   File
> "/usr/local/lib/python2.7/dist-packages/kafka/producer/simple.py",
> > > line 54, in send_messages
> > > topic, partition, *msg
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > > line 349, in send_messages
> > > return self._send_messages(topic, partition, *msg)
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > > line 390, in _send_messages
> > > fail_on_error=self.sync_fail_on_error
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> > 480,
> > > in send_produce_request
> > > (not fail_on_error or not self._raise_on_response_error(resp))]
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> > 247,
> > > in _raise_on_response_error
> > > raise resp
> > > kafka.common.FailedPayloadsError
> > >
> > > Here is what is in my logs
> > > [2015-07-12 03:29:58,103] INFO Closing socket connection to
> > > /xxx.xxx.xxx.xxx due to invalid request: Request of length 1550939497
> is
> > > not valid, it is larger than the maximum size of 104857600 bytes.
> > > (kafka.network.Processor)
> > >
> > >
> > >
> > > Server is 4 gigs of ram.
> > >
> > > I used export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M in
> kafka-server-start.sh
> > >
> > > So.why?
> > >
> >
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427
>


Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2015-07-14 Thread Todd Palino
It could be a client error, but we're seeing it show up in Mirror Maker.

-Todd


On Tue, Jul 14, 2015 at 1:27 PM, JIEFU GONG  wrote:

> Got it, looks like I didn't understand the request process and am failing
> to use AB properly. Thanks for the help everyone! I suspect you might be
> running into a similar error, David.
>
> On Tue, Jul 14, 2015 at 11:56 AM, Jay Kreps  wrote:
>
> > This is almost certainly a client bug. Kafka's request format is size
> > delimited messages in the form
> ><4 byte size N>
> > If the client sends a request with an invalid size or sends a partial
> > request the server will see effectively random bytes from the next
> request
> > as the size of the next message and generally reject the request (or fail
> > to parse it).
> >
> > -Jay
> >
> > On Sat, Jul 11, 2015 at 9:08 PM, David Montgomery <
> > davidmontgom...@gmail.com
> > > wrote:
> >
> > > I cant send this s simple payload using python.
> > >
> > > topic: topic-test-development
> > > payload: {"utcdt": "2015-07-12T03:59:36", "ghznezzhmx": "apple"}
> > >
> > >
> > > No handlers could be found for logger "kafka.conn"
> > > Traceback (most recent call last):
> > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> > 81,
> > > in 
> > > test_send_data_to_realtimenode()
> > >   File "/home/ubuntu/workspace/feed-tests/tests/druid-adstar.py", line
> > 38,
> > > in test_send_data_to_realtimenode
> > > response = producer.send_messages(test_topic,test_payload)
> > >   File
> "/usr/local/lib/python2.7/dist-packages/kafka/producer/simple.py",
> > > line 54, in send_messages
> > > topic, partition, *msg
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > > line 349, in send_messages
> > > return self._send_messages(topic, partition, *msg)
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py",
> > > line 390, in _send_messages
> > > fail_on_error=self.sync_fail_on_error
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> > 480,
> > > in send_produce_request
> > > (not fail_on_error or not self._raise_on_response_error(resp))]
> > >   File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line
> > 247,
> > > in _raise_on_response_error
> > > raise resp
> > > kafka.common.FailedPayloadsError
> > >
> > > Here is what is in my logs
> > > [2015-07-12 03:29:58,103] INFO Closing socket connection to
> > > /xxx.xxx.xxx.xxx due to invalid request: Request of length 1550939497
> is
> > > not valid, it is larger than the maximum size of 104857600 bytes.
> > > (kafka.network.Processor)
> > >
> > >
> > >
> > > Server is 4 gigs of ram.
> > >
> > > I used export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M in
> kafka-server-start.sh
> > >
> > > So.why?
> > >
> >
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427
>


Re: Custom Zookeeper install with kafka

2015-07-22 Thread Todd Palino
Yes, we use ZK 3.4.6 exclusively at LinkedIn and there's no problem.

-Todd

> On Jul 22, 2015, at 9:49 AM, Adam Dubiel  wrote:
> 
> Hi,
> 
> I don't think it matters much which version of ZK will you use (meaning
> minor/patch versions). We have been using 3.4.6 for some time and it works
> flawlessly.
> 
> BR,
> Adam
> 
> 2015-07-22 18:40 GMT+02:00 Prabhjot Bharaj :
> 
>> Hi,
>> 
>> I've read on the Kafka documentation page that the zookeeper version used
>> is 3.3.4
>> 
>> However, at my work, I've noticed certain problems with v3.3.4 (and the
>> problems are documented here: http://zookeeper.apache.org/releases.html
>> 
>> The latest stable version of zookeeper is 3.4.6 and has a lot of bug fixes
>> from 3.3.4
>> 
>> Has anyone tried using this version of zookeeper with kafka 0.8.2.1 ?
>> 
>> Regards,
>> Prabhjot
>> 


Re: Specify leader when doing partition reassignment

2015-08-05 Thread Todd Palino
To make sure you have a complete answer here, the order of the replica list
that you specify in the partition reassignment will affect the leader
selection, but if the current leader is in the new replica list, it will
not change the leadership to change.

That is, if your current replica list is [1, 2] and the leader is 1, and
you change the replica list to [3, 1] nothing happens immediately except
that 2 is removed as a replica and 3 is added. Upon a failure of broker 1
or a preferred replica election, broker 3 will take over leadership. If the
new replica list is [3, 4], then upon completion of the reassignment,
broker 3 will be selected as the new leader.

The controller will prefer to select the first broker in the replica list
as the leader. This means that if you do a partition reassignment and
change the replica list from [1, 2] to [2, 1], nothing happens at first.
But upon the next preferred replica election, broker 2 will be selected as
the leader.

-Todd


On Wed, Aug 5, 2015 at 3:37 AM, tao xiao  wrote:

> I think I figured it out. As pointed out in
>
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
>
> If the leader is not in the reassigned replica a new leader will be elected
> and the tool doesn't pick the first one from the RAR as the leader
>
> On Wed, 5 Aug 2015 at 18:32 Jilin Xie  wrote:
>
> > Check the --replica-assignment  parameter of the kafka-topics.sh.
> > It does what you need.
> > And there should also be similar configs in the api if you wanna do so by
> > coding.
> >
> > On Wed, Aug 5, 2015 at 6:18 PM, tao xiao  wrote:
> >
> > > Hi team,
> > >
> > > Is it possible to specify a leader broker for each topic partition when
> > > doing partition reassignment?
> > >
> > > For example I have following json. Is the first broker in the replicas
> > list
> > > by default the leader of the partition e.g. broker 3 is the leader of
> > topic
> > > test5 and broker 2 is the leader of topic test3. or does Kafka
> > > automatically pick the leader based on the leadership ratio
> > > if auto.leader.rebalance.enable is on
> > >
> > > {
> > >   "version": 1,
> > >   "partitions": [
> > > {
> > >   "topic": "test5",
> > >   "partition": 0,
> > >   "replicas": [3, 1]
> > > },
> > > {
> > >   "topic": "test3",
> > >   "partition": 0,
> > >   "replicas": [2,3]
> > > },
> > > {
> > >   "topic": "test1",
> > >   "partition": 0,
> > >   "replicas": [1,2]
> > > }
> > >   ]
> > > }
> > >
> >
>


Re: Documentation typo for offsets.topic.replication.factor ?

2015-08-05 Thread Todd Palino
That's exactly right. We've been talking about this internally at LinkedIn, and 
how to solve it. I think the best option would be to have the broker throw an 
error on offset commits until there are enough brokers to fulfill the 
configured RF.

We've seen this several times now when bootstrapping test and development 
clusters. You can work around it by making sure all consumers are shut down 
until you have all the brokers started. But that's not a good long term 
solution for this problem. 

-Todd


> On Aug 5, 2015, at 6:28 PM, James Cheng  wrote:
> 
> Hi,
> 
> My kafka cluster has a __consumer_offsets topic with 50 partitions (the 
> default for offsets.topic.num.partitions) but with a replication factor of 
> just 1 (the default for offsets.topic.replication.factor should be 3).
> 
> From the docs http://kafka.apache.org/documentation.html:
> 
> offsets.topic.replication.factor3The replication factor for the 
> offset commit topic. A higher setting (e.g., three or four) is recommended in 
> order to ensure higher availability. If the offsets topic is created when 
> fewer brokers than the replication factor then the offsets topic will be 
> created with fewer replicas.
> 
> 
> I'm guessing there's a typo there? I'm guessing it should be:
> 
> If the offsets topic is created when fewer brokers than the replication 
> factor [are active], then the offsets topic will be created with fewer 
> replicas.
> 
> Or something along those lines?
> 
> Thanks,
> -James
> 
> 


Re: Recovery of Kafka cluster takes very long time

2015-08-10 Thread Todd Palino
It looks like you did an unclean shutdown of the cluster, in which case
each open log segment in each partition needs to be checked upon startup.
It doesn't really have anything to do with RF=3 specifically, but it does
mean that each of your brokers has 6000 partitions to check.

What is the setting of recovery.threads.per.data.dir in your broker
configuration? The default is 1, which means that upon startup and
shutdown, the broker only uses 1 thread for checking/closing log segments.
If you increase this, it will parallelize both the startup and shutdown
process. This is particularly helpful for recovering from unclean shutdown.
We generally set it to the number of CPUs in the system, because we want a
fast recovery.

-Todd


On Mon, Aug 10, 2015 at 8:57 AM, Alexey Sverdelov <
alexey.sverde...@googlemail.com> wrote:

> Hi all,
>
> I have a 3 node Kafka cluster. There are ten topics, every topic has 600
> partitions with RF3.
>
> So, after cluster restart I can see the following log message like "INFO
> Recovering unflushed segment 0 in log..." and the complete recovery of 3
> nodes takes about 2+ hours.
>
> I don't know why it takes so long? Is it because of RF=3?
>
> Have a nice day,
> Alexey
>


Re: Raid vs individual disks

2015-08-21 Thread Todd Palino
At LinkedIn, we are using a RAID-10 of 14 disks. This is using software
RAID. I recently did some performance testing with RAID 0, 5, and 6. I
found that 5 and 6 underperformed significantly, possibly due to the parity
calculations. RAID 0 had a sizable performance gain over 10, and I would
expect single disks to perform similarly. I didn't test it because it lacks
some balancing ability that We would need.

-Todd


On Friday, August 21, 2015, Prabhjot Bharaj  wrote:

> Hi,
>
> I've gone through the details mentioned about Raid and individual disks in
> the ops section of the documentation
>
> But, I would like to know what performance boost we can get with individual
> disks.
> Is anybody using Kafka with multiple disks or all are raid into 1 big disk
> ?
>
> Regards,
> Prabcs
>


Re: How to monitor lag when "kafka" is used as offset.storage?

2015-09-03 Thread Todd Palino
You can use the emailer config in Burrow to send alerts directly (it will
monitor specific groups and send emails out when there is a problem). If
you need something more complex than that, I think the best practice is
always to send the output into an general alert/notification system.

-Todd

On Wednesday, September 2, 2015, shahab  wrote:

> Thanks Noah. I installed Burrow and played with it a little bit. It seems
> as you pointed out I need to implement the alerting system myself. Do you
> know any other Kafka tools that can give alerts?
>
> best,
> /Shahab
>
> On Wed, Sep 2, 2015 at 1:44 PM, noah >
> wrote:
>
> > We use Burrow . There are rest
> > endpoints you can use to get offsets and manually calculate lag, but if
> you
> > are focused on alerting, I'd use it's consumer statuses as they are a bit
> > smarter than a simple lag calculation.
> >
> > On Wed, Sep 2, 2015 at 4:08 AM shahab  > wrote:
> >
> > > Hi,
> > >
> > > I wonder how we can monitor lag (difference between consumer offset and
> > log
> > > ) when "kafka" is set as offset.storage?  because the
> "kafka-run-class.sh
> > > kafka.tools.ConsumerOffsetChecker ... " does work only when zookeeper
> is
> > > used as storage manager.
> > >
> > > best,
> > > /Shahab
> > >
> >
>


Re: API to query cluster metadata on-demand

2015-09-03 Thread Todd Palino
What Gwen said :)

We developed a python web service internally called Menagerie that provides
this functionality for both Kafka and Zookeeper. We use it to drive a web
dashboard for stats, our (old style) lag checking, and some other CLI
tools. Unfortunately it ties into too much internal LinkedIn tooling for us
to open source.

That's one of the reasons we released Burrow (
https://github.com/linkedin/Burrow). The primary use is to do lag checking
for consumers as a service. But I'm also moving functionality from
Menagerie into it. Right now you can use it to fetch topic lists, partition
counts, and broker offsets. You can also get information for consumers (as
long as they are committing offsets to Kafka and not ZK).

If it looks useful and there's some bit of info you'd like it to provide,
you can submit a github issue and I'll take a look at it.

-Todd

On Thursday, September 3, 2015, Andrew Otto  wrote:

> If you don’t mind doing it with a C CLI:
>
> https://github.com/edenhill/kafkacat
>
> $ kafkacat -L -b mybroker
>
> But, uhhh, you probably want a something in the Java API.
>
> :)
>
>
> > On Sep 3, 2015, at 13:58, Gwen Shapira >
> wrote:
> >
> > Ah, I wish.
> >
> > We are working on it :)
> >
> > On Thu, Sep 3, 2015 at 9:10 AM, Simon Cooper <
> > simon.coo...@featurespace.co.uk > wrote:
> >
> >> Is there a basic interface in the new client APIs to get the list of
> >> topics on a cluster, and get information on the topics (offsets, sizes,
> >> etc), without having to deal with a producer or consumer? I just want a
> >> basic synchronous API to query the metadata as-is. Does this exist in
> some
> >> form?
> >>
> >> Thanks,
> >> Simon
> >>
>
>


Re: Amount of partitions

2015-09-04 Thread Todd Palino
Jun's post is a good start, but I find it's easier to talk in terms of more
concrete reasons and guidance for having fewer or more partitions per topic.

Start with the number of brokers in the cluster. This is a good baseline
for the minimum number of partitions in a topic, as it will assure balance
over the cluster. Of course, if you have lots of topics, you can
potentially skip past this as you'll end up with balanced load in the
aggregate, but I think it's a good practice regardless. As with all other
advice here, there are always exceptions. If you really, really, really
need to assure ordering of messages, you might be stuck with a single
partition for some use cases.

In general, you should pick more partitions if a) the topic is very busy,
or b) you have more consumers. Looking at the second case first, you always
want to have at least as many partitions in a topic as you have individual
consumers in a consumer group. So if you have 16 consumers in a single
group, you will want the topic they consume to have at least 16 partitions.
In fact, you may also want to always have a multiple of the number of
consumers so that you have even distribution. How many consumers you have
in a group is going to be driven more by what you do with the messages once
they are consumed, so here you'll be looking from the bottom of your stack
up, until you get to Kafka.

How busy the topic is is looking from the top down, through the producer,
to Kafka. It's also a little more difficult to provide guidance on. We have
a policy of expanding partitions for a topic whenever the size of the
partition on disk (full retention over 4 days) is larger than 50 GB. We
find that this gives us a few benefits. One is that it takes a reasonable
amount of time when we need to move a partition from one broker to another.
Another is that when we have partitions that are larger than this, the rate
tends to cause problems with consumers. For example, we see mirror maker
perform much better, and have less spiky lag problems, when we stay under
this limit. We're even considering revising the limit down a little, as
we've had some reports from other wildcard consumers that they've had
problems keeping up with topics that have partitions larger than about 30
GB.

The last thing to look at is whether or not you are producing keyed
messages to the topic. If you're working with unkeyed messages, there is no
problem. You can usually add partitions whenever you want to down the road
with little coordination with producers and consumers. If you are producing
keyed messages, there is a good chance you do not want to change the
distribution of keys to partitions at various points in the future when you
need to size up. This means that when you first create the topic, you
probably want to create it with enough partitions to deal with growth over
time, both on the produce and consume side, even if that is too many
partitions right now by other measures. For example, we have one client who
requested 720 partitions for a particular set of topics. The reasoning was
that they are producing keyed messages, they wanted to account for growth,
and they wanted even distribution of the partitions to consumers as they
grow. 720 happens to have a lot of factors, so it was a good number for
them to pick.

As a note, we have up to 5000 partitions per broker right now on current
hardware, and we're moving to new hardware (more disk, 256 GB of memory,
10gig interfaces) where we're going to have up to 12,000. Our default
partition count for most clusters is 8, and we've got topics up to 512
partitions in some places just taking into account the produce rate alone
(not counting those 720-partition topics that aren't that busy). Many of
our brokers run with over 10k open file handles for regular files alone,
and over 50k open when you include network.

-Todd



On Fri, Sep 4, 2015 at 8:11 AM, tao xiao  wrote:

> Here is a good doc to describe how to choose the right number of partitions
>
>
> http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
>
> On Fri, Sep 4, 2015 at 10:08 PM, Jörg Wagner 
> wrote:
>
> > Hello!
> >
> > Regarding the recommended amount of partitions I am a bit confused.
> > Basically I got the impression that it's better to have lots of
> partitions
> > (see information from linkedin etc). On the other hand, a lot of
> > performance benchmarks floating around show only a few partitions are
> being
> > used.
> >
> > Especially when considering the difference between hdd and ssds and also
> > the amount thereof, what is the way to go?
> >
> > In my case, I seem to have the best stability and performance issues with
> > few partitions *per hdd*, and only one io thread per disk.
> >
> > What are your experiences and recommendations?
> >
> > Cheers
> > Jörg
> >
>
>
>
> --
> Regards,
> Tao
>


Re: Amount of partitions

2015-09-08 Thread Todd Palino
How large are your messages compressed? 50k requests/sec could equate to as
little as 50 KB/sec of traffic per topic, 50 GB/sec, or more. The size of
the messages is going to be pretty important when considering overall
throughput here. Additionally, what kind of network interfaces are you
using on your brokers?

If you're working with 400 partitions per topic, that's around 3600
partitions total, or about 2400 partitions per broker (assuming replication
factor 2). This isn't an unreasonable number of partitions per-broker. We
regularly run twice as many partitions as this with only 64 GB of memory. I
don't think partition count is your problem here. Depending on the message
size, you could easily be doing too much.

Let's assume a message size of 2 KB. Given that you have 3 topics doing 50k
req/sec, that would be 100 MB/sec per topic. With 3 brokers, I'm going to
assume that you're running replication factor 2. This means that each
broker has an inbound traffic rate equivalent to 2 of those topics, or 200
MB/sec. Already, this means that you had better be running something more
than a gigabit network interface. If not, I can pin down your first problem
right there. Given that you're seeing a lot of replication messages in the
logs, I would start here by adding more brokers and balancing out your
traffic further.

Each of your brokers also has an outbound traffic rate of 100 MB/sec just
for the inter-broker replication traffic. You're running a mirror maker,
which means that you have to double that. So you're doing 200 MB/sec
inbound, and 200 MB/sec outbound from each broker. How many copies of
mirror maker are you running in your consumer group? If you're only running
1 copy of mirror maker, that means that single box has to handle 300 MB/sec
of inbound and outbound traffic, and it also has to decompress and
recompress all of that traffic. My experience has been that a single thread
of mirror maker (1 server, running with 1 consumer thread) will max out
around 12 MB/sec. You're not too far off, especially if you have problems
on the broker side that is reducing the efficiency of your system.

I don't think you're looking at anything specific to the kernel version or
the JDK at this point, I think you may just be underpowered (depending on
the answers to the questions buried in there).

-Todd


On Mon, Sep 7, 2015 at 1:08 AM, Jörg Wagner  wrote:

> Thank you very much for both replies.
>
> @Tao
> Thanks, I am aware of and have read that article. I am asking because my
> experience is completely different :/. Everytime we go beyond 400
> partitions the cluster really starts breaking apart.
>
> @Todd
> Thank you, very informative.
>
> Or Details:
> 3 Brokers: 192GB Ram, 27 Disks for log.dirs, 9 topics and estimated 50k
> requests / second on 3 of the topics, the others are negligible.
> Ordering is not required, messages are not keyed
>
> The 3 main topics are one per DC (3 DCs) and being mirrored to the others.
>
>
> The issue arises when we use over 400 partitions, which I think we require
> due to performance and mirroring. Partitions get out of sync and the log is
> being spammed by replicator messages. At the core, we start having massive
> stability issues.
>
> Additionally, the mirrormaker only gets 2k Messages per *minute* through
> with a stable setup of 81 partitions (for the 3 main topics).
>
> Has anyone experienced this and can give more insight? We have been doing
> testing for weeks, compared configuration and setups, without finding the
> main cause.
> Can this be a Kernel (version/configuration) or Java(7) issue?
>
> Cheers
> Jörg
>
>
>
> On 04.09.2015 20:24, Todd Palino wrote:
>
>> Jun's post is a good start, but I find it's easier to talk in terms of
>> more
>> concrete reasons and guidance for having fewer or more partitions per
>> topic.
>>
>> Start with the number of brokers in the cluster. This is a good baseline
>> for the minimum number of partitions in a topic, as it will assure balance
>> over the cluster. Of course, if you have lots of topics, you can
>> potentially skip past this as you'll end up with balanced load in the
>> aggregate, but I think it's a good practice regardless. As with all other
>> advice here, there are always exceptions. If you really, really, really
>> need to assure ordering of messages, you might be stuck with a single
>> partition for some use cases.
>>
>> In general, you should pick more partitions if a) the topic is very busy,
>> or b) you have more consumers. Looking at the second case first, you
>> always
>> want to have at least as many partitions in a topic as you have individual
>> consumers in a consumer group. So if you have 16 

Re: Question on Kafka Replication

2015-09-15 Thread Todd Palino
I put an answer to this on Stack Overflow. Basically, that's not how RF
works for Kafka. It's not a guarantee, it's just how the partitions are
created, and how it is reported when something is down (under replicated
partitions). While there is an option to do auto leader rebalancing,
there's no equivalent option for auto partition migration. It's a fairly
resource intensive task, moving all that data around. If you want to move
replicas around when something is down, you have to do it manually.

That said, it may be interesting to consider.

-Todd


On Tue, Sep 15, 2015 at 7:47 AM, Dhyan  wrote:

> Hi All,
>
> Below is my partition information for the topic **xx_json_topic** .This is
> a Kafka cluster with three nodes .
>
> All nodes up :
>
> Topic: xx_json_topicPartitionCount:4
> ReplicationFactor:2Configs:
> Topic: xx_json_topicPartition: 0Leader: 1   Replicas:
> 3,1   Isr: 3,1
> Topic: xx_json_topicPartition: 1Leader: 2   Replicas:
> 1,2   Isr: 2,1
> Topic: xx_json_topicPartition: 2Leader: 2   Replicas:
> 2,3   Isr: 2,3
> Topic: xx_json_topicPartition: 3Leader: 3   Replicas:
> 3,2   Isr: 2,3
>
> At this point..  if i bring down the node "node-1" ..It looks like below :
>
> Topic: xx_json_topicPartitionCount:4
> ReplicationFactor:2Configs:
> Topic: xx_json_topicPartition: 0Leader: 3   Replicas:
> 3,1   Isr: 3
> Topic: xx_json_topicPartition: 1Leader: 2   Replicas:
> 1,2   Isr: 2
> Topic: xx_json_topicPartition: 2Leader: 2   Replicas:
> 2,3   Isr: 2,3
> Topic: xx_json_topicPartition: 3Leader: 3   Replicas:
> 3,2   Isr: 2,3
>
> My question is ..if kafka knows that the node-1 is down and it needs to
> maintain the replication factor-2 ,wouldn't it make node 3 a replica for
> partition-1 && node-2 a replica for partition-0  then make node-3 and
> node-2 part of their Isr ?
>
> Or you think Kafka doesn't promise that...
> If replication factor is 2 ..It doesn't mean that data will be available
> in atleast 2 nodes at all time(---like consistency level in Cassandra) .
>
> I also have this question posted below :
>
> http://stackoverflow.com/questions/32588784/would-kafka-create-a-new-follower-if-one-of-its-replica-is-down-to-keep-up-with
> <
> http://stackoverflow.com/questions/32588784/would-kafka-create-a-new-follower-if-one-of-its-replica-is-down-to-keep-up-with
> >
>
> —Dhyan


Re: Delay in Zookeeper Offset updates

2015-09-17 Thread Todd Palino
Consumer offsets in Zookeeper are not handled by the Kafka brokers at all -
the consumer writes those directly to Zookeeper. Most likely, what you are
seeing is the interval over which the consumer is committing offsets.
Assuming that you are using the auto.commit.enable setting (it defaults to
true, so unless you have changed that in the consumer configuration it will
apply here), you should check the setting of auto.commit.interval for your
consumer. I believe the default is still 60 seconds (or a config value of
6). If you're seeing the values change in ZK every 5-6 seconds, then it
sounds like it is set much lower.

The setting for the offset commit interval is entirely up to you. The lower
the value, the more often offsets are checkpointed to Zookeeper. However
that means that your consumer is writing to Zookeeper more often, and that
has performance impacts. In an environment like ours, where there is a
large number of consumers for a given cluster, we actually had to increase
the interval to 10 minutes for quite some time because the load on
Zookeeper was just too high. Once we moved the ZK transaction logs to SSDs,
we were able to drop that back to 60 seconds.

-Todd



On Thu, Sep 17, 2015 at 1:14 PM, nitin sharma 
wrote:

> anyone faced this issue?
>
> Regards,
> Nitin Kumar Sharma.
>
>
> On Wed, Sep 2, 2015 at 5:32 PM, nitin sharma 
> wrote:
>
> > Hi All,
> >
> > i have run into a weird issue with my Kafka setup.
> >
> > I see that it takes around 5-6 sec for Zookeeper to update the offset for
> > Kafka topics.
> >
> > I am running "ConsumerOffsetChecker" tool to see that lag and what i
> found
> > that even when my consumer is not up, it takes 5-6 sec for Zookeeper to
> > show the updated Offset .
> >
> > Is this behavior can be fixed? I have tried adding following parameters
> in
> > my Kafka server.property files but no luck.
> >
> > log.flush.interval.messages=1
> > log.default.flush.scheduler.interval.ms=500
> >
> >
> > ConsumerOffsetChecker:
> >  bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect
> > localhost:2181 --group perfConsumer
> >
> >
> > Regards,
> > Nitin Kumar Sharma.
> >
> >
>


Re: Closing connection messages

2015-09-17 Thread Todd Palino
This message is regarding a normal connection close. You see it in the logs
for any connection - consumers, producers, replica fetchers. It can be
particularly noisy because metadata requests often happen on their own
connection.

The log message has been moved to debug level in recent commits (it
actually was changed a while back but snuck back in with one of the larger
sets of changes). In general, I'd say you should just ignore it for now.

-Todd


On Wed, Sep 16, 2015 at 9:55 PM, Rajiv Kurian  wrote:

> My broker logs are full of messages of the following type of log message:
>
> INFO  [kafka-network-thread-9092-1] [kafka.network.Processor
>   ]: Closing socket connection to /some_ip_that_I_know.
>
> I see at least one every 4-5  seconds. Something I identified was that the
> ip of the closed clients was always from one class of application. This
> application mostly consumes kafka partitions (rarely produces) and it uses
> the SimpleConsumer to consume data, make requests about partition
> leadership (to keep it up to date) and to also make requests about the
> latest offset in a set of partitions (for metrics).
>
> The data consumption happens with long running consumers, but the one off
> requests like partition leadership requests and offset queries happen with
> short-lived consumers that are closed after a request is served. However
> the volume of closing "Closing socket connection" log messages is higher
> than what I think the rate of these short lived requests should be. My
> guess is that something else is going on.
>
> Is there a way for me to track what client is exactly having its connection
> closed? Each of my SimpleConsumer clients uses a particular client name
> when connecting to the broker. Is there a way to enable additional logging
> which would give me this data along with the "Closing socket connection"
> message?
>
> Thanks,
>
> Rajiv
>


Re: Log Cleaner Thread Stops

2015-09-18 Thread Todd Palino
Yes, this is a known concern, and it should be fixed with recent commits.
In the meantime, you'll have to do a little manual cleanup.

The problem you're running into is a corrupt message in the offsets topic.
We've seen this a lot. What you need to do is set the topic configuration
to remove the cleanup.policy config, and set retention.ms and segment.ms to
something reasonably low. I suggest using a value of 3 or 4 times your
commit interval for consumers. Then wait until the log segments are reaped
(wait twice as long as the retention.ms you chose, to be safe). Once this
is done, you can set the topic configuration back the way it was (remove
segment.ms and retention.ms configs, and set cleanup.policy=compact).
Lastly, you'll need to do a rolling bounce of the cluster to restart the
brokers (which restarts the log cleaner threads). Technically, you only
need to restart brokers where the threads have died, but it's easier to
just restart all of them.

Keep in mind that when you do this, you are deleting old offsets. If your
consumers are all live and healthy, this shouldn't be a problem because
they will just continue to commit their offsets properly. But if you have
an offline consumer, you'll lose the committed offsets by doing this.

-Todd


On Fri, Sep 18, 2015 at 5:31 AM, John Holland <
john.holl...@objectpartners.com> wrote:

> I've been experiencing this issue across several of our environments ever
> since we enabled the log cleaner for the __consumer_offsets topic.
>
> We are on version 0.8.2.1 of kafka, using the new producer.  All of our
> consumers are set to commit to kafka only.
>
> Below is the stack trace in the log I've encountered across several
> different clusters.  A simple restart of kafka will allow compaction to
> continue on all of the other partitions but the incorrect one will always
> fail.
>
> Here are the values for it from the kafka-topics --describe command:
>
> Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3
> Configs:segment.bytes=104857600,cleanup.policy=compact
>
> Are there any recommendations on how to prevent this and the best way to
> recover from this exception?  This is causing disk space to fill up quickly
> on the node.
>
> I did see an open issue that seems very similar to this
> https://issues.apache.org/jira/browse/KAFKA-1641 but this is the
> __consumer_offsets topic which I have not had any part in setting up nor
> producing to.
>
> [2015-09-18 02:57:25,520] INFO Cleaner 0: Beginning cleaning of log
> __consumer_offsets-17. (kafka.log.LogCleaner)
> [2015-09-18 02:57:25,520] INFO Cleaner 0: Building offset map for
> __consumer_offsets-17... (kafka.log.LogCleaner)
> [2015-09-18 02:57:25,609] INFO Cleaner 0: Building offset map for log
> __consumer_offsets-17 for 46 segments in offset range [468079184,
> 528707475). (kafka.log.LogCleaner)
> [2015-09-18 02:57:25,645] ERROR [kafka-log-cleaner-thread-0], Error due to
>  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: Last clean offset
> is 468079184 but segment base offset is 0 for log __consumer_offsets-17.
> at scala.Predef$.require(Predef.scala:233)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:509)
> at kafka.log.Cleaner.clean(LogCleaner.scala:307)
> at
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> [2015-09-18 02:57:25,654] INFO [kafka-log-cleaner-thread-0], Stopped
>  (kafka.log.LogCleaner)
>
> -John
>


Re: Log Cleaner Thread Stops

2015-09-18 Thread Todd Palino
I think the last major issue with log compaction (that it couldn't handle
compressed messages) was committed as part of
https://issues.apache.org/jira/browse/KAFKA-1374 in August, but I'm not
certain what version this will end up in. It may be part of 0.8.2.2.

Regardless, you'll probably be OK now. We've found that once we clean this
issue up once it doesn't appear to recur. As long as you're not writing in
compressed messages to a log compacted topic (and that won't happen with
__consumer_offsets, as it's managed by the brokers themselves - it would
only be if you were using other log compacted topics), you're likely in the
clear now.

-Todd


On Fri, Sep 18, 2015 at 9:54 AM, John Holland <
john.holl...@objectpartners.com> wrote:

> Thanks!
>
> I did what you suggested and it worked except it was necessary for me to
> remove the cleaner-offset-checkpoint file from the data directory and
> restart the servers.  The log indicates all is well.
>
> Do you know what version the fix to this will be in? I'm not looking
> forward to dealing with this on a reoccurring basis.
>
> -John
>
> On Fri, Sep 18, 2015 at 8:48 AM Todd Palino  wrote:
>
> > Yes, this is a known concern, and it should be fixed with recent commits.
> > In the meantime, you'll have to do a little manual cleanup.
> >
> > The problem you're running into is a corrupt message in the offsets
> topic.
> > We've seen this a lot. What you need to do is set the topic configuration
> > to remove the cleanup.policy config, and set retention.ms and segment.ms
> > to
> > something reasonably low. I suggest using a value of 3 or 4 times your
> > commit interval for consumers. Then wait until the log segments are
> reaped
> > (wait twice as long as the retention.ms you chose, to be safe). Once
> this
> > is done, you can set the topic configuration back the way it was (remove
> > segment.ms and retention.ms configs, and set cleanup.policy=compact).
> > Lastly, you'll need to do a rolling bounce of the cluster to restart the
> > brokers (which restarts the log cleaner threads). Technically, you only
> > need to restart brokers where the threads have died, but it's easier to
> > just restart all of them.
> >
> > Keep in mind that when you do this, you are deleting old offsets. If your
> > consumers are all live and healthy, this shouldn't be a problem because
> > they will just continue to commit their offsets properly. But if you have
> > an offline consumer, you'll lose the committed offsets by doing this.
> >
> > -Todd
> >
> >
> > On Fri, Sep 18, 2015 at 5:31 AM, John Holland <
> > john.holl...@objectpartners.com> wrote:
> >
> > > I've been experiencing this issue across several of our environments
> ever
> > > since we enabled the log cleaner for the __consumer_offsets topic.
> > >
> > > We are on version 0.8.2.1 of kafka, using the new producer.  All of our
> > > consumers are set to commit to kafka only.
> > >
> > > Below is the stack trace in the log I've encountered across several
> > > different clusters.  A simple restart of kafka will allow compaction to
> > > continue on all of the other partitions but the incorrect one will
> always
> > > fail.
> > >
> > > Here are the values for it from the kafka-topics --describe command:
> > >
> > > Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3
> > > Configs:segment.bytes=104857600,cleanup.policy=compact
> > >
> > > Are there any recommendations on how to prevent this and the best way
> to
> > > recover from this exception?  This is causing disk space to fill up
> > quickly
> > > on the node.
> > >
> > > I did see an open issue that seems very similar to this
> > > https://issues.apache.org/jira/browse/KAFKA-1641 but this is the
> > > __consumer_offsets topic which I have not had any part in setting up
> nor
> > > producing to.
> > >
> > > [2015-09-18 02:57:25,520] INFO Cleaner 0: Beginning cleaning of log
> > > __consumer_offsets-17. (kafka.log.LogCleaner)
> > > [2015-09-18 02:57:25,520] INFO Cleaner 0: Building offset map for
> > > __consumer_offsets-17... (kafka.log.LogCleaner)
> > > [2015-09-18 02:57:25,609] INFO Cleaner 0: Building offset map for log
> > > __consumer_offsets-17 for 46 segments in offset range [468079184,
> > > 528707475). (kafka.log.LogCleaner)
> > > [2015-09-18 02:57:25,645] ERROR [kafka-log-cleaner-thread-0], Error due
> > to
&g

Re: log.retention.hours not working?

2015-09-21 Thread Todd Palino
Retention is going to be based on a combination of both the retention and
segment size settings (as a side note, it's recommended to use
log.retention.ms and log.segment.ms, not the hours config. That's there for
legacy reasons, but the ms configs are more consistent). As messages are
received by Kafka, they are written to the current open log segment for
each partition. That segment is rotated when either the log.segment.bytes
or the log.segment.ms limit is reached. Once that happens, the log segment
is closed and a new one is opened. Only after a log segment is closed can
it be deleted via the retention settings. Once the log segment is closed
AND either all the messages in the segment are older than log.retention.ms
OR the total partition size is greater than log.retention.bytes, then the
log segment is purged.

As a note, the default segment limit is 1 gibibyte. So if you've only
written in 1k of messages, you have a long way to go before that segment
gets rotated. This is why the retention is referred to as a minimum time.
You can easily retain much more than you're expecting for slow topics.

-Todd


On Mon, Sep 21, 2015 at 7:28 PM, allen chan 
wrote:

> I guess that kind of makes sense.
> The following section in the config is what confused me:
> *"# The following configurations control the disposal of log segments. The
> policy can*
> *# be set to delete segments after a period of time, or after a given size
> has accumulated.*
> *# A segment will be deleted whenever *either* of these criteria are met.
> Deletion always happens*
> *# from the end of the log."*
>
> That makes it sound like deletion will happen if either of the criteria is
> met.
> I thought the whole idea of those two settings (time and bytes) is telling
> the application when it will need to delete.
>
>
>
> On Mon, Sep 21, 2015 at 7:10 PM, noah  wrote:
>
> > "minimum age of a log file to be eligible for deletion" Key word is
> > minimum. If you only have 1k logs, Kafka doesn't need to delete anything.
> > Try to push more data through and when it needs to, it will start
> deleting
> > old logs.
> >
> > On Mon, Sep 21, 2015 at 8:58 PM allen chan  >
> > wrote:
> >
> > > Hi,
> > >
> > > Just brought up new kafka cluster for testing.
> > > Was able to use the console producers to send 1k of logs and received
> it
> > on
> > > the console consumer side.
> > >
> > > The one issue that i have right now is that the retention period does
> not
> > > seem to be working.
> > >
> > > *# The minimum age of a log file to be eligible for deletion*
> > > *log.retention.hours=1*
> > >
> > > I have waited for almost 2 hours and the 1k of logs are still in kafka.
> > >
> > > I did see these messages pop up on the console
> > > *[2015-09-21 17:12:01,236] INFO Scheduling log segment 0 for log test-1
> > for
> > > deletion. (kafka.log.Log)*
> > > *[2015-09-21 17:13:01,238] INFO Deleting segment 0 from log test-1.
> > > (kafka.log.Log)*
> > > *[2015-09-21 17:13:01,239] INFO Deleting index
> > > /var/log/kafka/test-1/.index.deleted
> > > (kafka.log.OffsetIndex)*
> > >
> > > I know the logs are still in there because i am using
> > > the kafka-consumer-offset-checker.sh and it says how many messages the
> > > logSize is.
> > >
> > > What am i missing in my configuration?
> > >
> > >
> > >
> > > Thanks!
> > >
> > > --
> > > Allen Michael Chan
> > >
> >
>
>
>
> --
> Allen Michael Chan
>


Re: Log Cleaner Thread Stops

2015-09-24 Thread Todd Palino
Well, in general you can't currently use compressed messages in any topic
that has compaction turned on regardless of whether or not you are using
Kafka-committed offsets. The log compaction thread will die either way.
There's only one compression thread for the broker that runs on all topics
that use compaction.

Jason, to address your question, it's probably wise to wait for now.
Zookeeper offsets work, so unless it's broke, don't fix it for now. We're
using Kafka-committed offsets at LinkedIn for our mirror makers and our
auditor application (both of which are considered infrastructure
applications for Kafka), but we're not encouraging other internal users to
switch over just yet.

-Todd


On Wed, Sep 23, 2015 at 3:21 PM, James Cheng  wrote:

>
> On Sep 18, 2015, at 10:25 AM, Todd Palino  wrote:
>
> > I think the last major issue with log compaction (that it couldn't handle
> > compressed messages) was committed as part of
> > https://issues.apache.org/jira/browse/KAFKA-1374 in August, but I'm not
> > certain what version this will end up in. It may be part of 0.8.2.2.
> >
> > Regardless, you'll probably be OK now. We've found that once we clean
> this
> > issue up once it doesn't appear to recur. As long as you're not writing
> in
> > compressed messages to a log compacted topic (and that won't happen with
> > __consumer_offsets, as it's managed by the brokers themselves - it would
> > only be if you were using other log compacted topics), you're likely in
> the
> > clear now.
> >
>
> Todd,
>
> If I understand your description of the problem, you are saying that
> enabling log compaction on a topic with compressed messages can (will?)
> cause the log cleaner to crash when it encounters those compressed
> messages. And the death of the cleaner thread will prevent log compaction
> from running on other topics, even ones that don't have compressed messages.
>
> That means if we have a cluster where we want to use log compaction on
> *any* topic, we need to either:
> 1) apply https://issues.apache.org/jira/browse/KAFKA-1374 (or upgrade to
> some version it is applied)
> OR
> 2) make sure that we don't use compressed messages in *any* topic that has
> log compaction turned on.
>
> And, more specifically, if we want to make use of __consumer_offsets, then
> we cannot use compressed messages in any topic that has compaction turned
> on.
>
> Is that right?
> -James
>
> > -Todd
> >
> >
> > On Fri, Sep 18, 2015 at 9:54 AM, John Holland <
> > john.holl...@objectpartners.com> wrote:
> >
> >> Thanks!
> >>
> >> I did what you suggested and it worked except it was necessary for me to
> >> remove the cleaner-offset-checkpoint file from the data directory and
> >> restart the servers.  The log indicates all is well.
> >>
> >> Do you know what version the fix to this will be in? I'm not looking
> >> forward to dealing with this on a reoccurring basis.
> >>
> >> -John
> >>
> >> On Fri, Sep 18, 2015 at 8:48 AM Todd Palino  wrote:
> >>
> >>> Yes, this is a known concern, and it should be fixed with recent
> commits.
> >>> In the meantime, you'll have to do a little manual cleanup.
> >>>
> >>> The problem you're running into is a corrupt message in the offsets
> >> topic.
> >>> We've seen this a lot. What you need to do is set the topic
> configuration
> >>> to remove the cleanup.policy config, and set retention.ms and
> segment.ms
> >>> to
> >>> something reasonably low. I suggest using a value of 3 or 4 times your
> >>> commit interval for consumers. Then wait until the log segments are
> >> reaped
> >>> (wait twice as long as the retention.ms you chose, to be safe). Once
> >> this
> >>> is done, you can set the topic configuration back the way it was
> (remove
> >>> segment.ms and retention.ms configs, and set cleanup.policy=compact).
> >>> Lastly, you'll need to do a rolling bounce of the cluster to restart
> the
> >>> brokers (which restarts the log cleaner threads). Technically, you only
> >>> need to restart brokers where the threads have died, but it's easier to
> >>> just restart all of them.
> >>>
> >>> Keep in mind that when you do this, you are deleting old offsets. If
> your
> >>> consumers are all live and healthy, this shouldn't be a problem because
> >>> they will just continue to commit th

Re: Log Cleaner Thread Stops

2015-09-24 Thread Todd Palino
For now, that's the way it is. Historically, we've only monitored the lag
for our infrastructure applications. Other users are responsible for their
own checking, typically using the maxlag mbean or some application specific
metric. Besides the core, we've probably got a dozen or so consumers moved
over to Kafka committed offsets at this point.

Of course, just those apps do cover well over a hundred consumer groups :)

-Todd

On Thursday, September 24, 2015, James Cheng  wrote:

>
> > On Sep 24, 2015, at 8:11 PM, Todd Palino  > wrote:
> >
> > Well, in general you can't currently use compressed messages in any topic
> > that has compaction turned on regardless of whether or not you are using
> > Kafka-committed offsets. The log compaction thread will die either way.
> > There's only one compression thread for the broker that runs on all
> topics
> > that use compaction.
> >
> > Jason, to address your question, it's probably wise to wait for now.
> > Zookeeper offsets work, so unless it's broke, don't fix it for now. We're
> > using Kafka-committed offsets at LinkedIn for our mirror makers and our
> > auditor application (both of which are considered infrastructure
> > applications for Kafka), but we're not encouraging other internal users
> to
> > switch over just yet.
> >
>
> Burrow depends on kafka-commited offsets, doesn’t it? I guess that means
> Burrow is only being used to monitor your mirror makers and auditor
> application, then?
>
> -James
>
> > -Todd
> >
> >
> > On Wed, Sep 23, 2015 at 3:21 PM, James Cheng  > wrote:
> >
> >>
> >> On Sep 18, 2015, at 10:25 AM, Todd Palino  > wrote:
> >>
> >>> I think the last major issue with log compaction (that it couldn't
> handle
> >>> compressed messages) was committed as part of
> >>> https://issues.apache.org/jira/browse/KAFKA-1374 in August, but I'm
> not
> >>> certain what version this will end up in. It may be part of 0.8.2.2.
> >>>
> >>> Regardless, you'll probably be OK now. We've found that once we clean
> >> this
> >>> issue up once it doesn't appear to recur. As long as you're not writing
> >> in
> >>> compressed messages to a log compacted topic (and that won't happen
> with
> >>> __consumer_offsets, as it's managed by the brokers themselves - it
> would
> >>> only be if you were using other log compacted topics), you're likely in
> >> the
> >>> clear now.
> >>>
> >>
> >> Todd,
> >>
> >> If I understand your description of the problem, you are saying that
> >> enabling log compaction on a topic with compressed messages can (will?)
> >> cause the log cleaner to crash when it encounters those compressed
> >> messages. And the death of the cleaner thread will prevent log
> compaction
> >> from running on other topics, even ones that don't have compressed
> messages.
> >>
> >> That means if we have a cluster where we want to use log compaction on
> >> *any* topic, we need to either:
> >> 1) apply https://issues.apache.org/jira/browse/KAFKA-1374 (or upgrade
> to
> >> some version it is applied)
> >> OR
> >> 2) make sure that we don't use compressed messages in *any* topic that
> has
> >> log compaction turned on.
> >>
> >> And, more specifically, if we want to make use of __consumer_offsets,
> then
> >> we cannot use compressed messages in any topic that has compaction
> turned
> >> on.
> >>
> >> Is that right?
> >> -James
> >>
> >>> -Todd
> >>>
> >>>
> >>> On Fri, Sep 18, 2015 at 9:54 AM, John Holland <
> >>> john.holl...@objectpartners.com > wrote:
> >>>
> >>>> Thanks!
> >>>>
> >>>> I did what you suggested and it worked except it was necessary for me
> to
> >>>> remove the cleaner-offset-checkpoint file from the data directory and
> >>>> restart the servers.  The log indicates all is well.
> >>>>
> >>>> Do you know what version the fix to this will be in? I'm not looking
> >>>> forward to dealing with this on a reoccurring basis.
> >>>>
> >>>> -John
> >>>>
> >>>> On Fri, Sep 18, 2015 at 8:48 AM Todd Palino  > wrote:
> >>>>
> >>>>> Yes, this is a known concern

Re: Frequent Consumer and Producer Disconnects

2015-09-25 Thread Todd Palino
I don't see the logs attached, but what does the GC look like in your
applications? A lot of times this is caused (at least on the consumer side)
by the Zookeeper session expiring due to excessive GC activity, which
causes the consumers to go into a rebalance and change up their connections.

-Todd


On Fri, Sep 25, 2015 at 1:25 PM, Gwen Shapira  wrote:

> How busy are the clients?
>
> The brokers occasionally close idle connections, this is normal and
> typically not something to worry about.
> However, this shouldn't happen to consumers that are actively reading data.
>
> I'm wondering if the "consumers not making any progress" could be due to a
> different issue, and because they are idle, the connection closes (vs the
> other way around).
>
> On Thu, Sep 24, 2015 at 2:32 PM, noah  wrote:
>
> > We are having issues with producers and consumers frequently fully
> > disconnecting (from both the brokers and ZK) and reconnecting without any
> > apparent cause. On our production systems it can happen anywhere from
> every
> > 10-15 seconds to 15-20 minutes. On our less beefy test systems and
> > developer laptops, it can happen almost constantly.
> >
> > We see no errors in the logs (sample attached), just a message for each
> of
> > our our consumers and producers disconnecting, then reconnecting. On the
> > systems where it happens constantly, the consumers are not making any
> > progress.
> >
> > The logs on the brokers are equally unhelpful, they show only frequent
> > connects and reconnects, without any apparent cause.
> >
> > What could be causing this behavior?
> >
> >
>


Re: Frequent Consumer and Producer Disconnects

2015-09-25 Thread Todd Palino
That rebalance cycle doesn't look endless. I see that you started 23
consumers, and I see 23 rebalances finishing successfully, which is
correct. You will see rebalance messages from all of the consumers you
started. It all happens within about 2 seconds, which is fine. I agree that
there is a lot of log messages, but I'm not seeing anything that is
particularly a problem here. After the segment of pot you provided, your
consumers will be running properly. Now, given you have a topic with 16
partitions, and you're running 23 consumers, 7 of those consumer threads
are going to be idle because they do not own partitions.

-Todd


On Fri, Sep 25, 2015 at 3:27 PM, noah  wrote:

> We're seeing this the most on developer machines that are starting up
> multiple high level consumers on the same topic+group as part of service
> startup. The consumers do not seem to get a chance to consume anything
> before they disconnect.
>
> These are developer topics, so it is possible/likely that there isn't
> anything for them to consume in the topic, but the same service will start
> producing, so I would expect them to not be idle for long.
>
> Could it be the way we are bring up multiple consumers at the same time is
> hitting some sort of endless rebalance cycle? And/or the resulting
> thrashing is causing them to time out, rebalance, etc.?
>
> I've tried attaching the logs again. Thanks!
>
> On Fri, Sep 25, 2015 at 3:33 PM Todd Palino  wrote:
>
>> I don't see the logs attached, but what does the GC look like in your
>> applications? A lot of times this is caused (at least on the consumer
>> side)
>> by the Zookeeper session expiring due to excessive GC activity, which
>> causes the consumers to go into a rebalance and change up their
>> connections.
>>
>> -Todd
>>
>>
>> On Fri, Sep 25, 2015 at 1:25 PM, Gwen Shapira  wrote:
>>
>> > How busy are the clients?
>> >
>> > The brokers occasionally close idle connections, this is normal and
>> > typically not something to worry about.
>> > However, this shouldn't happen to consumers that are actively reading
>> data.
>> >
>> > I'm wondering if the "consumers not making any progress" could be due
>> to a
>> > different issue, and because they are idle, the connection closes (vs
>> the
>> > other way around).
>> >
>> > On Thu, Sep 24, 2015 at 2:32 PM, noah  wrote:
>> >
>> > > We are having issues with producers and consumers frequently fully
>> > > disconnecting (from both the brokers and ZK) and reconnecting without
>> any
>> > > apparent cause. On our production systems it can happen anywhere from
>> > every
>> > > 10-15 seconds to 15-20 minutes. On our less beefy test systems and
>> > > developer laptops, it can happen almost constantly.
>> > >
>> > > We see no errors in the logs (sample attached), just a message for
>> each
>> > of
>> > > our our consumers and producers disconnecting, then reconnecting. On
>> the
>> > > systems where it happens constantly, the consumers are not making any
>> > > progress.
>> > >
>> > > The logs on the brokers are equally unhelpful, they show only frequent
>> > > connects and reconnects, without any apparent cause.
>> > >
>> > > What could be causing this behavior?
>> > >
>> > >
>> >
>>
>


Re: Frequent Consumer and Producer Disconnects

2015-09-26 Thread Todd Palino
Topic creation should only cause a rebalance for wildcard consumers (and I
believe that is regardless of whether or not the wildcard covers the topic
- once the ZK watch fires a rebalance is going to happen).

Back to the original concern, it would be helpful to see more of that log,
in that case. When a rebalance is triggered, there will be a log message
that will indicate why. This is going to be caused by a change in the group
membership (which has a number of causes, but at least it narrows it down)
or a topic change. Figuring out why the consumers are rebalancing is the
first step to trying to reduce it.

-Todd


On Saturday, September 26, 2015, noah  wrote:

> Thanks, that gives us some more to look at.
>
> That is unfortunately a small section of the log file. When we hit this
> problem (which is not every time,) it will continue like that for hours.
>
> We also still have developers creating topics semi-regularly, which it
> seems like can cause the high level consumer to disconnect?
>
>
> On Fri, Sep 25, 2015 at 6:16 PM Todd Palino  > wrote:
>
>> That rebalance cycle doesn't look endless. I see that you started 23
>> consumers, and I see 23 rebalances finishing successfully, which is
>> correct. You will see rebalance messages from all of the consumers you
>> started. It all happens within about 2 seconds, which is fine. I agree that
>> there is a lot of log messages, but I'm not seeing anything that is
>> particularly a problem here. After the segment of pot you provided, your
>> consumers will be running properly. Now, given you have a topic with 16
>> partitions, and you're running 23 consumers, 7 of those consumer threads
>> are going to be idle because they do not own partitions.
>>
>> -Todd
>>
>>
>> On Fri, Sep 25, 2015 at 3:27 PM, noah > > wrote:
>>
>>> We're seeing this the most on developer machines that are starting up
>>> multiple high level consumers on the same topic+group as part of service
>>> startup. The consumers do not seem to get a chance to consume anything
>>> before they disconnect.
>>>
>>> These are developer topics, so it is possible/likely that there isn't
>>> anything for them to consume in the topic, but the same service will start
>>> producing, so I would expect them to not be idle for long.
>>>
>>> Could it be the way we are bring up multiple consumers at the same time
>>> is hitting some sort of endless rebalance cycle? And/or the resulting
>>> thrashing is causing them to time out, rebalance, etc.?
>>>
>>> I've tried attaching the logs again. Thanks!
>>>
>>> On Fri, Sep 25, 2015 at 3:33 PM Todd Palino >> > wrote:
>>>
>>>> I don't see the logs attached, but what does the GC look like in your
>>>> applications? A lot of times this is caused (at least on the consumer
>>>> side)
>>>> by the Zookeeper session expiring due to excessive GC activity, which
>>>> causes the consumers to go into a rebalance and change up their
>>>> connections.
>>>>
>>>> -Todd
>>>>
>>>>
>>>> On Fri, Sep 25, 2015 at 1:25 PM, Gwen Shapira >>> > wrote:
>>>>
>>>> > How busy are the clients?
>>>> >
>>>> > The brokers occasionally close idle connections, this is normal and
>>>> > typically not something to worry about.
>>>> > However, this shouldn't happen to consumers that are actively reading
>>>> data.
>>>> >
>>>> > I'm wondering if the "consumers not making any progress" could be due
>>>> to a
>>>> > different issue, and because they are idle, the connection closes (vs
>>>> the
>>>> > other way around).
>>>> >
>>>> > On Thu, Sep 24, 2015 at 2:32 PM, noah >>> > wrote:
>>>> >
>>>> > > We are having issues with producers and consumers frequently fully
>>>> > > disconnecting (from both the brokers and ZK) and reconnecting
>>>> without any
>>>> > > apparent cause. On our production systems it can happen anywhere
>>>> from
>>>> > every
>>>> > > 10-15 seconds to 15-20 minutes. On our less beefy test systems and
>>>> > > developer laptops, it can happen almost constantly.
>>>> > >
>>>> > > We see no errors in the logs (sample attached), just a message for
>>>> each
>>>> > of
>>>> > > our our consumers and producers disconnecting, then reconnecting.
>>>> On the
>>>> > > systems where it happens constantly, the consumers are not making
>>>> any
>>>> > > progress.
>>>> > >
>>>> > > The logs on the brokers are equally unhelpful, they show only
>>>> frequent
>>>> > > connects and reconnects, without any apparent cause.
>>>> > >
>>>> > > What could be causing this behavior?
>>>> > >
>>>> > >
>>>> >
>>>>
>>>
>>


Re: Log Cleaner Thread Stops

2015-09-28 Thread Todd Palino
This is correct, compression isn't used for the offsets at all. If, for
some reason, you do have either a compressed or a corrupt message somewhere
in the topic, the method I mentioned previously will flush it out. We
haven't seen that as a recurring problem, so fixing it once is sufficient.

-Todd


On Mon, Sep 28, 2015 at 9:53 AM, Jason Rosenberg  wrote:

> Just to clarify too, if the only use case for log-compaction we use is for
> the __consumer_offsets, we should be ok, correct?  I assume compression is
> not used by default for consumer offsets?
>
> Jason
>
> On Fri, Sep 25, 2015 at 12:15 AM, Todd Palino  wrote:
>
> > For now, that's the way it is. Historically, we've only monitored the lag
> > for our infrastructure applications. Other users are responsible for
> their
> > own checking, typically using the maxlag mbean or some application
> specific
> > metric. Besides the core, we've probably got a dozen or so consumers
> moved
> > over to Kafka committed offsets at this point.
> >
> > Of course, just those apps do cover well over a hundred consumer groups
> :)
> >
> > -Todd
> >
> > On Thursday, September 24, 2015, James Cheng  wrote:
> >
> > >
> > > > On Sep 24, 2015, at 8:11 PM, Todd Palino  > > > wrote:
> > > >
> > > > Well, in general you can't currently use compressed messages in any
> > topic
> > > > that has compaction turned on regardless of whether or not you are
> > using
> > > > Kafka-committed offsets. The log compaction thread will die either
> way.
> > > > There's only one compression thread for the broker that runs on all
> > > topics
> > > > that use compaction.
> > > >
> > > > Jason, to address your question, it's probably wise to wait for now.
> > > > Zookeeper offsets work, so unless it's broke, don't fix it for now.
> > We're
> > > > using Kafka-committed offsets at LinkedIn for our mirror makers and
> our
> > > > auditor application (both of which are considered infrastructure
> > > > applications for Kafka), but we're not encouraging other internal
> users
> > > to
> > > > switch over just yet.
> > > >
> > >
> > > Burrow depends on kafka-commited offsets, doesn’t it? I guess that
> means
> > > Burrow is only being used to monitor your mirror makers and auditor
> > > application, then?
> > >
> > > -James
> > >
> > > > -Todd
> > > >
> > > >
> > > > On Wed, Sep 23, 2015 at 3:21 PM, James Cheng  > > > wrote:
> > > >
> > > >>
> > > >> On Sep 18, 2015, at 10:25 AM, Todd Palino  > > > wrote:
> > > >>
> > > >>> I think the last major issue with log compaction (that it couldn't
> > > handle
> > > >>> compressed messages) was committed as part of
> > > >>> https://issues.apache.org/jira/browse/KAFKA-1374 in August, but
> I'm
> > > not
> > > >>> certain what version this will end up in. It may be part of
> 0.8.2.2.
> > > >>>
> > > >>> Regardless, you'll probably be OK now. We've found that once we
> clean
> > > >> this
> > > >>> issue up once it doesn't appear to recur. As long as you're not
> > writing
> > > >> in
> > > >>> compressed messages to a log compacted topic (and that won't happen
> > > with
> > > >>> __consumer_offsets, as it's managed by the brokers themselves - it
> > > would
> > > >>> only be if you were using other log compacted topics), you're
> likely
> > in
> > > >> the
> > > >>> clear now.
> > > >>>
> > > >>
> > > >> Todd,
> > > >>
> > > >> If I understand your description of the problem, you are saying that
> > > >> enabling log compaction on a topic with compressed messages can
> > (will?)
> > > >> cause the log cleaner to crash when it encounters those compressed
> > > >> messages. And the death of the cleaner thread will prevent log
> > > compaction
> > > >> from running on other topics, even ones that don't have compressed
> > > messages.
> > > >>
> > > >> That means if we have a cluster where we want to use log compaction
> on
>

Re: number of topics given many consumers and groups within the data

2015-09-30 Thread Todd Palino
So I disagree with the idea to use custom partitioning, depending on your
requirements. Having a consumer consume from a single partition is not
(currently) that easy. If you don't care which consumer gets which
partition (group), then it's not that bad. You have 20 partitions, you have
20 consumers, and you use custom partitioning as noted. The consumers use
the high level consumer with a single group, each one will get one
partition each, and it's pretty straightforward. If a consumer crashes, you
will end up with two partitions on one of the remaining consumers. If this
is OK, this is a decent solution.

If, however, you require that each consumer always have the same group of
data, and you need to know what that group is beforehand, it's more
difficult. You need to use the simple consumer to do it, which means you
need to implement a lot of logic for error and status code handling
yourself, and do it right. In this case, I think your idea of using 400
separate topics is sound. This way you can still use the high level
consumer, which takes care of the error handling for you, and your data is
separated out by topic.

Provided it is not an issue to implement it in your producer, I would go
with the separate topics. Alternately, if you're not sure you always want
separate topics, you could go with something similar to your second idea,
but have a consumer read the single topic and split the data out into 400
separate topics in Kafka (no need for Cassandra or Redis or anything else).
Then your real consumers can all consume their separate topics. Reading and
writing the data one extra time is much better than rereading all of it 400
times and throwing most of it away.

-Todd


On Wed, Sep 30, 2015 at 9:06 AM, Ben Stopford  wrote:

> Hi Shaun
>
> You might consider using a custom partition assignment strategy to push
> your different “groups" to different partitions. This would allow you walk
> the middle ground between "all consumers consume everything” and “one topic
> per consumer” as you vary the number of partitions in the topic, albeit at
> the cost of a little extra complexity.
>
> Also, not sure if you’ve seen it but there is quite a good section in the
> FAQ here <
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowmanytopicscanIhave?>
> on topic and partition sizing.
>
> B
>
> > On 29 Sep 2015, at 18:48, Shaun Senecal 
> wrote:
> >
> > Hi
> >
> >
> > I heave read Jay Kreps post regarding the number of topics that can be
> handled by a broker (
> https://www.quora.com/How-many-topics-can-be-created-in-Apache-Kafka),
> and it has left me with more questions that I dont see answered anywhere
> else.
> >
> >
> > We have a data stream which will be consumed by many consumers (~400).
> We also have many "groups" within our data.  A group in the data
> corresponds 1:1 with what the consumers would consume, so consumer A only
> ever see group A messages, consumer B only consumes group B messages, etc.
> >
> >
> > The downstream consumers will be consuming via a websocket API, so the
> API server will be the thing consuming from kafka.
> >
> >
> > If I use a single topic with, say, 20 partitions, the consumers in the
> API server would need to re-read the same messages over and over for each
> consumer, which seems like a waste of network and a potential bottleneck.
> >
> >
> > Alternatively, I could use a single topic with 20 partitions and have a
> single consumer in the API put the messages into cassandra/redis (as
> suggested by Jay), and serve out the downstream consumer streams that way.
> However, that requires using a secondary sorted storage, which seems like a
> waste (and added complexity) given that Kafka already has the data exactly
> as I need it.  Especially if cassandra/redis are required to maintain a
> long TTL on the stream.
> >
> >
> > Finally, I could use 1 topic per group, each with a single partition.
> This would result in 400 topics on the broker, but would allow the API
> server to simply serve the stream for each consumer directly from kafka and
> wont require additional machinery to serve out the requests.
> >
> >
> > The 400 topic solution makes the most sense to me (doesnt require extra
> services, doesnt waste resources), but seem to conflict with best
> practices, so I wanted to ask the community for input.  Has anyone done
> this before?  What makes the most sense here?
> >
> >
> >
> >
> > Thanks
> >
> >
> > Shaun
>
>


Re: Kafka Consumers getting overlapped data

2015-09-30 Thread Todd Palino
What Python library are you using?

In addition, there's no real guarantee that any two libraries will
implement consumer balancing using the same algorithm (if they do it at
all).

-Todd


On Wednesday, September 30, 2015, Rahul R  wrote:

> I have 2 kafka consumers. Both the consumers have the same group_id. One is
> written in java [1] and the other in python. According to the documentation
> [2] ,  if both the consumers have the same group_id , then I should be
> getting non-overlapping set of data . But in this case, both the consumers
> are getting all the data. I have not digressed from the default setting.
> Any idea on whats going on ?
>
> Thanks,
> ./R
>
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> [2] http://kafka.apache.org/documentation.html
>


Re: mapping events to topics

2015-10-08 Thread Todd Palino
Multiple topics is the model I would recommend for what you have described.
LinkedIn has an environment where we have a wide mix, in a lot of different
clusters. We have some topics that have one producer and one consumer
(queuing). We have some topics that are multi-producer (tracking and
metrics, mostly). Some of those are multi-consumer (tracking), and some are
mostly single consumer (metrics). Besides all of this, we have a couple
wildcard consumers that read everything (our audit system, and mirror
makers).

In your case, the rules engine sounds like a similar consumer case as our
audit consumer. I would not make the determination as to how many topics
you need based on that consumer because of that. Since the majority of what
you're describing is consumers who are interested in discrete data sets, go
with breaking out the topics based on that (all other things being equal).
While Gwen is absolutely right about her guidelines, consuming and throwing
away most of the data is a cardinal sin and should be avoided. Multi-topic
consumers are much less of a problem to deal with. Personally, I wouldn't
bother combining the messages into a separate topic for the rules engine -
I would just consume all the topics.

You mentioned message ordering, and that can present an issue. Now, you'd
likely have this problem regardless of how many topics you use, as ordering
is only guaranteed in a single partition. So you'd either have to have one
partition, or you would have to use some sort of partitioning scheme on the
messages that means hard ordering of all the messages matters less.
Obviously, when you have multiple topics it's the same as having multiple
partitions. You need to decide how important ordering within Kafka is to
your application, and if it can be handled separately inside of the
application.

-Todd



On Thu, Oct 8, 2015 at 8:50 AM, Mark Drago  wrote:

> Gwen,
>
> Thanks for your reply.  I understand all of the points you've made.  I
> think the challenge for us is that we have some consumers that are
> interested in messages of one type, but we also have a rules engine that is
> checking for events of many types and acting on them.
>
> If we put discrete event types on individual topics:
>   * Our rules engine would have to monitor many of these topics (10-20)
>   * Other consumers would only see messages they care about
>
> If we put all event types on one topic:
>   * Our rules engine only has to monitor one topic
>   * Other consumers would parse and then discard the majority of the
> messages that they see
>
> Perhaps a better approach would be to have different topics for the
> different use cases?  This would be similar to an approach that merges
> smaller topics together as needed.  So, each event type would be on its own
> topic but then we would put a subset of those messages on another topic
> that is destined for the rules engine.  The consumers that only care about
> one message type would listen on dedicated topics and the rules engine
> would just monitor one topic for all of the events that it cares about.  We
> would need to have something moving/creating messages on the rules engine
> topic.  We may also run in to another set of problems as the ordering of
> messages of different types no longer exists as they're coming from
> separate topics.
>
> I'm curious to hear if anyone else has been in a similar situation and had
> to make a judgement call about the best approach to take.
>
> Thanks,
> Mark.
>
> I usually approach this questions by looking at possible consumers. You
> > usually want each consumer to read from relatively few topics, use most
> > of the messages it receives and have fairly cohesive logic for using
> these
> > messages. Signs that things went wrong with too few topics:
> > * Consumers that throw away 90% of the messages on topics they read
> > * Consumers with gigantic switch statements for handling all the
> different
> > message types they get Signs that you have too many topics:
> > * Every consumer needs to read messages from 20 different topics in order
> > to construct the objects it actually uses If you ever did data modeling
> > for a datawarehouse, this will look a bit
> > familiar :) Gwen
> > On Tue, Oct 6, 2015 at 4:46 PM, Mark Drago  wrote:
> >
> > Hello,
> > >
> > > At my organization we are already using kafka in a few areas, but we're
> > > looking to expand our use and we're struggling with how best to
> > distribute
> > > our events on to topics.
> > >
> > > We have on the order of 30 different kinds of events that we'd like to
> > > distribute via kafka. We have one or two consumers that have a need to
> > > consume many of these types of events (~20 out of the 30) and we have
> > other
> > > consumers that are only interested in one type of event.
> > >
> > > We're trying to decide between a model where we have one topic
> containing
> > > many kinds of events or a model where we have many topics each
> containing
> > > one type of event. We h

Re: Kafka availability guarantee

2015-10-11 Thread Todd Palino
To answer the question, yes, it is incorrect. There are a few things you
can do to minimize problems. One is to disable unclean leader election, use
acks=-1 on the producers, have an RF of 3 or greater, and set the min ISR
to 2. This means that the topic will only be available if there are at
least 2 replicas in sync, your producers will all wait for acknowledgements
from all in sync replicas (therefore, at least 2) before considering
produce requests to be complete, and if you get in a situation where all
three replicas go down, the cluster will not perform an unclean leader
election (which can lose messages).

Basically, you have to trade availability for correctness here. You get to
pick one.

-Todd


On Sun, Oct 11, 2015 at 5:10 PM,  wrote:

> You can enable unclean leader election, which would allow the lagging
> partition to still become leader. There would be some data loss (offsets
> between the leggy partition and the old leader) but the partition would
> stay online and available.
>
>
>
> Sent from my BlackBerry 10 smartphone on the TELUS network.
>   Original Message
> From: Elias Levy
> Sent: Sunday, October 11, 2015 5:00 PM
> To: users@kafka.apache.org
> Reply To: users@kafka.apache.org
> Subject: Kafka availability guarantee
>
> Reading through the Kafka documentation for statements regarding Kafka's
> availability guarantees one comes across this statement:
>
> *With this ISR model and f+1 replicas, a Kafka topic can tolerate f
> failures without losing committed messages.*
>
> In my opinion, this appears incorrect or at best misleading. Consider a
> partition with a replication factor of 3. If one of the replicas lags, but
> does not fail, the ISR will be shrank to a set of 2 replicas, the leader
> and and one follower. The leader will consider the message committed when
> itself and the in sync follower write the message to their respective
> logs. Where a concurrent failure of 2 nodes occur, specifically the
> failure of the leader and the in sync follower, there won't be any
> remaining in sync replicas to take over as leader without potential message
> loss. Therefore Kafka cannot tolerate any failure of *f* nodes, where *f*
> is N - 1 and N is the replication factor. Kafka can only tolerate a failure
> of *f* if we take N to be the ISR set size, which is a dynamic value and
> not a topic configuration parameter that can me set a priori. Kafka can
> tolerate some failures of *f* replicas when N is the replication factor, so
> long as at least one in sync replica survives, but it can't tolerate all
> such failures.
>
> Am I wrong?
>


Re: Does Kafka recover all data if node is reimaged

2015-10-12 Thread Todd Palino
Yes. As long as you have not reassigned the partitions to other brokers,
the wiped broker will rebuild from replication. Keep in mind, however, that
if you are using the retention by time configuration, you will have 2x
retention on that broker for the length of retention. This means that if
your retention is 1 week, replication will copy over the last week's worth
of data. That data will not be expired for 1 week, as the expiration is
based on the file modification time.

There is work ongoing that will resolve this extra retention problem.

-Todd

On Monday, October 12, 2015, Rajasekar Elango 
wrote:

> I was wondering if a kafka broker node get reimaged and all data is wiped
> off, Will kafka recover all data on node from replication?
>
> --
> Thanks,
> Raja.
>


Re: G1 tuning

2015-10-14 Thread Todd Palino
We've had no problems with G1 in all of our clusters with varying load
levels. I think we've seen an occasional long GC here and there, but
nothing recurring at this point.

What's the full command line that you're using with all the options?

-Todd


On Wed, Oct 14, 2015 at 2:18 PM, Scott Clasen  wrote:

> You can also use -Xmn with that gc to size the new gen such that those
> buffers don't get tenured
>
> I don't think that's an option with G1
>
> On Wednesday, October 14, 2015, Cory Kolbeck  wrote:
>
> > I'm not sure that will help here, you'll likely have the same
> > medium-lifetime buffers getting into the tenured generation and forcing
> > large collections.
> >
> > On Wed, Oct 14, 2015 at 10:00 AM, Gerrit Jansen van Vuuren <
> > gerrit...@gmail.com > wrote:
> >
> > > Hi,
> > >
> > > I've seen pauses using G1 in other applications and have found that
> > > -XX:+UseParallelGC
> > > -XX:+UseParallelOldGC  works best if you're having GC issues in general
> > on
> > > the JVM.
> > >
> > >
> > > Regards,
> > >  Gerrit
> > >
> > > On Wed, Oct 14, 2015 at 4:28 PM, Cory Kolbeck  > > wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I'm a bit new to the operational side of G1, but pretty familiar with
> > its
> > > > basic concept. We recently set up a Kafka cluster to support a new
> > > product,
> > > > and are seeing some suboptimal GC performance. We're using the
> > parameters
> > > > suggested in the docs, except for having switched to java 1.8_40 in
> > order
> > > > to get better memory debugging. Even though the cluster is handling
> > only
> > > > 2-3k messages per second per node, we see periodic 11-18 second
> > > > stop-the-world pauses on a roughly hourly cadence. I've turned on
> > > > additional GC logging, and see no humongous allocations, it all seems
> > to
> > > be
> > > > buffers making it into the tenured gen. They appear to be
> collectable,
> > as
> > > > the collection triggered by dumping the heap collects them all. Ideas
> > for
> > > > additional diagnosis or tuning very welcome.
> > > >
> > > > --Cory
> > > >
> > >
> >
>


Re: Where is replication factor stored?

2015-10-16 Thread Todd Palino
Actually, be very careful with this. There are two different things stored
in Zookeeper, and depending on what you're interested in you want to make
sure you're looking at the right one.

If you want to know the replica assignment - that is, what brokers a given
partition is assigned to - you need to look at the following path:
/brokers/topics/(topic)

The data of that znode is JSON formatted, and the 'partitions' key is a
dictionary where they key is a string representation of the partition
number (i.e. it's not 0, it's "0") and the value is a list of the replicas
that the partition is assigned to. It's worth noting that this replica list
is also the order in which the preferred leader is selected (the first
replica in the list that is in sync will be selected as the leader).

If you want to know what the current in sync replicas are - that is, out of
the assigned replica list, which ones are currently considered to be in
sync, you need to look at the following path:
/brokers/topics/(topic)/partitions/(partition number)/state

The data of that znode is also JSON formatted, and the 'isr' key is a list
of the replicas that are currently considered to be in sync. The important
distinction here is that this list can be shorter than the actual assigned
replica list (from the znode above) if not all of the replicas are in sync.
The state znode also has a 'leader' key which holds the broker ID of the
replica that is currently the leader for that partition.

-Todd


On Fri, Oct 16, 2015 at 5:25 PM, Edward Ribeiro 
wrote:

> Hey, Guozhang,
>
> On Fri, Oct 16, 2015 at 6:20 PM, Guozhang Wang  wrote:
>
> > The replica list can be from at /brokers/topics//
> > partitions//state
> >
>
> Nice, good to know. Thanks! :)
>
> Regards,
> Edward​
>


Re: Where is replication factor stored?

2015-10-16 Thread Todd Palino
Sorry, I forgot the tl;dr on that :)

If you want to know the replication factor for a given partition, you want
to check the length of the replica list in the /brokers/topic/(topic) data
for that partition. Note that all the partitions for a topic do not have to
have the same replication factor (you can use partition reassignment to
change it). But if they are not all the same, some of the tooling will
break (such as altering the partition count for the topic).

-Todd


On Fri, Oct 16, 2015 at 5:39 PM, Todd Palino  wrote:

> Actually, be very careful with this. There are two different things stored
> in Zookeeper, and depending on what you're interested in you want to make
> sure you're looking at the right one.
>
> If you want to know the replica assignment - that is, what brokers a given
> partition is assigned to - you need to look at the following path:
> /brokers/topics/(topic)
>
> The data of that znode is JSON formatted, and the 'partitions' key is a
> dictionary where they key is a string representation of the partition
> number (i.e. it's not 0, it's "0") and the value is a list of the replicas
> that the partition is assigned to. It's worth noting that this replica list
> is also the order in which the preferred leader is selected (the first
> replica in the list that is in sync will be selected as the leader).
>
> If you want to know what the current in sync replicas are - that is, out
> of the assigned replica list, which ones are currently considered to be in
> sync, you need to look at the following path:
> /brokers/topics/(topic)/partitions/(partition number)/state
>
> The data of that znode is also JSON formatted, and the 'isr' key is a list
> of the replicas that are currently considered to be in sync. The important
> distinction here is that this list can be shorter than the actual assigned
> replica list (from the znode above) if not all of the replicas are in sync.
> The state znode also has a 'leader' key which holds the broker ID of the
> replica that is currently the leader for that partition.
>
> -Todd
>
>
> On Fri, Oct 16, 2015 at 5:25 PM, Edward Ribeiro 
> wrote:
>
>> Hey, Guozhang,
>>
>> On Fri, Oct 16, 2015 at 6:20 PM, Guozhang Wang 
>> wrote:
>>
>> > The replica list can be from at /brokers/topics//
>> > partitions//state
>> >
>>
>> Nice, good to know. Thanks! :)
>>
>> Regards,
>> Edward​
>>
>
>


Re: Load balancer for Kafka brokers

2015-11-03 Thread Todd Palino
We use loadbalancers for our producer configurations, but what you need to
keep in mind is that that connection is only used for metadata requests.
The producer queries the loadbalancer IP for metadata for the topic, then
disconnects and reconnects directly to the Kafka brokers for producing
messages. With the older producer lib, it periodically reconnects to the
loadbalancer to refresh metadata. With the newer producer lib, it actually
caches information about all the brokers locally and queries them directly
for metadata refreshes moving forwards (and therefore does not use the
loadbalancer again).

In your situation, it sounds like you want to put all the individual broker
connections through the GSLB as well. In order to do this, you would have
to:

- have an individual GSLB configuration for each broker, where that config
has an active/passive setup with 1 broker from each DC (Not too bad)
- configure the announced hostnames for each broker to be the same in the
active and passive DC (A little tricky)
- maintain the exact same partition to broker mapping, including
leadership, in each DC (Virtually impossible)

In short, I don’t think this is a reasonable thing to do. You’re not going
to be able to assure the exact partition mapping, especially not in the
face of Zookeeper timeouts or hardware failures that will cause partition
leadership to move around. This will result in partitions becoming
unavailable as soon as one of the clusters shifts just a little bit.

A better way to approach this is probably to set up a front-end service,
such as a REST endpoint for Kafka, which receives produce requests and
publishes them to the local Kafka cluster. Then you can put that endpoint
behind the GSLB, and you do not have to worry about the makeup of the Kafka
clusters themselves. Your producers would all send their messages through
the GSLB to that endpoint, rather than talking to Kafka directly.

-Todd



On Tue, Nov 3, 2015 at 10:15 AM, Cassa L  wrote:

> Hi,
>  Has anyone used load balancers between publishers and  Kafka brokers? I
> want to do active-passive setup of Kafka in two datacenters.  My question
> is can I add GSLB layer between these two Kafka clusters to configure
> automatic fail over while publishing data?
>
> Thanks,
> LCassa
>


Re: Change kafka broker ids dynamically

2015-11-06 Thread Todd Palino
I’m not quite sure why you would need to do this - the broker IDs are not
significant outside of the internal metadata. But this is what you would
have to do for each move (assuming you are running with at least
replication factor 2):

1) Shut down the broker
2) Clear its partition data
3) Reconfigure the broker to the new ID
4) Restart the broker
5) Issue a partition reassignment to reassign all of broker 1’s partitions
to broker 0
6) Wait for the broker to replicate all it’s partitions from other members
of the cluster

That’s a lot of moving data around, just to renumber. You can’t just issue
the reassignment while the broker is down, and not delete the partitions,
because the ID number 0 is unknown, so the reassignment will fail (the
broker is not online). If you wanted to shut the entire cluster down you
could, in theory, walk through the Zookeeper tree manually changing all the
replica information. That assumes you can shut the whole cluster down for a
while.

-Todd



On Fri, Nov 6, 2015 at 1:23 PM, Arathi Maddula 
wrote:

> Hi,
> Is it possible to change the broker.ids property for a node belonging to a
> Kafka cluster? For example, currently if I  have brokers with ids 1,2,3. If
> I want to stop broker 1,  can I change broker.id to 0 (with current id =
> 1) in server.properties and meta.properties files and then restart broker
> 1. Can I repeat this for brokers 2 and 3 as well?
>
> Thanks,
> Arathi
>
>


Re: Help on understanding kafka-topics.sh output

2015-11-22 Thread Todd Palino
Replicas and Isr are both a comma separated list of broker IDs. So in this
output, I am seeing that you have two Kafka brokers with IDs 1 and 2. You
have a topic, capture, with 16 partitions at replication factor 1 (1
replica per partition). The broker with ID 2 is not online, which is why it
shows in the Replica list for some partitions (meaning that it is assigned
to be a replica), but not in the Isr list (which would indicate that it is
currently in-sync).

The Leader field is the broker ID which is currently the leader for that
partition. For the partitions that are assigned to broker 1, you see that
broker 1 is the leader. For the partitions that are assigned to broker 2,
the leader is listed as -1, which indicates that there is no available
leader. These partitions are considered offline and cannot be produced to
or consumed from. When broker 2 comes back online, the controller will
perform an unclean leader election and select broker 2 (the only replica
available) as the leader for those partitions.

-Todd


On Sun, Nov 22, 2015 at 11:39 AM, Jan Algermissen <
algermissen1...@icloud.com> wrote:

> Hi,
>
> I have a topic with 16 partitions that shows the following output for
>
> kafka-topics.sh --zookeeper x:2181 --topic capture --describe
>
> Can anyone explain to me what the difference in replicas means and what
> Leader of -1 means?
>
> In the logs of my producer I see that no messages seem to be sent to the
> partitions with '-1' and th eproducer buffer becomes exhausted afetr a
> while (maybe that is related?)
>
> Jan
>
> Topic:capture   PartitionCount:16   ReplicationFactor:1
>  Configs:
>
> Topic: capture  Partition: 0Leader: 1   Replicas: 1
>  Isr: 1
> Topic: capture  Partition: 1Leader: 1   Replicas: 1
>  Isr: 1
> Topic: capture  Partition: 2Leader: -1  Replicas: 2
>  Isr:
> Topic: capture  Partition: 3Leader: 1   Replicas: 1
>  Isr: 1
> Topic: capture  Partition: 4Leader: -1  Replicas: 2
>  Isr:
> Topic: capture  Partition: 5Leader: 1   Replicas: 1
>  Isr: 1
> Topic: capture  Partition: 6Leader: -1  Replicas: 2
>  Isr:
> Topic: capture  Partition: 7Leader: 1   Replicas: 1
>  Isr: 1
> Topic: capture  Partition: 8Leader: -1  Replicas: 2
>  Isr:
> Topic: capture  Partition: 9Leader: 1   Replicas: 1
>  Isr: 1
> Topic: capture  Partition: 10   Leader: -1  Replicas: 2
>  Isr:
> Topic: capture  Partition: 11   Leader: 1   Replicas: 1
>  Isr: 1
> Topic: capture  Partition: 12   Leader: -1  Replicas: 2
>  Isr:
> Topic: capture  Partition: 13   Leader: 1   Replicas: 1
>  Isr: 1
> Topic: capture  Partition: 14   Leader: -1  Replicas: 2
>  Isr:
> Topic: capture  Partition: 15   Leader: 1   Replicas: 1
>  Isr: 1


Re: Help on understanding kafka-topics.sh output

2015-11-22 Thread Todd Palino
Hopefully one of the developers can jump in here. I believe there is a
future you can use to get the errors back from the producer. In addition,
you should check the following configs on the producer:

request.required.acks - this controls whether or not your producer is going
to wait for an acknowledgement from the broker, and how many brokers it
waits for
request.timeout.ms - how long the producer waits to satisfy the acks
setting before marking the request failed
retry.backoff.ms - how long the producer waits between retries
message.send.max.retries - the maximum number of retries the producer will
attempt a failed request

-Todd


On Sun, Nov 22, 2015 at 12:31 PM, Jan Algermissen <
algermissen1...@icloud.com> wrote:

> Hi Todd,
>
> yes, correct - thanks.
>
> However, what I am not getting is that the KafkaProducer (see my other
> mail from today) silently accepts the messages and fills them up in the
> buffer until it is exhausted instead of saying that the broker is not
> reachable.
>
> IOW, it seems from an application perspective I am unable to detect that
> messages are not being sent out. Is this normal behavior and I am simply
> doing something wrong or could it be a producer bug?
>
> Jan
>
> Config and code again:
>
> ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
> ProducerConfig.RETRIES_CONFIG -> "0",
> ProducerConfig.ACKS_CONFIG -> "1",
> ProducerConfig.COMPRESSION_TYPE_CONFIG -> "none",
> ProducerConfig.TIMEOUT_CONFIG -> new Integer(3),
> // ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(16384),
> ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(10),
> ProducerConfig.BUFFER_MEMORY_CONFIG -> new Integer(66554432),
> ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG -> new java.lang.Boolean(false),
> ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
> "org.apache.kafka.common.serialization.StringSerializer",
> ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
> "org.apache.kafka.common.serialization.StringSerializer"
>
>
> 
>
> kafkaProducer.send(new ProducerRecord[String,String](topic, key, data),new
> Callback {
>  def onCompletion(recordMetadata: RecordMetadata, e: Exception):Unit = {
>if(e != null) {
>      logger.error(s"Could not send $data",e)
>}
>logger.info("The offset of the record we just sent is: " +
> recordMetadata.offset())
>()
>  }
>
> })
>
>
> > On 22 Nov 2015, at 20:49, Todd Palino  wrote:
> >
> > Replicas and Isr are both a comma separated list of broker IDs. So in
> this
> > output, I am seeing that you have two Kafka brokers with IDs 1 and 2. You
> > have a topic, capture, with 16 partitions at replication factor 1 (1
> > replica per partition). The broker with ID 2 is not online, which is why
> it
> > shows in the Replica list for some partitions (meaning that it is
> assigned
> > to be a replica), but not in the Isr list (which would indicate that it
> is
> > currently in-sync).
> >
> > The Leader field is the broker ID which is currently the leader for that
> > partition. For the partitions that are assigned to broker 1, you see that
> > broker 1 is the leader. For the partitions that are assigned to broker 2,
> > the leader is listed as -1, which indicates that there is no available
> > leader. These partitions are considered offline and cannot be produced to
> > or consumed from. When broker 2 comes back online, the controller will
> > perform an unclean leader election and select broker 2 (the only replica
> > available) as the leader for those partitions.
> >
> > -Todd
> >
> >
> > On Sun, Nov 22, 2015 at 11:39 AM, Jan Algermissen <
> > algermissen1...@icloud.com> wrote:
> >
> >> Hi,
> >>
> >> I have a topic with 16 partitions that shows the following output for
> >>
> >> kafka-topics.sh --zookeeper x:2181 --topic capture --describe
> >>
> >> Can anyone explain to me what the difference in replicas means and what
> >> Leader of -1 means?
> >>
> >> In the logs of my producer I see that no messages seem to be sent to the
> >> partitions with '-1' and th eproducer buffer becomes exhausted afetr a
> >> while (maybe that is related?)
> >>
> >> Jan
> >>
> >>Topic:capture   PartitionCount:16   ReplicationFactor:1
> >> Configs:
> >>
> >>Topic: capture  Partition: 0Leader: 1   Replicas: 1
> >> Isr: 1
> >>Topic: capture  Partition: 1Leader: 1   Replicas: 1
> >> Isr: 1
> >>T

Re: Number of partitions and disks in a topic

2015-12-01 Thread Todd Palino
Getting the partitioning right now is only important if your messages are
keyed. If they’re not, stop reading, start with a fairly low number of
partitions, and expand as needed.

1000 partitions per topic is generally not normal. It’s not really a
problem, but you want to size topics appropriately. Every partition
represents open file handles and overhead on the cluster controller. But if
you’re working with keyed messages, size for your eventual data size. We
use a general guideline of keeping partitions on disk under 25 GB (for 4
days of retention - so ~6 GB of compressed messages per day). We find this
gives us a good spread of data in the cluster, and represents a reasonable
amount of network throughput per partition, so it allows us to scale
easily. It also makes for fewer issues with replication within the cluster,
and mirroring to other clusters.

Outside of a guideline like that, partition based on how you want to spread
out your keys. We have a user who wanted 720 partitions for a given topic
because it has a large number of factors, which allows them to run a
variety of counts of consumers and have balanced load.

As far as multiple disks goes, yes, Kafka can make use of multiple log
dirs. However, there are caveats. It’s fairly naive about how it assigns
partitions to disks, and partitions are assigned by the controller to a
broker with no knowledge of the disks underneath. The broker then makes the
assignment to a single disk. In addition, there’s no tool for moving
partitions from one mount point to another without shutting down the broker
and doing it manually.

-Todd

On Tue, Dec 1, 2015 at 4:31 AM, Guillermo Ortiz 
wrote:

> Hello,
>
> I want to size the kafka cluster with just one topic and I'm going to
> process the data with Spark and others applications.
>
> If I have six hard drives per node, is it kafka smart enough to deal with
> them? I guess that the memory should be very important in this point and
> all data is cached in memory. Is it possible to config kafka to use many
> directories as HDFS, each one with a different disk?
>
> I'm not sure about the number of partitions either. I have read
>
> http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
> and they talk about number of partitions much higher that I had thought. Is
> it normal to have a topic with 1000 partitions? I was thinking about about
> two/four partitions per node. is it wrong my thought?
>
> As I'm going to process data with Spark, I could have numberPartitions
> equals numberExecutors in Spark as max, always thinking in the future and
> sizing higher than that.
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: how to reset kafka offset in zookeeper

2015-12-18 Thread Todd Palino
The way to reset to smallest is to stop the consumer, delete the consumer
group from Zookeeper, and then restart with the property set to smallest.
Once your consumer has recreated the group and committed offsets, you can
change the auto.offset.reset property back to largest (if that is your
preference).

-Todd

On Friday, December 18, 2015, Akhilesh Pathodia 
wrote:

> Hi,
>
> I want to reset the kafka offset in zookeeper so that the consumer will
> start reading messages from first offset. I am using flume as a consumer to
> kafka. I have set the kafka property kafka.auto.offset.reset to "smallest",
> but it does not reset the offset in zookeeper and that's why flume will not
> read messages from first offset.
>
> Is there any way to reset kafka offset in zookeeper?
>
> Thanks,
> Akhilesh
>


-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: how to reset kafka offset in zookeeper

2015-12-18 Thread Todd Palino
That works if you want to set to an arbitrary offset, Marko. However in the
case the OP described, wanting to reset to smallest, it is better to just
delete the consumer group and start the consumer with auto.offset.reset set
to smallest. The reason is that while you can pull the current smallest
offsets from the brokers and set them in Zookeeper for the consumer, by the
time you do that the smallest offset is likely no longer valid. This means
you’re going to resort to the offset reset logic anyways.

-Todd


On Fri, Dec 18, 2015 at 7:10 AM, Marko Bonaći 
wrote:

> You can also do this:
> 1. stop consumers
> 2. export offsets from ZK
> 3. make changes to the exported file
> 4. import offsets to ZK
> 5. start consumers
>
> e.g.
> bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --group group-name
> --output-file /tmp/zk-offsets --zkconnect localhost:2181
> bin/kafka-run-class.sh kafka.tools.ImportZkOffsets --input-file
> /tmp/zk-offsets --zkconnect localhost:2181
>
> Marko Bonaći
> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> Solr & Elasticsearch Support
> Sematext <http://sematext.com/> | Contact
> <http://sematext.com/about/contact.html>
>
> On Fri, Dec 18, 2015 at 4:06 PM, Jens Rantil  wrote:
>
> > Hi,
> >
> > I noticed that a consumer in the new consumer API supports setting the
> > offset for a partition to beginning. I assume doing so also would update
> > the offset in Zookeeper eventually.
> >
> > Cheers,
> > Jens
> >
> > On Friday, December 18, 2015, Akhilesh Pathodia <
> > pathodia.akhil...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I want to reset the kafka offset in zookeeper so that the consumer will
> > > start reading messages from first offset. I am using flume as a
> consumer
> > to
> > > kafka. I have set the kafka property kafka.auto.offset.reset to
> > "smallest",
> > > but it does not reset the offset in zookeeper and that's why flume will
> > not
> > > read messages from first offset.
> > >
> > > Is there any way to reset kafka offset in zookeeper?
> > >
> > > Thanks,
> > > Akhilesh
> > >
> >
> >
> > --
> > Jens Rantil
> > Backend engineer
> > Tink AB
> >
> > Email: jens.ran...@tink.se
> > Phone: +46 708 84 18 32
> > Web: www.tink.se
> >
> > Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> > <
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > >
> >  Twitter <https://twitter.com/tink>
> >
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: how to reset kafka offset in zookeeper

2015-12-18 Thread Todd Palino
Yes, that’s right. It’s just work for no real gain :)

-Todd

On Fri, Dec 18, 2015 at 9:38 AM, Marko Bonaći 
wrote:

> Hmm, I guess you're right Tod :)
> Just to confirm, you meant that, while you're changing the exported file it
> might happen that one of the segment files becomes eligible for cleanup by
> retention, which would then make the imported offsets out of range?
>
> Marko Bonaći
> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> Solr & Elasticsearch Support
> Sematext <http://sematext.com/> | Contact
> <http://sematext.com/about/contact.html>
>
> On Fri, Dec 18, 2015 at 6:29 PM, Todd Palino  wrote:
>
> > That works if you want to set to an arbitrary offset, Marko. However in
> the
> > case the OP described, wanting to reset to smallest, it is better to just
> > delete the consumer group and start the consumer with auto.offset.reset
> set
> > to smallest. The reason is that while you can pull the current smallest
> > offsets from the brokers and set them in Zookeeper for the consumer, by
> the
> > time you do that the smallest offset is likely no longer valid. This
> means
> > you’re going to resort to the offset reset logic anyways.
> >
> > -Todd
> >
> >
> > On Fri, Dec 18, 2015 at 7:10 AM, Marko Bonaći  >
> > wrote:
> >
> > > You can also do this:
> > > 1. stop consumers
> > > 2. export offsets from ZK
> > > 3. make changes to the exported file
> > > 4. import offsets to ZK
> > > 5. start consumers
> > >
> > > e.g.
> > > bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --group group-name
> > > --output-file /tmp/zk-offsets --zkconnect localhost:2181
> > > bin/kafka-run-class.sh kafka.tools.ImportZkOffsets --input-file
> > > /tmp/zk-offsets --zkconnect localhost:2181
> > >
> > > Marko Bonaći
> > > Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> > > Solr & Elasticsearch Support
> > > Sematext <http://sematext.com/> | Contact
> > > <http://sematext.com/about/contact.html>
> > >
> > > On Fri, Dec 18, 2015 at 4:06 PM, Jens Rantil 
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I noticed that a consumer in the new consumer API supports setting
> the
> > > > offset for a partition to beginning. I assume doing so also would
> > update
> > > > the offset in Zookeeper eventually.
> > > >
> > > > Cheers,
> > > > Jens
> > > >
> > > > On Friday, December 18, 2015, Akhilesh Pathodia <
> > > > pathodia.akhil...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I want to reset the kafka offset in zookeeper so that the consumer
> > will
> > > > > start reading messages from first offset. I am using flume as a
> > > consumer
> > > > to
> > > > > kafka. I have set the kafka property kafka.auto.offset.reset to
> > > > "smallest",
> > > > > but it does not reset the offset in zookeeper and that's why flume
> > will
> > > > not
> > > > > read messages from first offset.
> > > > >
> > > > > Is there any way to reset kafka offset in zookeeper?
> > > > >
> > > > > Thanks,
> > > > > Akhilesh
> > > > >
> > > >
> > > >
> > > > --
> > > > Jens Rantil
> > > > Backend engineer
> > > > Tink AB
> > > >
> > > > Email: jens.ran...@tink.se
> > > > Phone: +46 708 84 18 32
> > > > Web: www.tink.se
> > > >
> > > > Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> > > > <
> > > >
> > >
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > > > >
> > > >  Twitter <https://twitter.com/tink>
> > > >
> > >
> >
> >
> >
> > --
> > *—-*
> > *Todd Palino*
> > Staff Site Reliability Engineer
> > Data Infrastructure Streaming
> >
> >
> >
> > linkedin.com/in/toddpalino
> >
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: how to reset kafka offset in zookeeper

2015-12-19 Thread Todd Palino
If what you want to do is reset to smallest, all you need to do is stop the
consumer, delete the group from Zookeeper, and restart the consumer. It
will automatically create the group again.

You only need to export the offsets first if you later need to reset to
where you were in the partitions.

-Todd

On Saturday, December 19, 2015, Akhilesh Pathodia <
pathodia.akhil...@gmail.com> wrote:

> What is the process for deleting the consumer group from zookeeper? Should
> I export offset, delete and then import?
>
> Thanks,
> Akhilesh
>
> On Fri, Dec 18, 2015 at 11:32 PM, Todd Palino  > wrote:
>
> > Yes, that’s right. It’s just work for no real gain :)
> >
> > -Todd
> >
> > On Fri, Dec 18, 2015 at 9:38 AM, Marko Bonaći  >
> > wrote:
> >
> > > Hmm, I guess you're right Tod :)
> > > Just to confirm, you meant that, while you're changing the exported
> file
> > it
> > > might happen that one of the segment files becomes eligible for cleanup
> > by
> > > retention, which would then make the imported offsets out of range?
> > >
> > > Marko Bonaći
> > > Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> > > Solr & Elasticsearch Support
> > > Sematext <http://sematext.com/> | Contact
> > > <http://sematext.com/about/contact.html>
> > >
> > > On Fri, Dec 18, 2015 at 6:29 PM, Todd Palino  > wrote:
> > >
> > > > That works if you want to set to an arbitrary offset, Marko. However
> in
> > > the
> > > > case the OP described, wanting to reset to smallest, it is better to
> > just
> > > > delete the consumer group and start the consumer with
> auto.offset.reset
> > > set
> > > > to smallest. The reason is that while you can pull the current
> smallest
> > > > offsets from the brokers and set them in Zookeeper for the consumer,
> by
> > > the
> > > > time you do that the smallest offset is likely no longer valid. This
> > > means
> > > > you’re going to resort to the offset reset logic anyways.
> > > >
> > > > -Todd
> > > >
> > > >
> > > > On Fri, Dec 18, 2015 at 7:10 AM, Marko Bonaći <
> > marko.bon...@sematext.com 
> > > >
> > > > wrote:
> > > >
> > > > > You can also do this:
> > > > > 1. stop consumers
> > > > > 2. export offsets from ZK
> > > > > 3. make changes to the exported file
> > > > > 4. import offsets to ZK
> > > > > 5. start consumers
> > > > >
> > > > > e.g.
> > > > > bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --group
> group-name
> > > > > --output-file /tmp/zk-offsets --zkconnect localhost:2181
> > > > > bin/kafka-run-class.sh kafka.tools.ImportZkOffsets --input-file
> > > > > /tmp/zk-offsets --zkconnect localhost:2181
> > > > >
> > > > > Marko Bonaći
> > > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> > Management
> > > > > Solr & Elasticsearch Support
> > > > > Sematext <http://sematext.com/> | Contact
> > > > > <http://sematext.com/about/contact.html>
> > > > >
> > > > > On Fri, Dec 18, 2015 at 4:06 PM, Jens Rantil  >
> > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I noticed that a consumer in the new consumer API supports
> setting
> > > the
> > > > > > offset for a partition to beginning. I assume doing so also would
> > > > update
> > > > > > the offset in Zookeeper eventually.
> > > > > >
> > > > > > Cheers,
> > > > > > Jens
> > > > > >
> > > > > > On Friday, December 18, 2015, Akhilesh Pathodia <
> > > > > > pathodia.akhil...@gmail.com >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I want to reset the kafka offset in zookeeper so that the
> > consumer
> > > > will
> > > > > > > start reading messages from first offset. I am using flume as a
> > > > > consumer
> > > > > > to
> > > > > > > kafka. I have set the kafka property kafka.auto.offset.reset to
>

Re: how to reset kafka offset in zookeeper

2015-12-19 Thread Todd Palino
There’s no simple command. You’ll need to use either zookeeper-shell.sh or
zkCli.sh or something similar that lets you explore and edit Zookeeper and
do a recursive delete on the group name in the consumers tree. I’m not sure
how Cloudera’s interface differs, however, or if they provide a separate
tool for deleting a consumer group.

-Todd


On Sat, Dec 19, 2015 at 11:34 AM, Akhilesh Pathodia <
pathodia.akhil...@gmail.com> wrote:

> What is the command  to delete  group from zookeeper? I dont find
> /consumer/ directory? I am using cloudera, is there any place on cloudera
> manager where I can delete the group?
>
> Thanks
>
> On Sat, Dec 19, 2015 at 11:47 PM, Todd Palino  wrote:
>
> > If what you want to do is reset to smallest, all you need to do is stop
> the
> > consumer, delete the group from Zookeeper, and restart the consumer. It
> > will automatically create the group again.
> >
> > You only need to export the offsets first if you later need to reset to
> > where you were in the partitions.
> >
> > -Todd
> >
> > On Saturday, December 19, 2015, Akhilesh Pathodia <
> > pathodia.akhil...@gmail.com> wrote:
> >
> > > What is the process for deleting the consumer group from zookeeper?
> > Should
> > > I export offset, delete and then import?
> > >
> > > Thanks,
> > > Akhilesh
> > >
> > > On Fri, Dec 18, 2015 at 11:32 PM, Todd Palino  > > > wrote:
> > >
> > > > Yes, that’s right. It’s just work for no real gain :)
> > > >
> > > > -Todd
> > > >
> > > > On Fri, Dec 18, 2015 at 9:38 AM, Marko Bonaći <
> > marko.bon...@sematext.com
> > > >
> > > > wrote:
> > > >
> > > > > Hmm, I guess you're right Tod :)
> > > > > Just to confirm, you meant that, while you're changing the exported
> > > file
> > > > it
> > > > > might happen that one of the segment files becomes eligible for
> > cleanup
> > > > by
> > > > > retention, which would then make the imported offsets out of range?
> > > > >
> > > > > Marko Bonaći
> > > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> > Management
> > > > > Solr & Elasticsearch Support
> > > > > Sematext <http://sematext.com/> | Contact
> > > > > <http://sematext.com/about/contact.html>
> > > > >
> > > > > On Fri, Dec 18, 2015 at 6:29 PM, Todd Palino  > > > wrote:
> > > > >
> > > > > > That works if you want to set to an arbitrary offset, Marko.
> > However
> > > in
> > > > > the
> > > > > > case the OP described, wanting to reset to smallest, it is better
> > to
> > > > just
> > > > > > delete the consumer group and start the consumer with
> > > auto.offset.reset
> > > > > set
> > > > > > to smallest. The reason is that while you can pull the current
> > > smallest
> > > > > > offsets from the brokers and set them in Zookeeper for the
> > consumer,
> > > by
> > > > > the
> > > > > > time you do that the smallest offset is likely no longer valid.
> > This
> > > > > means
> > > > > > you’re going to resort to the offset reset logic anyways.
> > > > > >
> > > > > > -Todd
> > > > > >
> > > > > >
> > > > > > On Fri, Dec 18, 2015 at 7:10 AM, Marko Bonaći <
> > > > marko.bon...@sematext.com 
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > You can also do this:
> > > > > > > 1. stop consumers
> > > > > > > 2. export offsets from ZK
> > > > > > > 3. make changes to the exported file
> > > > > > > 4. import offsets to ZK
> > > > > > > 5. start consumers
> > > > > > >
> > > > > > > e.g.
> > > > > > > bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --group
> > > group-name
> > > > > > > --output-file /tmp/zk-offsets --zkconnect localhost:2181
> > > > > > > bin/kafka-run-class.sh kafka.tools.ImportZkOffsets --input-file
> > > > > > > /tmp/zk-offsets --zkconnect localhost:2181
> > > > > > >
> > > >

Re: Kafka + ZooKeeper on the same hardware?

2016-01-14 Thread Todd Palino
I’d say it depends on load and usage. It can definitely be done, and we’ve
done it here in places, though we don’t anymore. Part of the luxury of
being able to get the hardware we want. In general, it’s probably easier to
do with 0.9 and Kafka-committed offsets, because the consumers don’t need
to talk to ZK as much. It’s probably even even easier with the new
consumer, but I can’t speak to that at all.

One of the gotchas is that ZK really should have its transaction log on an
isolate device so that sequential writes do not require seeks. This could
be a separate disk or an SSD drive. An example of a really bad place to put
it would be on the same device as your Kafka log segments :) Depending on
your load, it may not be critical to use a separate device.

As Gwen noted, it all comes down to load. Your availability will be fine,
you just need to figure out if the services can share the load.

-Todd


On Thu, Jan 14, 2016 at 9:25 AM, Gwen Shapira  wrote:

> It depends on load :)
> As long as there is no contention, you are fine.
>
> On Thu, Jan 14, 2016 at 6:06 AM, Erik Forsberg  wrote:
>
> > Hi!
> >
> > Pondering how to configure Kafka clusters and avoid having too many
> > machines to manage.. Would it be recommended to run say a 3 node kafka
> > cluster where you also run your 3 node zookeeper cluster on the same
> > machines?
> >
> > I guess the answer is that "it depends on load", but would be interested
> > in any opinions on this anyway.
> >
> > Thanks!
> > \EF
> >
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Apache Kafka Case Studies

2016-02-03 Thread Todd Palino
To follow up on the blog post of mine that Jens pasted, I also have a
number of talk presentations up at http://www.slideshare.net/toddpalino

If you download the actual presentation files, you’ll be able to access the
slide notes in most of those that have a lot more information.

On Wed, Feb 3, 2016 at 7:47 AM, Jens Rantil  wrote:

> Hi Joe,
>
> This might be interesting:
> https://engineering.linkedin.com/kafka/running-kafka-scale
>
> Cheers,
> Jens
>
> On Wed, Feb 3, 2016 at 4:15 PM, Joe San  wrote:
>
> > Dear Kafka users,
> >
> > I'm looking for some case studies around using Kafka on big projects.
> > Specifically, I'm looking for some architectural insights into how I
> could
> > orchestrate my data pipeline using Kafka on an enterprise system.
> >
> > Some pointers on some architectural best practices, slides on how some
> > organisation X used Apache Kafka in their landscape would be ideal.
> >
> > Any suggestions?
> >
> > Thanks and Regards,
> > Joe
> >
>
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> <
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >
>  Twitter <https://twitter.com/tink>
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Kafka broker decommission steps

2016-03-04 Thread Todd Palino
To answer your questions…

1 - Not in the way you want it to. There is a setting for automatic leader
election (which I do not recommend anyone use at this time), but all that
does is pick which of the currently assigned replicas should be the leader.
It does not reassign partitions from one broker to another. Kafka does not
have a facility for doing this automatically.

2 - No. The most you can do is move all the partitions off and then
immediately shut down the broker process. Any broker that is live in the
cluster can, and will, get partitions assigned to it by the controller.

For what you want to do, you need you use the partition reassignment
command line tool that ships with Kafka to reassign partitions from the old
broker to the new one. Once that is complete, you can double check that the
old broker has no partitions left and shut it down. I have a tool that we
use internally to make this a lot easier, and I’m in the process of getting
a repository set up to make it available via open source. It allows for
more easily removing and adding brokers, and rebalancing partitions in a
cluster without having to craft the reassignments by hand.

-Todd


On Fri, Mar 4, 2016 at 5:07 AM, Muqtafi Akhmad 
wrote:

> dear Kafka users,
>
> I have some questions regarding decommissioning kafka broker node and
> replacing it with the new one. Lets say that we have three broker nodes and
> each topic in Kafka has replication factor = 3, we upgrade one node with
> the following steps :
> 1. add one broker node to cluster
> 2. shutdown old broker node
>
> My questions are
> 1. When we add one new broker to the cluster will it trigger Kafka topic /
> group leadership rebalance?
> 2. Is there any way to disable the to-be-decommissioned node to hold no
> topic/group leadership (acting as passive copy) so that it can be
> decommissioned with minimal effect to Kafka clients?
>
> Thank you,
>
> --
> Muqtafi Akhmad
> Software Engineer
> Traveloka
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: [kafka] decreasing Lag

2016-03-07 Thread Todd Palino
So it looks like you need more logstash consumers, but you’ll want to look
at the consumers you have and make sure they are working well and they’re
not getting bogged down somewhere else, which is causing them to consume
slower. Assuming they’re working fine, you can add 4 more.

If that doesn’t drop it down, you can then look at increasing the number of
partitions and increasing the number of logstash consumers further. While
you may get some benefit from increasing partitions without increasing the
consumer count, you’ll most likely have to do both.

-Todd


On Mon, Mar 7, 2016 at 8:46 AM, Tim Desrochers 
wrote:

> I am new to Kafka so please excuse me if this is a very basic question.
>
> I have a cluster set up with 3 zookeepers and 9 brokers.  I have network
> security logs flowing into the kafka cluster.  I am using logstash to read
> them from the cluster and ingest them into an elasticsearch cluster.
>
> My current settings are mostly default.  I created a topic with 8
> partitions.  I have 4 logstash consumers reading that topic and feeding my
> ES cluster.  My problem is I can't keep up with real time at the moment.  I
> am constantly falling behind and logs are building on my kafka cluster.
>
> When I run:
> $ /opt/kafka/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
> --group logstash --zookeeper localhost:2181 --topic bro-logs
>
> I get the following:
> logstashbro-logs   0   2593739429935485
>3998091 logstash_OP-01-VM-553-1457301346564-d14fd84a-0
> logstashbro-logs   1   2592959429935506
>4005912 logstash_OP-01-VM-553-1457301346564-d14fd84a-0
> logstashbro-logs   2   2671072829935519
>3224791 logstash_OP-01-VM-554-1457356976268-fa8c24b9-0
> logstashbro-logs   3   3887940 6372075
> 2484135 logstash_OP-01-VM-554-1457356976268-fa8c24b9-0
> logstashbro-logs   4   3978342 6372074
> 2393732 logstash_OP-01-VM-555-1457368235387-c6b8bd1f-0
> logstashbro-logs   5   3984965 6372075
> 2387110 logstash_OP-01-VM-555-1457368235387-c6b8bd1f-0
> logstashbro-logs   6   4017715 6372076
> 2354361 logstash_OP-01-VM-556-1457368464998-8edb13df-0
> logstashbro-logs   7   4022484 6372074
> 2349590 logstash_OP-01-VM-556-1457368464998-8edb13df-0
>
> from what I understand the Lag column is telling me that there are a hole
> bunch of logs waiting in the cluster to be processed.
>
> So my question is, should I spin up more logstash consumers to read from
> the kafka cluster and feed the ES cluster?  Should I increase or decrease
> partitions?  What can be done to increase the amount of logs being read
> from the cluster and ingested into Elastisearch?
>
> Like I said, very new to kafka.
>
> Thanks for the help
> Tim
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Re-Balancing Kafka topics - Best practices

2017-06-13 Thread Todd Palino
A few things here…

1) auto.leader.rebalance.enable can have serious performance impacts on
larger clusters. It’s currently in need of some development work to enable
it to batch leader elections into smaller groups and back off between them,
as well as have a better backoff after broker startup. I don’t recommend
using it.

2) auto.leader.rebalance.enable is not going to get you what you’re looking
for. It only changes the leader for a partition to the “optimal” leader (I
put that in quotes because it’s a pretty dumb algorithm. It’s whichever
replica is listed first). It does not move partitions around to assure you
have a balance of traffic across the cluster.

If you want to rebalance partitions, you have a couple options right now:
1) Run kafka-reassign-partitions.sh. It will move all of the partitions
around and try and assure an even count on each broker. It does not balance
traffic, however, (if you have a really busy partition and a really slow
partition, it considers them equal).
2) Use an external tool like https://github.com/linkedin/kafka-tools
kafka-assigner. This is a script we developed at LinkedIn for doing
operations that involve moving partitions around and provides a number of
different ways to rebalance traffic.

There are other tools available for doing this, but right now it requires
something external to the Apache Kafka core.

-Todd


On Tue, Jun 13, 2017 at 5:30 PM, karan alang  wrote:

> Hi All,
>
> Fpr Re-balancing Kafka partitions, we can set property ->
>
>
> *auto.leader.rebalance.enable = true in server.properties file.*
>
> *Is that the recommended way or is it better to reBalance the kafka
> partitions manually ?(using *scripts - *kafka-preferred-replica-
> election.sh,
> *
>
> *kafka-reassign-partition.sh)*
> *One of the blogs mentioned that - it is preferable to Re-balance Kafka
> topics manually, since setting   *
>
> *auto.leader.rebalance.enable = true causes issues.*
>
> Pls let me know.
> Any other best practices wrt. Re-balancing kafka topics ?
>
> thanks!
>



-- 
*Todd Palino*
Senior Staff Engineer, Site Reliability
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Kafka Monitoring

2017-06-20 Thread Todd Palino
Not for monitoring Kafka. We pull the JMX metrics two ways - one is a
container that wraps around the Kafka application and annotates the beans
to be emitted to Kafka as metrics, which gets pulled into our
autometrics/InGraphs system for graphing. But for alerting, we use an agent
that polls the critical metrics via JMX and pushes them into a separate
system (that doesn’t use Kafka). ELK is used for log analysis for other
applications.

Kafka-monitor is what we built/use for synthetic traffic monitoring for
availability. And Burrow for monitoring consumers.

-Todd


On Tue, Jun 20, 2017 at 9:53 AM, Andrew Hoblitzell <
ahoblitz...@salesforce.com> wrote:

> Using Elasticsearch, Logstash, and Kibana is a pretty popular pattern at
> LinkedIn.
>
> Also giving honorable mentions to Kafka Monitor and Kafka Manager since
> they hadn't been mentioned yet
> https://github.com/yahoo/kafka-manager
> https://github.com/linkedin/kafka-monitor
>
> Thanks,
>
> Andrew Hoblitzell
> Sr. Software Engineer, Salesforce
>
>
> On Tue, Jun 20, 2017 at 9:37 AM, Todd S  wrote:
>
> > You can look at enabling JMX on kafka (
> > https://stackoverflow.com/questions/36708384/enable-jmx-on-kafka-brokers
> )
> > using
> > JMXTrans (https://github.com/jmxtrans/jmxtrans) and a config (
> > https://github.com/wikimedia/puppet-kafka/blob/master/
> > kafka-jmxtrans.json.md)
> > to gather stats, and insert them into influxdb (
> > https://www.digitalocean.com/community/tutorials/how-to-
> > monitor-system-metrics-with-the-tick-stack-on-centos-7)
> > then graph the resulsts with grafana (
> > https://softwaremill.com/monitoring-apache-kafka-with-influxdb-grafana/,
> > https://grafana.com/dashboards/721)
> >
> > This is likely a solid day of work to get working nicely, but it also
> > enables you to do a lot of extra cool stuff for monitoring, more than
> just
> > Kafka.  JMXTrans can be a bit of a pain, because Kafkas JMX metrics are
> ..
> > plentiful ... but the example configuration above should get you started.
> > Using Telegraf to collect system stats and graph them with Grafana is
> > really simple and powerful, as the Grafana community has a lot of
> pre-built
> > content you can steal and make quick wins with.
> >
> > Monitoring Kafka can be a beast, but there is a lot of useful data there
> > for if(when?) there is a problem.  The more time you spend with the
> > metrics, the more you start to get a feel for the internals.
> >
> > On Mon, Jun 19, 2017 at 6:52 PM, Muhammad Arshad <
> > muhammad.ars...@alticeusa.com> wrote:
> >
> > > Hi,
> > >
> > > wanted to see if there is Kafka monitoring which is available. I am
> > > looking to the following:
> > >
> > >
> > >
> > > how much data came in at a certain time.
> > >
> > >
> > >
> > > Thanks,
> > >
> > > *Muhammad Faisal Arshad*
> > >
> > > Manager, Enterprise Data Quality
> > >
> > > Data Services & Architecture
> > >
> > > [image:
> > > http://www.multichannel.com/sites/default/files/public/
> > styles/blog_content/public/Altice-NewLogo2017_RESIZED_0.
> jpg?itok=RmwvsCI6]
> > >
> > >
> > >
> > >
> > > 
> > > The information transmitted in this email and any of its attachments is
> > > intended only for the person or entity to which it is addressed and may
> > > contain information concerning Altice USA and/or its affiliates and
> > > subsidiaries that is proprietary, privileged, confidential and/or
> subject
> > > to copyright. Any review, retransmission, dissemination or other use
> of,
> > or
> > > taking of any action in reliance upon, this information by persons or
> > > entities other than the intended recipient(s) is prohibited and may be
> > > unlawful. If you received this in error, please contact the sender
> > > immediately and delete and destroy the communication and all of the
> > > attachments you have received and all copies thereof.
> > > 
> > >
> > >
> >
>



-- 
*Todd Palino*
Senior Staff Engineer, Site Reliability
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Tuning up mirror maker for high thruput

2017-07-23 Thread Todd Palino
One of the best pieces of advice I can offer is that you really need to run
the mirror maker in the same physical/network location as the Kafka cluster
you are producing to. Latency on the consumer side can be more easily
absorbed than latency on the producer side, as to assure that we have
proper message ordering and reliability, we need to restrict in flight
batches to 1. So that means that our produce connection is contstrained to
be very thin, and latency makes a huge impact. Meanwhile, on the consume
side we’re fetching large batches of messages, many at a time, so
round-trip latency has less of an impact. I really can’t stress this
enough. We set up some mirror makers in the opposite configuration for
security reasons, and it’s been a huge problem even with tuning.

In addition to this, you will want to assure that your OS (and then the
mirror maker and broker) tuning is taking into account the latency. Here’s
a good reference for the OS side (for Linux):
http://linuxczar.net/blog/2016/09/18/bandwidth-delay-product/

Once you have the OS tuned, you’ll need to adjust the broker tuning on the
clusters you are consuming from, since that is the high latency side. The
configuration for that is socket.send.buffer.bytes, and it probably makes
sense to set this to -1 (which means use the OS configuration). You can do
the same with socket.receive.buffer.bytes, but it’s not as critical with
this setup. On the mirror maker, the configuration is on the consumer side,
and it’s called receive.buffer.bytes. Again, you can set this to -1 to use
the OS configuration. Make sure to restart the applications after making
all these changes, of course.

-Todd


On Sat, Jul 22, 2017 at 1:27 AM, James Cheng  wrote:

> Becket Qin from LinkedIn spoke at a meetup about how to tune the Kafka
> producer. One scenario that he described was tuning for situations where
> you had high network latency. See slides at https://www.slideshare.net/
> mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600
> and video at https://youtu.be/oQe7PpDDdzA
>
> -James
>
> Sent from my iPhone
>
> > On Jul 21, 2017, at 9:25 AM, Sunil Parmar  wrote:
> >
> > We're trying to set up mirror maker to mirror data from EU dc to US dc.
> The
> > network delay is ~150 ms. In recent test; we realized that mirror maker
> is
> > not keeping up with load and have a lag trending upward all the time.
> >
> > What are configurations that can be tuned up to make it work for the
> higher
> > throughput ?
> > How to decide number of producer and consumer threads ? ( number of topic
> > partitions / brokers ? )
> >
> >
> > *Environment* ( both source and destination cluster )
> >
> > Kafka version 0.9 ( Cloudera 0.9.0.0+kafka2.0.0+188 )
> >
> > queue.size = 1
> > queue.byte.size = 100MB
> >
> > 2 brokers on source, 3 brokers on destination
> >
> >
> > *Mirror maker configs :*
> >
> > Producer properties :
> > request.timeout.ms=12
> > block.on.buffer.full=TRUE
> > retries=20
> > acks=all
> >
> >
> > Consumer properties:
> > request.timeout.ms=12
> > auto.offset.reset=latest
> > enable.auto.commit=false
> >
> > We've configured 4 producer and consumer threads.
> > There is no security set up as of now so it's all PLAINTEXT.
> >
> > We have 4 topics are white listed to sync from EU to US. Only one of them
> > is high throughput. We also have a message handler to strip off some
> > sensitive information from EU to US but it only works on a low thru put
> > topic; the message handler still try to process the other topics but let
> it
> > pass thru.
> >
> > Thanks,
> > Sunil Parmar
>



-- 
*Todd Palino*
Senior Staff Engineer, Site Reliability
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Tuning up mirror maker for high thruput

2017-07-24 Thread Todd Palino
We haven’t had any problem after tuning the default send/receive buffers in
the OS up to 10MB. Linux uses a sliding window, so if you have short
latencies, you won’t use as much of the buffer and you should see very
little, if any, impact.

-Todd


On Mon, Jul 24, 2017 at 2:20 PM, James Cheng  wrote:

> Todd,
>
> I have a question about the OS/broker tuning that you are talking about on
> the source cluster. Aside from mirrormaker (which you say should be running
> in the remote destination datacenter), presumably there will be other
> consumers in the source datacenter as well. How does the OS/broker tuning
> affect those consumers that are close to the source datacenter? Will they
> continue to function well?
>
> -James
>
> > On Jul 23, 2017, at 7:16 AM, Todd Palino  wrote:
> >
> > One of the best pieces of advice I can offer is that you really need to
> run
> > the mirror maker in the same physical/network location as the Kafka
> cluster
> > you are producing to. Latency on the consumer side can be more easily
> > absorbed than latency on the producer side, as to assure that we have
> > proper message ordering and reliability, we need to restrict in flight
> > batches to 1. So that means that our produce connection is contstrained
> to
> > be very thin, and latency makes a huge impact. Meanwhile, on the consume
> > side we’re fetching large batches of messages, many at a time, so
> > round-trip latency has less of an impact. I really can’t stress this
> > enough. We set up some mirror makers in the opposite configuration for
> > security reasons, and it’s been a huge problem even with tuning.
> >
> > In addition to this, you will want to assure that your OS (and then the
> > mirror maker and broker) tuning is taking into account the latency.
> Here’s
> > a good reference for the OS side (for Linux):
> > http://linuxczar.net/blog/2016/09/18/bandwidth-delay-product/
> >
> > Once you have the OS tuned, you’ll need to adjust the broker tuning on
> the
> > clusters you are consuming from, since that is the high latency side. The
> > configuration for that is socket.send.buffer.bytes, and it probably makes
> > sense to set this to -1 (which means use the OS configuration). You can
> do
> > the same with socket.receive.buffer.bytes, but it’s not as critical with
> > this setup. On the mirror maker, the configuration is on the consumer
> side,
> > and it’s called receive.buffer.bytes. Again, you can set this to -1 to
> use
> > the OS configuration. Make sure to restart the applications after making
> > all these changes, of course.
> >
> > -Todd
> >
> >
> > On Sat, Jul 22, 2017 at 1:27 AM, James Cheng 
> wrote:
> >
> >> Becket Qin from LinkedIn spoke at a meetup about how to tune the Kafka
> >> producer. One scenario that he described was tuning for situations where
> >> you had high network latency. See slides at https://www.slideshare.net/
> >> mobile/JiangjieQin/producer-performance-tuning-for-apache-
> kafka-63147600
> >> and video at https://youtu.be/oQe7PpDDdzA
> >>
> >> -James
> >>
> >> Sent from my iPhone
> >>
> >>> On Jul 21, 2017, at 9:25 AM, Sunil Parmar 
> wrote:
> >>>
> >>> We're trying to set up mirror maker to mirror data from EU dc to US dc.
> >> The
> >>> network delay is ~150 ms. In recent test; we realized that mirror maker
> >> is
> >>> not keeping up with load and have a lag trending upward all the time.
> >>>
> >>> What are configurations that can be tuned up to make it work for the
> >> higher
> >>> throughput ?
> >>> How to decide number of producer and consumer threads ? ( number of
> topic
> >>> partitions / brokers ? )
> >>>
> >>>
> >>> *Environment* ( both source and destination cluster )
> >>>
> >>> Kafka version 0.9 ( Cloudera 0.9.0.0+kafka2.0.0+188 )
> >>>
> >>> queue.size = 1
> >>> queue.byte.size = 100MB
> >>>
> >>> 2 brokers on source, 3 brokers on destination
> >>>
> >>>
> >>> *Mirror maker configs :*
> >>>
> >>> Producer properties :
> >>> request.timeout.ms=12
> >>> block.on.buffer.full=TRUE
> >>> retries=20
> >>> acks=all
> >>>
> >>>
> >>> Consumer properties:
> >>> request.timeout.ms=12
> >>> auto.offset.reset=latest
> >>> enable.auto.commit=false
> >>>
> >>> We've configured 4 producer and consumer threads.
> >>> There is no security set up as of now so it's all PLAINTEXT.
> >>>
> >>> We have 4 topics are white listed to sync from EU to US. Only one of
> them
> >>> is high throughput. We also have a message handler to strip off some
> >>> sensitive information from EU to US but it only works on a low thru put
> >>> topic; the message handler still try to process the other topics but
> let
> >> it
> >>> pass thru.
> >>>
> >>> Thanks,
> >>> Sunil Parmar
> >>
> >
> >
> >
> > --
> > *Todd Palino*
> > Senior Staff Engineer, Site Reliability
> > Data Infrastructure Streaming
> >
> >
> >
> > linkedin.com/in/toddpalino
>
>


-- 
*Todd Palino*
Senior Staff Engineer, Site Reliability
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Avoid jvm swapping

2017-08-07 Thread Todd Palino
To avoid swap you should set swappiness to 1, not 0. 1 is a request (don't
swap if avoidable) whereas 0 is a demand (processes will be killed as OOM
instead of swapping.

However, I'm wondering why you are running such large heaps. Most of the ZK
heap is used for storage of the data in memory, and it's obvious from your
setup that this is a development instance. So if ZK is only being used for
that Kafka cluster you are testing, you can go with a smaller heap.

Also, for what reason are you running a 12 GB heap for Kafka? Even our
largest production clusters at LinkedIn are using a heap size of 6 GB right
now. You want to leave memory open for the OS to use for buffers and cache
in order to get better performance from consumers. You can see from that
output that it's trying to.

It really looks like you're just overloading your system. In which case
swapping is to be expected.

-Todd



On Aug 7, 2017 8:34 AM, "Gabriel Machado"  wrote:

Hi,

I have a 3 nodes cluster with 18 GB RAM and 2 GB swap.
Each node have the following JVMs (Xms=Xmx) :
- Zookeeper 2GB
- Kafka 12 GB
- Kafka mirror-maker DCa 1 GB
- Kafka mirror-maker DCb 1 GB

All th JVMs consume 16 GB. It leaves 2 GB for the OS (debian jessie 64
bits).
Why i have no swap free on these virtual machines ?

#free -m
 total   used   free sharedbuffers cached
Mem: 18105  17940164  0 38   
-/+ buffers/cache:  11235   6869
Swap: 2047   2045  2


I've read i should avoid jvm swapping.
What is the best way to do that ?
- modify swapiness threshold
- unmount all swap partition
- force the jvm to stay in memory with mlockall (
https://github.com/LucidWorks/mlockall-agent)
- Other solution

Gabriel.


Re: Avoid jvm swapping

2017-08-07 Thread Todd Palino
In production, you probably want to avoid stacking up the applications like
this. There’s a number of reasons:
1) Kafka’s performance is significantly increased by other applications not
polluting the OS page cache
2) Zookeeper has specific performance requirements - among them are a
dedicated disk for transaction logs that it can sequentially write to
3) Mirror maker chews up a lot of CPU and memory with decompression and
recompression of messages

Particular sizing of your systems is going to be dependent on the amount of
data you are moving around, but at the very least I would recommend that
your Kafka brokers, Zookeeper ensemble, and mirror makers be on separate
systems (stacking up the mirror makers on a common system is fine,
however). The Kafka brokers will need CPU and memory, and of course decent
storage to meet your retention and performance requirements. ZK needs a bit
of memory, and very good disk for the transaction logs, but it’s CPU
requirements are pretty light. Mirror maker needs CPU and memory, but it
has no real need of disk performance at all.

Sizing the brokers, you can probably get away with 3 or 4 GB of heap (this
is based on my experience running really large clusters at LinkedIn - even
at that heap size we were good for a long time), using G1 garbage
collection. The guidelines in the Kafka documentation for this are the ones
that I have developed over the last few years here. Reserve the rest of the
memory for the OS to manage - buffers and cache is your friend.

-Todd


On Mon, Aug 7, 2017 at 11:06 AM, Gabriel Machado 
wrote:

> Thanks Todd, i will set swapiness to 1.
>
> Theses machines will be the future production cluster for our main
> datacenter . We have 2 remote datacenters.
> Kafka will bufferize logs and elasticsearch will index its.
>
> Is it a bad practice to have all these JVMs on the same virtual machine ?
> What do you recommend (number of machines, quantity of GB, CPU...) ? For
> the moment, each node has 4 vcpu.
>
> Gabriel.
>
> 2017-08-07 15:45 GMT+02:00 Todd Palino :
>
> > To avoid swap you should set swappiness to 1, not 0. 1 is a request
> (don't
> > swap if avoidable) whereas 0 is a demand (processes will be killed as OOM
> > instead of swapping.
> >
> > However, I'm wondering why you are running such large heaps. Most of the
> ZK
> > heap is used for storage of the data in memory, and it's obvious from
> your
> > setup that this is a development instance. So if ZK is only being used
> for
> > that Kafka cluster you are testing, you can go with a smaller heap.
> >
> > Also, for what reason are you running a 12 GB heap for Kafka? Even our
> > largest production clusters at LinkedIn are using a heap size of 6 GB
> right
> > now. You want to leave memory open for the OS to use for buffers and
> cache
> > in order to get better performance from consumers. You can see from that
> > output that it's trying to.
> >
> > It really looks like you're just overloading your system. In which case
> > swapping is to be expected.
> >
> > -Todd
> >
> >
> >
> > On Aug 7, 2017 8:34 AM, "Gabriel Machado" 
> wrote:
> >
> > Hi,
> >
> > I have a 3 nodes cluster with 18 GB RAM and 2 GB swap.
> > Each node have the following JVMs (Xms=Xmx) :
> > - Zookeeper 2GB
> > - Kafka 12 GB
> > - Kafka mirror-maker DCa 1 GB
> > - Kafka mirror-maker DCb 1 GB
> >
> > All th JVMs consume 16 GB. It leaves 2 GB for the OS (debian jessie 64
> > bits).
> > Why i have no swap free on these virtual machines ?
> >
> > #free -m
> >  total   used   free sharedbuffers cached
> > Mem: 18105  17940164  0 38   
> > -/+ buffers/cache:  11235   6869
> > Swap:     2047   2045  2
> >
> >
> > I've read i should avoid jvm swapping.
> > What is the best way to do that ?
> > - modify swapiness threshold
> > - unmount all swap partition
> > - force the jvm to stay in memory with mlockall (
> > https://github.com/LucidWorks/mlockall-agent)
> > - Other solution
> >
> > Gabriel.
> >
>



-- 
*Todd Palino*
Senior Staff Engineer, Site Reliability
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Kafka MirrorMaker - target or source datacenter deployment

2017-09-14 Thread Todd Palino
Always in the target datacenter. While you can set up mirror maker for no
data loss operation, it’s still a good idea to put the connection more
likely to fail (remote) on the consumer side. Additionally, there are
significant performance problems with setting it up for remote produce as
you must run with one in flight batch in order to maintain message ordering.

-Todd


On Thu, Sep 14, 2017 at 9:46 PM, Vu Nguyen  wrote:

> Many of the descriptions and diagrams online describe deploying Kafka
> MirrorMaker into the target data center (near the target Kafka cluster).
> Since MirrorMaker is supposed to not lose messages, does it matter which
> data center MirrorMaker is deployed in--source or target data center (with
> any Kafka MirrorMaker version 0.10.1+)?
>
> It might be easier to collect and observe metrics in the source data center
> if MirrorMaker is also in the source data center (near the source Kafka
> cluster), especially if I can't aggregate the metrics from the 2 data
> centers.  Is there anything else that would influence me to deploy in
> MirrorMaker in either data center?
>
> Thanks
>
> Vu
>



-- 
*Todd Palino*
Senior Staff Engineer, Site Reliability
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Change replication factor for a topic in the runtime

2017-09-19 Thread Todd Palino
You can do this using the kafka-reassign-partitions tool (or using a 3rd
party tool like kafka-assigner in github.com/linkedin/kafka-tools) to
explicitly assign the partitions to an extra replica, or remove a replica.

-Todd


On Tue, Sep 19, 2017 at 3:45 PM, Devendar Rao 
wrote:

> Is it possible to change the replication factor in runtime? We're using
> 10.x version.
>
> Thanks,
> Devendar
>



-- 
*Todd Palino*
Senior Staff Engineer, Site Reliability
Data Infrastructure Streaming



linkedin.com/in/toddpalino


New release of Burrow!

2017-12-03 Thread Todd Palino
Hey, all. I wanted to let everyone know that I just released a brand new
version of Burrow that resolves a lot of issues that have been collecting.
We’ve been working hard on this internally for the last few months, and
it’s been running in production at LinkedIn for a few weeks now. So I’m
happy to announce that I’ve just released version 1.0.0

https://github.com/linkedin/Burrow/releases/latest

First off, from a user point of view, there’s a few big things:
1) The releases are now built for Linux, Windows, and OS X and you can
download them directly at https://github.com/linkedin/Burrow/releases/latest
2) The Docker image is now built and pushed as toddpalino/burrow
3) The config has changed significantly. Please review
https://github.com/linkedin/Burrow/wiki/Configuration

For features, there’s a lot of change:
1) Full support for topic deletion in Kafka
2) Full support for both TLS and SASL
2) Offset handling and evaluation is much cleaner and less prone to missing
commits and false alerts
3) Lag numbers are now intuitive - if a consumer stops, the lag will
increase

>From a developer point of view, this is a ground-up rewrite of Burrow:
1) Everything is modular, making it easier to add modules for custom
consumer types, or notifiers
2) The internals have a significant amount of test coverage, meaning it’s
easier to know when a change will break things
3) CI is finally set up for testing on every PR, and for building releases
4) Burrow can be started as a library. This will make it easier to build
applications that wrap it for custom configuration or logging systems.

There’s a number of things to be worked on still, and I’ll be starting
these as I have time:
1) Metrics for Burrow itself
2) Making it easier to provide custom modules without a fork
3) Better docs for setting up notifiers, and more sample configs and
templates

As always, please let me know if there are any issues. I’ve set up #burrow
on the Confluent Community Slack for discussion, though I will probably be
setting up Gitter on the project directly as well. And for bugs, there are
always the Github issues, or PRs for contributing!

-Todd


-- 
*Todd Palino*
Senior Staff Engineer, Site Reliability
Data Infrastructure Streaming



linkedin.com/in/toddpalino


  1   2   >