Re: AWS EC2 deployment best practices

2014-09-30 Thread James Cheng
I'm also interested in hearing more about deploying Kafka in AWS.

I was also considering options like your 1a and 2. I ran some calculations and 
one interesting thing I ran across was bandwidth costs between AZs.

In 1a, if you can have your producers and consumers in the same AZ as the 
"master", then you won't have to pay any bandwidth costs for your 
producers/consumers. You will have to pay bandwidth costs for the mirror-maker 
traffic between clusters in different AZs.

In 2, if your producers and consumers are writing/reading to different AZs, 
then you are paying bandwidth costs between AZs for both producers and 
consumers. In my cost calculation for a modest size cluster, my bandwidth costs 
were roughly the same as my (EC2 instance + EBS) costs.

An idea for #2 is to deploy your producers and your consumers so that they 
always are deployed in the AZ that contains the partitions they want to 
read/write. Or, said another way, move your partitions to the brokers in the 
same AZs as where your producers/consumers are. I think it's doable, but it 
means means that you'd want to write a Kafka client library that is aware of 
your AZ's, and also manage the cluster partitions in-sync with your 
producer/consumer deployments.

With ephemeral disks, I imagine that Kafka would become network bound. In case 
you find it useful, I ran some network performance tests against different EC2 
instances. I only went as far as c3.4xlarge.

https://docs.google.com/spreadsheets/d/1QF-4EO3PQ_YOLbvf6HKpqBTNQ8fyYeRuDMrlDYlK0yQ/pubchart?oid=1634430904&format=interactive

-James

On Sep 30, 2014, at 7:47 AM, Philip O'Toole  
wrote:

> OK, yeah, speaking from experience I would be comfortable with using the 
> ephemeral storage if it's replicated across AZs. More and more EC2 instances 
> have local SSDs, so you'll get great IO. Of course, you better monitor your 
> instance, and if a instance terminates, you're vulnerable if a second 
> instance is lost. It might argue for 3 copies.
> 
> As you correctly pointed out in your original e-mail, the Loggly setup 
> predated 0.8 -- so there was no replication to worry about. We ran 3-broker 
> clusters, and put a broker, of each cluster, in a different AZ. This did mean 
> that during an AZ failure that certain brokers would be unavailable (but the 
> messages were still on disk, ready for processing when the AZ came back 
> online), but it did mean that there was always some Kafka brokers running 
> somewhere that were reachable, and incoming traffic could be sent there. The 
> Producers we wrote took care of dealing with this. In other words the 
> pipeline kept moving data.
> 
> 
> Of course, in a healthy pipeline, each message was written to ES within a 
> matter of seconds, and we had replication there (as outlined in the 
> accompanying talk). It all worked very well.
> 
> 
> Philip
> 
> 
> -
> http://www.philipotoole.com 
> 
> 
> On Tuesday, September 30, 2014 2:49 PM, Joe Crobak  wrote:
> 
> 
> 
> I didn't know about KAFKA-1215, thanks. I'm not sure it would fully address
> my concerns of a producer writing to the partition leader in different AZ,
> though.
> 
> To answer your question, I was thinking ephemerals with replication, yes.
> With a reservation, it's pretty easy to get e.g. two i2.xlarge for an
> amortized cost below a single m2.2xlarge with the same amount of EBS
> storage and provisioned IOPs.
> 
> 
> On Mon, Sep 29, 2014 at 9:40 PM, Philip O'Toole <
> philip.oto...@yahoo.com.invalid> wrote:
> 
>> If only Kafka had rack awarenessyou could run 1 cluster and set up the
>> replicas in different AZs.
>> 
>> 
>> https://issues.apache.org/jira/browse/KAFKA-1215
>> 
>> As for your question about ephemeral versus EBS, I presume you are
>> proposing to use ephemeral *with* replicas, right?
>> 
>> 
>> Philip
>> 
>> 
>> 
>> -
>> http://www.philipotoole.com
>> 
>> 
>> On Monday, September 29, 2014 9:45 PM, Joe Crobak 
>> wrote:
>> 
>> 
>> 
>> We're planning a deploy to AWS EC2, and I was hoping to get some advice on
>> best practices. I've seen the Loggly presentation [1], which has some good
>> recommendations on instance types and EBS setup. Aside from that, there
>> seem to be several options in terms of multi-Availability Zone (AZ)
>> deployment. The ones we're considering are:
>> 
>> 1) Treat each AZ as a separate data center. Producers write to the kafka
>> cluster in the same AZ. For consumption, two options:
>> 1a) designate one cluster the "master" cluster and use mirrormaker. This
>> was discussed here [2] where some gotchas related to offset management were
>> raised.
>> 1b) Build consumers to consume from both clusters (e.g. Two camus jobs-one
>> for each cluster).
>> 
>> Pros:
>> * if there's a network partition between AZs (or extra latency), the
>> consumer(s) will catch up once the event is resolved.
>> * If an AZ goes offline, only unprocessed data in that AZ is lost

If you run Kafka in AWS or Docker, how do you persist data?

2015-02-26 Thread James Cheng
Hi,

I know that Netflix might be talking about "Kafka on AWS" at the March meetup, 
but I wanted to bring up the topic anyway.

I'm sure that some people are running Kafka in AWS. Is anyone running Kafka 
within docker in production? How does that work?

For both of these, how do you persist data? If on AWS, do you use EBS? Do you 
use ephemeral storage and then rely on replication? And if using docker, do you 
persist data outside the docker container and on the host machine?

And related, how do you deal with broker failure? Do you simply replace it, and 
repopulate a new broker via replication? Or do you bring back up the broker 
with the persisted files?

Trying to learn about what people are doing, beyond "on premises and dedicated 
hardware".

Thanks,
-James



Re: Kafka 0.8.2 log cleaner

2015-03-02 Thread James Cheng
Ivan,

I think log.cleaner.delete.retention.ms does just that?

"The amount of time to retain delete tombstone markers for log compacted 
topics. This setting also gives a bound on the time in which a consumer must 
complete a read if they begin from offset 0 to ensure that they get a valid 
snapshot of the final stage (otherwise delete tombstones may be collected 
before they complete their scan). This setting can be overridden on a per-topic 
basis (see the per-topic configuration section)."

http://kafka.apache.org/documentation.html#brokerconfigs

-James

> On Mar 2, 2015, at 8:57 AM, Ivan Balashov  wrote:
> 
> Svante,
> 
> Not sure if I understand your suggestion correctly, but I do think
> that enabling retention for deleted values would make a useful
> addition to the "compact" policy. Otherwise some data is bound to be
> hanging around not used.
> 
> Guozhang, could this potentially deserve a feature request?
> 
> Thanks,
> 
> 
> 2015-03-02 19:40 GMT+03:00 svante karlsson :
>> Wouldn't it be rather simple to add a retention time on "deleted" items ie
>> keys with null value for topics that are compacted?
>> 
>> The retention time would then be set to some "large" time to allow all
>> consumers to understand that a previous k/v is being deleted.



Re: Database Replication Question

2015-03-04 Thread James Cheng
Another thing to think about is delivery guarantees. Exactly once, at least 
once, etc.

If you have a publisher that consumes from the database log and pushes out to 
Kafka, and then the publisher crashes, what happens when it starts back up? 
Depending on how you keep track of the database's transaction id/scn/offset, 
you may end up re-publishing events that you already published out to the kafka 
topic.

I am also working on database replication, namely from MySQL to Kafka. I'm 
using some of the ideas from http://ben.kirw.in/2014/11/28/kafka-patterns/ in 
order to get exactly once processing, so that I don't have any duplicates in my 
kafka stream.

Specifically, I have the publisher write messages to a single topic (I 
think/hope that Kafka's throughput is high enough). I include MySQL's binary 
log coordinates into my output messages. Upon startup, I read back the "end" of 
my topic to find out what messages I published. This gives me 2 pieces of 
information:
1) The MySQL binary log coordinates, so I know where to start again.
2) The messages that I last published, to make sure that I don't re-publish 
them.

That does mean that all data from all tables is in a single topic. I will 
probably have a consumer that will read that "all tables" topic, and split the 
data out into separate topics, for consumers who just want a subset of the data.

-James

On Mar 4, 2015, at 9:28 AM, Jonathan Hodges  wrote:

> Yes you are right on the oplog per partition as well as that mapping well
> to the Kafka partitions.  I think we are making this harder than it is
> based on previous attempts and trying to leverage something like Databus
> for propagating log changes from MongoDB and Cassandra since it requires a
> scn.  Sounds like direct Kafka makes more sense for these use cases.
> Thanks again!
> 
> 
> On Wed, Mar 4, 2015 at 8:56 AM, Jay Kreps  wrote:
> 
>> Hey Josh,
>> 
>> NoSQL DBs may actually be easier because they themselves generally don't
>> have a global order. I.e. I believe Mongo has a per-partition oplog, is
>> that right? Their partitions would match our partitions.
>> 
>> -Jay
>> 
>> On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader  wrote:
>> 
>>> Thanks everyone for your responses!  These are great.  It seems our cases
>>> matches closest to Jay's recommendations.
>>> 
>>> The one part that sounds a little tricky is point #5 'Include in each
>>> message the database's transaction id, scn, or other identifier '.  This
>> is
>>> pretty straightforward with the RDBMS case that I mentioned, but I could
>>> see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
>>> which might not always have a readily available monotonic id,
>> particularly
>>> in failover scenarios.  I guess in that case we can think about creating
>>> this id ourselves from the single producer.
>>> 
>>> Xiao,
>>> 
>>> I think in the Kafka failover cases you mention if we also store the
>> offset
>>> with replicated data we should be able to pick up where we left off since
>>> we are using the low level consumer.  Maybe I am missing your point
>>> though...
>>> 
>>> Guozhang,
>>> 
>>> Very good point that we didn't think of.  We will need to think this
>>> through, as you say avoid resending other messages in a batch if one is
>>> failed.  I wonder if we might also manage this on the consumer side too
>>> with idempotency.  Thanks for raising this!
>>> 
>>> Josh
>>> 
>>> 
>>> 
>>> On Tue, Mar 3, 2015 at 6:08 PM, Xiao  wrote:
>>> 
 Hey Josh,
 
 Sorry, after reading codes, Kafka did fsync the data using a separate
 thread. The recovery point (oldest transaction timestamp) can be got
>> from
 the file recovery-point-offset-checkpoint.
 
 You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if
>>> you
 think the speed is not quick enough. When the workloads is huge, the
 bottleneck could be in your target side or source side. That means,
>> your
 apply could have enough jobs to do.
 
 Basically, you need to keep reading this file for determining the
>> oldest
 timestamps of all relevant partitions. Then, apply the transactions
>> until
 that timestamp.
 
 Note, this does not protect the transaction consistency. This is just
>> for
 ensuring the data at the target side is consistent at one timestamp
>> when
 you have multiple channel to send data changes. The implementation
>> should
 be simple if you can understand the concepts. I am unable to find the
>>> filed
 patent application about it. This is one related paper. It covers the
>>> main
 concepts about the issues you are facing. "Inter-Data-Center
>> Large-Scale
 Database Replication Optimization - A Workload Driven Partitioning
>>> Approach"
 
 Hopefully, you understood what I explained above.
 
 Best wishes,
 
 Xiao Li
 
 Best wishes,
 
 Xiao Li
 
 On Mar 3, 2015, at 4:23 PM, Xiao  wrote:
 
> Hey Josh,
> 
>

Re: Database Replication Question

2015-03-04 Thread James Cheng

> On Mar 3, 2015, at 4:18 PM, Guozhang Wang  wrote:
> 
> Additionally to Jay's recommendation, you also need to have some special
> cares in error handling of the producer in order to preserve ordering since
> producer uses batching and async sending. That is, if you already sent
> messages 1,2,3,4,5 to producer but later on be notified that message 3
> failed to send, you need to avoid continue sending messages 4,5 before 3
> gets fixed or dropped.
> 

Guozhang, how would we do this? Would this require sending each message 
individually and waiting for acknowledgment of each message?

Send 1
Wait for ack
Send 2
Wait for ack
etc

If I try to send 1,2,3,4,5 in a batch, is it possible that the broker could 
receive 1,2 and 4,5, and that only 3 would fail? Or is it always a contiguous 
chunk, and then the first failure would cause the rest of the batch to abort?

-James

> Guozhang
> 
> On Tue, Mar 3, 2015 at 3:45 PM, Xiao  wrote:
> 
>> Hey Josh,
>> 
>> Transactions can be applied in parallel in the consumer side based on
>> transaction dependency checking.
>> 
>> http://www.google.com.ar/patents/US20080163222
>> 
>> This patent documents how it work. It is easy to understand, however, you
>> also need to consider the hash collision issues. This has been implemented
>> in IBM Q Replication since 2001.
>> 
>> Thanks,
>> 
>> Xiao Li
>> 
>> 
>> On Mar 3, 2015, at 3:36 PM, Jay Kreps  wrote:
>> 
>>> Hey Josh,
>>> 
>>> As you say, ordering is per partition. Technically it is generally
>> possible
>>> to publish all changes to a database to a single partition--generally the
>>> kafka partition should be high throughput enough to keep up. However
>> there
>>> are a couple of downsides to this:
>>> 1. Consumer parallelism is limited to one. If you want a total order to
>> the
>>> consumption of messages you need to have just 1 process, but often you
>>> would want to parallelize.
>>> 2. Often what people want is not a full stream of all changes in all
>> tables
>>> in a database but rather the changes to a particular table.
>>> 
>>> To some extent the best way to do this depends on what you will do with
>> the
>>> data. However if you intend to have lots
>>> 
>>> I have seen pretty much every variation on this in the wild, and here is
>>> what I would recommend:
>>> 1. Have a single publisher process that publishes events into Kafka
>>> 2. If possible use the database log to get these changes (e.g. mysql
>>> binlog, Oracle xstreams, golden gate, etc). This will be more complete
>> and
>>> more efficient than polling for changes, though that can work too.
>>> 3. Publish each table to its own topic.
>>> 4. Partition each topic by the primary key of the table.
>>> 5. Include in each message the database's transaction id, scn, or other
>>> identifier that gives the total order within the record stream. Since
>> there
>>> is a single publisher this id will be monotonic within each partition.
>>> 
>>> This seems to be the best set of tradeoffs for most use cases:
>>> - You can have parallel consumers up to the number of partitions you
>> chose
>>> that still get messages in order per ID'd entity.
>>> - You can subscribe to just one table if you like, or to multiple tables.
>>> - Consumers who need a total order over all updates can do a "merge"
>> across
>>> the partitions to reassemble the fully ordered set of changes across all
>>> tables/partitions.
>>> 
>>> One thing to note is that the requirement of having a single consumer
>>> process/thread to get the total order isn't really so much a Kafka
>>> restriction as it just is a restriction about the world, since if you had
>>> multiple threads even if you delivered messages to them in order their
>>> processing might happen out of order (just do to the random timing of the
>>> processing).
>>> 
>>> -Jay
>>> 
>>> 
>>> 
>>> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader  wrote:
>>> 
 Hi Kafka Experts,
 
 
 
 We have a use case around RDBMS replication where we are investigating
 Kafka.  In this case ordering is very important.  Our understanding is
 ordering is only preserved within a single partition.  This makes sense
>> as
 a single thread will consume these messages, but our question is can we
 somehow parallelize this for better performance?   Is there maybe some
 partition key strategy trick to have your cake and eat it too in terms
>> of
 keeping ordering, but also able to parallelize the processing?
 
 
 
 I am sorry if this has already been asked, but we tried to search
>> through
 the archives and couldn’t find this response.
 
 
 
 Thanks,
 
 Josh
 
>> 
>> 
> 
> 
> --
> -- Guozhang



Re: Database Replication Question

2015-03-05 Thread James Cheng

On Mar 5, 2015, at 12:59 AM, Xiao  wrote:

> Hi, James, 
> 
> This design regarding the restart point has a few potential issues, I think. 
> 
> - The restart point is based on the messages that you last published. The 
> message could be pruned. How large is your log.retention.hours?

That's a good point. In my case, I will be publishing this into a log compacted 
topic. My goal is that by reading the log compacted topic from beginning to 
end, and then continually applying new items as they come in, that it will 
always contain the final state of the database. So messages will only be pruned 
if they are being overwritten by later ones, and in that case, the "later" one 
is the one that I now care about.

> - If the Kafka message order is different from your log sequence, your 
> replication might lose the data. 

Good point. I think it depends on use case, and also if it's possible to 
"recover" lost data.

I don't think I'll be losing any messages, but it's possible that, due to 
network delays, that the items might arrive in Kafka out of order. And in my 
use case, that would be bad because "last items wins". In the log compaction 
case, I would be able to detect that because the "last" item in Kafka for a 
particular primary key would not match the state of the item in the mysql 
database. I could repair that item by re-publishing what is in the database, 
out to the stream.

I planned to have an auditor monitor the "correctness" of the Kafka topic, to 
ensure that it reflected the state of the database. I think I will now need to 
add a "correction" component, that will republish any items that are incorrect.

> First, I think you can maintain a local persistence media for recording the 
> last published message id. 

That works, but there is a small window of failure that can result in 
duplicates.

If you normally:
1) Write to Kafka
2) Write your "last published message id" somewhere.

You might crash between steps 1 and 2. When you come back up, you might end up 
re-publishing your last event.

-James

> Second, if you can add into each message a strictly increasing dense ID in 
> the producers, you can easily recover the sequences in the consumers. If so, 
> you can have multiple producers publish the messages at the same time. This 
> could improve your throughput and your consumers can easily identify if any 
> message is lost due to any reason. 
> 
> Best wishes, 
> 
> Xiao Li
> 
> 
> On Mar 4, 2015, at 4:59 PM, James Cheng  wrote:
> 
>> Another thing to think about is delivery guarantees. Exactly once, at least 
>> once, etc.
>> 
>> If you have a publisher that consumes from the database log and pushes out 
>> to Kafka, and then the publisher crashes, what happens when it starts back 
>> up? Depending on how you keep track of the database's transaction 
>> id/scn/offset, you may end up re-publishing events that you already 
>> published out to the kafka topic.
>> 
>> I am also working on database replication, namely from MySQL to Kafka. I'm 
>> using some of the ideas from http://ben.kirw.in/2014/11/28/kafka-patterns/ 
>> in order to get exactly once processing, so that I don't have any duplicates 
>> in my kafka stream.
>> 
>> Specifically, I have the publisher write messages to a single topic (I 
>> think/hope that Kafka's throughput is high enough). I include MySQL's binary 
>> log coordinates into my output messages. Upon startup, I read back the "end" 
>> of my topic to find out what messages I published. This gives me 2 pieces of 
>> information:
>> 1) The MySQL binary log coordinates, so I know where to start again.
>> 2) The messages that I last published, to make sure that I don't re-publish 
>> them.
>> 
>> That does mean that all data from all tables is in a single topic. I will 
>> probably have a consumer that will read that "all tables" topic, and split 
>> the data out into separate topics, for consumers who just want a subset of 
>> the data.
>> 
>> -James
>> 
>> On Mar 4, 2015, at 9:28 AM, Jonathan Hodges  wrote:
>> 
>>> Yes you are right on the oplog per partition as well as that mapping well
>>> to the Kafka partitions.  I think we are making this harder than it is
>>> based on previous attempts and trying to leverage something like Databus
>>> for propagating log changes from MongoDB and Cassandra since it requires a
>>> scn.  Sounds like direct Kafka makes more sense for these use cases.
>>> Thanks again!
>>> 
>>> 
>>> On Wed, Mar 4, 2015 at 8:56 AM, Jay Kreps  wrote:
>>> 
>>

Re: createMessageStreams vs createMessageStreamsByFilter

2015-03-10 Thread James Cheng
Hi,

Sorry to bring up this old thread, but my question is about this exact thing:

Guozhang, you said:
> A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
> partitions.
> 
> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
> 
> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> respectively.


You said that in the createMessageStreamsByFilter case, if topic AC had no 
messages in it and consumer.timeout.ms = -1, then the 3 threads might all be 
blocked waiting for data to arrive from topic AC, and so messages from BC would 
not be processed.

createMessageStreamsByFilter("*C" => 1) (single stream) would have the same 
problem but just worse. Behind the scenes, is there a single thread that is 
consuming (round-robin?) messages from the different partitions and inserting 
them all into a single queue for the application code to process? And that is 
why a single partition with no messages with block the other messages from 
getting through?

What about createMessageStreams("AC" => 1)? That creates a single stream that 
contains messages from multiple partitions, which might be on different 
brokers. Does that also suffer the same problem, where if one partition has no 
messages, that the application would not receive messages from the other 
paritions?

Thanks,
-James


On Feb 11, 2015, at 8:13 AM, Guozhang Wang  wrote:

> The new consumer will be released in 0.9, which is targeted for end of this
> quarter.
> 
> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao  wrote:
> 
>> Do you know when the new consumer API will be publicly available?
>> 
>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang 
>> wrote:
>> 
>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>> different processes and AC processors gets stuck, hence AC messages will
>>> fill up in the consumer's buffer and eventually prevents the fetcher
>> thread
>>> to put more data into it; the fetcher thread will be blocked on that and
>>> not be able to fetch BC.
>>> 
>>> This issue has been addressed in the new consumer client, which is
>>> single-threaded with non-blocking APIs.
>>> 
>>> Guozhang
>>> 
>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao  wrote:
>>> 
 Thank you Guozhang for your detailed explanation. In your example
 createMessageStreamsByFilter("*C" => 3)  since threads are shared among
 topics there may be situation where all 3 threads threads get stuck
>> with
 topic AC e.g. topic is empty which will be holding the connecting
>> threads
 (setting consumer.timeout.ms=-1) hence there is no thread to serve
>> topic
 BC. do you think this situation will happen?
 
 On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang 
>>> wrote:
 
> I was not clear before .. for createMessageStreamsByFilter each
>> matched
> topic will have num-threads, but shared: i.e. there will be totally
> num-threads created, but each thread will be responsible for fetching
>>> all
> matched topics.
> 
> A more concrete example: say you have topic AC: 3 partitions, topic
>>> BC: 6
> partitions.
> 
> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>>> will
> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>> respectively;
> 
> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>> will
>>> be
> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> respectively.
> 
> Guozhang
> 
> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao 
>>> wrote:
> 
>> Guozhang,
>> 
>> Do you mean that each regex matched topic owns number of threads
>> that
 get
>> passed in to createMessageStreamsByFilter ? For example in below
>> code
 If
> I
>> have 3 matched topics each of which has 2 partitions then I should
>>> have
> 3 *
>> 2 = 6 threads in total with each topic owning 2 threads.
>> 
>> TopicFilter filter = new Whitelist(".*");
>> 
>> int threadTotal = 2;
>> 
>> List> streams = connector
>> .createMessageStreamsByFilter(filter, threadTotal);
>> 
>> 
>> But what I observed from the log is different
>> 
>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>> following
>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>> consumers:
>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>> test1234dd5_localhost-1423585444070-82f23758-1)
>> 
>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>> partition 1
>> 
>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>> test1234dd5_localhost-1423585444

Re: createMessageStreams vs createMessageStreamsByFilter

2015-03-11 Thread James Cheng

On Mar 11, 2015, at 9:12 AM, Guozhang Wang  wrote:

> Hi James,
> 
> What I meant before is that a single fetcher may be responsible for putting
> fetched data to multiple queues according to the construction of the
> streams setup, where each queue may be consumed by a different thread. And
> the queues are actually bounded. Now say if there are two queues that are
> getting data from the same fetcher F, and are consumed by two different
> user threads A and B. If thread A for some reason got slowed / hung
> consuming data from queue 1, then queue 1 will eventually get full, and F
> trying to put more data to it will be blocked. Since F is parked on trying
> to put data to queue 1, queue 2 will not get more data from it, and thread
> B may hence gets starved. Does that make sense now?
> 

Yes, that makes sense. That is the scenario where one thread of a consumer can 
cause a backup in the queue, which would cause other threads to not receive 
data.

What about the situation I described, where a thread consumes a queue that is 
supposed to be filled with messages from multiple partitions? If partition A 
has no messages and partitions B and C do, how will the fetcher behave? Will 
the processing thread receive messages from partitions B and C?

Thanks,
-James


> Guozhang
> 
> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng  wrote:
> 
>> Hi,
>> 
>> Sorry to bring up this old thread, but my question is about this exact
>> thing:
>> 
>> Guozhang, you said:
>>> A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
>>> partitions.
>>> 
>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
>>> 
>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>> respectively.
>> 
>> 
>> You said that in the createMessageStreamsByFilter case, if topic AC had no
>> messages in it and consumer.timeout.ms = -1, then the 3 threads might all
>> be blocked waiting for data to arrive from topic AC, and so messages from
>> BC would not be processed.
>> 
>> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
>> same problem but just worse. Behind the scenes, is there a single thread
>> that is consuming (round-robin?) messages from the different partitions and
>> inserting them all into a single queue for the application code to process?
>> And that is why a single partition with no messages with block the other
>> messages from getting through?
>> 
>> What about createMessageStreams("AC" => 1)? That creates a single stream
>> that contains messages from multiple partitions, which might be on
>> different brokers. Does that also suffer the same problem, where if one
>> partition has no messages, that the application would not receive messages
>> from the other paritions?
>> 
>> Thanks,
>> -James
>> 
>> 
>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang  wrote:
>> 
>>> The new consumer will be released in 0.9, which is targeted for end of
>> this
>>> quarter.
>>> 
>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao  wrote:
>>> 
>>>> Do you know when the new consumer API will be publicly available?
>>>> 
>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang 
>>>> wrote:
>>>> 
>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>>>> different processes and AC processors gets stuck, hence AC messages
>> will
>>>>> fill up in the consumer's buffer and eventually prevents the fetcher
>>>> thread
>>>>> to put more data into it; the fetcher thread will be blocked on that
>> and
>>>>> not be able to fetch BC.
>>>>> 
>>>>> This issue has been addressed in the new consumer client, which is
>>>>> single-threaded with non-blocking APIs.
>>>>> 
>>>>> Guozhang
>>>>> 
>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao 
>> wrote:
>>>>> 
>>>>>> Thank you Guozhang for your detailed explanation. In your example
>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
>> among
>>>>>> topics there may be situation where all 3 threads threads get stuck
>>>> with
>>>>>> topic AC e.

Re: createMessageStreams vs createMessageStreamsByFilter

2015-03-12 Thread James Cheng
Ah, I understand now. I didn't realize that there was one fetcher thread per 
broker. 

Thanks Tao & Guozhang!
-James


On Mar 11, 2015, at 5:00 PM, tao xiao  wrote:

> Fetcher thread is per broker basis, it ensures that at lease one fetcher
> thread per broker. Fetcher thread is sent to broker with a fetch request to
> ask for all partitions. So if A, B, C are in the same broker fetcher thread
> is still able to fetch data from A, B, C even though A returns no data.
> same logic is applied to different broker.
> 
> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng  wrote:
> 
>> 
>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang  wrote:
>> 
>>> Hi James,
>>> 
>>> What I meant before is that a single fetcher may be responsible for
>> putting
>>> fetched data to multiple queues according to the construction of the
>>> streams setup, where each queue may be consumed by a different thread.
>> And
>>> the queues are actually bounded. Now say if there are two queues that are
>>> getting data from the same fetcher F, and are consumed by two different
>>> user threads A and B. If thread A for some reason got slowed / hung
>>> consuming data from queue 1, then queue 1 will eventually get full, and F
>>> trying to put more data to it will be blocked. Since F is parked on
>> trying
>>> to put data to queue 1, queue 2 will not get more data from it, and
>> thread
>>> B may hence gets starved. Does that make sense now?
>>> 
>> 
>> Yes, that makes sense. That is the scenario where one thread of a consumer
>> can cause a backup in the queue, which would cause other threads to not
>> receive data.
>> 
>> What about the situation I described, where a thread consumes a queue that
>> is supposed to be filled with messages from multiple partitions? If
>> partition A has no messages and partitions B and C do, how will the fetcher
>> behave? Will the processing thread receive messages from partitions B and C?
>> 
>> Thanks,
>> -James
>> 
>> 
>>> Guozhang
>>> 
>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng  wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Sorry to bring up this old thread, but my question is about this exact
>>>> thing:
>>>> 
>>>> Guozhang, you said:
>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>> BC: 6
>>>>> partitions.
>>>>> 
>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>> will
>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>> respectively;
>>>>> 
>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will
>> be
>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>>>> respectively.
>>>> 
>>>> 
>>>> You said that in the createMessageStreamsByFilter case, if topic AC had
>> no
>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads might
>> all
>>>> be blocked waiting for data to arrive from topic AC, and so messages
>> from
>>>> BC would not be processed.
>>>> 
>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
>>>> same problem but just worse. Behind the scenes, is there a single thread
>>>> that is consuming (round-robin?) messages from the different partitions
>> and
>>>> inserting them all into a single queue for the application code to
>> process?
>>>> And that is why a single partition with no messages with block the other
>>>> messages from getting through?
>>>> 
>>>> What about createMessageStreams("AC" => 1)? That creates a single stream
>>>> that contains messages from multiple partitions, which might be on
>>>> different brokers. Does that also suffer the same problem, where if one
>>>> partition has no messages, that the application would not receive
>> messages
>>>> from the other paritions?
>>>> 
>>>> Thanks,
>>>> -James
>>>> 
>>>> 
>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang  wrote:
>>>> 
>>>>> The new consumer will be released in 0.9, which is targeted for end of
>>>> this
>>>>> quarter.
>>>>> 
>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao 
>>

Re: [ANN] sqlstream: Simple MySQL binlog to Kafka stream

2015-03-16 Thread James Cheng
Super cool, and super simple.

I like how it is pretty much a pure translation of the binlog into Kafka, with 
no interpretation of the events. That means people can layer whatever they want 
on top of it. They would have to understand what the mysql binary events mean, 
but they would just have to interact with kafka, and not with mysql.

Will you be working on the reconnection strategy, so that you resume from the 
last binlog position that you left off at? Do you anticipate that there will be 
duplicate events in the output stream, or are you going to go for exactly-once?

-James

On Mar 16, 2015, at 7:18 AM, Pierre-Yves Ritschard  wrote:

> Hi kafka,
> 
> I just wanted to mention I published a very simple project which can
> connect as MySQL replication client and stream replication events to
> kafka: https://github.com/pyr/sqlstream
> 
> When you don't have control over an application, it can provide a simple
> way of consolidating SQL data in kafka.
> 
> This is an early release and there are a few caveats (mentionned in the
> README), mostly the poor partitioning which I'm going to evolve quickly
> and the reconnection strategy which doesn't try to keep track of binlog
> position, other than that, it should work as advertised.
> 
> Cheers,
>  - pyr



Re: [ANN] sqlstream: Simple MySQL binlog to Kafka stream

2015-03-17 Thread James Cheng
This is a great set of projects!

We should put this list of projects on a site somewhere so people can more 
easily see and refer to it. These aren't Kafka-specific, but most seem to be 
"MySQL CDC." Does anyone have a place where they can host a page? Preferably a 
wiki, so we can keep it up to date easily.

-James

On Mar 17, 2015, at 8:21 AM, Hisham Mardam-Bey  
wrote:

> Pretty much a hijack / plug as well (=
> 
> https://github.com/mardambey/mypipe
> 
> "MySQL binary log consumer with the ability to act on changed rows and
> publish changes to different systems with emphasis on Apache Kafka."
> 
> Mypipe currently encodes events using Avro before pushing them into Kafka
> and is Avro schema repository aware. The project is young; and patches for
> improvements are appreciated (=
> 
> On Mon, Mar 16, 2015 at 10:35 PM, Arya Ketan  wrote:
> 
>> Great work.
>> Sorry for kinda hijacking this thread, but I though that we had built
>> some-thing on mysql bin log event propagator and wanted to share it .
>> You guys can also look into Aesop ( https://github.com/Flipkart/aesop).
>> Its
>> a change propagation frame-work. It has relays which listens to bin logs of
>> Mysql, keeps track of SCNs  and has consumers which can then (transform/map
>> or interpret as is) the bin log-event to a destination. Consumers also keep
>> track of SCNs and a slow consumer can go back to a previous SCN if it wants
>> to re-listen to events  ( similar to kafka's consumer view ).
>> 
>> All the producers/consumers are extensible and you can write your own
>> custom consumer and feed off the data to it.
>> 
>> Common use-cases:
>> a) Archive mysql based data into say hbase
>> b) Move mysql based data to say a search store for serving reads.
>> 
>> It has a decent ( not an awesome :) ) console too which gives a nice human
>> readable view of where the producers and consumers are.
>> 
>> Current supported producers are mysql bin logs, hbase wall-edits.
>> 
>> 
>> Further insights/reviews/feature reqs/pull reqs/advices are all welcome.
>> 
>> --
>> Arya
>> 
>> Arya
>> 
>> On Tue, Mar 17, 2015 at 1:48 AM, Gwen Shapira 
>> wrote:
>> 
>>> Really really nice!
>>> 
>>> Thank you.
>>> 
>>> On Mon, Mar 16, 2015 at 7:18 AM, Pierre-Yves Ritschard >> 
>>> wrote:
 Hi kafka,
 
 I just wanted to mention I published a very simple project which can
 connect as MySQL replication client and stream replication events to
 kafka: https://github.com/pyr/sqlstream
 
 When you don't have control over an application, it can provide a
>> simple
 way of consolidating SQL data in kafka.
 
 This is an early release and there are a few caveats (mentionned in the
 README), mostly the poor partitioning which I'm going to evolve quickly
 and the reconnection strategy which doesn't try to keep track of binlog
 position, other than that, it should work as advertised.
 
 Cheers,
  - pyr
>>> 
>> 
> 
> 
> 
> -- 
> Hisham Mardam-Bey
> http://hisham.cc/



Re: Kafka 0.9 consumer API

2015-03-19 Thread James Cheng
Those are pretty much the best javadocs I've ever seen. :)

Nice job, Kafka team.

-James

> On Mar 19, 2015, at 9:40 PM, Jay Kreps  wrote:
> 
> Err, here:
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> 
> -Jay
> 
> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps  wrote:
> 
>> The current work in progress is documented here:
>> 
>> 
>> On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian 
>> wrote:
>> 
>>> Is there a link to the proposed new consumer non-blocking API?
>>> 
>>> Thanks,
>>> Rajiv
>>> 
>> 
>> 



Re: Post on running Kafka at LinkedIn

2015-03-20 Thread James Cheng
For those who missed it:

The Kafka Audit tool was also presented at the 1/27 Kafka meetup:
http://www.meetup.com/http-kafka-apache-org/events/219626780/

Recorded video is here, starting around the 40 minute mark:
http://www.ustream.tv/recorded/58109076

Slides are here:
http://www.ustream.tv/recorded/58109076

-James

> On Mar 20, 2015, at 9:47 AM, Todd Palino  wrote:
> 
> For those who are interested in detail on how we've got Kafka set up at
> LinkedIn, I have just published a new posted to our Engineering blog titled
> "Running Kafka at Scale"
> 
> https://engineering.linkedin.com/kafka/running-kafka-scale
> 
> It's a general overview of our current Kafka install, tiered architecture,
> audit, and the libraries we use for producers and consumers. You'll also be
> seeing more posts from the SRE team here in the coming weeks on deeper
> looks into both Kafka and Samza.
> 
> Additionally, I'll be giving a talk at ApacheCon next month on running
> tiered Kafka architectures. If you're in Austin for that, please come by
> and check it out.
> 
> -Todd



Re: Post on running Kafka at LinkedIn

2015-03-20 Thread James Cheng
Oops. Thank you Tao Xiao!

-James

> On Mar 20, 2015, at 11:49 AM, tao xiao  wrote:
> 
> here is the slide
> 
> http://www.slideshare.net/JonBringhurst/kafka-audit-kafka-meetup-january-27th-2015
> 
> On Sat, Mar 21, 2015 at 2:36 AM, Xiao  wrote:
> 
>> Hi, James,
>> 
>> Thank you for sharing it!
>> 
>> The links of videos and slides are the same. Could you check the link of
>> slides?
>> 
>> Xiao Li
>> 
>> On Mar 20, 2015, at 11:30 AM, James Cheng  wrote:
>> 
>>> For those who missed it:
>>> 
>>> The Kafka Audit tool was also presented at the 1/27 Kafka meetup:
>>> http://www.meetup.com/http-kafka-apache-org/events/219626780/
>>> 
>>> Recorded video is here, starting around the 40 minute mark:
>>> http://www.ustream.tv/recorded/58109076
>>> 
>>> Slides are here:
>>> http://www.ustream.tv/recorded/58109076
>>> 
>>> -James
>>> 
>>>> On Mar 20, 2015, at 9:47 AM, Todd Palino  wrote:
>>>> 
>>>> For those who are interested in detail on how we've got Kafka set up at
>>>> LinkedIn, I have just published a new posted to our Engineering blog
>> titled
>>>> "Running Kafka at Scale"
>>>> 
>>>>   https://engineering.linkedin.com/kafka/running-kafka-scale
>>>> 
>>>> It's a general overview of our current Kafka install, tiered
>> architecture,
>>>> audit, and the libraries we use for producers and consumers. You'll
>> also be
>>>> seeing more posts from the SRE team here in the coming weeks on deeper
>>>> looks into both Kafka and Samza.
>>>> 
>>>> Additionally, I'll be giving a talk at ApacheCon next month on running
>>>> tiered Kafka architectures. If you're in Austin for that, please come by
>>>> and check it out.
>>>> 
>>>> -Todd
>>> 
>> 
>> 
> 
> 
> -- 
> Regards,
> Tao



Re: Post on running Kafka at LinkedIn

2015-03-20 Thread James Cheng
Amazing growth numbers.

At the meetup on 1/27, Clark Haskins presented their Kafka usage at the time. 
It was:

Bytes in: 120 TB
Messages In: 585 million
Bytes out: 540 TB
Total brokers: 704

In Todd's post, the current numbers:

Bytes in: 175 TB (45% growth)
Messages In: 800 billion (36% growth)
Bytes out: 650 TB (20% growth)
Total brokers: 1100 (56% growth)

That much growth in just 2 months? Wowzers.

-James

> On Mar 20, 2015, at 11:30 AM, James Cheng  wrote:
> 
> For those who missed it:
> 
> The Kafka Audit tool was also presented at the 1/27 Kafka meetup:
> http://www.meetup.com/http-kafka-apache-org/events/219626780/
> 
> Recorded video is here, starting around the 40 minute mark:
> http://www.ustream.tv/recorded/58109076
> 
> Slides are here:
> http://www.ustream.tv/recorded/58109076
> 
> -James
> 
>> On Mar 20, 2015, at 9:47 AM, Todd Palino  wrote:
>> 
>> For those who are interested in detail on how we've got Kafka set up at
>> LinkedIn, I have just published a new posted to our Engineering blog titled
>> "Running Kafka at Scale"
>> 
>>https://engineering.linkedin.com/kafka/running-kafka-scale
>> 
>> It's a general overview of our current Kafka install, tiered architecture,
>> audit, and the libraries we use for producers and consumers. You'll also be
>> seeing more posts from the SRE team here in the coming weeks on deeper
>> looks into both Kafka and Samza.
>> 
>> Additionally, I'll be giving a talk at ApacheCon next month on running
>> tiered Kafka architectures. If you're in Austin for that, please come by
>> and check it out.
>> 
>> -Todd
> 



Re: [ANN] sqlstream: Simple MySQL binlog to Kafka stream

2015-03-23 Thread James Cheng
I created a wiki page that lists all the MySQL replication options that people 
posted, plus a couple others. People may/may not find it useful.

https://github.com/wushujames/mysql-cdc-projects/wiki

I wasn't sure where to host it, so I put it up on a Github Wiki.

-James

On Mar 17, 2015, at 11:09 PM, Xiao  wrote:

> Linkedin Gabblin compaction tool is using Hive to perform the compaction. 
> Does it mean Lumos is replaced?
> 
> Confused…
> 
> On Mar 17, 2015, at 10:00 PM, Xiao  wrote:
> 
>> Hi, all,
>> 
>> Do you know whether Linkedin plans to open source Lumos in the near future?
>> 
>> I found the answer from Qiao Lin’s post about replication from Oracle/mySQL 
>> to Hadoop.
>> 
>>  - https://engineering.linkedin.com/data-ingestion/gobblin-big-data-ease
>> 
>> At the source side, it can be DataBus-based or file based.
>> 
>> At the target side, it is Lumos to rebuild the snapshots due to inability to 
>> do an update/delete in Hadoop.
>> 
>> The slides about Lumos:
>>  http://www.slideshare.net/Hadoop_Summit/th-220p230-cramachandranv1
>> The talk about Lumos:
>>  https://www.youtube.com/watch?v=AGlRjlrNDYk
>> 
>> Event publishing is different from database replication. Kafka is used for 
>> change publishing or maybe also used for sending changes (recorded in files).
>> 
>> Thanks,
>> 
>> Xiao Li
>> 
>> On Mar 17, 2015, at 7:26 PM, Arya Ketan  wrote:
>> 
>>> AFAIK , linkedin uses databus to do the same. Aesop is built on top of
>>> databus , extending its beautiful capabilities to mysql n hbase
>>> On Mar 18, 2015 7:37 AM, "Xiao"  wrote:
>>> 
>>>> Hi, all,
>>>> 
>>>> Do you know how Linkedin team publishes changed rows in Oracle to Kafka? I
>>>> believe they already knew the whole problem very well.
>>>> 
>>>> Using triggers? or directly parsing the log? or using any Oracle
>>>> GoldenGate interfaces?
>>>> 
>>>> Any lesson or any standard message format? Could the Linkedin people share
>>>> it with us? I believe it can help us a lot.
>>>> 
>>>> Thanks,
>>>> 
>>>> Xiao Li
>>>> 
>>>> 
>>>> On Mar 17, 2015, at 12:26 PM, James Cheng  wrote:
>>>> 
>>>>> This is a great set of projects!
>>>>> 
>>>>> We should put this list of projects on a site somewhere so people can
>>>> more easily see and refer to it. These aren't Kafka-specific, but most seem
>>>> to be "MySQL CDC." Does anyone have a place where they can host a page?
>>>> Preferably a wiki, so we can keep it up to date easily.
>>>>> 
>>>>> -James
>>>>> 
>>>>> On Mar 17, 2015, at 8:21 AM, Hisham Mardam-Bey <
>>>> hisham.mardam...@gmail.com> wrote:
>>>>> 
>>>>>> Pretty much a hijack / plug as well (=
>>>>>> 
>>>>>> https://github.com/mardambey/mypipe
>>>>>> 
>>>>>> "MySQL binary log consumer with the ability to act on changed rows and
>>>>>> publish changes to different systems with emphasis on Apache Kafka."
>>>>>> 
>>>>>> Mypipe currently encodes events using Avro before pushing them into
>>>> Kafka
>>>>>> and is Avro schema repository aware. The project is young; and patches
>>>> for
>>>>>> improvements are appreciated (=
>>>>>> 
>>>>>> On Mon, Mar 16, 2015 at 10:35 PM, Arya Ketan 
>>>> wrote:
>>>>>> 
>>>>>>> Great work.
>>>>>>> Sorry for kinda hijacking this thread, but I though that we had built
>>>>>>> some-thing on mysql bin log event propagator and wanted to share it .
>>>>>>> You guys can also look into Aesop ( https://github.com/Flipkart/aesop
>>>> ).
>>>>>>> Its
>>>>>>> a change propagation frame-work. It has relays which listens to bin
>>>> logs of
>>>>>>> Mysql, keeps track of SCNs  and has consumers which can then
>>>> (transform/map
>>>>>>> or interpret as is) the bin log-event to a destination. Consumers also
>>>> keep
>>>>>>> track of SCNs and a slow consumer can go back to a previous SCN if it
>>>> wants
>>>>>>> to re-listen to events  ( simi

How to consume from a specific topic, as well as a wildcard of topics?

2015-04-03 Thread James Cheng
Hi,

I want to consume from both a specific topic "a_topic" as well as all topics 
that match a certain prefix "prefix.*".

When I do that using a single instance of a ConsumerConnector, I get a hang 
when creating the 2nd set of message streams.

Code:
ConsumerConnector consumer = 
Consumer.createJavaConsumerConnector(consumerConfig);
Map topicCountMap = new HashMap();
topicCountMap.put("a_topic", new Integer(1));

Map>> consumerMap = 
consumer.createMessageStreams(topicCountMap);

// do stuff with resulting streams

TopicFilter whitelist = new Whitelist("prefix\\.*");
List> wildcardStreams = 
consumer.createMessageStreamsByFilter(whitelist, 1);

It hangs when inside createMessageStreamsByFilter(), within 
createEphemeralPathExpectConflictHandleZKBug(): There is an info() message 
saying:
info("I wrote this conflicted ephemeral node [%s] at %s a while back in 
a different session, ".format(data, path)
  + "hence I will backoff for this node to be deleted by 
Zookeeper and retry")

Is this expected to work? Can a ConsumerConnector be used like this, or should 
I have 2 ConsumerConnectors; one for the specific topic, and another for the 
wildcarded topics?

It works when I use 2 ConsumerConnectors, but I just wanted to check if this is 
expected or not.

Thanks,
-James



Re: serveral questions about auto.offset.reset

2015-04-14 Thread James Cheng
"What to do when there is no initial offset in ZooKeeper or if an offset is out 
of range"

I personally find the name "auto.offset.reset" to be somewhat confusing. That's 
mostly because I only knew of it as the "no initial offset"  setting.

An alternate name could be "auto.offset.initial", to handle the "no initial 
offset in Zookeeper" case. But that doesn't describe the "offset out of range" 
case.

-James

On Apr 13, 2015, at 10:28 PM, Ewen Cheslack-Postava  wrote:

> On Mon, Apr 13, 2015 at 10:10 PM, bit1...@163.com  wrote:
> 
>> Hi, Kafka experts:
>> 
>> I got serveral questions about auto.offset.reset. This configuration
>> parameter governs how  consumer read the message from Kafka when there is
>> no initial offset in ZooKeeper or if an offset is out of range.
>> 
>> Q1. "no initial offset in zookeeper "  means that there isn't any consumer
>> to consume the message yet(The offset is set once the consumer starts to
>> consume)?
>> 
> 
> Yes, or if you consumed messages, but auto offset commit is disabled and
> you haven't explicitly committed any offsets.
> 
> 
>> Q2:  What does "offset is out of range" mean? Can you eleborate one
>> scenario when "offset is out of range" could happen?
>> 
> 
> Kafka uses a retention policy for topics to expire data and clean it up. If
> some messages expire and your consumer hasn't run in awhile, the last
> committed offset may no longer exist.
> 
> 
>> 
>> auto.offset.reset has two values:smallest and largest.
>> Assume one scenario: A producer has produced 10 messages to kafka, and
>> there is no consumer yet to consume it.
>> Q3: If auto.offset.reset is set to "smallest", does it mean that the
>> consumer will read the message from the offset 0?(0 is smallest here)
>> 
> 
> Yes.
> 
> 
>> Q4: If auto.offset.reset is set to "largest", does it mean that the
>> consumer will not read any message but wait until new messages come?
>> 
> 
> Also correct. This is why in the quickstart you need to use the
> --from-beginning flag on the console consumer. Since the consumer is
> executed after the console producer it wouldn't see any messages unless it
> set auto.offset.reset to smallest, which is what --from-beginning does.
> 
> 
>> 
>> 
>> 
>> bit1...@163.com
>> 
> 
> 
> 
> -- 
> Thanks,
> Ewen



New and old producers partition messages differently

2015-04-24 Thread James Cheng
Hi,

I was playing with the new producer in 0.8.2.1 using partition keys ("semantic 
partitioning" I believe is the phrase?). I noticed that the default partitioner 
in 0.8.2.1 does not partition items the same way as the old 0.8.1.1 default 
partitioner was doing. For a test item, the old producer was sending it to 
partition 0, whereas the new producer was sending it to partition 4.

Digging in the code, it appears that the partitioning logic is different 
between the old and new producers. Both of them hash the key, but they use 
different hashing algorithms.

Old partitioner:
./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:

  def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
  }

New partitioner:
./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:

} else {
// hash the key to choose a partition
return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
}

Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2 isn't 
the same logic as hashCode, especially since hashCode is overrideable).

Was it intentional that the hashing algorithm would change between the old and 
new producer? If so, was this documented? I don't know if anyone was relying on 
the old default partitioner, as opposed to going round-robin or using their own 
custom partitioner. Do you expect it to change in the future? I'm guessing that 
one of the main reasons to have a custom hashing algorithm is so that you are 
full control of the partitioning and can keep it stable (as opposed to being 
reliant on hashCode()).

Thanks,
-James



Re: New and old producers partition messages differently

2015-04-27 Thread James Cheng

On Apr 26, 2015, at 9:03 PM, Gwen Shapira  wrote:

> Definitely +1 for advertising this in the docs.
> 
> What I can't figure out is the upgrade path... if my application assumes
> that all data for a single user is in one partition (so it subscribes to a
> single partition and expects everything about a specific subset of users to
> be in that partition), this assumption will not survive an upgrade to
> 0.8.2.X.  I think the assumption of stable hash partitions even after
> upgrades is pretty reasonable (i.e. I made it about gazillion times without
> thinking twice). Note that in this story my app wasn't even upgraded - it
> broke because a producer upgraded to a new API.
> 

Agreed. And part of the whole thing about Kafka is decoupling producer from 
consumer. If you upgrade the producer and "break" the stream, you may break 
consumers that you don't even know exist.

(Side question: How do you even identify who your consumers are? The only way I 
can think of is by looking at something like consumer groups and offsets, and 
find mappings between those and the people who wrote those.)

-James

> If we advertise: "upgrading to the new producer API may break consumers",
> we may need to offer a work-around to allow people to upgrade producers
> anyway.
> Perhaps we can say "wait for Sriharsha's partitioner patch and write a
> custom partitioner that uses hashcode()".
> 
> Gwen
> 
> 
> 
> On Sun, Apr 26, 2015 at 7:57 AM, Jay Kreps  wrote:
> 
>> This was actually intentional.
>> 
>> The problem with relying on hashCode is that
>> (1) it is often a very bad hash function,
>> (2) it is not guaranteed to be consistent from run to run (i.e. if you
>> restart the jvm the value of hashing the same key can change!),
>> (3) it is not available outside the jvm so non-java producers can't use the
>> same function.
>> 
>> In general at the moment different producers don't use the same hash code
>> so I think this is not quite as bad as it sounds. Though it would be good
>> to standardize things.
>> 
>> I think the most obvious thing we could do here would be to do a much
>> better job of advertising this in the docs, though, so people don't get
>> bitten by it.
>> 
>> -Jay
>> 
>> On Fri, Apr 24, 2015 at 5:48 PM, James Cheng  wrote:
>> 
>>> Hi,
>>> 
>>> I was playing with the new producer in 0.8.2.1 using partition keys
>>> ("semantic partitioning" I believe is the phrase?). I noticed that the
>>> default partitioner in 0.8.2.1 does not partition items the same way as
>> the
>>> old 0.8.1.1 default partitioner was doing. For a test item, the old
>>> producer was sending it to partition 0, whereas the new producer was
>>> sending it to partition 4.
>>> 
>>> Digging in the code, it appears that the partitioning logic is different
>>> between the old and new producers. Both of them hash the key, but they
>> use
>>> different hashing algorithms.
>>> 
>>> Old partitioner:
>>> ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
>>> 
>>>  def partition(key: Any, numPartitions: Int): Int = {
>>>Utils.abs(key.hashCode) % numPartitions
>>>  }
>>> 
>>> New partitioner:
>>> 
>>> 
>> ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
>>> 
>>>} else {
>>>// hash the key to choose a partition
>>>return Utils.abs(Utils.murmur2(record.key())) %
>> numPartitions;
>>>}
>>> 
>>> Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
>>> isn't the same logic as hashCode, especially since hashCode is
>>> overrideable).
>>> 
>>> Was it intentional that the hashing algorithm would change between the
>> old
>>> and new producer? If so, was this documented? I don't know if anyone was
>>> relying on the old default partitioner, as opposed to going round-robin
>> or
>>> using their own custom partitioner. Do you expect it to change in the
>>> future? I'm guessing that one of the main reasons to have a custom
>> hashing
>>> algorithm is so that you are full control of the partitioning and can
>> keep
>>> it stable (as opposed to being reliant on hashCode()).
>>> 
>>> Thanks,
>>> -James
>>> 
>>> 
>> 



Is there a way to know when I've reached the end of a partition (consumed all messages) when using the high-level consumer?

2015-05-08 Thread James Cheng
Hi,

I want to use the high level consumer to read all partitions for a topic, and 
know when I have reached "the end". I know "the end" might be a little vague, 
since items keep showing up, but I'm trying to get as close as possible. I know 
that more messages might show up later, but I want to know when I've received 
all the items that are currently available in the topic.

Is there a standard/recommended way to do this?

I know one way to do it is to first issue an OffsetRequest for each partition, 
which would get me the last offset, and then use that information in my high 
level consumer to detect when I've reached that a message with that offset. 
Which is exactly what the SimpleConsumer example does 
(https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example).
 That involves finding the leader for the partition, etc etc. Not hard, but a 
bunch of steps.

I noticed that kafkacat has an option similar to what I'm looking for:
  -e Exit successfully when last message received

Looking at the code, it appears that a FetchRequest returns the 
HighwaterMarkOffset mark for a partition, and the API docs confirm that: 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse

Does the Java high-level consumer expose the HighwaterMarkOffset in any way? I 
looked but I couldn't find such a thing.

Thanks,
-James



Re: Is there a way to know when I've reached the end of a partition (consumed all messages) when using the high-level consumer?

2015-05-11 Thread James Cheng
Thanks everyone.

To answer Charlie's question:

I'm doing some simple stream processing. I have Topics A,B, and C, all using 
log compaction and all recordings having primary keys. The data in Topic A is 
essentially a routing table that tells me which primary keys in Topics B and C 
I should pay attention to. So before I start consuming B and C, I need to have 
all/most of Topic A loaded into a local routing table.  As Topic A is updated, 
then I will continue to update my routing table, and use it to continually 
process events coming from B and C.

Hope that makes sense.

All of the solutions look good. Will, that patch does exactly what I want, but 
I'm not sure I want to patch Kafka right now. I'll keep it in mind. Thanks.

-James

On May 9, 2015, at 10:42 AM, Charlie Knudsen  
wrote:

> Hi James,
> What are you trying to do exactly? If all you are trying to do is monitor
> how far behind a consumer is getting you could use the ConsumerOffsetChecker.
> As described in the link below.
> http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer
> 
> Each message being processed will also have the offset and partition
> attached to it so with that data. I suppose that information plus info from
> a fetch response you could determine this with in an application.
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse
> 
> Does that help?
> 
> 
> On Fri, May 8, 2015 at 6:04 PM, James Cheng  wrote:
> 
>> Hi,
>> 
>> I want to use the high level consumer to read all partitions for a topic,
>> and know when I have reached "the end". I know "the end" might be a little
>> vague, since items keep showing up, but I'm trying to get as close as
>> possible. I know that more messages might show up later, but I want to know
>> when I've received all the items that are currently available in the topic.
>> 
>> Is there a standard/recommended way to do this?
>> 
>> I know one way to do it is to first issue an OffsetRequest for each
>> partition, which would get me the last offset, and then use that
>> information in my high level consumer to detect when I've reached that a
>> message with that offset. Which is exactly what the SimpleConsumer example
>> does (
>> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example).
>> That involves finding the leader for the partition, etc etc. Not hard, but
>> a bunch of steps.
>> 
>> I noticed that kafkacat has an option similar to what I'm looking for:
>>  -e Exit successfully when last message received
>> 
>> Looking at the code, it appears that a FetchRequest returns the
>> HighwaterMarkOffset mark for a partition, and the API docs confirm that:
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse
>> 
>> Does the Java high-level consumer expose the HighwaterMarkOffset in any
>> way? I looked but I couldn't find such a thing.
>> 
>> Thanks,
>> -James
>> 
>> 



Re: Log end offset

2015-05-11 Thread James Cheng
Vamsi,

There is another thread going on right now about this exact topic:

"Is there a way to know when I've reached the end of a partition (consumed all 
messages) when using the high-level consumer?"
http://search-hadoop.com/m/uyzND1Eb3e42NMCWl

-James

On May 10, 2015, at 11:48 PM, Achanta Vamsi Subhash 
 wrote:

> Hi,
> 
> What is the best way for finding out the log end offset for a topic?
> Currently I am using the SimpleConsumer getLastOffset logic mentioned in:
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> 
> But we are running into ClosedChannelException for some of the topics. We
> use Kafka for offset storage and version 0.8.2.1.
> 
> What is the ideal way to compute the topic log end offset?
> 
> -- 
> Regards
> Vamsi Subhash



Re: Is fetching from in-sync replicas possible?

2015-05-27 Thread James Cheng

On May 26, 2015, at 1:44 PM, Joel Koshy  wrote:

>> Apologies if this question has been asked before. If I understand things
>> correctly a client can only fetch from the leader of a partition, not from
>> an (in-sync) replica. I have a use case where it would be very beneficial
>> if it were possible to fetch from a replica instead of just the leader, and
>> I wonder why it is not allowed? Are there any consistency problems with
>> allowing it, for example? Is there any way to configure Kafka to allow it?
> 
> Yes this should be possible.  I don't think there are any consistency
> issues (barring any bugs) since we never expose past the
> high-watermark and the follower HW is strictly <= leader HW. Can you
> file a jira for this?
> 

Wouldn't this allow Kafka to scale to handle a lot more consumer traffic? 
Currently, consumers all have to read from the leader, which means that the 
network/disk bandwidth of a particular leader is the bottleneck. If consumers 
could read from in-sync replicas, then a single node no longer is the 
bottleneck for reads. You could scale out your read capacity as far as you want.

-James


>> The use case is a Kafka cluster running in EC2 across three availability
>> zones.
> 
> Out of curiosity - what's the typical latency (distribution) you see
> between zones?
> 
> Joel



Re: Is fetching from in-sync replicas possible?

2015-05-27 Thread James Cheng

On May 27, 2015, at 11:23 AM, Joel Koshy  wrote:

> That's right - it should not help significantly assuming even
> distribution of leaders and even distribution of partition volume
> (average inbound messages/sec).
> 

Aditya, Joel,

Oh, right, that makes sense. If I had a 10 partition topic across 10 nodes 
where each leader handles 1/10th of the consumer traffic for that topic, I 
could change that and instead have 100 partition topic across 100 nodes, and 
then each leader would only have to handle 1/100th of the consumer traffic for 
that topic.

-James


> Theo's use-case is a bit different though in which you want to avoid
> cross-zone consumer reads especially if you have a high fan-out in
> number of consumers.
> 
> On Wed, May 27, 2015 at 05:56:56PM +, Aditya Auradkar wrote:
>> Is that necessarily the case? On a cluster hosting partitions, assuming the 
>> leaders are evenly distributed, every node should receive a roughly equal 
>> share of the traffic. It does help a lot when the consumer throughput of a 
>> single partition exceeds the capacity of a single leader but at that point 
>> the topic ideally needs more partitions.
>> 
>> Aditya
>> 
>> 
>> From: James Cheng [jch...@tivo.com]
>> Sent: Wednesday, May 27, 2015 10:50 AM
>> To: users@kafka.apache.org
>> Subject: Re: Is fetching from in-sync replicas possible?
>> 
>> On May 26, 2015, at 1:44 PM, Joel Koshy  wrote:
>> 
>>>> Apologies if this question has been asked before. If I understand things
>>>> correctly a client can only fetch from the leader of a partition, not from
>>>> an (in-sync) replica. I have a use case where it would be very beneficial
>>>> if it were possible to fetch from a replica instead of just the leader, and
>>>> I wonder why it is not allowed? Are there any consistency problems with
>>>> allowing it, for example? Is there any way to configure Kafka to allow it?
>>> 
>>> Yes this should be possible.  I don't think there are any consistency
>>> issues (barring any bugs) since we never expose past the
>>> high-watermark and the follower HW is strictly <= leader HW. Can you
>>> file a jira for this?
>>> 
>> 
>> Wouldn't this allow Kafka to scale to handle a lot more consumer traffic? 
>> Currently, consumers all have to read from the leader, which means that the 
>> network/disk bandwidth of a particular leader is the bottleneck. If 
>> consumers could read from in-sync replicas, then a single node no longer is 
>> the bottleneck for reads. You could scale out your read capacity as far as 
>> you want.
>> 
>> -James
>> 
>> 
>>>> The use case is a Kafka cluster running in EC2 across three availability
>>>> zones.
>>> 
>>> Out of curiosity - what's the typical latency (distribution) you see
>>> between zones?
>>> 
>>> Joel
>> 
> 



How to manage the consumer group id?

2015-06-10 Thread James Cheng
Hi,

How are people specifying/persisting/resetting the consumer group identifier 
("group.id") when using the high-level consumer?

I understand how it works. I specify some string and all consumers that use 
that same string will help consume a topic. The partitions will be distributed 
amongst them for consumption. And when they save their offsets, the offsets 
will be saved according to the consumer group. That all makes sense to me.

What I don't understand is the best way to set and persist them, and reset them 
if needed. For example, do I simply hardcode the string in my code? If so, then 
all deployed instances will have the same value (that's good). If I want to 
bring up a test instance of that code, or a new installation, though, then it 
will also share the load (that's bad). 

If I pass in a value to my instances, that lets me have different test and 
production instances of the same code (that's good), but then I have to persist 
my consumer group id somewhere outside of the process (on disk, in zookeeper, 
etc). Which then means I need some way to manage *that* identifier (that's... 
just how it is?).

What if I decide that I want my app to start over? In the case of log-compacted 
streams, I want to throw away any processing I did and start "from the 
beginning". Do I change my consumer group, which effective resets everything? 
Or do I delete my saved offsets, and then resume with the same consumer group? 
The latter is functionally equivalent to the former.

Thanks,
-James



Re: Changing replication factor for an existing topic

2015-06-10 Thread James Cheng
AirBNB's kafkat tool has a "set-replication-factor" option. I've never tried it 
myself.

https://github.com/airbnb/kafkat

-James

> On Jun 10, 2015, at 4:20 PM, Aditya Auradkar  
> wrote:
> 
> The replica list that you specify can be used to increment/decrement the 
> replication factor.
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 
> Aditya
> 
> 
> From: Robin Yamaguchi [ro...@tune.com]
> Sent: Wednesday, June 10, 2015 4:05 PM
> To: users@kafka.apache.org
> Subject: Changing replication factor for an existing topic
> 
> Greetings,
> 
> With Kafka 0.8.2.1, is there a way to either raise or lower the replication
> factor of an existing topic?  Searching through this list, there was
> mention that the tool has yet to be developed.  It doesn't seem possible
> still looking at the replication tools wiki, nor with yahoo's kafka
> manager.  Thought I give this list another shot.
> 
> Thank you,
> Robin



Re: How to manage the consumer group id?

2015-06-10 Thread James Cheng

> On Jun 10, 2015, at 1:26 PM, Todd Palino  wrote:
> 
> For us, group ID is a configuration parameter of the application. So we
> store it in configuration files (generally on disk) and maintain it there
> through our configuration and deployment infrastructure. As you pointed
> out, hard coding the group ID into the application is not usually a good
> pattern.
> 
> If you want to reset, you have a couple choices. One is that you can just
> switch group names and start fresh. Another is that you can shut down the
> consumer and delete the existing consumer group, then restart. You could
> also stop, edit the offsets to set them to something specific (if you need
> to roll back to a specific point, for example), and restart.
> 

Thanks Todd. That helps. The "on disk" storage doesn't work well if you are 
running consumers in ephemeral nodes like EC2 machines, but in that case, I 
guess you would save the group ID in some other data store ("on disk, but 
elsewhere") associated with your "application cluster" rather than any one node 
of the cluster.

I often hear about people saving their offsets using the consumer, and 
monitoring offsets for lag. I don't hear much about people deleting or 
changing/setting offsets by other means. How is it usually done? Are there 
tools to change the offsets, or do people go into zookeeper to change them 
directly? Or, for broker-stored offsets, use the Kafka APIs?

-James

> -Todd
> 
> 
> On Wed, Jun 10, 2015 at 1:20 PM, James Cheng  wrote:
> 
>> Hi,
>> 
>> How are people specifying/persisting/resetting the consumer group
>> identifier ("group.id") when using the high-level consumer?
>> 
>> I understand how it works. I specify some string and all consumers that
>> use that same string will help consume a topic. The partitions will be
>> distributed amongst them for consumption. And when they save their offsets,
>> the offsets will be saved according to the consumer group. That all makes
>> sense to me.
>> 
>> What I don't understand is the best way to set and persist them, and reset
>> them if needed. For example, do I simply hardcode the string in my code? If
>> so, then all deployed instances will have the same value (that's good). If
>> I want to bring up a test instance of that code, or a new installation,
>> though, then it will also share the load (that's bad).
>> 
>> If I pass in a value to my instances, that lets me have different test and
>> production instances of the same code (that's good), but then I have to
>> persist my consumer group id somewhere outside of the process (on disk, in
>> zookeeper, etc). Which then means I need some way to manage *that*
>> identifier (that's... just how it is?).
>> 
>> What if I decide that I want my app to start over? In the case of
>> log-compacted streams, I want to throw away any processing I did and start
>> "from the beginning". Do I change my consumer group, which effective resets
>> everything? Or do I delete my saved offsets, and then resume with the same
>> consumer group? The latter is functionally equivalent to the former.
>> 
>> Thanks,
>> -James
>> 
>> 



Re: Using Kafka as a persistent store

2015-07-13 Thread James Cheng
For what it's worth, I did something similar to Rad's suggestion of 
"cold-storage" to add long-term archiving when using Amazon Kinesis. Kinesis is 
also a message bus, but only has a 24 hour retention window.

I wrote a Kinesis consumer that would take all messages from Kinesis and save 
them into S3. I stored them in S3 in such a way that the structure mirrors the 
original Kinesis stream, and all message metadata is preserved (message offsets 
and primary keys, for example).

This means that I can write a "consumer" that would consume from S3 files in 
the same way that it would consume from the Kinesis stream itself. And the data 
is structured such that when you are done reading from S3, you can connect to 
the Kinesis stream at the point where the S3 archive left off.

This effectively allowed me to add a configurable retention period when 
consuming from Kinesis.

-James

On Jul 13, 2015, at 11:45 AM, Tim Smith  wrote:

> I have had a similar issue where I wanted a single source of truth between
> Search and HDFS. First, if you zoom out a little, eventually you are going
> to have some compute engine(s) process the data. If you store it in a
> compute neutral tier like kafka then you will need to suck the data out at
> runtime and stage it for the compute engine to use. So pick your poison,
> process at ingest and store multiple copies of data, one per compute
> engine, OR store in a neutral store and process at runtime. I am not saying
> one is better than the other but that's how I see the trade-off so
> depending on your use cases, YMMV.
> 
> What I do is:
> - store raw data into kafka
> - use spark streaming to transform data to JSON and post it back to kafka
> - Hang multiple data stores off kafka that ingest the JSON
> - Not do any other transformations in the "consumer" stores and store the
> copy as immutable event
> 
> So I do have multiple copies (one per compute tier) but they all look the
> same.
> 
> Unless different compute engines, natively start to use a common data
> storage format, I don't see how one could get away from storing multiple
> copies. Primarily, I see Lucene based products have their format, the
> Hadoop ecosystem seems congregating around Parquet and then the NoSQL
> players have their formats (one per each product).
> 
> My 2 cents worth :)
> 
> 
> 
> On Mon, Jul 13, 2015 at 10:35 AM, Daniel Schierbeck <
> daniel.schierb...@gmail.com> wrote:
> 
>> Am I correct in assuming that Kafka will only retain a file handle for the
>> last segment of the log? If the number of handles grows unbounded, then it
>> would be an issue. But I plan on writing to this topic continuously anyway,
>> so not separating data into cold and hot storage is the entire point.
>> 
>> Daniel Schierbeck
>> 
>>> On 13. jul. 2015, at 15.41, Scott Thibault <
>> scott.thiba...@multiscalehn.com> wrote:
>>> 
>>> We've tried to use Kafka not as a persistent store, but as a long-term
>>> archival store.  An outstanding issue we've had with that is that the
>>> broker holds on to an open file handle on every file in the log!  The
>> other
>>> issue we've had is when you create a long-term archival log on shared
>>> storage, you can't simply access that data from another cluster b/c of
>> meta
>>> data being stored in zookeeper rather than in the log.
>>> 
>>> --Scott Thibault
>>> 
>>> 
>>> On Mon, Jul 13, 2015 at 4:44 AM, Daniel Schierbeck <
>>> daniel.schierb...@gmail.com> wrote:
>>> 
 Would it be possible to document how to configure Kafka to never delete
 messages in a topic? It took a good while to figure this out, and I see
>> it
 as an important use case for Kafka.
 
 On Sun, Jul 12, 2015 at 3:02 PM Daniel Schierbeck <
 daniel.schierb...@gmail.com> wrote:
 
> 
>> On 10. jul. 2015, at 23.03, Jay Kreps  wrote:
>> 
>> If I recall correctly, setting log.retention.ms and
 log.retention.bytes
> to
>> -1 disables both.
> 
> Thanks!
> 
>> 
>> On Fri, Jul 10, 2015 at 1:55 PM, Daniel Schierbeck <
>> daniel.schierb...@gmail.com> wrote:
>> 
>>> 
 On 10. jul. 2015, at 15.16, Shayne S  wrote:
 
 There are two ways you can configure your topics, log compaction and
> with
 no cleaning. The choice depends on your use case. Are the records
>>> uniquely
 identifiable and will they receive updates? Then log compaction is
 the
>>> way
 to go. If they are truly read only, you can go without log
 compaction.
>>> 
>>> I'd rather be free to use the key for partitioning, and the records
 are
>>> immutable — they're event records — so disabling compaction
>> altogether
>>> would be preferable. How is that accomplished?
 
 We have a small processes which consume a topic and perform upserts
 to
>>> our
 various database engines. It's easy to change how it all works and
> simply
 consume the single source of truth 

New producer and ordering of Callbacks when sending to multiple partitions

2015-07-13 Thread James Cheng
Hi,

I'm trying to understand the new producer, and the order in which the Callbacks 
will be called.

From my understanding, records are batched up per partition. So all records 
destined for a specific partition will be sent in order, and that means that 
their callbacks will be called in order.

What about message batches that cover multiple partitions? E.g. If I send three 
messages to three partitions A, B, and C, in the following order:

A1 A2 A3 B1 B2 B3 C1 C2 D3

Then is it possible that messages B1 B2 B3 will be sent prior to A1 A2 A3? 
Which means the callbacks for B1 B2 B3 will also be called prior to the ones 
from A1 A2 A3?

Thanks,
-James



Re: New producer in production

2015-07-17 Thread James Cheng
http://kafka.apache.org/documentation.html, Section 3.4.

> 3.4 New Producer Configs
> 
> We are working on a replacement for our existing producer. The code is 
> available in trunk now and can be considered beta quality. Below is the 
> configuration for the new producer.

Sivananda might have seen it elsewhere, but this is where I found it.

-James

On Jul 17, 2015, at 9:49 AM, Jay Kreps  wrote:

> Hey Sivananda,
> 
> That's actually no longer true and likely a documentation bug. Where did
> you see that?
> 
> -Jay
> 
> On Fri, Jul 17, 2015 at 9:35 AM, Sivananda Reddy 
> wrote:
> 
>> Hi,
>> 
>> Kafka document ion says that the new producer is in Beta state, how safe is
>> it to use the new producer in production?. This is the first time I am
>> using Kafka for my application messaging needs. Please let me know.
>> 
>> Thank you,
>> Siva.
>> 



Re: New producer in production

2015-07-17 Thread James Cheng
Be aware that the old producer and new producer have different partitioning 
algorithms:
http://mail-archives.apache.org/mod_mbox/kafka-users/201504.mbox/%3C4A6E5F2C-232E-4288-8F9F-3F9D9AE05718%40tivo.com%3E

This could bite you if you rely on key-based partitioning and are switching 
from the old producer to the new producer.

Siva, this is less important for you, since you are starting to use kafka for 
the first time. And this is not important at all if you provide your own 
partitioning algorithm.

-James

On Jul 17, 2015, at 1:23 PM, Sivananda Reddy  wrote:

> Hi Jay,
> 
> I found it here:
> http://kafka.apache.org/documentation.html#newproducerconfigs, the same
> link is reported by James.
> 
> @Joel: Thanks a lot for the info, I will use new producer
> 
> Regards,
> Siva.
> 
> On Fri, Jul 17, 2015 at 12:02 PM, James Cheng  wrote:
> 
>> http://kafka.apache.org/documentation.html, Section 3.4.
>> 
>>> 3.4 New Producer Configs
>>> 
>>> We are working on a replacement for our existing producer. The code is
>> available in trunk now and can be considered beta quality. Below is the
>> configuration for the new producer.
>> 
>> Sivananda might have seen it elsewhere, but this is where I found it.
>> 
>> -James
>> 
>> On Jul 17, 2015, at 9:49 AM, Jay Kreps  wrote:
>> 
>>> Hey Sivananda,
>>> 
>>> That's actually no longer true and likely a documentation bug. Where did
>>> you see that?
>>> 
>>> -Jay
>>> 
>>> On Fri, Jul 17, 2015 at 9:35 AM, Sivananda Reddy <
>> sivananda2...@gmail.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Kafka document ion says that the new producer is in Beta state, how
>> safe is
>>>> it to use the new producer in production?. This is the first time I am
>>>> using Kafka for my application messaging needs. Please let me know.
>>>> 
>>>> Thank you,
>>>> Siva.
>>>> 
>> 
>> 



Consuming from Kafka but don't need to save offsets

2015-07-20 Thread James Cheng
Hi,

I have a web service that serves up some data that it obtains from a kafka 
topic. When the process starts up, it wants to load the entire kafka topic into 
memory, and serve the data up from an in-memory hashtable. The data in the 
topic has primary keys and is log compacted, and so the total dataset will be 
small enough to fit in memory. My web service will only start serving up data 
when the entire topic is loaded. (And for that, 
https://issues.apache.org/jira/browse/KAFKA-1977 would be super useful).

I am only storing this data in memory. In the event of process death or 
restart, my in-memory state is gone, and so I will always want to rebuild it by 
again consuming the topic from the earliest offset. I will never need to 
checkpoint my offsets.

Also, I will have N instances of this application, each one needing to consume 
the entire topic. This is how I plan to do horizontal scaling of my web service.

I would like to use the high level consumer, so that I don't need to manually 
discover which broker is the leader, and so that I don't have to handle leader 
rebalancing.

A couple questions:
1) Does this use case make sense? Is this pattern used by anyone else? I like 
it because it makes my web service completely stateless.
2) In order to make each instance consume all partitions of the topic, I need 
each consumer group id to be unique to that process. So I was thinking of just 
using a UUID or something similar. What is the "cost" of creating a new 
consumer group id? If I am creating a new one every time I start my 
application, would I be cluttering up zookeeper or the __consumer_offsets 
topic? Note there will only every be N instances of my application running. 
Since I never will need to checkpoint my offsets, does that affect my question 
about "cluttering up" zookeeper/kafka? Are old consumer groups ever cleaned up 
out of zookeeper or the __consumer_offsets topic?
3) Are the stored offsets used for any other reason, aside from at startup of a 
new consumer? Are offsets used after rebalancing when partition leaders change 
due to broker failure? I know that offsets can be used for Burrow-like 
monitoring.
4) Since I don't need for support checkpointing, another option is to use the 
SimpleConsumer. The sample code at 
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 
looks fairly comprehensive. It handles discovery of the partition leader, and 
handles leader rebalancing. Are there any other situations that I should be 
aware of before relying on that sample code?
5) Will any of this change when the new consumer comes out? Will the 
SimpleConsumer still exist when the new consumer comes out?

Thanks,
-James



Re: New consumer - poll/seek javadoc confusing, need clarification

2015-07-21 Thread James Cheng

> On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava  wrote:
> 
> On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić  wrote:
> 
>> Hello Apache Kafka community,
>> 
>> I find new consumer poll/seek javadoc a bit confusing. Just by reading docs
>> I'm not sure what the outcome will be, what is expected in following
>> scenario:
>> 
>> - kafkaConsumer is instantiated with auto-commit off
>> - kafkaConsumer.subscribe(someTopic)
>> - kafkaConsumer.position is called for every TopicPartition HLC is actively
>> subscribed on
>> 
>> and then when doing multiple poll calls in succession (without calling
>> commit), does seek have to be called in between poll calls to position HLC
>> to skip what was read in previous poll, or does HLC keep that state
>> (position after poll) in memory, so that next poll (without seek in between
>> two poll calls) will continue from where last poll stopped?
>> 
> 
> The position is tracked in-memory within the consumer, so as long as there
> isn't a consumer rebalance, consumption will just proceed with subsequent
> messages (i.e. the behavior I think most people would find intuitive).
> However, if a rebalance occurs (another consumer instance joins the group
> or some leave), then a partition may be assigned to an different consumer
> instance that has no idea about the current position and will restart based
> on the offset reset setting (because attempting to fetch the committed
> offset will fail since no offsets have been committed).
> 

Ewen,

What happens if there is a broker failure and a new broker becomes the 
partition leader? Does the high level consumer start listening to the new 
partition leader at the in-memory position, or does it restart based on saved 
offsets?

Thanks,
-James

> -Ewen
> 
> 
>> Could be it's just me not understanding this from javadoc. If not, maybe
>> javadoc can be improved to make this (even) more obvious.
>> 
>> Kind regards,
>> Stevo Slavic.
>> 
> 
> 
> 
> -- 
> Thanks,
> Ewen



Checkpointing with custom metadata

2015-08-03 Thread James Cheng
According to 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest,
 we can store custom metadata with our checkpoints. It looks like the high 
level consumer does not support committing offsets with metadata, and that in 
order to checkpoint with custom metadata, we have to issue the 
OffsetCommitRequest ourselves. Is that correct?

Thanks,
-James



Re: Checkpointing with custom metadata

2015-08-03 Thread James Cheng
Nice new email address, Gwen. :)

On Aug 3, 2015, at 3:17 PM, Gwen Shapira  wrote:

> You are correct. You can see that ZookeeperConsumerConnector is hardcoded
> with null metadata.
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L310
> 
> More interesting, it looks like the Metadata is not exposed in the new
> KafkaConsumer either.
> 
> Mind sharing what did you plan to use it for? this will help us figure out
> how to expose it :)
> 

I'm juggling around a couple designs on what I'm trying to do, so it may turn 
out that I actually don't need it.

My generic answer is, I have some application state related to the processing 
of a topic, and I wanted to store it somewhere persistent that will survive 
across process death and disk failure. So storing it with the offset seemed 
like a nice solution. I could alternatively store it in a standalone kafka 
topic instead.

The more detailed answer: I'm doing a time-based merge sort of 3 topics (A, B, 
and C) and outputtiing the results into a new output topic (let's call it 
"sorted-topic"). Except that during the initial creation of "sorted-topic", and 
I want all of topic A to be output to sorted-topic first, and then followed by 
an on-going merge sort of topics B, C, and any updates to A that come along.

I was trying to handle the situation of what happens if I crash when initially 
copying topic A into sorted-topic. And I was thinking that I could save some 
metadata in my topic A checkpoint that says "still doing initial copy of A". So 
that way, when I start up next time, I would know to continue copying topic A 
to the output. Once I have finished copying all of A to sorted-topic, that I 
would store "finished doing initial copy of A" into my checkpoint, and that 
upon restart, I would check that and know to start doing the merge sort of A B 
C.

I have a couple other designs that seem cleaner, tho, so I might not actually 
need it.

-James

> Gwen
> 
> 
> On Mon, Aug 3, 2015 at 1:52 PM, James Cheng  wrote:
> 
>> According to
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest,
>> we can store custom metadata with our checkpoints. It looks like the high
>> level consumer does not support committing offsets with metadata, and that
>> in order to checkpoint with custom metadata, we have to issue the
>> OffsetCommitRequest ourselves. Is that correct?
>> 
>> Thanks,
>> -James
>> 
>> 



Documentation typo for offsets.topic.replication.factor ?

2015-08-05 Thread James Cheng
Hi,

My kafka cluster has a __consumer_offsets topic with 50 partitions (the default 
for offsets.topic.num.partitions) but with a replication factor of just 1 (the 
default for offsets.topic.replication.factor should be 3).

From the docs http://kafka.apache.org/documentation.html:

offsets.topic.replication.factor3   The replication factor for the 
offset commit topic. A higher setting (e.g., three or four) is recommended in 
order to ensure higher availability. If the offsets topic is created when fewer 
brokers than the replication factor then the offsets topic will be created with 
fewer replicas.


I'm guessing there's a typo there? I'm guessing it should be:

If the offsets topic is created when fewer brokers than the replication factor 
[are active], then the offsets topic will be created with fewer replicas.

Or something along those lines?

Thanks,
-James




Re: Log Cleaner Thread Stops

2015-09-23 Thread James Cheng

On Sep 18, 2015, at 10:25 AM, Todd Palino  wrote:

> I think the last major issue with log compaction (that it couldn't handle
> compressed messages) was committed as part of
> https://issues.apache.org/jira/browse/KAFKA-1374 in August, but I'm not
> certain what version this will end up in. It may be part of 0.8.2.2.
> 
> Regardless, you'll probably be OK now. We've found that once we clean this
> issue up once it doesn't appear to recur. As long as you're not writing in
> compressed messages to a log compacted topic (and that won't happen with
> __consumer_offsets, as it's managed by the brokers themselves - it would
> only be if you were using other log compacted topics), you're likely in the
> clear now.
> 

Todd,

If I understand your description of the problem, you are saying that enabling 
log compaction on a topic with compressed messages can (will?) cause the log 
cleaner to crash when it encounters those compressed messages. And the death of 
the cleaner thread will prevent log compaction from running on other topics, 
even ones that don't have compressed messages.

That means if we have a cluster where we want to use log compaction on *any* 
topic, we need to either:
1) apply https://issues.apache.org/jira/browse/KAFKA-1374 (or upgrade to some 
version it is applied)
OR
2) make sure that we don't use compressed messages in *any* topic that has log 
compaction turned on.

And, more specifically, if we want to make use of __consumer_offsets, then we 
cannot use compressed messages in any topic that has compaction turned on.

Is that right?
-James

> -Todd
> 
> 
> On Fri, Sep 18, 2015 at 9:54 AM, John Holland <
> john.holl...@objectpartners.com> wrote:
> 
>> Thanks!
>> 
>> I did what you suggested and it worked except it was necessary for me to
>> remove the cleaner-offset-checkpoint file from the data directory and
>> restart the servers.  The log indicates all is well.
>> 
>> Do you know what version the fix to this will be in? I'm not looking
>> forward to dealing with this on a reoccurring basis.
>> 
>> -John
>> 
>> On Fri, Sep 18, 2015 at 8:48 AM Todd Palino  wrote:
>> 
>>> Yes, this is a known concern, and it should be fixed with recent commits.
>>> In the meantime, you'll have to do a little manual cleanup.
>>> 
>>> The problem you're running into is a corrupt message in the offsets
>> topic.
>>> We've seen this a lot. What you need to do is set the topic configuration
>>> to remove the cleanup.policy config, and set retention.ms and segment.ms
>>> to
>>> something reasonably low. I suggest using a value of 3 or 4 times your
>>> commit interval for consumers. Then wait until the log segments are
>> reaped
>>> (wait twice as long as the retention.ms you chose, to be safe). Once
>> this
>>> is done, you can set the topic configuration back the way it was (remove
>>> segment.ms and retention.ms configs, and set cleanup.policy=compact).
>>> Lastly, you'll need to do a rolling bounce of the cluster to restart the
>>> brokers (which restarts the log cleaner threads). Technically, you only
>>> need to restart brokers where the threads have died, but it's easier to
>>> just restart all of them.
>>> 
>>> Keep in mind that when you do this, you are deleting old offsets. If your
>>> consumers are all live and healthy, this shouldn't be a problem because
>>> they will just continue to commit their offsets properly. But if you have
>>> an offline consumer, you'll lose the committed offsets by doing this.
>>> 
>>> -Todd
>>> 
>>> 
>>> On Fri, Sep 18, 2015 at 5:31 AM, John Holland <
>>> john.holl...@objectpartners.com> wrote:
>>> 
 I've been experiencing this issue across several of our environments
>> ever
 since we enabled the log cleaner for the __consumer_offsets topic.
 
 We are on version 0.8.2.1 of kafka, using the new producer.  All of our
 consumers are set to commit to kafka only.
 
 Below is the stack trace in the log I've encountered across several
 different clusters.  A simple restart of kafka will allow compaction to
 continue on all of the other partitions but the incorrect one will
>> always
 fail.
 
 Here are the values for it from the kafka-topics --describe command:
 
 Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3
 Configs:segment.bytes=104857600,cleanup.policy=compact
 
 Are there any recommendations on how to prevent this and the best way
>> to
 recover from this exception?  This is causing disk space to fill up
>>> quickly
 on the node.
 
 I did see an open issue that seems very similar to this
 https://issues.apache.org/jira/browse/KAFKA-1641 but this is the
 __consumer_offsets topic which I have not had any part in setting up
>> nor
 producing to.
 
 [2015-09-18 02:57:25,520] INFO Cleaner 0: Beginning cleaning of log
 __consumer_offsets-17. (kafka.log.LogCleaner)
 [2015-09-18 02:57:25,520] INFO Cleaner 0: Building offset map for
 __consumer_offsets-17

Re: Log Cleaner Thread Stops

2015-09-24 Thread James Cheng

> On Sep 24, 2015, at 8:11 PM, Todd Palino  wrote:
>
> Well, in general you can't currently use compressed messages in any topic
> that has compaction turned on regardless of whether or not you are using
> Kafka-committed offsets. The log compaction thread will die either way.
> There's only one compression thread for the broker that runs on all topics
> that use compaction.
>
> Jason, to address your question, it's probably wise to wait for now.
> Zookeeper offsets work, so unless it's broke, don't fix it for now. We're
> using Kafka-committed offsets at LinkedIn for our mirror makers and our
> auditor application (both of which are considered infrastructure
> applications for Kafka), but we're not encouraging other internal users to
> switch over just yet.
>

Burrow depends on kafka-commited offsets, doesn’t it? I guess that means Burrow 
is only being used to monitor your mirror makers and auditor application, then?

-James

> -Todd
>
>
> On Wed, Sep 23, 2015 at 3:21 PM, James Cheng  wrote:
>
>>
>> On Sep 18, 2015, at 10:25 AM, Todd Palino  wrote:
>>
>>> I think the last major issue with log compaction (that it couldn't handle
>>> compressed messages) was committed as part of
>>> https://issues.apache.org/jira/browse/KAFKA-1374 in August, but I'm not
>>> certain what version this will end up in. It may be part of 0.8.2.2.
>>>
>>> Regardless, you'll probably be OK now. We've found that once we clean
>> this
>>> issue up once it doesn't appear to recur. As long as you're not writing
>> in
>>> compressed messages to a log compacted topic (and that won't happen with
>>> __consumer_offsets, as it's managed by the brokers themselves - it would
>>> only be if you were using other log compacted topics), you're likely in
>> the
>>> clear now.
>>>
>>
>> Todd,
>>
>> If I understand your description of the problem, you are saying that
>> enabling log compaction on a topic with compressed messages can (will?)
>> cause the log cleaner to crash when it encounters those compressed
>> messages. And the death of the cleaner thread will prevent log compaction
>> from running on other topics, even ones that don't have compressed messages.
>>
>> That means if we have a cluster where we want to use log compaction on
>> *any* topic, we need to either:
>> 1) apply https://issues.apache.org/jira/browse/KAFKA-1374 (or upgrade to
>> some version it is applied)
>> OR
>> 2) make sure that we don't use compressed messages in *any* topic that has
>> log compaction turned on.
>>
>> And, more specifically, if we want to make use of __consumer_offsets, then
>> we cannot use compressed messages in any topic that has compaction turned
>> on.
>>
>> Is that right?
>> -James
>>
>>> -Todd
>>>
>>>
>>> On Fri, Sep 18, 2015 at 9:54 AM, John Holland <
>>> john.holl...@objectpartners.com> wrote:
>>>
>>>> Thanks!
>>>>
>>>> I did what you suggested and it worked except it was necessary for me to
>>>> remove the cleaner-offset-checkpoint file from the data directory and
>>>> restart the servers.  The log indicates all is well.
>>>>
>>>> Do you know what version the fix to this will be in? I'm not looking
>>>> forward to dealing with this on a reoccurring basis.
>>>>
>>>> -John
>>>>
>>>> On Fri, Sep 18, 2015 at 8:48 AM Todd Palino  wrote:
>>>>
>>>>> Yes, this is a known concern, and it should be fixed with recent
>> commits.
>>>>> In the meantime, you'll have to do a little manual cleanup.
>>>>>
>>>>> The problem you're running into is a corrupt message in the offsets
>>>> topic.
>>>>> We've seen this a lot. What you need to do is set the topic
>> configuration
>>>>> to remove the cleanup.policy config, and set retention.ms and
>> segment.ms
>>>>> to
>>>>> something reasonably low. I suggest using a value of 3 or 4 times your
>>>>> commit interval for consumers. Then wait until the log segments are
>>>> reaped
>>>>> (wait twice as long as the retention.ms you chose, to be safe). Once
>>>> this
>>>>> is done, you can set the topic configuration back the way it was
>> (remove
>>>>> segment.ms and retention.ms configs, and set cleanup.policy=compact).
&

Re: custom message handlers?

2015-09-28 Thread James Cheng

> On Sep 28, 2015, at 12:47 PM, Doug Tomm  wrote:
>
> hello,
>
> i've noticed the addition of the custom message handler feature in the latest 
> code; a very useful feature.  in what release will it be available, and when 
> might that be?  at present i am building kafka from source to get this 
> feature.
>

Doug, are you asking about custom message handling in Mirror Maker?

-James

> many thanks,
> doug
>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Dealing with large messages

2015-10-05 Thread James Cheng
Here’s an article that Gwen wrote earlier this year on handling large messages 
in Kafka.

http://ingest.tips/2015/01/21/handling-large-messages-kafka/

-James

> On Oct 5, 2015, at 11:20 AM, Pradeep Gollakota  wrote:
>
> Fellow Kafkaers,
>
> We have a pretty heavyweight legacy event logging system for batch
> processing. We're now sending the events into Kafka now for realtime
> analytics. But we have some pretty large messages (> 40 MB).
>
> I'm wondering if any of you have use cases where you have to send large
> messages to Kafka and how you're dealing with them.
>
> Thanks,
> Pradeep




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: New consumer client compatible with old broker

2015-10-15 Thread James Cheng

> On Oct 15, 2015, at 11:29 AM, tao xiao  wrote:
>
> Hi team,
>
> Does new consumer client (the one in trunk) work with 0.8.2.x broker? I am
> planning to use the new consumer in our development but don't want to
> upgrade the broker to the latest. is it possible to do that?

Tao,

I recently tried using the console-consumer from trunk to read from a 0.8.2 
broker. It did not work. I tried the console-consumer both in normal mode 
(which uses old consumer) as well as --new-consumer mode. Neither of them 
worked.

https://twitter.com/lorax_james/status/652607463057829888

-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Where is replication factor stored?

2015-10-16 Thread James Cheng
Hi,

Where is the replication factor for a topic stored? It isn't listed at 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper.
 But the kafka-topics --describe command returns something. Where is it finding 
that?

Thanks,
-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Where is replication factor stored?

2015-10-16 Thread James Cheng

> On Oct 16, 2015, at 1:19 PM, Guozhang Wang  wrote:
>
> Replication factor is stored as topic configs that are introduced since
> 0.8.1, you can find it in the wiki you mentioned.
>

Ah, I didn't notice the /config section.

But it still doesn't show the replication factor.

[zk: localhost:2181(CONNECTED) 3] get /config/topics/__consumer_offsets
{"version":1,"config":{"segment.bytes":"104857600","cleanup.policy":"compact"}}
cZxid = 0xc017a
ctime = Wed Aug 05 22:48:12 UTC 2015
mZxid = 0xc017a
mtime = Wed Aug 05 22:48:12 UTC 2015
pZxid = 0xc017a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 79
numChildren = 0

I tried that for a number of different topics, and none of them have it.

-James


> Guozhang
>
> On Fri, Oct 16, 2015 at 12:33 PM, James Cheng  wrote:
>
>> Hi,
>>
>> Where is the replication factor for a topic stored? It isn't listed at
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper.
>> But the kafka-topics --describe command returns something. Where is it
>> finding that?
>>
>> Thanks,
>> -James
>>
>>
>> 
>>
>> This email and any attachments may contain confidential and privileged
>> material for the sole use of the intended recipient. Any review, copying,
>> or distribution of this email (or any attachments) by others is prohibited.
>> If you are not the intended recipient, please contact the sender
>> immediately and permanently delete this email and any attachments. No
>> employee or agent of TiVo Inc. is authorized to conclude any binding
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>> Inc. may only be made by a signed written agreement.
>>
>
>
>
> --
> -- Guozhang




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


0.8.2 high level consumer with one-time-use group.id's?

2015-12-15 Thread James Cheng
When using the 0.8.2 high level consumer, what is the impact of creating many 
one-time use groupIds and checkpointing offsets using those?

I have a use case where upon every boot, I want to consume an entire topic from 
the very beginning, all partitions. We are using the high level consumer for 
convenience in handling leader discovery and rebalancing, but we do not need 
need consumer groups functionality.

We do not need checkpointing of offsets to allow continuing after a restart of 
our application, since we want to re-consume the stream upon restarts. However, 
it appears that if you do *not* checkpoint, then when there is an intermittent 
disconnect, the consumer will restart at the beginning of the topic. I haven't 
yet traced down why this happens.

We were thinking of simply creating a new consumer group id upon every reboot, 
but this seems messy, leaving around a lot of unused consumer group ids. A 
couple questions:

1) What resources does a groupId use, when it is active (a consumer using it) 
and when it is inactive (no consumers using it)?

The only resources I can identify are:
* kafka/zookeeper using it for group membership (only when the group is active)
* disk storage for most recent offset in zookeeper (only the most recent is 
stored per partition)
* disk storage for all offsets in kafka (all checkpoints stored, but there is 
log compaction)
* in-memory storage for most recent offset in kafka, for lookups.

2) Are old non-active groupId's ever deleted?

Thanks,
-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: kafka-connect-jdbc: ids, timestamps, and transactions

2015-12-18 Thread James Cheng
Mark, what database are you using?

If you are using MySQL...



There is a not-yet-finished Kafka MySQL Connector at 
https://github.com/wushujames/kafka-mysql-connector. It tails the MySQL binlog, 
and so will handle the situation you describe.

But, as I mentioned, I haven't finished it yet.

If you are using MySQL and don't specifically need/want Kafka Connect, then 
there are a bunch of other options. There is a list of them at 
https://github.com/wushujames/mysql-cdc-projects. But, I'd recommend using the 
Kafka Connect framework, since it was built for this exact purpose.



-James

> On Dec 18, 2015, at 12:08 PM, Mark Drago  wrote:
>
> Ewen,
>
> Thanks for the reply.  We'll proceed while keeping all of your points in
> mind.  I looked around for a more focused forum for the jdbc connector
> before posting here but didn't come across the confluent-platform group.
> I'll direct any more questions about the jdbc connector there.  I'll also
> close the github issue with a link to this thread.
>
> Thanks again,
> Mark.
>
> On Wed, Dec 16, 2015 at 9:51 PM Ewen Cheslack-Postava 
> wrote:
>
>> Mark,
>>
>> There are definitely limitations to using JDBC for change data capture.
>> Using a database-specific implementation, especially if you can read
>> directly off the database's log, will be able to handle more situations
>> like this. Cases like the one you describe are difficult to address
>> efficiently working only with simple queries.
>>
>> The JDBC connector offers a few different modes for handling incremental
>> queries. One of them uses both a timestamp and a unique ID, which will be
>> more robust to issues like these. However, even with both, you can still
>> come up with variants that can cause issues like the one you describe. You
>> also have the option of using a custom query which might help if you can do
>> something smarter by making assumptions about your table, but for now
>> that's pretty limited for constructing incremental queries since the
>> connector doesn't provide a way to track offset columns with custom
>> queries. I'd like to improve the support for this in the future, but at
>> some point it starts making sense to look at database-specific connectors.
>>
>> (By the way, this gets even messier once you start thinking about the
>> variety of different isolation levels people may be using...)
>>
>> -Ewen
>>
>> P.S. Where to ask these questions is a bit confusing since Connect is part
>> of Kafka. In general, for specific connectors I'd suggest asking on the
>> corresponding mailing list for the project, which in the case of the JDBC
>> connector would be the Confluent Platform mailing list here:
>> https://groups.google.com/forum/#!forum/confluent-platform
>>
>> On Wed, Dec 16, 2015 at 5:27 AM, Mark Drago  wrote:
>>
>>> I had asked this in a github issue but I'm reposting here to try and get
>> an
>>> answer from a wider audience.
>>>
>>> Has any thought gone into how kafka-connect-jdbc will be impacted by SQL
>>> transactions committing IDs and timestamps out-of-order?  Let me give an
>>> example with two connections.
>>>
>>> 1: begin transaction
>>> 1: insert (get id 1)
>>> 2: begin transaction
>>> 2: insert (get id 2)
>>> 2: commit (recording id 2)
>>> kafka-connect-jdbc runs and thinks it has handled everything through id 2
>>> 1: commit (recording id 1)
>>>
>>> This would result in kafka-connect-jdbc missing id 1. The same thing
>> could
>>> happen with timestamps. I've read through some of the kafka-connect-jdbc
>>> code and I think it may be susceptible to this problem, but I haven't run
>>> it or verified that it would be an issue. Has this come up before? Are
>>> there plans to deal with this situation?
>>>
>>> Obviously something like bottled-water for postgresql would handle this
>>> nicely as it would get the changes once they're committed.
>>>
>>>
>>> Thanks for any insight,
>>>
>>> Mark.
>>>
>>>
>>> Original github issue:
>>> https://github.com/confluentinc/kafka-connect-jdbc/issues/27
>>>
>>
>>
>>
>> --
>> Thanks,
>> Ewen
>>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: how to reset kafka offset in zookeeper

2015-12-19 Thread James Cheng
This page describes what Kafka stores in Zookeeper:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

It looks like the info for a particular consumer groupId is stored at:

/consumers//

According to 
https://community.cloudera.com/t5/Cloudera-Labs/Kafka-Parcels/td-p/20392, 
Cloudera's default config is to put a zookeeper chroot of "/kafka"

So you might have to look at 

/kafka/consumers/

-James


> On Dec 19, 2015, at 3:14 PM, Todd Palino  wrote:
> 
> There’s no simple command. You’ll need to use either zookeeper-shell.sh or
> zkCli.sh or something similar that lets you explore and edit Zookeeper and
> do a recursive delete on the group name in the consumers tree. I’m not sure
> how Cloudera’s interface differs, however, or if they provide a separate
> tool for deleting a consumer group.
> 
> -Todd
> 
> 
> On Sat, Dec 19, 2015 at 11:34 AM, Akhilesh Pathodia <
> pathodia.akhil...@gmail.com> wrote:
> 
>> What is the command  to delete  group from zookeeper? I dont find
>> /consumer/ directory? I am using cloudera, is there any place on cloudera
>> manager where I can delete the group?
>> 
>> Thanks
>> 
>> On Sat, Dec 19, 2015 at 11:47 PM, Todd Palino  wrote:
>> 
>>> If what you want to do is reset to smallest, all you need to do is stop
>> the
>>> consumer, delete the group from Zookeeper, and restart the consumer. It
>>> will automatically create the group again.
>>> 
>>> You only need to export the offsets first if you later need to reset to
>>> where you were in the partitions.
>>> 
>>> -Todd
>>> 
>>> On Saturday, December 19, 2015, Akhilesh Pathodia <
>>> pathodia.akhil...@gmail.com> wrote:
>>> 
 What is the process for deleting the consumer group from zookeeper?
>>> Should
 I export offset, delete and then import?
 
 Thanks,
 Akhilesh
 
 On Fri, Dec 18, 2015 at 11:32 PM, Todd Palino >>> > wrote:
 
> Yes, that’s right. It’s just work for no real gain :)
> 
> -Todd
> 
> On Fri, Dec 18, 2015 at 9:38 AM, Marko Bonaći <
>>> marko.bon...@sematext.com
 >
> wrote:
> 
>> Hmm, I guess you're right Tod :)
>> Just to confirm, you meant that, while you're changing the exported
 file
> it
>> might happen that one of the segment files becomes eligible for
>>> cleanup
> by
>> retention, which would then make the imported offsets out of range?
>> 
>> Marko Bonaći
>> Monitoring | Alerting | Anomaly Detection | Centralized Log
>>> Management
>> Solr & Elasticsearch Support
>> Sematext  | Contact
>> 
>> 
>> On Fri, Dec 18, 2015 at 6:29 PM, Todd Palino >>> > wrote:
>> 
>>> That works if you want to set to an arbitrary offset, Marko.
>>> However
 in
>> the
>>> case the OP described, wanting to reset to smallest, it is better
>>> to
> just
>>> delete the consumer group and start the consumer with
 auto.offset.reset
>> set
>>> to smallest. The reason is that while you can pull the current
 smallest
>>> offsets from the brokers and set them in Zookeeper for the
>>> consumer,
 by
>> the
>>> time you do that the smallest offset is likely no longer valid.
>>> This
>> means
>>> you’re going to resort to the offset reset logic anyways.
>>> 
>>> -Todd
>>> 
>>> 
>>> On Fri, Dec 18, 2015 at 7:10 AM, Marko Bonaći <
> marko.bon...@sematext.com 
>>> 
>>> wrote:
>>> 
 You can also do this:
 1. stop consumers
 2. export offsets from ZK
 3. make changes to the exported file
 4. import offsets to ZK
 5. start consumers
 
 e.g.
 bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --group
 group-name
 --output-file /tmp/zk-offsets --zkconnect localhost:2181
 bin/kafka-run-class.sh kafka.tools.ImportZkOffsets --input-file
 /tmp/zk-offsets --zkconnect localhost:2181
 
 Marko Bonaći
 Monitoring | Alerting | Anomaly Detection | Centralized Log
> Management
 Solr & Elasticsearch Support
 Sematext  | Contact
 
 
 On Fri, Dec 18, 2015 at 4:06 PM, Jens Rantil <
>>> jens.ran...@tink.se
 >
>>> wrote:
 
> Hi,
> 
> I noticed that a consumer in the new consumer API supports
 setting
>> the
> offset for a partition to beginning. I assume doing so also
>>> would
>>> update
> the offset in Zookeeper eventually.
> 
> Cheers,
> Jens
> 
> On Friday, December 18, 2015, Akhilesh Pathodia <
> pathodia.akhil...@gmail.com >
> wrote:
> 
>> Hi,
>> 
>> I want to reset the kafka offset in zookeeper so that the
> consumer
>>> will
>> start reading messages

Re: Kafka + ZooKeeper on the same hardware?

2016-01-18 Thread James Cheng

> On Jan 18, 2016, at 12:21 PM, Dick Davies  wrote:
>
> Started an Ansible playbook using the Confluent platform RPM distro,
> and it seems that co-locates zookeepers
> on the brokers.
>
> So I'm assuming it's fine (at least on 0.9.x for the reasons Todd mentioned).
>
> Does anyone know if the Confluent distro is supposed to be production-ready?
> I switched our test setup over to the Confluent distro to get
> RPM-based installs etc and got a 3-node
> cluster up in HA, but there were a few things I had to roll myself
> (init scripts etc) which I'd have expected
> in a full release.
>
> (sorry if this is OT, I don't know of a specific Confluent mailing list).
>

Confluent-specific mailing list is linked off of here:
http://www.confluent.io/developer#documentation

-James

>
>
> On 14 January 2016 at 18:31, Todd Palino  wrote:
>> I’d say it depends on load and usage. It can definitely be done, and we’ve
>> done it here in places, though we don’t anymore. Part of the luxury of
>> being able to get the hardware we want. In general, it’s probably easier to
>> do with 0.9 and Kafka-committed offsets, because the consumers don’t need
>> to talk to ZK as much. It’s probably even even easier with the new
>> consumer, but I can’t speak to that at all.
>>
>> One of the gotchas is that ZK really should have its transaction log on an
>> isolate device so that sequential writes do not require seeks. This could
>> be a separate disk or an SSD drive. An example of a really bad place to put
>> it would be on the same device as your Kafka log segments :) Depending on
>> your load, it may not be critical to use a separate device.
>>
>> As Gwen noted, it all comes down to load. Your availability will be fine,
>> you just need to figure out if the services can share the load.
>>
>> -Todd
>>
>>
>> On Thu, Jan 14, 2016 at 9:25 AM, Gwen Shapira  wrote:
>>
>>> It depends on load :)
>>> As long as there is no contention, you are fine.
>>>
>>> On Thu, Jan 14, 2016 at 6:06 AM, Erik Forsberg  wrote:
>>>
 Hi!

 Pondering how to configure Kafka clusters and avoid having too many
 machines to manage.. Would it be recommended to run say a 3 node kafka
 cluster where you also run your 3 node zookeeper cluster on the same
 machines?

 I guess the answer is that "it depends on load", but would be interested
 in any opinions on this anyway.

 Thanks!
 \EF

>>>
>>
>>
>>
>> --
>> *—-*
>> *Todd Palino*
>> Staff Site Reliability Engineer
>> Data Infrastructure Streaming
>>
>>
>>
>> linkedin.com/in/toddpalino




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Offset storage issue with kafka(0.8.2.1)

2016-01-27 Thread James Cheng

> On Jan 27, 2016, at 8:25 PM, Sivananda Reddys Thummala Abbigari 
>  wrote:
>
> Hi,
>
> # *Kafka Version*: 0.8.2.1
>
> # *My consumer.propeties have the following properties*:
>exclude.internal.topics=false
>offsets.storage=kafka
>dual.commit.enabled=false
>
> # With the above configuration the offsets should be stored in kafka
> instead of zookeeper but I see that offsets are stored in
> zookeeper(verified using "get path" command).
>
> # I can see that "__consumer_offsets" topic is created but when I run
> "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
> __consumer_offsets --from-beginning" nothing is returned which means the
> offsest data is not stored in the  "__consumer_offsets" topic
>
> I want to store the offsets only in kafka. Could you please let me know if
> I am missing anything?.
>
> Thank you,
> Siva.

Siva,
I think you need to pass exclude.internal.topics=false to 
kafka-console-consumer.sh. Also, the messages in the __consumer_offsets topic 
are stored in a binary (I think) encoded format, so you'll need to specify a 
message formatter.

>From slide 32 of this deck 
>http://www.slideshare.net/jjkoshy/offset-management-in-kafka,

./bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper 
localhost:2181 --formatter "kafka.server.OffsetManager$OffsetMessageFormatter" 
--consumer.config consumer.properties

And make sure your consumer.properties has exclude.internal.topics=false.

-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Accumulating data in Kafka Connect source tasks

2016-01-28 Thread James Cheng

> On Jan 28, 2016, at 5:06 PM, Ewen Cheslack-Postava  wrote:
>
> Randall,
>
> Great question. Ideally you wouldn't need this type of state since it
> should really be available in the source system. In your case, it might
> actually make sense to be able to grab that information from the DB itself,
> although that will also have issues if, for example, there have been
> multiple schema changes and you can no longer get a previous schema from
> the current state of the tables.
>
> The offset storage is probably pretty close to what you're looking for,
> although we obviously structure that very narrowly. Adding in some sort of
> other state store is an interesting idea, though I'd be curious how many
> other systems encounter similar challenges. I think one way to do this
> without huge changes and in a way that properly handles offset commits
> would be to expose a small API for setting local state and have Connect
> store that state right in the same topic (and message) as offsets. To
> handle offset commits and reviving tasks that hit a fault, we would just
> grab the current state as part of the process of committing offsets. Then
> offsets would just be a special case of that more general state.
>
> However, I'm also wary of doing something like this. Right now every worker
> has to consume the entire offsets topic. This works fine because offsets,
> while capable of being pretty complex, are generally pretty small such that
> there's no concern having to tail it on all workers (and no concern for the
> load on brokers leading those partitions). Once you provide a generic state
> storage mechanism without clear constraints on how it should be used,
> someone will come along and abuse it. Also, with offsets it is very clear
> (to the connector developer) which task should write to which keys (where
> the key here is the partition of the source partitioned stream). With
> support for arbitrary state, ownership of different subsets of the key
> space is very unclear. I think you may not have that particular problem
> because you probably only have 1 partition anyway since you are reading a
> transaction log.
>
> In any case, you're right that this doesn't exist today. There is one very
> hacky way to get around this, which is to store that schema information in
> your "offsets". This may not be *too* bad -- it'll increase the size of
> offset data, but probably doesn't affect much else. The data size may not
> be that bad as long as offsets aren't committed too frequently. In terms of
> performance, I'm assuming these schema changes are relatively rare, and you
> can just include the same schema object in every offset you create during
> the periods between schema changes so you (and the GC) are probably only
> doing a negligible amount of extra work.
>
> Re: creating a consumer, Connect doesn't provide any utilities to do that
> since the goal is to handle everything Kafka-related for the connector
> developer so they can just focus on getting the data from the other system!
> We could consider exposing some of the worker config though, which I
> imagine is all you really need -- it'd just be convenient to have the
> connection info for the Kafka brokers.
>
> Finally, I'd love to know which DB you're reading the transaction log from
> and if you're planning on open sourcing the connector:)
>

+1! I'd like to know what DB you're working on too!

-James

> -Ewen
>
> On Thu, Jan 28, 2016 at 6:12 AM, Randall Hauch  wrote:
>
>> Rather than leave this thread so open ended, perhaps I can narrow down to
>> what I think is the best approach. These accumulations are really just
>> additional information from the source that don’t get written to the normal
>> topics. Instead, each change to the accumulated state can be emitted as
>> source records on a dedicated topic. That is very straightforward with the
>> existing Kafka Connect.
>>
>> The challenge I’m struggling with is how a task can/should, upon startup,
>> *consume* that stream to rebuild its state. I can set up my own Kafka
>> consumer for that topic, but IIUC now my connector config has to include
>> much of the same information included in the Kafka Connect workers
>> configuration.
>>
>> Am I just missing how a connector can see the worker configuration
>> properties? Or is there a way that Kafka Connect can help me create a Kafka
>> consumer?
>>
>> Best regards,
>>
>> Randall Hauch
>>
>> On January 28, 2016 at 12:11:07 AM, Randall Hauch (rha...@gmail.com)
>> wrote:
>> I’m creating a custom Kafka Connect source connector, and I’m running into
>> a situation for which Kafka Connect doesn’t seem to provide a solution out
>> of the box. I thought I’d first post to the users list in case I’m just
>> missing a feature that’s already there.
>>
>> My connector’s SourceTask implementation is reading a relational database
>> transaction log. That log contains schema changes and row changes, and the
>> row changes include a reference to the table and the row val

Re: Accumulating data in Kafka Connect source tasks

2016-01-29 Thread James Cheng

> On Jan 29, 2016, at 7:06 AM, Randall Hauch  wrote:
>
> On January 28, 2016 at 7:07:02 PM, Ewen Cheslack-Postava (e...@confluent.io) 
> wrote:
> Randall,
>
> Great question. Ideally you wouldn't need this type of state since it
> should really be available in the source system. In your case, it might
> actually make sense to be able to grab that information from the DB itself,
> although that will also have issues if, for example, there have been
> multiple schema changes and you can no longer get a previous schema from
> the current state of the tables.
> I agree that ideally connectors would be stateless, or at least have no need 
> for maintaining state across restarts. Unfortunately, that’s not always 
> possible.
>
> Reading the log but using the current schema does pose a problem if/when the 
> schema has evolved since the point in the log that we’re currently reading. 
> This is far more of an issue if you’re playing catch up and there’s been 
> non-compatible schema changes.
>
> Case in point: when MySQL inserts/updates/removes a row from a table, it 
> writes an event in the log that includes (a) a table identifier and (b) the 
> row values in column-order. There is no other information. Column renames 
> might be okay, but adding or removing columns will likely result in 
> mismatching the row values to the appropriate columns.
>
> Luckily, MySQL includes the DDL statements in the log, so my connector parses 
> these as part of processing and builds up the schema state as it goes along. 
> This works beautifully, with the only issue being how to persist and recover 
> this after restarts.
>
>
> The offset storage is probably pretty close to what you're looking for,
> although we obviously structure that very narrowly. Adding in some sort of
> other state store is an interesting idea, though I'd be curious how many
> other systems encounter similar challenges. I think one way to do this
> without huge changes and in a way that properly handles offset commits
> would be to expose a small API for setting local state and have Connect
> store that state right in the same topic (and message) as offsets. To
> handle offset commits and reviving tasks that hit a fault, we would just
> grab the current state as part of the process of committing offsets. Then
> offsets would just be a special case of that more general state.
>
> However, I'm also wary of doing something like this. Right now every worker
> has to consume the entire offsets topic. This works fine because offsets,
> while capable of being pretty complex, are generally pretty small such that
> there's no concern having to tail it on all workers (and no concern for the
> load on brokers leading those partitions). Once you provide a generic state
> storage mechanism without clear constraints on how it should be used,
> someone will come along and abuse it. Also, with offsets it is very clear
> (to the connector developer) which task should write to which keys (where
> the key here is the partition of the source partitioned stream). With
> support for arbitrary state, ownership of different subsets of the key
> space is very unclear. I think you may not have that particular problem
> because you probably only have 1 partition anyway since you are reading a
> transaction log.
>
> In any case, you're right that this doesn't exist today. There is one very
> hacky way to get around this, which is to store that schema information in
> your "offsets". This may not be *too* bad -- it'll increase the size of
> offset data, but probably doesn't affect much else. The data size may not
> be that bad as long as offsets aren't committed too frequently. In terms of
> performance, I'm assuming these schema changes are relatively rare, and you
> can just include the same schema object in every offset you create during
> the periods between schema changes so you (and the GC) are probably only
> doing a negligible amount of extra work.
>
> Hmm, it sound like hammering accumulated state into the offsets could be 
> pretty problematic and potentially risky, especially if the state has very 
> different size and frequency characteristics than the offsets.
>
> Re: creating a consumer, Connect doesn't provide any utilities to do that
> since the goal is to handle everything Kafka-related for the connector
> developer so they can just focus on getting the data from the other system!
> We could consider exposing some of the worker config though, which I
> imagine is all you really need -- it'd just be convenient to have the
> connection info for the Kafka brokers.
> Having a way to get the worker config would be awesome, and IMO it a nice 
> minimalistic approach. If you think this is a good idea, I can log a JIRA and 
> take it to the dev list. I’m willing to work on it, too.
>
> I’m starting to think that storing state on a separate dedicated topic is the 
> best option, at least for me. First, connector tasks can easily record their 
> state by simply adding more Sou

Re: MongoDB Kafka Connect driver

2016-01-29 Thread James Cheng
Not sure if this will help anything, but just throwing it out there.

The Maxwell and mypipe projects both do CDC from MySQL and support 
bootstrapping. The way they do it is kind of "eventually consistent".

1) At time T1, record coordinates of the end of the binlog as of T1.
2) At time T2, do a full dump of the database into Kafka.
3) Connect back to the binlog in the coordinates recorded in step #1, and emit 
all those records into Kafka.

As Jay mentioned, MySQL supports full row images. At the start of step #3, the 
kafka topic contains all rows as of time T2. It is possible that during step 
#3, that you will emit rows that changed between T1 and T2. From the point of 
view of the consumer of the kafka topic, they would see rows that went "back in 
time". However, as step #3 progresses, and the consumer keeps reading, those 
rows would eventually converge down to their final state.

Maxwell: https://github.com/zendesk/maxwell
mypipe: https://github.com/mardambey/mypipe

Does that idea help in any way? Btw, a reason it is done this way is that it is 
"difficult" to do #1 and #2 above in a coordinated way without locking the 
database or without adding additional outside dependencies (LVM snapshots, 
being a specific one).

Btw, I glanced at some docs about the Mongodb oplog. It seems that each oplog 
contains
1) A way to identify the document that the change applies to.
2) A series of mongodb commands (set, unset) to alter the document in #1 to 
become the new document.

Thoughts:
For #1, does it identify a particular "version" of a document? (I don't know 
much about mongodb). If so, you might be able to use it to determine if the 
change should even be attempted to be applied to the object.
For #2, doesn't that mean you'll need "understand" mongodb's syntax and 
commands? Although maybe it is simply sets/unsets/deletes, in which case it's 
maybe pretty simple.

-James

> On Jan 29, 2016, at 9:39 AM, Jay Kreps  wrote:
>
> Also, most database provide a "full logging" option that let's you capture
> the whole row in the log (I know Oracle and MySQL have this) but it sounds
> like Mongo doesn't yet. That would be the ideal solution.
>
> -Jay
>
> On Fri, Jan 29, 2016 at 9:38 AM, Jay Kreps  wrote:
>
>> Ah, agreed. This approach is actually quite common in change capture,
>> though. For many use cases getting the final value is actually preferable
>> to getting intermediates. The exception is usually if you want to do
>> analytics on something like number of changes.
>>
>> On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava 
>> wrote:
>>
>>> Jay,
>>>
>>> You can query after the fact, but you're not necessarily going to get the
>>> same value back. There could easily be dozens of changes to the document
>>> in
>>> the oplog so the delta you see may not even make sense given the current
>>> state of the document. Even if you can apply it the delta, you'd still be
>>> seeing data that is newer than the update. You can of course take this
>>> shortcut, but it won't give correct results. And if the data has been
>>> deleted since then, you won't even be able to write the full record... As
>>> far as I know, the way the op log is exposed won't let you do something
>>> like pin a query to the state of the db at a specific point in the op log
>>> and you may be reading from the beginning of the op log, so I don't think
>>> there's a way to get correct results by just querying the DB for the full
>>> documents.
>>>
>>> Strictly speaking you don't need to get all the data in memory, you just
>>> need a record of the current set of values somewhere. This is what I was
>>> describing following those two options -- if you do an initial dump to
>>> Kafka, you could track only offsets in memory and read back full values as
>>> needed to apply deltas, but this of course requires random reads into your
>>> Kafka topic (but may perform fine in practice depending on the workload).
>>>
>>> -Ewen
>>>
>>> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps  wrote:
>>>
 Hey Ewen, how come you need to get it all in memory for approach (1)? I
 guess the obvious thing to do would just be to query for the record
 after-image when you get the diff--e.g. just read a batch of changes and
 multi-get the final values. I don't know how bad the overhead of this
>>> would
 be...batching might reduce it a fair amount. The guarantees for this are
 slightly different than the pure oplog too (you get the current value
>>> not
 every necessarily every intermediate) but that should be okay for most
 uses.

 -Jay

 On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
>>> e...@confluent.io>
 wrote:

> Sunny,
>
> As I said on Twitter, I'm stoked to hear you're working on a Mongo
> connector! It struck me as a pretty natural source to tackle since it
 does
> such a nice job of cleanly exposing the op log.
>
> Regarding the problem of only getting deltas, unfortunate

Re: at-least-once delivery

2016-01-30 Thread James Cheng

> On Jan 30, 2016, at 4:21 AM, Franco Giacosa  wrote:
>
> Sorry, this solved my questions: "Setting a value greater than zero will
> cause the client to resend any record whose send fails with a potentially
> transient error. Note that this retry is no different than if the client
> resent the record upon receiving the error. Allowing retries will
> potentially change the ordering of records because if two records are sent
> to a single partition, and the first fails and is retried but the second
> succeeds, then the second record may appear first."
>

Franco,

Also, you can avoid the message reordering issue in that description by setting 
max.in.flight.requests.per.connector to 1.

This slide deck has good guidelines on the types of things you are talking 
about:
http://www.slideshare.net/JiangjieQin/no-data-loss-pipeline-with-apache-kafka-49753844

-James

> 2016-01-30 13:18 GMT+01:00 Franco Giacosa :
>
>> Hi,
>>
>> The at-least-once delivery is generated in part by the network fails and
>> the retries (that may generate duplicates) right?
>>
>> In the event of a duplicated (there was an error but the first message
>> landed ok on the partition P1) the producer will recalculate the partition
>> on the retry? is this done automatically?
>>
>> If in the retry the partition doesn't change and there is only 1 Producer,
>> will the duplicated be written next to the original? I mean if I poll()
>> they will come one after the other?
>>
>>
>>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Detecting broker version programmatically

2016-02-04 Thread James Cheng

> On Feb 4, 2016, at 8:28 PM, Manikumar Reddy  wrote:
>
> Currently it is available through JMX Mbean. It is not available on wire
> protocol/requests.
>

The name of the JMX Mbean is kafka.server:type=app-info,id=4

Not sure what the id=4 means.

-James

> Pending JIRAs related to this:
> https://issues.apache.org/jira/browse/KAFKA-2061
>
> On Fri, Feb 5, 2016 at 4:31 AM,  wrote:
>
>> Is there a way to detect the broker version (even at a high level 0.8 vs
>> 0.9) using the kafka-clients Java library?
>>
>> --
>> Best regards,
>> Marko
>> www.kafkatool.com
>>
>>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Discrepancy between JMX OfflinePartitionCount and kafka-topics.sh?

2016-02-09 Thread James Cheng
I ran into kind of a similar discrepancy, but about UnderReplicatedPartitions.

kafka-topics.sh and zookeeper were saying that we had underreplicated 
partitions.

But JMX said that there were none. I took one of the partitions that ZK was 
saying was under-replicated and I ran DumpLogSegments on the leader broker and 
the broker that was out of the ISR, and it showed that the partition was fully 
replicated to the follower.

So JMX+DumpLogSegments disagreed with kafka-topics.sh+zookeeper.

So that is the opposite scenario of yours.

Mine was on 0.9.0.

-James


> On Feb 9, 2016, at 12:02 PM, Eric Ogren  wrote:
>
> Hello -
>
> I am seeing that the node in our Kafka cluster currently elected as
> controller is reporting 1 offline partition via JMX
> (kafka.controller:type=KafkaController,name=OfflinePartitionsCount).
> However, when I use kaka-topics to find the offline partition
> (bin/kafka-topics.sh
> --zookeeper localhost:2181 --unavailable-partitions --describe), nothing is
> dumped out. We also haven't been seeing any errors around unavailable
> partitions in any of our producers.
>
> Is there a good way to introspect the controller's state and figure out
> what it thinks the offline partition is?
>
> This is for Kafka 0.8.2.1.
>
> thanks
> Eric




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Questions about unclean leader election and "Halting because log truncation is not allowed"

2016-02-25 Thread James Cheng
Hi,

I ran into a scenario where one of my brokers would continually shutdown, with 
the error message:
[2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because log 
truncation is not allowed for topic test, Current leader 1's latest offset 0 is 
less than replica 2's latest offset 151 (kafka.server.ReplicaFetcherThread)

I managed to reproduce it with the following scenario:
1. Start broker1, with unclean.leader.election.enable=false
2. Start broker2, with unclean.leader.election.enable=false

3. Create topic, single partition, with replication-factor 2.
4. Write data to the topic.

5. At this point, both brokers are in the ISR. Broker1 is the partition leader.

6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
dropped out of ISR. Broker1 is still the leader. I can still write data to the 
partition.

7. Shutdown Broker1. Hard or controlled, doesn't matter.

8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
full hardware replacement)

9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
because broker1 is down. At this point, the partition is offline. Can't write 
to it.

10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
to join ISR, and immediately halts with the error message:
[2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because log 
truncation is not allowed for topic test, Current leader 1's latest offset 0 is 
less than replica 2's latest offset 151 (kafka.server.ReplicaFetcherThread)

I am able to recover by setting unclean.leader.election.enable=true on my 
brokers.

I'm trying to understand a couple things:
* Is my scenario a valid supported one, or is it along the lines of "don't ever 
do that"?
* In step 10, why is broker1 allowed to resume leadership even though it has no 
data?
* In step 10, why is it necessary to stop the entire broker due to one 
partition that is in this state? Wouldn't it be possible for the broker to 
continue to serve traffic for all the other topics, and just mark this one as 
unavailable?
* Would it make sense to allow an operator to manually specify which broker 
they want to become the new master? This would give me more control over how 
much data loss I am willing to handle. In this case, I would want broker2 to 
become the new master. Or, is that possible and I just don't know how to do it?
* Would it be possible to make unclean.leader.election.enable to be a per-topic 
configuration? This would let me control how much data loss I am willing to 
handle.

Btw, the comment in the source code for that error message indicates:
https://github.com/apache/kafka/blob/01aeea7c7bca34f1edce40116b7721335938b13b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L164-L166

  // Prior to truncating the follower's log, ensure that doing so is not 
disallowed by the configuration for unclean leader election.
  // This situation could only happen if the unclean election configuration 
for a topic changes while a replica is down. Otherwise,
  // we should never encounter this situation since a non-ISR leader cannot 
be elected if disallowed by the broker configuration.

But I don't believe that happened. I never changed the configuration. But I did 
venture into "unclean leader election" territory, so I'm not sure if the 
comment still applies.

Thanks,
-James





This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Kafka Rest Proxy

2016-03-01 Thread James Cheng
Jan,

I don't use the rest proxy, but Confluent has a mailing list where you can 
probably get more info:

Here's the direct link: 
https://groups.google.com/forum/#!forum/confluent-platform

And it is linked off of here: http://www.confluent.io/developer#documentation

-James

> On Mar 1, 2016, at 3:25 AM, Jan Omar  wrote:
>
> Hey guys,
>
> Is someone using the kafka rest proxy from confluent?
>
> We have an issue, that all messages for a certain topic end up in the same 
> partition. Has anyone faced this issue before? We're not using a custom   
> partitioner class, so it's using the default partitioner. We're sending 
> messages without a specific partition and without a key, like this:
>
> curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data 
> '{"records":[{"value":{"foo":"bar"}}]}' "http://x.x.x.x:8082/topics/testme";
>
> and yet for some reason every message ends up in partition 11...
>
> It's a test topic with 30 partitions on 3 brokers and our rest (producer) 
> config is very simple:
>
> id=1
> zookeeper.connect=zookeeper-four.acc:2181...etc
>
> Any help would be appreciated.
>
> Thanks!
>
>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Unavailable partitions (Leader: -1 and ISR is empty) and we can't figure out how to get them back online

2016-03-01 Thread James Cheng
Hi,

We have 44 partitions in our cluster that are unavailable. kafka-topics.sh is 
reporting them with Leader: -1, and with no brokers in the ISR. Zookeeper says 
that broker 5 should be the partition leader for this topic partition. These 
are topics with replication-factor 1. Most of the topics have little to no data 
in them, so they are low-traffic topics. We currently cannot produce to them. 
And I can't find anything in the logs that seems to explain why the broker is 
not taking the partitions back online. Can anyone help?

Relevant log lines are attached below.

Questions:
* What does Leader: -1 mean?
* Why doesn't the broker take the partition back online?
* Is there more debugging/logging that I can turn on?
* unclean.leader.election.enable=false right now, although during a previous 
boot of the broker, we set it to true to get some partitions back online. These 
ones never came back online.

Thanks,
-James

in zookeeper
---
$ get /brokers/topics/the.topic.name
{"version":1,"partitions":{"0":[5]}}

server.log

[2016-03-01 06:29:13,869] WARN Found an corrupted index file, 
/TivoData/kafka/the.topic.name-0/.index, deleting and 
rebuilding index... (kafka.log.Log)
[2016-03-01 06:29:13,870] INFO Recovering unflushed segment 0 in log 
the.topic.name-0. (kafka.log.Log)
[2016-03-01 06:29:13,870] INFO Completed load of log the.topic.name-0 with log 
end offset 0 (kafka.log.Log)


state-change.log
-
state-change.log.2016-03-01-06:[2016-03-01 06:34:20,498] TRACE Broker 5 cached 
leader info 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 for partition [the.topic.name,0] in response to UpdateMetadata request sent by 
controller 2 epoch 20 with correlation id 0 (state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:20,695] TRACE Broker 5 
received LeaderAndIsr request 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 correlation id 1 from controller 2 epoch 20 for partition [the.topic.name,0] 
(state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:20,957] TRACE Broker 5 
handling LeaderAndIsr request correlationId 1 from controller 2 epoch 20 
starting the become-follower transition for partition [the.topic.name,0] 
(state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:23,531] ERROR Broker 5 
received LeaderAndIsrRequest with correlation id 1 from controller 2 epoch 20 
for partition [the.topic.name,0] but cannot become follower since the new 
leader -1 is unavailable. (state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:30,075] TRACE Broker 5 
completed LeaderAndIsr request correlationId 1 from controller 2 epoch 20 for 
the become-follower transition for partition [the.topic.name,0] 
(state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:30,458] TRACE Broker 5 cached 
leader info 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 for partition [the.topic.name,0] in response to UpdateMetadata request sent by 
controller 2 epoch 20 with correlation id 2 (state.change.logger)

state-change.log on the controller:
[2016-03-01 06:34:15,077] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 2 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,145] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 5 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,200] TRACE Broker 2 cached leader info 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 for partition [the.topic.name,0] in response to UpdateMetadata request sent by 
controller 2 epoch 20 with correlation id 9144 (state.change.logger)
[2016-03-01 06:34:15,276] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 4 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,418] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 1 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,484] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 3 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,585] TRACE Controller 2 epoch 20 changed state of replica 
5 for partition [the.topic.name,0] from OfflineReplica to OnlineReplica 
(state.change.logger)
[2016-03-01 06:34:15,606] TRACE Controller 2 epoch 20 sending become-follower 
LeaderAndIsr request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to 
broker 5 for partition [the.topic.name,0] (state.change.logger)
[2016-03-01 06

Re: About the number of partitions

2016-03-02 Thread James Cheng
Kim,

Here's a good blog post from Confluent with advice on how to choose the number 
of partitions.

http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

-James


> On Mar 1, 2016, at 4:11 PM, BYEONG-GI KIM  wrote:
>
> Hello.
>
> I have questions about how many partitions are optimal while using kafka.
> As far as I know, even if there are multiple consumers that belong to a
> consumer group, say *group_A*, only one consumer can receive a kafka
> message produced by a producer if there is a partition. So, as a result,
> multiple partitions are required in order to distribute the message to all
> the consumers in group_A if I want the consumers to get the message.
>
> Is it right?
>
> I'm considering developing several kafka consumer applications, e.g.,
> message saver, message analyzer, etc., so a message from a producer must be
> consumed by those kinds of consumers.
>
> Any advice and help would be really appreciated.
>
> Thanks in advance!
>
> Best regards
>
> Kim




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Writing a Producer from Scratch

2016-03-03 Thread James Cheng
Stephen,

There is a mailing list for kafka client developers that you may find useful: 
https://groups.google.com/forum/#!forum/kafka-clients

The d...@kafka.apache.org mailing list might also 
be a good resource: http://kafka.apache.org/contact.html

Lastly, do you have any way to do HTTP calls on your platform? There exist some 
REST servers that you speak HTTP to and then they will produce to Kafka on your 
behalf. Here is one: http://docs.confluent.io/2.0.1/kafka-rest/docs/index.html

-James

On Mar 3, 2016, at 2:47 AM, Hopson, Stephen 
mailto:stephen.hop...@gb.unisys.com>> wrote:

Hi,
Not sure if this is the right forum for this question, but if it not I’m sure 
someone will direct me to the proper one.
Also, I am new to Kafka (but not new to computers).

I want to write a kafka producer client for a Unisys OS 2200 mainframe. I need 
to write it in C, and since I have no access to Windows / Unix / Linux 
libraries, I have to develop the interface at the lowest level.

So far, I have downloaded a kafka server with associated zookeeper (kafka 
_2.10-0.8.2.2). Note I have downloaded the Windows version and have it running 
on my laptop, successfully tested on the same laptop with the provided provider 
and consumer clients.

I have developed code to open a TCP session to the kafka server which appears 
to work and I have attempted to send a metadata request which does not appear 
to work. When I say it does not appear to work, I mean that I send the message 
and then I sit on a retrieve, which eventually times out ( I do seem to get one 
character in the receive buffer of 0235 octal). The message format I am using 
is the one described by the excellent document by Jay Creps / Gwen Shapira 
athttps://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  However, it is not clear what level of kafka these message formats are 
applicable for.

Can anybody offer me any advice or suggestions as to how to progress?

PS is the CRC mandatory in the Producer messages?
Many thanks in advance.

Stephen Hopson | Infrastructure Architect | Enterprise Solutions
Unisys | +44 (0)1908 805010 | +44 (0)7557 303321 | 
stephen.hop...@gb.unisys.com



THIS COMMUNICATION MAY CONTAIN CONFIDENTIAL AND/OR OTHERWISE PROPRIETARY 
MATERIAL and is for use only by the intended recipient. If you received this in 
error, please contact the sender and delete the e-mail and its attachments from 
all devices.




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Uneven GC behavior between nodes

2016-03-05 Thread James Cheng
Your partitions are balanced, but is your data being evenly written across all 
the partitions? How are you producing data? Are you producing them with keys? 
Is it possible that the majority of the messages being written to just a few 
partitions, and so the brokers for those partitions are seeing more load than 
the others?

-James

> On Mar 4, 2016, at 9:15 AM, Cees de Groot  wrote:
>
> We're seeing something funny in one of our production clusters that I
> cannot explain away. Everything works fine, but as we're ramping up on
> Kafka, I really want to get at the root cause before we push a ton of
> traffic through it :)
>
> We have 6 nodes over three DCs in the cluster. Currently it's running a
> light load of two topics, one with small (KB) messages, one with variable
> sized (KB-MB) messages, both with 64 partitions and 3 replicas. All topics,
> including __consumer_offsets, have been rebalanced with a script we wrote
> to make sure that the replicas are spread out over the three datacenters
> and that leadership is evenly balanced, so we can continue to operate if we
> lose one DC. Producers use Consul to find an initial broker (round-robin
> through the local DC), Consumers use the 0.9.0.1 client.
>
> The funny thing is that in each DC, one broker graphs "normal" JVM heap
> behavior - a sawtooth of the expected garbage creation/collection cycle.
> The other one essentially stays flat. The flat-lining brokers also show
> less incoming traffic when graphing the OS' received bytes. Everything else
> - incoming, outgoing messages, etcetera, shows up as essentially the same
> on the graphs.
>
> I've been digging around for a bit, but can't find anything obvious that
> would cause the differences in memory pressure. Assuming that Kafka brokers
> pre-allocate buffers, I'd expect not too much garbage being generated. Is
> the flatline the expected behavior and the sawtooth the unexpected one?
> What could cause the difference?
>
> Thanks for any pointers :-)
>
>
> --
>
> *Cees de Groot*
> PRINCIPAL SOFTWARE ENGINEER
> [image: PagerDuty logo] 
> pagerduty.com
> c...@pagerduty.com 
> +1(416)435-4085
>
> [image: Twitter] [image: FaceBook]
> [image: Google+]
> [image: LinkedIn]
> [image: Blog]
> 




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Questions about unclean leader election and "Halting because log truncation is not allowed"

2016-03-15 Thread James Cheng
Anthony,

I filed https://issues.apache.org/jira/browse/KAFKA-3410 to track this.

-James

> On Feb 25, 2016, at 2:16 PM, Anthony Sparks  
> wrote:
>
> Hello James,
>
> We received this exact same error this past Tuesday (we are on 0.8.2).  To
> answer at least one of your bullet points -- this is a valid scenario. We
> had the same questions, I'm starting to think this is a bug -- thank you
> for the reproducing steps!
>
> I looked over the Release Notes to see if maybe there were some fixes in
> newer versions -- this bug fix looked the most related:
> https://issues.apache.org/jira/browse/KAFKA-2143
>
> Thank you,
>
> Tony
>
> On Thu, Feb 25, 2016 at 3:46 PM, James Cheng  wrote:
>
>> Hi,
>>
>> I ran into a scenario where one of my brokers would continually shutdown,
>> with the error message:
>> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting
>> because log truncation is not allowed for topic test, Current leader 1's
>> latest offset 0 is less than replica 2's latest offset 151
>> (kafka.server.ReplicaFetcherThread)
>>
>> I managed to reproduce it with the following scenario:
>> 1. Start broker1, with unclean.leader.election.enable=false
>> 2. Start broker2, with unclean.leader.election.enable=false
>>
>> 3. Create topic, single partition, with replication-factor 2.
>> 4. Write data to the topic.
>>
>> 5. At this point, both brokers are in the ISR. Broker1 is the partition
>> leader.
>>
>> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2
>> gets dropped out of ISR. Broker1 is still the leader. I can still write
>> data to the partition.
>>
>> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
>>
>> 8. rm -rf the log directory of broker1. (This simulates a disk replacement
>> or full hardware replacement)
>>
>> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed
>> because broker1 is down. At this point, the partition is offline. Can't
>> write to it.
>>
>> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2
>> attempts to join ISR, and immediately halts with the error message:
>> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting
>> because log truncation is not allowed for topic test, Current leader 1's
>> latest offset 0 is less than replica 2's latest offset 151
>> (kafka.server.ReplicaFetcherThread)
>>
>> I am able to recover by setting unclean.leader.election.enable=true on my
>> brokers.
>>
>> I'm trying to understand a couple things:
>> * Is my scenario a valid supported one, or is it along the lines of "don't
>> ever do that"?
>> * In step 10, why is broker1 allowed to resume leadership even though it
>> has no data?
>> * In step 10, why is it necessary to stop the entire broker due to one
>> partition that is in this state? Wouldn't it be possible for the broker to
>> continue to serve traffic for all the other topics, and just mark this one
>> as unavailable?
>> * Would it make sense to allow an operator to manually specify which
>> broker they want to become the new master? This would give me more control
>> over how much data loss I am willing to handle. In this case, I would want
>> broker2 to become the new master. Or, is that possible and I just don't
>> know how to do it?
>> * Would it be possible to make unclean.leader.election.enable to be a
>> per-topic configuration? This would let me control how much data loss I am
>> willing to handle.
>>
>> Btw, the comment in the source code for that error message indicates:
>>
>> https://github.com/apache/kafka/blob/01aeea7c7bca34f1edce40116b7721335938b13b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L164-L166
>>
>>  // Prior to truncating the follower's log, ensure that doing so is
>> not disallowed by the configuration for unclean leader election.
>>  // This situation could only happen if the unclean election
>> configuration for a topic changes while a replica is down. Otherwise,
>>  // we should never encounter this situation since a non-ISR leader
>> cannot be elected if disallowed by the broker configuration.
>>
>> But I don't believe that happened. I never changed the configuration. But
>> I did venture into "unclean leader election" territory, so I'm not sure if
>> the comment still applies.
>>
>> Thanks,
>> -James
>>
>>
>>
>> 
>>
>>

What happens if controlled shutdown can't complete within controlled.shutdown.max.retries attempts?

2016-03-20 Thread James Cheng
The broker has the following parameters related to controlled shutdown:

controlled.shutdown.enable  Enable controlled shutdown of the server
boolean truemedium
controlled.shutdown.max.retries Controlled shutdown can fail for multiple 
reasons. This determines the number of retries when such failure happens  
int 3   medium
controlled.shutdown.retry.backoff.msBefore each retry, the system needs 
time to recover from the state that caused the previous failure (Controller 
fail over, replica lag etc). This config determines the amount of time to wait 
before retrying. long5000medium


If the broker attempts controlled shutdown and then fails, and this happens 
controlled.shutdown.max.retries number of times, what happens? Does it go back 
into active service and continue participating in the cluster, as if controlled 
shutdown never happened? Or does it do an uncontrolled shutdown?

Thanks,
-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [ANNOUNCE] New committer: Damian Guy

2017-06-09 Thread James Cheng
Congrats Damian!

-James

> On Jun 9, 2017, at 1:34 PM, Guozhang Wang  wrote:
> 
> Hello all,
> 
> 
> The PMC of Apache Kafka is pleased to announce that we have invited Damian
> Guy as a committer to the project.
> 
> Damian has made tremendous contributions to Kafka. He has not only
> contributed a lot into the Streams api, but have also been involved in many
> other areas like the producer and consumer clients, broker-side
> coordinators (group coordinator and the ongoing transaction coordinator).
> He has contributed more than 100 patches so far, and have been driving on 6
> KIP contributions.
> 
> More importantly, Damian has been a very prolific reviewer on open PRs and
> has been actively participating on community activities such as email lists
> and slack overflow questions. Through his code contributions and reviews,
> Damian has demonstrated good judgement on system design and code qualities,
> especially on thorough unit test coverages. We believe he will make a great
> addition to the committers of the community.
> 
> 
> Thank you for your contributions, Damian!
> 
> 
> -- Guozhang, on behalf of the Apache Kafka PMC



Re: Slow Consumer Group Startup

2017-06-13 Thread James Cheng
Bryan,

This sounds related to 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance
 and https://issues.apache.org/jira/browse/KAFKA-4925.

-James

> On Jun 13, 2017, at 7:02 AM, Bryan Baugher  wrote:
> 
> The topics already exist prior to starting any of the consumers
> 
> On Mon, Jun 12, 2017 at 9:35 PM J Pai  wrote:
> 
>> When are the topics on which these consumer groups consume, created?
>> 
>> -Jaikiran
>> On 13-Jun-2017, at 3:18 AM, Bryan Baugher  wrote:
>> 
>> Hi everyone,
>> 
>> We are currently experiencing slow startup times for our consumer groups
>> (16-32 processes for a hundred or more partitions) in the range of minutes
>> (3-15 minutes), where little to no messages are consumed before suddenly
>> everything just starts working at full speed.
>> 
>> I'm currently using Kafka 0.9.0.1 but we are in the middle of upgrading to
>> Kafka 0.10.2.1. We also using the newer kafka consumer API and group
>> management on a simple Apache Storm topology. We don't make use of Storm's
>> kafka spout but instead wrote a simple one ourselves.
>> 
>> Using the kafka AdminClient I was able to poll for consumer group summary
>> information. What I've found is that the group seems to sit
>> in PreparingRebalance state for minutes before finally becoming Stable
>> which then everything starts processing quickly. I've also enabled debug
>> logging around the consumer's coordinator classes but didn't see anything
>> to indicate the issue.
>> 
>> I'm hoping that just upgrading to 0.10 or tweaking how we use our consumer
>> in Apache Storm is the problem but are there any pointers on things I
>> should look at or try?
>> 
>> Bryan
>> 
>> 



Re: mirroring Kafka while preserving the order

2017-06-29 Thread James Cheng
MirrorMaker acts as a consumer+producer. So it will consume from the source 
topic and produce to the destination topic. That means that the destination 
partition is chosen using the same technique as the normal producer:

* if the source record has a key, the key will be hashed and the hash will be 
used to choose a partition. If the source partition was chosen using some 
different hashing algorithm or a custom partitioner, then you may end up 
writing to a different destination.
* if the source record does NOT have a key, then the destination partition will 
be randomly chosen.

MirrorMaker supports custom message handlers. You can use those to map the 
source partition to the destination partition, which will allow you to avoid 
the above two problems.  Here's an example of how to do it. 
https://github.com/gwenshap/kafka-examples/tree/master/MirrorMakerHandler

-James

Sent from my iPhone

> On Jun 29, 2017, at 9:57 AM, Tom Bentley  wrote:
> 
> I believe so. You need to be careful that the mirror maker producer doesn't
> reorder messages; in particular if retries > 0 then
> max.in.flight.requests.per.connection must be 1. If
> retries=0 then it doesn't matter what max.in.flight.requests.per.connection
> is.
> 
> 
> 
>> On 29 June 2017 at 05:52, Sunil Parmar  wrote:
>> 
>> Is it possible to configure mirror maker using message handler to preserve
>> the order of messages in each topic partition. In the particular use case
>> we're looking at both source and destination topics have same number of
>> partitions.
>> 
>> Sunil
>> 


Re: Mirroring multiple clusters into one

2017-07-06 Thread James Cheng
I'm not sure what the "official" recommendation is. At TiVo, we *do* run all 
our mirrormakers near the target cluster. It works fine for us, but we're still 
fairly inexperienced, so I'm not sure how strong of a data point we should be.

I think the thought process is, if you are mirroring from a source cluster to a 
target cluster where there is a WAN between the two, then whichever request 
goes across the WAN has a higher chance of intermittent failure than the one 
over the LAN. That means that if mirrormaker is near the source cluster, the 
produce request over the WAN to the target cluster may fail. If the mirrormaker 
is near the target cluster, then the fetch request over the WAN to the source 
cluster may fail.

Failed fetch requests don't have much impact on data replication, it just 
delays it. Whereas a failure during a produce request may introduce duplicates.

Becket Qin from LinkedIn did a presentation on tuning producer performance at a 
meetup last year, and I remember he specifically talked about producing over a 
WAN as one of the cases where you have to tune settings. Maybe that 
presentation will give more ideas about what to look at. 
https://www.slideshare.net/mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600

-James

Sent from my iPhone

> On Jul 6, 2017, at 1:00 AM, Vahid S Hashemian  
> wrote:
> 
> The literature suggests running the MM on the target cluster when possible 
> (with the exception of when encryption is required for transferred data).
> I am wondering if this is still the recommended approach when mirroring 
> from multiple clusters to a single cluster (i.e. multiple MM instances).
> Is there anything in particular (metric, specification, etc.) to consider 
> before making a decision?
> 
> Thanks.
> --Vahid
> 
> 


Re: Mirroring multiple clusters into one

2017-07-06 Thread James Cheng
Answers inline below.

-James

Sent from my iPhone

> On Jul 7, 2017, at 1:18 AM, Vahid S Hashemian  
> wrote:
> 
> James,
> 
> Thanks for sharing your thoughts and experience.
> Could you please also confirm whether
> - you do any encryption for the mirrored data?
Not at the Kafka level. The data goes over a VPN.

> - you have a many-to-one mirroring similar to what I described?
> 

Yes, we mirror multiple source clusters to a single target cluster. We have a 
topic naming convention where our topics are prefixed with their cluster name, 
so as long as we follow that convention, each source topic gets mirrored to a 
unique target topic. That is, we try not to have multiple mirrormakers writing 
to a single target topic. 

Our topic names in the target cluster get prefixed with the string "mirror." 
And then we never mirror topics that start with "mirror." This prevents us from 
creating mirroring loops.

> Thanks.
> --Vahid
> 
> 
> 
> From:   James Cheng 
> To: users@kafka.apache.org
> Cc: dev 
> Date:   07/06/2017 12:37 PM
> Subject:Re: Mirroring multiple clusters into one
> 
> 
> 
> I'm not sure what the "official" recommendation is. At TiVo, we *do* run 
> all our mirrormakers near the target cluster. It works fine for us, but 
> we're still fairly inexperienced, so I'm not sure how strong of a data 
> point we should be.
> 
> I think the thought process is, if you are mirroring from a source cluster 
> to a target cluster where there is a WAN between the two, then whichever 
> request goes across the WAN has a higher chance of intermittent failure 
> than the one over the LAN. That means that if mirrormaker is near the 
> source cluster, the produce request over the WAN to the target cluster may 
> fail. If the mirrormaker is near the target cluster, then the fetch 
> request over the WAN to the source cluster may fail.
> 
> Failed fetch requests don't have much impact on data replication, it just 
> delays it. Whereas a failure during a produce request may introduce 
> duplicates.
> 
> Becket Qin from LinkedIn did a presentation on tuning producer performance 
> at a meetup last year, and I remember he specifically talked about 
> producing over a WAN as one of the cases where you have to tune settings. 
> Maybe that presentation will give more ideas about what to look at. 
> https://www.slideshare.net/mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600
> 
> 
> -James
> 
> Sent from my iPhone
> 
>> On Jul 6, 2017, at 1:00 AM, Vahid S Hashemian 
>  wrote:
>> 
>> The literature suggests running the MM on the target cluster when 
> possible 
>> (with the exception of when encryption is required for transferred 
> data).
>> I am wondering if this is still the recommended approach when mirroring 
>> from multiple clusters to a single cluster (i.e. multiple MM instances).
>> Is there anything in particular (metric, specification, etc.) to 
> consider 
>> before making a decision?
>> 
>> Thanks.
>> --Vahid
>> 
>> 
> 
> 
> 
> 


Re: Consumer offsets partitions size much bigger than others

2017-07-18 Thread James Cheng
It's possible that the log-cleaning thread has crashed. That is the thread that 
implements log compaction.

Look in the log-cleaner.log file in your kafka debuglog directory to see if 
there is any indication that it has crashed (error messages, stack traces, etc).

What version of kafka are you using? 0.10 and prior had some bugs in the 
log-cleaner thread that might sometimes cause it to crash. Those were fixed in 
later versions, but it's always possible there might still be more bugs there.

I notice that your __consumer_offsets topic only has replication-factor=1. How 
many brokers are in your cluster? You should increase the replication factor to 
3. 

Older versions of kafka would try to auto-create the __consumer_offsets topic 
with replication-factor 3 but if there were fewer than 3 brokers in the 
cluster, then they would simply use the number of brokers in the cluster. What 
that means is that if your cluster only had 1 broker running at the time the 
topic was auto-created, that it would be created with replication-factor 1. 
This has been fixed in later brokers, so that it will always create topics with 
the specified number of replicas or will throw loud errors in the event you 
don't have enough brokers.

-James

> On Jul 18, 2017, at 8:44 AM, Luciano Afranllie  
> wrote:
> 
> Hi
> 
> One of our Kafka brokers was running out of disk space and when we checked
> the file size in the kafka log dir we observed the following
> 
> $ du -h . --max-depth=2 | grep '__consumer_offsets'
> 4.0K./kafka-logs/__consumer_offsets-16
> 4.0K./kafka-logs/__consumer_offsets-40
> 35G ./kafka-logs/__consumer_offsets-44
> 4.0K./kafka-logs/__consumer_offsets-8
> 4.0K./kafka-logs/__consumer_offsets-38
> 4.0K./kafka-logs/__consumer_offsets-20
> 4.0K./kafka-logs/__consumer_offsets-34
> 4.0K./kafka-logs/__consumer_offsets-18
> 4.0K./kafka-logs/__consumer_offsets-32
> 251G./kafka-logs/__consumer_offsets-14
> 4.0K./kafka-logs/__consumer_offsets-4
> 4.0K./kafka-logs/__consumer_offsets-26
> 4.0K./kafka-logs/__consumer_offsets-12
> 4.0K./kafka-logs/__consumer_offsets-30
> 4.0K./kafka-logs/__consumer_offsets-6
> 4.0K./kafka-logs/__consumer_offsets-2
> 4.0K./kafka-logs/__consumer_offsets-24
> 4.0K./kafka-logs/__consumer_offsets-36
> 4.0K./kafka-logs/__consumer_offsets-46
> 4.0K./kafka-logs/__consumer_offsets-42
> 4.0K./kafka-logs/__consumer_offsets-22
> 4.0K./kafka-logs/__consumer_offsets-0
> 4.0K./kafka-logs/__consumer_offsets-28
> 4.0K./kafka-logs/__consumer_offsets-10
> 4.0K./kafka-logs/__consumer_offsets-48
> 
> As you can see, two of the log files (partition 44 and 14) have a huge
> size. Do you have a hint to understand what could be happening here? May be
> for some reason this partitions are not being compacted?
> 
> By the way, this is the description of the __consumer_offsets topic.
> 
> # ./bin/kafka-topics.sh --describe --zookeeper x.x.x.x:2181 --topic
> __consumer_offsets
> Topic:__consumer_offsetsPartitionCount:50   ReplicationFactor:1
> 
> Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=uncompressed
>Topic: __consumer_offsets   Partition: 0Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 1Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 2Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 3Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 4Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 5Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 6Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 7Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 8Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 9Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 10   Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 11   Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 12   Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 13   Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 14   Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 15   Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 16   Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 17   Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 18   Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 19   Leader: 

Re: Tuning up mirror maker for high thruput

2017-07-22 Thread James Cheng
Becket Qin from LinkedIn spoke at a meetup about how to tune the Kafka 
producer. One scenario that he described was tuning for situations where you 
had high network latency. See slides at 
https://www.slideshare.net/mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600
 and video at https://youtu.be/oQe7PpDDdzA

-James

Sent from my iPhone

> On Jul 21, 2017, at 9:25 AM, Sunil Parmar  wrote:
> 
> We're trying to set up mirror maker to mirror data from EU dc to US dc. The
> network delay is ~150 ms. In recent test; we realized that mirror maker is
> not keeping up with load and have a lag trending upward all the time.
> 
> What are configurations that can be tuned up to make it work for the higher
> throughput ?
> How to decide number of producer and consumer threads ? ( number of topic
> partitions / brokers ? )
> 
> 
> *Environment* ( both source and destination cluster )
> 
> Kafka version 0.9 ( Cloudera 0.9.0.0+kafka2.0.0+188 )
> 
> queue.size = 1
> queue.byte.size = 100MB
> 
> 2 brokers on source, 3 brokers on destination
> 
> 
> *Mirror maker configs :*
> 
> Producer properties :
> request.timeout.ms=12
> block.on.buffer.full=TRUE
> retries=20
> acks=all
> 
> 
> Consumer properties:
> request.timeout.ms=12
> auto.offset.reset=latest
> enable.auto.commit=false
> 
> We've configured 4 producer and consumer threads.
> There is no security set up as of now so it's all PLAINTEXT.
> 
> We have 4 topics are white listed to sync from EU to US. Only one of them
> is high throughput. We also have a message handler to strip off some
> sensitive information from EU to US but it only works on a low thru put
> topic; the message handler still try to process the other topics but let it
> pass thru.
> 
> Thanks,
> Sunil Parmar


Re: Tuning up mirror maker for high thruput

2017-07-24 Thread James Cheng
Todd,

I have a question about the OS/broker tuning that you are talking about on the 
source cluster. Aside from mirrormaker (which you say should be running in the 
remote destination datacenter), presumably there will be other consumers in the 
source datacenter as well. How does the OS/broker tuning affect those consumers 
that are close to the source datacenter? Will they continue to function well?

-James

> On Jul 23, 2017, at 7:16 AM, Todd Palino  wrote:
> 
> One of the best pieces of advice I can offer is that you really need to run
> the mirror maker in the same physical/network location as the Kafka cluster
> you are producing to. Latency on the consumer side can be more easily
> absorbed than latency on the producer side, as to assure that we have
> proper message ordering and reliability, we need to restrict in flight
> batches to 1. So that means that our produce connection is contstrained to
> be very thin, and latency makes a huge impact. Meanwhile, on the consume
> side we’re fetching large batches of messages, many at a time, so
> round-trip latency has less of an impact. I really can’t stress this
> enough. We set up some mirror makers in the opposite configuration for
> security reasons, and it’s been a huge problem even with tuning.
> 
> In addition to this, you will want to assure that your OS (and then the
> mirror maker and broker) tuning is taking into account the latency. Here’s
> a good reference for the OS side (for Linux):
> http://linuxczar.net/blog/2016/09/18/bandwidth-delay-product/
> 
> Once you have the OS tuned, you’ll need to adjust the broker tuning on the
> clusters you are consuming from, since that is the high latency side. The
> configuration for that is socket.send.buffer.bytes, and it probably makes
> sense to set this to -1 (which means use the OS configuration). You can do
> the same with socket.receive.buffer.bytes, but it’s not as critical with
> this setup. On the mirror maker, the configuration is on the consumer side,
> and it’s called receive.buffer.bytes. Again, you can set this to -1 to use
> the OS configuration. Make sure to restart the applications after making
> all these changes, of course.
> 
> -Todd
> 
> 
> On Sat, Jul 22, 2017 at 1:27 AM, James Cheng  wrote:
> 
>> Becket Qin from LinkedIn spoke at a meetup about how to tune the Kafka
>> producer. One scenario that he described was tuning for situations where
>> you had high network latency. See slides at https://www.slideshare.net/
>> mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600
>> and video at https://youtu.be/oQe7PpDDdzA
>> 
>> -James
>> 
>> Sent from my iPhone
>> 
>>> On Jul 21, 2017, at 9:25 AM, Sunil Parmar  wrote:
>>> 
>>> We're trying to set up mirror maker to mirror data from EU dc to US dc.
>> The
>>> network delay is ~150 ms. In recent test; we realized that mirror maker
>> is
>>> not keeping up with load and have a lag trending upward all the time.
>>> 
>>> What are configurations that can be tuned up to make it work for the
>> higher
>>> throughput ?
>>> How to decide number of producer and consumer threads ? ( number of topic
>>> partitions / brokers ? )
>>> 
>>> 
>>> *Environment* ( both source and destination cluster )
>>> 
>>> Kafka version 0.9 ( Cloudera 0.9.0.0+kafka2.0.0+188 )
>>> 
>>> queue.size = 1
>>> queue.byte.size = 100MB
>>> 
>>> 2 brokers on source, 3 brokers on destination
>>> 
>>> 
>>> *Mirror maker configs :*
>>> 
>>> Producer properties :
>>> request.timeout.ms=12
>>> block.on.buffer.full=TRUE
>>> retries=20
>>> acks=all
>>> 
>>> 
>>> Consumer properties:
>>> request.timeout.ms=12
>>> auto.offset.reset=latest
>>> enable.auto.commit=false
>>> 
>>> We've configured 4 producer and consumer threads.
>>> There is no security set up as of now so it's all PLAINTEXT.
>>> 
>>> We have 4 topics are white listed to sync from EU to US. Only one of them
>>> is high throughput. We also have a message handler to strip off some
>>> sensitive information from EU to US but it only works on a low thru put
>>> topic; the message handler still try to process the other topics but let
>> it
>>> pass thru.
>>> 
>>> Thanks,
>>> Sunil Parmar
>> 
> 
> 
> 
> -- 
> *Todd Palino*
> Senior Staff Engineer, Site Reliability
> Data Infrastructure Streaming
> 
> 
> 
> linkedin.com/in/toddpalino



Re: Consumer group metadata retention

2017-07-26 Thread James Cheng
The offsets.retention.minutes value (1440 = 24 hours = 1 day) is a broker level 
configuration, and can't be changed dynamically during runtime. You would have 
to modify the broker configurations, and restart the brokers.

-James

> On Jul 25, 2017, at 9:43 PM, Raghu Angadi  wrote:
> 
> I am writing an exactly-once Kafka sink for Apache Beam.
> In order to avoid duplicates due to retries, it stores a sequential id and
> producer signature in consumer group metadata, and commits it atomically
> with messages (using sendOffsetsToTransaction()).
> 
> I have a couple of clarification questions on partition metadata associated
> with a consumer group.
> 
>   - Looks like a partition number could be larger than number of
>   partitions for a topic. Is this formally supported (which is great!)?
>  - E.g. I was able to set and read partition metadata for partition
>  #20 for a topic with 4 partitions (though `console-consumer-group` didn't
>  quite work.).
>   - This is critical information for a sink. But looks like the metadata
>   gets purged in 24 hours (server config) if it is inactive. Is there a way
>   to set a longer TTL programatically?
> 
> Thanks.
> Raghu.



Re: Improving Kafka State Store performance

2017-09-16 Thread James Cheng
In addition to the measurements that you are doing yourself, Kafka Streams also 
has its own metrics. They are exposed via JMX, if you have that enabled:

http://kafka.apache.org/documentation/#monitoring 


If you set metrics.recording.level="debug", you can see a bunch of metrics 
around the state stores. Stuff like put-latency-avg, for example.

See http://kafka.apache.org/documentation/#kafka_streams_store_monitoring 


-James

> On Sep 16, 2017, at 6:14 AM, dev loper  wrote:
> 
> Hi Kafka Streams Users,
> 
> I am trying to improve the performance of Kafka Streams State Store
> Persistent Store. In our application we are using Kafka Streams Processor
> API  and using Persistent State Store.. My application when starts up it
> performing well but over a period of time the performance deteriorated. I
> am computing certain results in computeAnalytics method and this method is
> not taking time at all. This method is being called within both process and
> punctuate and I am storing the updated object back to store. Over the
> period of time its taking huge time for completing the punctuate process
> and I could see majority of the time is spent in storing the records and
> Iterating the records. The record size is just 2500 per partition. I am not
> where I am going wrong and how can I improve the performance.
> 
> Below is one such sample log record.
> 
> INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records ::
> 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394
> 
> Below I have given my pseudo code for my processor which exactly resembles
> the code which I am using in my application.
> 
> MyProcessor(){
> 
>  process(Key objectkey, Update eventupdate){
>   long timestamp=context.timestamp();
>   AnalyticeObj storeobj=store.get(objectkey);
> 
>   if( storeobj ===null)
> {
>  storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
> }
> else
>{
>   storeobj.update(eventupdate,timestamp)
>}
> storeobj=storeobj.computeAnalytics();
> 
>   store.put(objectkey,storeobj);
>  context.commit();
> }
> // Every 5 seconds
> punctuate(long timestamp)
> {
> long startTime = System.currentTimeMillis();
> long totalTimeTakenToProcessRecords=0;
> long totalTimeTakenToStoreRecords=0;
> long counter=0;
> KeyValueIterator iter=this.visitStore.all();
> while (iter.hasNext()) {
> KeyValue entry = iter.next();
> 
>if(AnalyticeObj.hasExpired(timestamp)
> store.remove(entry.key)
>  else
>  {
>long processStartTime=System.currentTimeMillis();
> AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);
> 
> totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords+(System.currentTimeMillis()-processStartTime);
> 
> long storeStartTime=System.currentTimeMillis();
>  store.put(entry.key,storeobj);
> 
> totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(System.currentTimeMillis()-storeStartTime);
>   }
>   counter++;
> }
> logger.info(" Time Metrics for punctuate  "
>" for TimeStamp :: " + "" + timestamp + " processed
> Records :: "
>+ counter +" totalTimeTakenToProcessRecords ::
> "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> "+totalTimeTakenToStoreRecords
>+"toal time Taken to retrieve Records :: "+
> (System.currentTimeMillis() -
> (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+"
> Total time Taken :: " + (System.currentTimeMillis() - startTime));
> }
> }



Re: Kafka Internals Video/Blog

2017-09-20 Thread James Cheng
This recent meetup had a presentation of the internals of the Kafka Controller. 

https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/242656767/ 


The video is not yet available, but hopefully will be soon.

-James

> On Sep 20, 2017, at 9:49 AM, Raghav  wrote:
> 
> Hi
> 
> Just wondering if there is any video/blog that goes over Kafka Internal and
> under the hood design and implementation details. I am a newbie and I would
> like to dabble with the code and understand design of it. Just wondering if
> there is any video, blog etc that goes over it ?
> 
> Thanks.
> 
> -- 
> Raghav



Re: Metrics: committed offset, client version

2017-09-20 Thread James Cheng
KIP-188 is expected to be in the upcoming 1.0.0 release. It will add 
client-side JMX metrics that show the client version number.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-188+-+Add+new+metrics+to+support+health+checks
 


-James



> On Sep 20, 2017, at 2:37 AM, Stas Chizhov  wrote:
> 
> Hi!
> 
> I am wondering if there are broker/client metrics for:
> - client version (to keep track of clients that needs an upgrade)
> - committed offsets (to detect situations when commits fail systematically
> with everything else being ok)
> 
> Thank you,
> Stanislav.



Re: In which scenarios would "INVALID_REQUEST" be returned for "Offset Request"

2017-09-24 Thread James Cheng
Your client library might be sending a message that is too old or too new for 
your broker to understand.

What version is your Kafka client library, and what version is your broker?

-James

Sent from my iPhone

> On Sep 22, 2017, at 4:09 PM, Vignesh  wrote:
> 
> Hi,
> 
> In which scenarios would we get "INVALID_REQUEST" for a Version 1 "Offset
> Request"  (https://kafka.apache.org/protocol#The_Messages_Offsets)  ?
> 
> I searched for INVALID_REQUEST in https://github.com/apache/kafka and below
> is the only file that seems related.
> 
> https://github.com/apache/kafka/blob/96ba21e0dfb1a564d5349179d844f020abf1e08b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
> 
> Here, I see that invalid request is returned only on duplicate topic
> partition. Is that the only reason?
> 
> The description for the error is broader though.
> 
> "
> This most likely occurs because of a request being malformed by the client
> library or the message was sent to an incompatible broker. See the broker
> logs for more details.
> 
> "
> 
> Thanks,
> Vignesh.


How do I instantiate a metrics reporter in Kafka Streams, with custom config?

2017-11-01 Thread James Cheng
Hi, we have a KafkaStreams app. We specify a custom metric reporter by doing:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"kafka-broker1:9092");
config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"com.mycompany.MetricReporter");
config.put("custom-key-for-metric-reporter", "value");

Previously, our metric reporter would get passed the properties object upon 
instantiation, and would retrieve its custom config. It appears that in recent 
releases, that in order to apply my metric reporter to the consumer, that I 
have to specify my config as "consumer.custom-key-for-metric-reporter". And for 
the producer, I have to specify it as 
"producer.custom-key-for-metric-reporter". If I don't prefix it with 
"consumer." or "producer." , it appears it gets stripped out from the 
properties object that is passsed to my metric reporter when the 
consumer/producer gets initialized, and so my metric reporter can't get its 
config.

That means that if I have a metrics reporter and I want to collect producer and 
consumer metrics, as well as kafka-streams metrics, that I have to specify my 
custom config 3 times:
1) consumer.custom-key-for-metric-reporter
2) producer.custom-key-for-metric-reporter
3) custom-key-for-metric-reporter

Is that behavior as designed or is that a bug? What is the desired behavior for 
non-recognized keys in the properties object?

And actually, for the metrics.reporter class itself, am I supposed to specify 
it as:

metrics.reporter

or

metric.reporters
producer.metric.reporters
consumer.metric.reporters

Thanks,
-James



Re: [ANNOUNCE] Apache Kafka 1.0.0 Released

2017-11-01 Thread James Cheng
th these APIs, Kafka can be used for two broad classes of application:
> 
> ** Building real-time streaming data pipelines that reliably get data between
> systems or applications.
> 
> ** Building real-time streaming applications that transform or react
> to the streams
> of data.
> 
> 
> Apache Kafka is in use at large and small companies worldwide, including
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
> 
> 
> A big thank you for the following 108 contributors to this release!
> 
> Abhishek Mendhekar, Xi Hu, Andras Beni, Andrey Dyachkov, Andy Chambers,
> Apurva Mehta, Armin Braun, Attila Kreiner, Balint Molnar, Bart De Vylder,
> Ben Stopford, Bharat Viswanadham, Bill Bejeck, Boyang Chen, Bryan Baugher,
> Colin P. Mccabe, Koen De Groote, Dale Peakall, Damian Guy, Dana Powers,
> Dejan Stojadinović, Derrick Or, Dong Lin, Zhendong Liu, Dustin Cote,
> Edoardo Comar, Eno Thereska, Erik Kringen, Erkan Unal, Evgeny Veretennikov,
> Ewen Cheslack-Postava, Florian Hussonnois, Janek P, Gregor Uhlenheuer,
> Guozhang Wang, Gwen Shapira, Hamidreza Afzali, Hao Chen, Jiefang He, Holden
> Karau, Hooman Broujerdi, Hugo Louro, Ismael Juma, Jacek Laskowski, Jakub
> Scholz, James Cheng, James Chien, Jan Burkhardt, Jason Gustafson, Jeff
> Chao, Jeff Klukas, Jeff Widman, Jeremy Custenborder, Jeyhun Karimov,
> Jiangjie Qin, Joel Dice, Joel Hamill, Jorge Quilcate Otoya, Kamal C, Kelvin
> Rutt, Kevin Lu, Kevin Sweeney, Konstantine Karantasis, Perry Lee, Magnus
> Edenhill, Manikumar Reddy, Manikumar Reddy O, Manjula Kumar, Mariam John,
> Mario Molina, Matthias J. Sax, Max Zheng, Michael Andre Pearce, Michael
> André Pearce, Michael G. Noll, Michal Borowiecki, Mickael Maison, Nick
> Pillitteri, Oleg Prozorov, Onur Karaman, Paolo Patierno, Pranav Maniar,
> Qihuang Zheng, Radai Rosenblatt, Alex Radzish, Rajini Sivaram, Randall
> Hauch, Richard Yu, Robin Moffatt, Sean McCauliff, Sebastian Gavril, Siva
> Santhalingam, Soenke Liebau, Stephane Maarek, Stephane Roset, Ted Yu,
> Thibaud Chardonnens, Tom Bentley, Tommy Becker, Umesh Chaudhary, Vahid
> Hashemian, Vladimír Kleštinec, Xavier Léauté, Xianyang Liu, Xin Li, Linhua
> Xin
> 
> 
> We welcome your help and feedback. For more information on how to report
> problems, and to get involved, visit the project website at
> http://kafka.apache.org/
> 
> 
> 
> 
> Thanks,
> Guozhang Wang



Re: [ANNOUNCE] New committer: Onur Karaman

2017-11-06 Thread James Cheng
Congrats Onur! Well deserved!

-James

> On Nov 6, 2017, at 9:24 AM, Jun Rao  wrote:
> 
> Hi, everyone,
> 
> The PMC of Apache Kafka is pleased to announce a new Kafka committer Onur
> Karaman.
> 
> Onur's most significant work is the improvement of Kafka controller, which
> is the brain of a Kafka cluster. Over time, we have accumulated quite a few
> correctness and performance issues in the controller. There have been
> attempts to fix controller issues in isolation, which would make the code
> base more complicated without a clear path of solving all problems. Onur is
> the one who took a holistic approach, by first documenting all known
> issues, writing down a new design, coming up with a plan to deliver the
> changes in phases and executing on it. At this point, Onur has completed
> the two most important phases: making the controller single threaded and
> changing the controller to use the async ZK api. The former fixed multiple
> deadlocks and race conditions. The latter significantly improved the
> performance when there are many partitions. Experimental results show that
> Onur's work reduced the controlled shutdown time by a factor of 100 times
> and the controller failover time by a factor of 3 times.
> 
> Congratulations, Onur!
> 
> Thanks,
> 
> Jun (on behalf of the Apache Kafka PMC)



Re: Insanely long recovery time with Kafka 0.11.0.2

2018-01-11 Thread James Cheng
We saw this as well, when updating from 0.10.1.1 to 0.11.0.1.

Have you restarted your brokers since then? Did it take 8h to start up again, 
or did it take its normal 45 minutes?

I don't think it's related to the crash/recovery. Rather, I think it's due to 
the upgrade from 0.10.1.1 to 0.11.0.1

I think 0.11.0.0 introduced some new files in the log directories (maybe the 
.snapshot files?). The first time 0.11.0.0 (or newer) started up, it scanned 
the entire .log files to create... something. It scanned *all* the segments, 
not just the most recent ones. I think that's why it took so long. I think 
normally log recovery only looks at the most recent segments.

We noticed this only on the FIRST boot when running 0.11+. From then on, 
startups were our normal length of time.

In your https://pastebin.com/tZqze4Ya, I see a line like:
[2018-01-05 13:43:34,776] INFO Completed load of log webapi-event-1 with 2 log 
segments, log start offset 1068104 and log end offset 1236587 in 9547 ms 
(kafka.log.Log)

That line says that that partition took 9547ms (9.5 seconds) to load/recover. 
We had large partitions that took 30 minutes to recover, on that first boot. 
When I used strace to see what I/O the broker was doing, it was reading ALL the 
segments for the partitions.

-James



> On Jan 11, 2018, at 10:56 AM, Vincent Rischmann  wrote:
> 
> If anyone else has any idea, I'd love to hear it.
> 
> Meanwhile, I'll resume upgrading my brokers and hope it doesn't crash and/or 
> take so much time for recovery.
> 
> On Sat, Jan 6, 2018, at 7:25 PM, Vincent Rischmann wrote:
>> Hi,
>> 
>> just to clarify: this is the cause of the crash 
>> https://pastebin.com/GuF60kvF in the broker logs, which is why I 
>> referenced https://issues.apache.org/jira/browse/KAFKA-4523
>> 
>> I had this crash some time ago and yesterday was in the process of 
>> upgrading my brokers to 0.11.0.2 in part to address this bug, 
>> unfortunately while stopping this particular broker it crashed.
>> 
>> What I don't understand is why the recovery time after upgrading was so 
>> high. A couple of month ago when a broker crashed due to this bug and 
>> recovered it didn't take nearly as long. In fact, I never had a recovery 
>> that long on any broker, even when they suffered a kernel panic or power 
>> failure.
>> 
>> We have quite a bit of data, however as I said with 0.10.1.1 it "only" 
>> took around 45 minutes. The broker did do a lot of I/O while recovering 
>> (to the point where even just running ls was painfully slow) but afair 
>> it was the same every time a broker recovered. Volume of data hasn't 
>> changed much since the last crash with 0.10.1.1, in fact I removed a lot 
>> of data recently.
>> 
>> Our brokers are running with 3 SATA disks in raid 0 (using mdadm), while 
>> recovering yesterday atop reported around 200MB/s of read throughput. 
>> 
>> Here are some graphs from our monitoring:
>> 
>> - CPU usage https://vrischmann.me/files/fr3/cpu.png
>> - Disk IO https://vrischmann.me/files/fr3/disk_io.png and 
>> https://vrischmann.me/files/fr3/disk_usage.png
>> 
>> On Sat, Jan 6, 2018, at 4:53 PM, Ted Yu wrote:
>>> Ismael:
>>> We're on the same page.
>>> 
>>> 0.11.0.2 was released on 17 Nov 2017.
>>> 
>>> By 'recently' in my previous email I meant the change was newer.
>>> 
>>> Vincent:
>>> Did the machine your broker ran on experience power issue ?
>>> 
>>> Cheers
>>> 
>>> On Sat, Jan 6, 2018 at 7:36 AM, Ismael Juma  wrote:
>>> 
 Hi Ted,
 
 The change you mention is not part of 0.11.0.2.
 
 Ismael
 
 On Sat, Jan 6, 2018 at 3:31 PM, Ted Yu  wrote:
 
> bq. WARN Found a corrupted index file due to requirement failed: Corrupt
> index found, index file
> (/data/kafka/data-processed-15/54942918.index)
> 
> Can you search backward for 54942918.index in the log to see
 if
> we can find the cause for corruption ?
> 
> This part of code was recently changed by :
> 
> KAFKA-6324; Change LogSegment.delete to deleteIfExists and harden log
> recovery
> 
> Cheers
> 
> On Sat, Jan 6, 2018 at 7:18 AM, Vincent Rischmann 
> wrote:
> 
>> Here's an excerpt just after the broker started:
>> https://pastebin.com/tZqze4Ya
>> 
>> After more than 8 hours of recovery the broker finally started. I
 haven't
>> read through all 8 hours of log but the parts I looked at are like the
>> pastebin.
>> 
>> I'm not seeing much in the log cleaner logs either, they look normal.
 We
>> have a couple of compacted topics but seems only the consumer offsets
 is
>> ever compacted (the other topics don't have much traffic).
>> 
>> On Sat, Jan 6, 2018, at 12:02 AM, Brett Rann wrote:
>>> What do the broker logs say its doing during all that time?
>>> 
>>> There are some consumer offset / log cleaner bugs which caused us
>> similarly
>>> log delays. that was easily visible by

Re: [ANNOUNCE] New committer: Matthias J. Sax

2018-01-12 Thread James Cheng
Congrats, Matthias!! Well deserved!

-James

> On Jan 12, 2018, at 2:59 PM, Guozhang Wang  wrote:
> 
> Hello everyone,
> 
> The PMC of Apache Kafka is pleased to announce Matthias J. Sax as our
> newest Kafka committer.
> 
> Matthias has made tremendous contributions to Kafka Streams API since early
> 2016. His footprint has been all over the places in Streams: in the past
> two years he has been the main driver on improving the join semantics
> inside Streams DSL, summarizing all their shortcomings and bridging the
> gaps; he has also been largely working on the exactly-once semantics of
> Streams by leveraging on the transaction messaging feature in 0.11.0. In
> addition, Matthias have been very active in community activity that goes
> beyond mailing list: he's getting the close to 1000 up votes and 100
> helpful flags on SO for answering almost all questions about Kafka Streams.
> 
> Thank you for your contribution and welcome to Apache Kafka, Matthias!
> 
> 
> 
> Guozhang, on behalf of the Apache Kafka PMC



Re: [VOTE] KIP-247: Add public test utils for Kafka Streams

2018-01-18 Thread James Cheng
+1 (non-binding)

-James

Sent from my iPhone

> On Jan 17, 2018, at 6:09 PM, Matthias J. Sax  wrote:
> 
> Hi,
> 
> I would like to start the vote for KIP-247:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams
> 
> 
> -Matthias
> 


Re: [ANNOUNCE] New Kafka PMC Member: Rajini Sivaram

2018-01-18 Thread James Cheng
Congrats Rajini!

-James

Sent from my iPhone

> On Jan 17, 2018, at 10:48 AM, Gwen Shapira  wrote:
> 
> Dear Kafka Developers, Users and Fans,
> 
> Rajini Sivaram became a committer in April 2017.  Since then, she remained
> active in the community and contributed major patches, reviews and KIP
> discussions. I am glad to announce that Rajini is now a member of the
> Apache Kafka PMC.
> 
> Congratulations, Rajini and looking forward to your future contributions.
> 
> Gwen, on behalf of Apache Kafka PMC


Re: [ANNOUNCE] Apache Kafka 1.0.1 Released

2018-03-06 Thread James Cheng
Congrats, everyone! Thanks for driving the release, Ewen!

-James

> On Mar 6, 2018, at 1:22 PM, Guozhang Wang  wrote:
> 
> Ewen, thanks for driving the release!!
> 
> 
> Guozhang
> 
> On Tue, Mar 6, 2018 at 1:14 PM, Ewen Cheslack-Postava  wrote:
> 
>> The Apache Kafka community is pleased to announce the release for Apache
>> Kafka
>> 1.0.1.
>> 
>> This is a bugfix release for the 1.0 branch that was first released with
>> 1.0.0 about 4 months ago. We've fixed 49 issues since that release. Most of
>> these are non-critical, but in aggregate these fixes will have significant
>> impact. A few of the more significant fixes include:
>> 
>> * KAFKA-6277: Make loadClass thread-safe for class loaders of Connect
>> plugins
>> * KAFKA-6185: Selector memory leak with high likelihood of OOM in case of
>> down conversion
>> * KAFKA-6269: KTable state restore fails after rebalance
>> * KAFKA-6190: GlobalKTable never finishes restoring when consuming
>> transactional messages
>> * KAFKA-6529: Stop file descriptor leak when client disconnects with
>> staged receives
>> * KAFKA-6238: Issues with protocol version when applying a rolling upgrade
>> to 1.0.0
>> 
>> 
>> All of the changes in this release can be found in the release notes:
>> 
>> 
>> https://dist.apache.org/repos/dist/release/kafka/1.0.1/RELEASE_NOTES.html
>> 
>> 
>> 
>> You can download the source release from:
>> 
>> 
>> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
>> kafka-1.0.1-src.tgz
>> 
>> 
>> and binary releases from:
>> 
>> 
>> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
>> kafka_2.11-1.0.1.tgz
>> (Scala 2.11)
>> 
>> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
>> kafka_2.12-1.0.1.tgz
>> (Scala 2.12)
>> 
>> 
>> ---
>> 
>> 
>> Apache Kafka is a distributed streaming platform with four core APIs:
>> 
>> 
>> ** The Producer API allows an application to publish a stream records to
>> one or more Kafka topics.
>> 
>> 
>> ** The Consumer API allows an application to subscribe to one or more
>> topics and process the stream of records produced to them.
>> 
>> 
>> ** The Streams API allows an application to act as a stream processor,
>> consuming an input stream from one or more topics and producing an output
>> stream to one or more output topics, effectively transforming the input
>> streams to output streams.
>> 
>> 
>> ** The Connector API allows building and running reusable producers or
>> consumers that connect Kafka topics to existing applications or data
>> systems. For example, a connector to a relational database might capture
>> every change to a table.three key capabilities:
>> 
>> 
>> 
>> With these APIs, Kafka can be used for two broad classes of application:
>> 
>> 
>> ** Building real-time streaming data pipelines that reliably get data
>> between systems or applications.
>> 
>> 
>> ** Building real-time streaming applications that transform or react to the
>> streams of data.
>> 
>> 
>> 
>> Apache Kafka is in use at large and small companies worldwide, including
>> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
>> Target, The New York Times, Uber, Yelp, and Zalando, among others.
>> 
>> 
>> 
>> A big thank you for the following 36 contributors to this release!
>> 
>> Alex Good, Andras Beni, Andy Bryant, Arjun Satish, Bill Bejeck, Colin P.
>> Mccabe, Colin Patrick McCabe, ConcurrencyPractitioner, Damian Guy, Daniel
>> Wojda, Dong Lin, Edoardo Comar, Ewen Cheslack-Postava, Filipe Agapito,
>> fredfp, Guozhang Wang, huxihx, Ismael Juma, Jason Gustafson, Jeremy
>> Custenborder, Jiangjie (Becket) Qin, Joel Hamill, Konstantine Karantasis,
>> lisa2lisa, Logan Buckley, Manjula K, Matthias J. Sax, Nick Chiu, parafiend,
>> Rajini Sivaram, Randall Hauch, Robert Yokota, Ron Dagostino, tedyu,
>> Yaswanth Kumar, Yu.
>> 
>> 
>> We welcome your help and feedback. For more information on how to
>> report problems,
>> and to get involved, visit the project website at http://kafka.apache.org/
>> 
>> 
>> Thank you!
>> Ewen
>> 
> 
> 
> 
> -- 
> -- Guozhang



Re: [ANNOUNCE] New Committer: Dong Lin

2018-03-28 Thread James Cheng
Congrats, Dong!

-James

> On Mar 28, 2018, at 10:58 AM, Becket Qin  wrote:
> 
> Hello everyone,
> 
> The PMC of Apache Kafka is pleased to announce that Dong Lin has accepted
> our invitation to be a new Kafka committer.
> 
> Dong started working on Kafka about four years ago, since which he has
> contributed numerous features and patches. His work on Kafka core has been
> consistent and important. Among his contributions, most noticeably, Dong
> developed JBOD (KIP-112, KIP-113) to handle disk failures and to reduce
> overall cost, added deleteDataBefore() API (KIP-107) to allow users
> actively remove old messages. Dong has also been active in the community,
> participating in KIP discussions and doing code reviews.
> 
> Congratulations and looking forward to your future contribution, Dong!
> 
> Jiangjie (Becket) Qin, on behalf of Apache Kafka PMC



Re: [ANNOUNCE] Apache Kafka 1.1.0 Released

2018-03-29 Thread James Cheng
Thanks Damian and Rajini for running the release! Congrats and good job 
everyone!

-James

Sent from my iPhone

> On Mar 29, 2018, at 2:27 AM, Rajini Sivaram  wrote:
> 
> The Apache Kafka community is pleased to announce the release for
> 
> Apache Kafka 1.1.0.
> 
> 
> Kafka 1.1.0 includes a number of significant new features.
> 
> Here is a summary of some notable changes:
> 
> 
> ** Kafka 1.1.0 includes significant improvements to the Kafka Controller
> 
>   that speed up controlled shutdown. ZooKeeper session expiration edge
> cases
> 
>   have also been fixed as part of this effort.
> 
> 
> ** Controller improvements also enable more partitions to be supported on a
> 
>   single cluster. KIP-227 introduced incremental fetch requests, providing
> 
>   more efficient replication when the number of partitions is large.
> 
> 
> ** KIP-113 added support for replica movement between log directories to
> 
>   enable data balancing with JBOD.
> 
> 
> ** Some of the broker configuration options like SSL keystores can now be
> 
>   updated dynamically without restarting the broker. See KIP-226 for
> details
> 
>   and the full list of dynamic configs.
> 
> 
> ** Delegation token based authentication (KIP-48) has been added to Kafka
> 
>   brokers to support large number of clients without overloading Kerberos
> 
>   KDCs or other authentication servers.
> 
> 
> ** Several new features have been added to Kafka Connect, including header
> 
>   support (KIP-145), SSL and Kafka cluster identifiers in the Connect REST
> 
>   interface (KIP-208 and KIP-238), validation of connector names (KIP-212)
> 
>   and support for topic regex in sink connectors (KIP-215). Additionally,
> 
>   the default maximum heap size for Connect workers was increased to 2GB.
> 
> 
> ** Several improvements have been added to the Kafka Streams API, including
> 
>   reducing repartition topic partitions footprint, customizable error
> 
>   handling for produce failures and enhanced resilience to broker
> 
>   unavailability.  See KIPs 205, 210, 220, 224 and 239 for details.
> 
> 
> All of the changes in this release can be found in the release notes:
> 
> 
> 
> https://dist.apache.org/repos/dist/release/kafka/1.1.0/RELEASE_NOTES.html
> 
> 
> 
> 
> You can download the source release from:
> 
> 
> 
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka-1.1.0-src.tgz
> 
> 
> 
> and binary releases from:
> 
> 
> 
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.11-1.1.0.tgz
> 
> (Scala 2.11)
> 
> 
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.12-1.1.0.tgz
> 
> (Scala 2.12)
> 
> 
> --
> 
> 
> 
> Apache Kafka is a distributed streaming platform with four core APIs:
> 
> 
> 
> ** The Producer API allows an application to publish a stream records to
> 
> one or more Kafka topics.
> 
> 
> 
> ** The Consumer API allows an application to subscribe to one or more
> 
> topics and process the stream of records produced to them.
> 
> 
> 
> ** The Streams API allows an application to act as a stream processor,
> 
> consuming an input stream from one or more topics and producing an output
> 
> stream to one or more output topics, effectively transforming the input
> 
> streams to output streams.
> 
> 
> 
> ** The Connector API allows building and running reusable producers or
> 
> consumers that connect Kafka topics to existing applications or data
> 
> systems. For example, a connector to a relational database might capture
> 
> every change to a table.three key capabilities:
> 
> 
> 
> 
> With these APIs, Kafka can be used for two broad classes of application:
> 
> ** Building real-time streaming data pipelines that reliably get data
> 
> between systems or applications.
> 
> 
> 
> ** Building real-time streaming applications that transform or react to the
> 
> streams of data.
> 
> 
> 
> 
> Apache Kafka is in use at large and small companies worldwide, including
> 
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> 
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
> 
> 
> 
> 
> A big thank you for the following 120 contributors to this release!
> 
> 
> Adem Efe Gencer, Alex Good, Andras Beni, Andy Bryant, Antony Stubbs,
> 
> Apurva Mehta, Arjun Satish, bartdevylder, Bill Bejeck, Charly Molter,
> 
> Chris Egerton, Clemens Valiente, cmolter, Colin P. Mccabe,
> 
> Colin Patrick McCabe, ConcurrencyPractitioner, Damian Guy, dan norwood,
> 
> Daniel Wojda, Derrick Or, Dmitry Minkovsky, Dong Lin, Edoardo Comar,
> 
> ekenny, Elyahou, Eugene Sevastyanov, Ewen Cheslack-Postava, Filipe Agapito,
> 
> fredfp, Gavrie Philipson, Gunnar Morling, Guozhang Wang, hmcl, Hugo Louro,
> 
> huxi, huxihx, Igor Kostiakov, Ismael Juma, Ivan Babrou, Jacek Laskowski,
> 
> Jakub Scholz, Jason Gustafson, Jeff Klukas, Jeff Widman, Jeremy
> Custenborder,
> 
> Jeyhun Karimov, Jiangjie (Becket) Qin, Jiangjie Qin, Jimin Hsie

Re: Multi-threaded consumer?

2016-03-22 Thread James Cheng
Here's a good introductory blog post on the 0.9.0 consumer:

http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client

It shows the basics of using the consumer, as well as a section where they 
launch 3 threads, each with one consumer, to consume a single topic.

-James

> On Mar 22, 2016, at 5:21 PM, BYEONG-GI KIM  wrote:
>
> Hello.
>
> I'd like to know how to implement a multi-threaded consumer, which retrieve
> message(s) from a topic per thread.
>
> I read the Kafka Consumer 0.9.0.1 API document from
> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html,
> and I copied and pasted the example source code from here. And I tried to
> manage threads via ThreadPoolTaskExecutor, but exception occurrd while
> executing multi-thread.
>
> The API document mentioned that multi-threaded access should be properly
> synchronized, but I think the example code seems missing something for
> that. My understanding of Kafka is probably bad so that I guess I may be
> using the API and functions of kafka wrongly...
>
>
> Could you give me a good sample code how to develop the multi-threaded
> consumer or any advice? Do I need to use commitSync() method somewhere to
> solve the problem?
>
> Thanks in advance!
>
> Best regards
>
> bgkim




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: kafka 0.9.0.1: FATAL exception on startup

2016-03-22 Thread James Cheng
Hi, we ran into this problem too. The only way we were able to bypass this was 
by stopping Kafka and deleting the log directory of the affected partition. 
Which means, we lost data for that partition on this broker.

-James

> On Mar 8, 2016, at 1:07 AM, Anatoly Deyneka  wrote:
>
> Hi,
>
> I need your advice how to start server in the next situation:
> It fails on startup with FATAL error:
> [2016-03-07 16:30:53,495] FATAL Fatal error during KafkaServerStartable 
> startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> kafka.common.InvalidOffsetException: Attempt to append an offset (211046544) 
> to position 40048 no larger than the last offset appended (211046546) to 
> xyz/000210467262.index.
>at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>at kafka.log.LogSegment.recover(LogSegment.scala:188)
>at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>at kafka.log.Log.loadSegments(Log.scala:160)
>at kafka.log.Log.(Log.scala:90)
>at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>at java.lang.Thread.run(Thread.java:744)
>
> http://stackoverflow.com/questions/35849673/kafka-0-9-0-1-fails-with-fatal-exception-on-startup
>
> Thanks,
> Anatoly
> 
>
> The information contained in this email is strictly confidential and for the 
> use of the addressee only, unless otherwise indicated. If you are not the 
> intended recipient, please do not read, copy, use or disclose to others this 
> message or any attachment. Please also notify the sender by replying to this 
> email or by telephone (+44 (0)20 7896 0011) and then delete the email and any 
> copies of it. Opinions, conclusions (etc) that do not relate to the official 
> business of this company shall be understood as neither given nor endorsed by 
> it. IG Group Holdings plc is a company registered in England and Wales under 
> number 04677092. VAT registration number 761 2978 07. Registered Office: 
> Cannon Bridge House, 25 Dowgate Hill, London EC4R 2YA. Listed on the London 
> Stock Exchange. Its subsidiaries IG Markets Limited and IG Index Limited are 
> authorised and regulated by the Financial Conduct Authority (IG Markets 
> Limited FCA registration number 195355 and IG Index Limited FCA registration 
> number 114059).-




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Reg. Partition Rebalancing

2016-03-29 Thread James Cheng

> On Mar 29, 2016, at 10:33 AM, Todd Palino  wrote:
>
> There’s two things that people usually mean when they talk about
> rebalancing.
>
> One is leader reelection, or preferred replica election, which is sometimes
> confusingly referred to as “leader rebalance”. This is when we ask the
> controller in the cluster to pick the preferred replica for all partitions
> and change which broker is the leader (as appropriate). This is very useful
> when you have to take a broker offline for maintenance, as you can quickly
> make it take over leadership for partitions once it is back online. The
> controller picks the preferred leader pretty simply: the replica list is an
> array, and the controller picks the first broker listed in the array that
> is currently in sync as preferred. This means that the PLE is
> deterministic, and will always give you the same partition leadership
> (assuming the replicas are in sync).
>
> There is an admin CLI command to trigger the preferred replica election
> manually. There is also a broker configuration
> “auto.leader.rebalance.enable” which you can set to have the broker
> automatically perform the PLE when needed. DO NOT USE THIS OPTION. There
> are serious performance issues when doing so, especially on larger
> clusters. It needs some development work that has not been fully identified
> yet.
>

Todd,

What do you mean specifically by "serious performance issues"? I know that if 
you enable "auto.leader.rebalance.enable", then the broker(s) will do the 
reassignment whenever they want, at a time you can't predict. And the 
reassignment can move a lot of data around the cluster, and therefore it is 
undesireable to move so much data around at unpredictable times.

Is that the main performance issue you were talking about, or was there 
something else?

-James


> The other thing we mean by rebalance is partition rebalancing, or changing
> which brokers are replicas for a given partition to spread out the
> partitions according to some algorithm. This is something that you want to
> do when you add a broker to a cluster (or remove it), to redistribute load
> within the cluster. It’s also useful periodically to make sure you have a
> good spread of load, especially as topics change patterns (like ramping new
> features).
>
> While there are admin tools to perform partition reassignments, the brokers
> are not yet that great about rebalancing partitions. There is also
> currently no automated way of doing this, which is OK because it involves
> moving a lot of data around. Internally at LinkedIn we have some scripts we
> use for more intelligently balancing partitions to assure even balances
> based on a number of criteria. I’m hoping to have more to say about this
> later this week.
>
> -Todd
>
>
> On Tue, Mar 29, 2016 at 7:27 AM, Srikanth Chandika 
> wrote:
>
>> Hi,
>>
>> I am new to kafka I am testing all the options in kafka.
>> I am confused about the re-balancing?
>> How and where to configure the re-balancing option?
>>
>> Regards,
>> Srikanth
>>
>
>
>
> --
> *—-*
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Consumers disappearing form __consumer_offsets

2016-04-11 Thread James Cheng
This may be related to offsets.retention.minutes.

offsets.retention.minutes
Log retention window in minutes for offsets topic

It defaults to 1440 minutes = 24 hours.

-James

> On Apr 11, 2016, at 1:36 PM, Morellato, Wanny  
> wrote:
>
> Hi,
>
> I am trying to figure out why some of my consumers disappears from the list 
> of active consumers…
> This is happening in my QA environment where sometimes no messages get 
> published over the weekend.
>
> I am wondering if it is related to the default 24 hours 
> log.cleaner.delete.retention.ms
> If that is the case… what would be the best way to increase that value just 
> for the __consumer_offsets topic?
>
>
> Thanks
>
> Wanny
>
>
> 
>
> This e-mail message is authorized for use by the intended recipient only and 
> may contain information that is privileged and confidential. If you received 
> this message in error, please call us immediately at (425) 590-5000 and ask 
> to speak to the message sender. Please do not copy, disseminate, or retain 
> this message unless you are the intended recipient. In addition, to ensure 
> the security of your data, please do not send any unencrypted credit card or 
> personally identifiable information to this email address. Thank you.




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: unknown (kafka) offsets after restart

2016-05-06 Thread James Cheng
Is the log compaction thread correctly working? The offsets are stored in a log 
compacted topic, and we have seen issues where the log cleaner thread dies and 
therefore the offsets topic just grows forever, which means it will take a long 
time to read in the topic.

You can look in the log-cleaner.log debuglog file to see if there are any error 
messages there.

-James


> On May 6, 2016, at 6:28 AM, Jörg Wagner  wrote:
> 
> After a bit more looking into this we found out that the Offsetmanager is 
> single threaded and due to our setup (few, powerful servers: rather bad for 
> kafka I know..) it seems we are limiting ourselves severely by using kafka 
> offsets.
> 
> Any more insight is still welcome of course.
> 
> 
>  Forwarded Message 
> Subject:  unknown (kafka) offsets after restart
> Date: Fri, 6 May 2016 14:12:24 +0200
> From: Jörg Wagner 
> Reply-To: users@kafka.apache.org
> To:   users@kafka.apache.org
> 
> 
> 
> We're using Kafka 0.8.2 and are puzzled by the offset behaviour when
> they are stored in kafka topics.
> 
> Upon restart of the Kafka cluster (e.g. due to reconfiguration) it can
> happen that the offsets are unknown and therefore stop consumers from
> consuming without knowing their offset.
> 
> kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group
> --zookeeper localhost:2181
> Could not fetch offset for [topic,19] due to
> kafka.common.OffsetsLoadInProgressException.
> 
> Group   Topic  Pid Offset
> logSize Lag Owner
> group   topic0   unknown 326606  unknown none
> 
> I currently have no other solution to this than to wait.. but it takes a
> very long time (hours.. the servers are hopelessly underutilized!), I
> would be grateful for any advice.
> 
> Thanks
> Jörg
> 
> 
> 



Do consumer offsets stored in zookeeper ever get cleaned up?

2016-05-19 Thread James Cheng
I know that when offsets get stored in Kafka, they get cleaned up based on the 
offsets.retention.minutes config setting. This happens when using the new 
consumer, or when using the old consumer but offsets.storage=kafka.

If using the old consumer where offsets are stored in Zookeeper, do old offsets 
ever get removed?

Thanks,
-James



Will segments on no-traffic topics get deleted/compacted?

2016-05-19 Thread James Cheng
Time-based log retention only happens on old log segments. And log compaction 
only happens on old segments as well.

Currently, I believe segments only roll whenever a new record is written to the 
log. That is, during the write of the new record is when the current segment is 
evaluated to see if it should be rolled. Is that true?

That means that if there is *no* traffic on a topic, that the messages on disk 
may persist past the log retention time, or past the log compaction time. Is 
that the case? If so, is there any way to trigger rolling of a segment without 
active traffic on the topic?

Thanks!
-James



Re: Will segments on no-traffic topics get deleted/compacted?

2016-05-24 Thread James Cheng
I ran some tests, and also looked through the code. log.roll.ms isn't 
sufficient.

Segments appear to get rolled only if

log.roll.ms or segment.ms has passed
log.segment.bytes has passed

But those rules are only evaluated when a new record is appended to the log. So 
it appears that for in-active topics, segments will never roll.

See 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L383
and
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L617-L634

Tom, does that sound right?

-James


> On May 20, 2016, at 6:05 AM, Tom Crayford  wrote:
> 
> Hi there,
> 
> The missing piece is the config `log.roll.hours` or it's alternative `
> log.roll.ms`. Log segments are by default rolled once a week, regardless of
> activity, but you can tune that down as you like.
> 
> Thanks
> 
> Tom Crayford
> Heroku Kafka
> 
> On Fri, May 20, 2016 at 12:49 AM, James Cheng  wrote:
> 
>> Time-based log retention only happens on old log segments. And log
>> compaction only happens on old segments as well.
>> 
>> Currently, I believe segments only roll whenever a new record is written
>> to the log. That is, during the write of the new record is when the current
>> segment is evaluated to see if it should be rolled. Is that true?
>> 
>> That means that if there is *no* traffic on a topic, that the messages on
>> disk may persist past the log retention time, or past the log compaction
>> time. Is that the case? If so, is there any way to trigger rolling of a
>> segment without active traffic on the topic?
>> 
>> Thanks!
>> -James
>> 
>> 



Re: 10MB message

2016-06-15 Thread James Cheng
Igor,

This article talks about what to think about if putting large messages into 
Kafka: http://ingest.tips/2015/01/21/handling-large-messages-kafka/

The summary is that Kafka is not optimized for handling large messages, but if 
you really want to, it's possible to do it.

That website is having issues right now, so it may take a (long) while to load. 
I've notified the website owner to let her know.

-James

> On Jun 14, 2016, at 1:33 AM, R Krishna  wrote:
> 
> There are options to compress on the wire and in the topic.
> 
> On Tue, May 31, 2016 at 8:35 AM, Igor Kravzov 
> wrote:
> 
>> In our system some data can be as big as 10MB.
>> Is it OK to send 10 MB message through Kafka?  What configuration
>> parameters should I check/set?
>> It is going to be one topic with one consumer - Apache NiFi GetKafka
>> processor.
>> Is one partition enough?
>> 
> 
> 
> 
> -- 
> Radha Krishna, Proddaturi
> 253-234-5657



Re: Halting because log truncation is not allowed for topic __consumer_offsets

2016-06-26 Thread James Cheng
Peter, can you add some of your observations to those JIRAs? You seem to have a 
good understanding of the problem. Maybe there is something that can be 
improved in the codebase to prevent this from happening, or reduce the impact 
of it.

Wanny, you might want to add a "me too" to the JIRAs as well.

FYI, I was the original filer of one of KAFKA-3410. My repro only talks about 
the case where a broker was completely wiped, but we've seen it happen during 
normal operation as well.

-James

Sent from my iPhone

> On Jun 26, 2016, at 1:41 PM, Peter Davis  wrote:
> 
> We have seen this several times and it's quite frustrating.  It seems to 
> happen due to the fact that the leader for a partition writes to followers 
> ahead of committing itself, especially for a topic like __consumer_offsets 
> that is written with acks=all.  If a brief network interruption occurs (as 
> seems to happen quite regularly for us in a virtualized environment), for a 
> low-to-medium-throughput topic like __consumer_offsets, the follower may 
> recover "too quickly" -- it has more messages than the leader because it was 
> written ahead, but recovers before enough more messages are written to the 
> leader and remaining ISR such that the leader gains a higher high water mark. 
>  So the replica halts due to a supposed unclean leader election.  Usually, 
> just waiting a minute before restarting the halted broker solves the issue: 
> more messages have been written, the leader has a higher HW, and the replica 
> will happily truncate itself and recover.  At least, that is my theory -- 
> it's been a journey understanding Kafka's details well enough!
> 
> This happens with 0.10.0 and occurs even with min.insync.replicas=2 (majority 
> of 3 replicas).  In fact the problem can be amplified by setting min.isr: if 
> fewer than minimum replicas are available, then it can be impossible to write 
> more messages to the leader as above, so the only way to recover is to delete 
> data files from the halted follower.  Similar for very low-throughput topics. 
>  At the same time, without min.insync.replicas enforcing a quorum, the risk 
> of a true unclean leader election or data loss is increased -- a double edged 
> sword!
> 
> It seems related to https://issues.apache.org/jira/browse/KAFKA-3410 or 
> https://issues.apache.org/jira/browse/KAFKA-3861 but happens even under 
> normal conditions (no data loss required!)
> 
> Anyone else have suggestions?  **Brokers halting due to a simple network 
> hiccup is, shall we say, not good!**
> 
> -Peter
> 
> 
>> On Jun 25, 2016, at 12:28 PM, Morellato, Wanny  
>> wrote:
>> 
>> Hi all,
>> 
>> My kafka brokers (0.9.0.1) are refusing to restart and they return the 
>> following error
>> 
>> Halting because log truncation is not allowed for topic __consumer_offsets, 
>> Current leader 11's latest offset 79445540 is less than replica 13's latest 
>> offset 79445565 (kafka.server.ReplicaFetcherThread)
>> 
>> Deleting the topic __consumer_offsets from those servers seam to fix the 
>> problem…
>> 
>> From what I understand this should result in some duplicate delivery…
>> If that is the case, is it possible to configure kafka in a way that it will 
>> automatically recover from this type of failure?
>> 
>> Thanks
>> 
>> Wanny
>> 
>> 
>> 
>> 
>> This e-mail message is authorized for use by the intended recipient only and 
>> may contain information that is privileged and confidential. If you received 
>> this message in error, please call us immediately at (425) 590-5000 and ask 
>> to speak to the message sender. Please do not copy, disseminate, or retain 
>> this message unless you are the intended recipient. In addition, to ensure 
>> the security of your data, please do not send any unencrypted credit card or 
>> personally identifiable information to this email address. Thank you.


Re: kafka + autoscaling groups fuckery

2016-07-03 Thread James Cheng
Charity,

I'm not sure about the specific problem you are having, but about Kafka on AWS, 
Netflix did a talk at a meetup about their Kafka installation on AWS. There 
might be some useful information in there. There is a video stream as well as 
slides, and maybe you can get in touch with the speakers. Look in the comment 
section for links to the slides and video. 

Kafka at Netflix
http://www.meetup.com//http-kafka-apache-org/events/220355031/?showDescription=true

There's also a talk about running Kafka on Mesos, which might be relevant.

Kafka on Mesos
http://www.meetup.com//http-kafka-apache-org/events/222537743/?showDescription=true

-James

Sent from my iPhone

> On Jul 2, 2016, at 5:15 PM, Charity Majors  wrote:
> 
> Gwen, thanks for the response.
> 
> 1.1 Your life may be a bit simpler if you have a way of starting a new
> 
>> broker with the same ID as the old one - this means it will
>> automatically pick up the old replicas and you won't need to
>> rebalance. Makes life slightly easier in some cases.
> 
> Yeah, this is definitely doable, I just don't *want* to do it.  I really
> want all of these to share the same code path: 1) rolling all nodes in an
> ASG to pick up a new AMI, 2) hardware failure / unintentional node
> termination, 3) resizing the ASG and rebalancing the data across nodes.
> 
> Everything but the first one means generating new node IDs, so I would
> rather just do that across the board.  It's the solution that really fits
> the ASG model best, so I'm reluctant to give up on it.
> 
> 
>> 1.2 Careful not too rebalance too many partitions at once - you only
>> have so much bandwidth and currently Kafka will not throttle
>> rebalancing traffic.
> 
> Nod, got it.  This is def something I plan to work on hardening once I have
> the basic nut of things working (or if I've had to give up on it and accept
> a lesser solution).
> 
> 
>> 2. I think your rebalance script is not rebalancing the offsets topic?
>> It still has a replica on broker 1002. You have two good replicas, so
>> you are no where near disaster, but make sure you get this working
>> too.
> 
> Yes, this is another problem I am working on in parallel.  The Shopify
> sarama library  uses the
> __consumer_offsets topic, but it does *not* let you rebalance or resize the
> topic when consumers connect, disconnect, or restart.
> 
> "Note that Sarama's Consumer implementation does not currently support
> automatic consumer-group rebalancing and offset tracking"
> 
> I'm working on trying to get the sarama-cluster to do something here.  I
> think these problems are likely related, I'm not sure wtf you are
> *supposed* to do to rebalance this god damn topic.  It also seems like we
> aren't using a consumer group which sarama-cluster depends on to rebalance
> a topic.  I'm still pretty confused by the 0.9 "consumer group" stuff.
> 
> Seriously considering downgrading to the latest 0.8 release, because
> there's a massive gap in documentation for the new stuff in 0.9 (like
> consumer groups) and we don't really need any of the new features.
> 
> A common work-around is to configure the consumer to handle "offset
>> out of range" exception by jumping to the last offset available in the
>> log. This is the behavior of the Java client, and it would have saved
>> your consumer here. Go client looks very low level, so I don't know
>> how easy it is to do that.
> 
> Erf, this seems like it would almost guarantee data loss.  :(  Will check
> it out tho.
> 
> If I were you, I'd retest your ASG scripts without the auto leader
>> election - since your own scripts can / should handle that.
> 
> Okay, this is straightforward enough.  Will try it.  And will keep tryingn
> to figure out how to balance the __consumer_offsets topic, since I
> increasingly think that's the key to this giant mess.
> 
> If anyone has any advice there, massively appreciated.
> 
> Thanks,
> 
> charity.


Re: Read all record from a Topic.

2016-07-13 Thread James Cheng
Jean-Baptiste,

I wrote a blog post recently on this exact subject.

https://logallthethings.com/2016/06/28/how-to-read-to-the-end-of-a-kafka-topic/

Let me know if you find it useful.

-James

Sent from my iPhone

> On Jul 13, 2016, at 7:16 AM, g...@netcourrier.com wrote:
> 
> Hi,
> 
> 
> I'm using a compacted Kafka Topic to save the state of my application. When 
> the application crashes/restarts I can restore its state by reading the Kafka 
> topic.
> 
> 
> 
> However I need to read it completely, especially up to most recent record, to 
> be sure to restore all data.
> 
> 
> 
> Is there a standard way to to that? I've checked the Kafka streams code and 
> found that the class ProcessorStateManager seems to be doing something 
> similar.
> 
> 
> 
> It first gets the last offset by doing:
> 
> // calculate the end offset of the partition // TODO: this is a bit hacky to 
> first seek then position to get the end offset
> 
> restoreConsumer.seekToEnd(singleton(storePartition));
> 
> long endOffset = restoreConsumer.position(storePartition); 
> 
> 
> 
> Then it polls the records until reaching the endoffset (there is also an 
> other limit but I think it is related to an other use case).
> 
> 
> 
> I guess it works, but the TODO message makes me wonder if it is a good 
> solution and if it will continue to work in future releases.
> 
> 
> 
> Thanks for your help,
> 
> 
> 
> Jean-Baptiste
> 
> 
> 
> 
> 


Re: [ANNOUNCE] New committer: Jiangjie (Becket) Qin

2016-10-31 Thread James Cheng
Congrats, Becket!

-James

> On Oct 31, 2016, at 10:35 AM, Joel Koshy  wrote:
> 
> The PMC for Apache Kafka has invited Jiangjie (Becket) Qin to join as a
> committer and we are pleased to announce that he has accepted!
> 
> Becket has made significant contributions to Kafka over the last two years.
> He has been deeply involved in a broad range of KIP discussions and has
> contributed several major features to the project. He recently completed
> the implementation of a series of improvements (KIP-31, KIP-32, KIP-33) to
> Kafka’s message format that address a number of long-standing issues such
> as avoiding server-side re-compression, better accuracy for time-based log
> retention, log roll and time-based indexing of messages.
> 
> Congratulations Becket! Thank you for your many contributions. We are
> excited to have you on board as a committer and look forward to your
> continued participation!
> 
> Joel



Re: [ANNOUNCE] Apache Kafka 2.0.0 Released

2018-07-30 Thread James Cheng
Congrats and great job, everyone! Thanks Rajini for driving the release!

-James

Sent from my iPhone

> On Jul 30, 2018, at 3:25 AM, Rajini Sivaram  wrote:
> 
> The Apache Kafka community is pleased to announce the release for
> 
> Apache Kafka 2.0.0.
> 
> 
> 
> 
> 
> This is a major release and includes significant new features from
> 
> 40 KIPs. It contains fixes and improvements from 246 JIRAs, including
> 
> a few critical bugs. Here is a summary of some notable changes:
> 
> ** KIP-290 adds support for prefixed ACLs, simplifying access control
> management in large secure deployments. Bulk access to topics,
> consumer groups or transactional ids with a prefix can now be granted
> using a single rule. Access control for topic creation has also been
> improved to enable access to be granted to create specific topics or
> topics with a prefix.
> 
> ** KIP-255 adds a framework for authenticating to Kafka brokers using
> OAuth2 bearer tokens. The SASL/OAUTHBEARER implementation is
> customizable using callbacks for token retrieval and validation.
> 
> **Host name verification is now enabled by default for SSL connections
> to ensure that the default SSL configuration is not susceptible to
> man-in-the middle attacks. You can disable this verification for
> deployments where validation is performed using other mechanisms.
> 
> ** You can now dynamically update SSL trust stores without broker restart.
> You can also configure security for broker listeners in ZooKeeper before
> starting brokers, including SSL key store and trust store passwords and
> JAAS configuration for SASL. With this new feature, you can store sensitive
> password configs in encrypted form in ZooKeeper rather than in cleartext
> in the broker properties file.
> 
> ** The replication protocol has been improved to avoid log divergence
> between leader and follower during fast leader failover. We have also
> improved resilience of brokers by reducing the memory footprint of
> message down-conversions. By using message chunking, both memory
> usage and memory reference time have been reduced to avoid
> OutOfMemory errors in brokers.
> 
> ** Kafka clients are now notified of throttling before any throttling is
> applied
> when quotas are enabled. This enables clients to distinguish between
> network errors and large throttle times when quotas are exceeded.
> 
> ** We have added a configuration option for Kafka consumer to avoid
> indefinite blocking in the consumer.
> 
> ** We have dropped support for Java 7 and removed the previously
> deprecated Scala producer and consumer.
> 
> ** Kafka Connect includes a number of improvements and features.
> KIP-298 enables you to control how errors in connectors, transformations
> and converters are handled by enabling automatic retries and controlling the
> number of errors that are tolerated before the connector is stopped. More
> contextual information can be included in the logs to help diagnose problems
> and problematic messages consumed by sink connectors can be sent to a
> dead letter queue rather than forcing the connector to stop.
> 
> ** KIP-297 adds a new extension point to move secrets out of connector
> configurations and integrate with any external key management system.
> The placeholders in connector configurations are only resolved before
> sending the configuration to the connector, ensuring that secrets are stored
> and managed securely in your preferred key management system and
> not exposed over the REST APIs or in log files.
> 
> ** We have added a thin Scala wrapper API for our Kafka Streams DSL,
> which provides better type inference and better type safety during compile
> time. Scala users can have less boilerplate in their code, notably regarding
> Serdes with new implicit Serdes.
> 
> ** Message headers are now supported in the Kafka Streams Processor API,
> allowing users to add and manipulate headers read from the source topics
> and propagate them to the sink topics.
> 
> ** Windowed aggregations performance in Kafka Streams has been largely
> improved (sometimes by an order of magnitude) thanks to the new
> single-key-fetch API.
> 
> ** We have further improved unit testibility of Kafka Streams with the
> kafka-streams-testutil artifact.
> 
> 
> 
> 
> 
> All of the changes in this release can be found in the release notes:
> 
> https://www.apache.org/dist/kafka/2.0.0/RELEASE_NOTES.html
> 
> 
> 
> 
> 
> You can download the source and binary release (Scala 2.11 and Scala 2.12)
> from:
> 
> https://kafka.apache.org/downloads#2.0.0
> 
> 
> 
> 
> ---
> 
> 
> 
> 
> 
> Apache Kafka is a distributed streaming platform with four core APIs:
> 
> 
> 
> ** The Producer API allows an application to publish a stream records to
> 
> one or more Kafka topics.
> 
> 
> 
> ** The Consumer API allows an application to subscribe to one or more
> 
> topic

  1   2   >