How to design a robust producer?

2014-01-30 Thread Thibaud Chardonnens
Hello — I am struggling about how to design a robust implementation of a producer. My use case is quite simple: I want to process a relatively big stream (~8MB/s) with Storm. Kafka will be used as intermediate between the stream and Storm. The stream is sent to a specific server on a specific

How to design a robust producer?

2014-01-30 Thread Thibaud Chardonnens
Hello — I am struggling about how to design a robust implementation of a producer. My use case is quite simple: I want to process a relatively big stream (~8MB/s) with Storm. Kafka will be used as intermediate between the stream and Storm. The stream is sent to a specific server on a specific

Re: How to design a robust producer?

2014-01-30 Thread Philip O'Toole
What exactly are you struggling with? Your question is too broad. What you want to do is eminently possible, having done it myself from scratch. Philip > On Jan 30, 2014, at 6:00 AM, Thibaud Chardonnens wrote: > > Hello — I am struggling about how to design a robust implementation of a > pro

Re: How to design a robust producer?

2014-01-30 Thread Tom Brown
Why go with a fancy multithreaded producer architecture? Why not rely on a simple python/perl/whatever implementation and let a scalable web server handle the threading issues?

Re: How to design a robust producer?

2014-01-30 Thread Thibaud Chardonnens
Thanks for your reply, but I am missing something, how do you push the data to a specific topic in your example? Through which client? Le 30 janv. 2014 à 15:16, Tom Brown a écrit : > Why go with a fancy multithreaded producer architecture? Why not rely on a > simple python/perl/whatever impleme

Re: How to design a robust producer?

2014-01-30 Thread Thibaud Chardonnens
Thanks for your quick answer. Yes, sorry it's probably too broad but my main question was if there is any best practices to build a robust, fault-tolerant producer that guarantees that no data will be dropped while listening on the port. From my point of view the producer will be the most critic

Re: How to design a robust producer?

2014-01-30 Thread Philip O'Toole
Well, you could start by looking at the Kafka Producer source code for some ideas. We have built plenty of solid software on that. As to your goal of building something solid, robust, and critical. All I can say is you then need to keep your Producer as simple as possible -- the simpler it is, the

Re: How to design a robust producer?

2014-01-30 Thread Clark Breyman
Thibaud, Sounds like one of your issues will be upstream of Kafka. Robust and UDP aren't something I usually think of together unless you have additional bookkeeping to detect and request lost messages. 8MB/s shouldn't be much of a problem unless the messages are very small and looking for individ

Problems encountered during the consumer shutdown.

2014-01-30 Thread paresh shah
This is on the 0.8.0-beta1 code base. So if I understand right, moving to 0.8.0 should not cause the above problem? Is there a specific change that fixes this behaviour change in 0.8.0? Paresh

Re: How to design a robust producer?

2014-01-30 Thread Andrew Otto
Thibaud, I wouldn't say this is a 'robust' solution, but the Wikimedia Foundation uses a piece of software we wrote called udp2log. We are in the process of replacing it with more robust direct Kafka producers, but it has worked for us in the intermediary. udp2log is a c++ daemon that listens f

Re: Cannot cast to kafka.producer.async.CallbackHandler

2014-01-30 Thread Jun Rao
Yes, you should implement kafka.javaapi.producer.async package. Internally, we wrap that callback with a scala callback. When instantiating the producer, you need to provide 2 types, the first one for key and the second one for value. Make sure the second one is of type byte[]. Thanks, Jun On W

Re: Problems encountered during the consumer shutdown.

2014-01-30 Thread Jun Rao
Not sure if 0.8.0 solved that particular problem, but it's more stable. So, it's worth a try. Thanks, Jun On Thu, Jan 30, 2014 at 7:35 AM, paresh shah wrote: > This is on the 0.8.0-beta1 code base. So if I understand right, moving to > 0.8.0 should not cause the above problem? Is there a spec

Re: Cannot cast to kafka.producer.async.CallbackHandler

2014-01-30 Thread Patricio Echagüe
Jun, we've been using Kafka for more than two years. Both the key and the value are type string. That doesn't seem to be the problem. I just can't start the application when setting the callback handler which I tried with string and byte[]. The reason I want to use the handler is to send metrics

Metadata error always correlates with LeaderAndIsr request

2014-01-30 Thread Marek Dolgos
We are seeing the following errors in our logs: [2014-01-30 15:18:40,736] 2134373909 [kafka-request-handler-3] ERROR kafka.server.KafkaApis - [KafkaApi-10881778] Error while fetching metadata for partition [las_01_scsRawHits,0] then they are always preceded or followed, within the same second

Re: How to design a robust producer?

2014-01-30 Thread Tom Brown
I hadn't noticed the UDP requirement before, that does complicate things, and unless you're in absolute control of the network path, some data loss is virtually guaranteed. Are you allowed to have more than one "collector/producer" machine so that that if one fails you won't be stuck? If you can ha

Re: Cannot cast to kafka.producer.async.CallbackHandler

2014-01-30 Thread Patricio Echagüe
one more thing. Using the scala callback handler from java code seems to work but I'm having a hard time creating scala Seqs from java code to make my handler compatible with the scala signature. On Thu, Jan 30, 2014 at 8:10 AM, Patricio Echagüe wrote: > Jun, we've been using Kafka for more than

Re: Java 8 influence on next-generation Java producer and consumer APIs

2014-01-30 Thread Jay Kreps
I think this is a good point and you are not the first person to bring it up. I am not hugely knowledgable about java 8 so any feedback would be useful. In the producer I think the biggest impact is that the Callback can be implemented with a lambda instead of a anon class which will be much nice

RE: Kafka performance test: "--request-num-acks -1" kills throughput

2014-01-30 Thread Michael Popov
Hi Jun, I ran a new new tests today. Settings: 2 Kafka brokers, 1 Zookeeper node, 4 client machines. Creating a new topic with 1 partition for each client process. Commands to create a topic looked like this one: bin/kafka-create-topic.sh --partition 1 --replica 2 --zookeeper 10.0.0.4:2181 -

There is insufficient memory for the Java Runtime Environment to continue.

2014-01-30 Thread David Montgomery
Hi, <%20us...@kafka.apache.org> Why oh whhy can I nt start kafka 8? I am on a machine with 512 megs of ram on digi ocean. What does one have to do to get kafka to work? root@do-kafka-sf-development-20140130051956: export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M root@do-kafka-sf-development-20140130

Re: There is insufficient memory for the Java Runtime Environment to continue.

2014-01-30 Thread Jay Kreps
I think this may be more a general java thing. Can you try running any java class with the same command line options you are using for kafka and confirm that that also doesn't work. -Jay On Thu, Jan 30, 2014 at 11:23 AM, David Montgomery < davidmontgom...@gmail.com> wrote: > Hi, <%20us...@kafka.

Add partitions command note

2014-01-30 Thread Marc Labbe
Hi, I am going through the tools documentations for our own ops team. While doing so, I noticed the following note from the add partition tool: ***Please note that this tool can only be used to add partitions when data for a topic does not use a key.*** This message left me a bit dubious as to w

Re: There is insufficient memory for the Java Runtime Environment to continue.

2014-01-30 Thread David Montgomery
Hi, This is a dedicated machine on DO.But i can say I did not have a problem with kafka 7. I just upgraded the macine to 1gig on digi ocean. Same error. export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" root@do-kafka-sf-development-20140130195343:/etc/supervisor/conf.d# /var/lib/kafka-0.8.0-src/bin/kaf

Re: There is insufficient memory for the Java Runtime Environment to continue.

2014-01-30 Thread Benjamin Black
are you sure the java opts are being set as you expect? On Jan 30, 2014 12:41 PM, "David Montgomery" wrote: > Hi, > > This is a dedicated machine on DO.But i can say I did not have a > problem with kafka 7. > I just upgraded the macine to 1gig on digi ocean. Same error. > > export KAFKA_HEAP

Re: Add partitions command note

2014-01-30 Thread Jay Kreps
Yeah this is confusing. What they are trying to say is that adding partitions doesn't move data that is already in existing partitions. I.e. say you have 10 partitions (0-9) each containing 1GB of data and you add an 11th partition. When this is complete partitions 0-9 will contain the exact same

RE: Kafka performance test: "--request-num-acks -1" kills throughput

2014-01-30 Thread Michael Popov
Hi Neha, I am almost sure the root of the problem is not on the client side. I ran tests with different Kafka client library implementations and got similar results. In my tests I "saturated" servers with load coming from 40 processes running on 4 different hosts, so blocking producers even wit

Re: Add partitions command note

2014-01-30 Thread Marc Labbe
Yep, I got that after digging a bit, but thanks for the additional wording. I would update the wiki with this explanation. I also think it would be wise to add notes to other commands that will be replaced by TopicCommand (kafka-topics.sh) so people know where the commands are after they migrate

Re: Cannot cast to kafka.producer.async.CallbackHandler

2014-01-30 Thread Patricio Echagüe
Update: There was no way to make it work with the javaapi one. I made it work using the scala interface from java. The code looks a bit ugly as I had to create a scala set from java. For example: @Override public Seq> afterDequeuingExistingData(QueueItem queueItem) { if (queueItem == null) { r

Re: Cannot cast to kafka.producer.async.CallbackHandler

2014-01-30 Thread Neha Narkhede
Sure. However, we are working actively on a new producer API for Kafka. It will be good if you can take a look at it and provide feedback - https://www.mail-archive.com/dev@kafka.apache.org/msg07187.html Thanks, Neha On Thu, Jan 30, 2014 at 2:28 PM, Patricio Echagüe wrote: > Update: > > There w

Re: New Producer Public API

2014-01-30 Thread Jay Kreps
One downside to the 1A proposal is that without a Partitioner interface we can't really package up and provide common partitioner implementations. Example of these would be 1. HashPartitioner - The default hash partitioning 2. RoundRobinPartitioner - Just round-robins over partitions 3. ConnectionM

Re: New Producer Public API

2014-01-30 Thread Jun Rao
With option 1A, if we increase # partitions on a topic, how will the producer find out newly created partitions? Do we expect the producer to periodically call getCluster()? As for ZK dependency, one of the goals of client rewrite is to reduce dependencies so that one can implement the client in l

Re: New Producer Public API

2014-01-30 Thread Joel Koshy
Does it preclude those various implementations? i.e., it could become a producer config: default.partitioner.strategy="minimize-connections"/"roundrobin" - and so on; and implement those partitioners internally in the producer. Not as clear as a .class config, but it accomplishes the same effect no

Re: New Producer Public API

2014-01-30 Thread Joel Koshy
+ dev (this thread has become a bit unwieldy) On Thu, Jan 30, 2014 at 5:15 PM, Joel Koshy wrote: > Does it preclude those various implementations? i.e., it could become > a producer config: > default.partitioner.strategy="minimize-connections"/"roundrobin" - and > so on; and implement those par

Re: New Producer Public API

2014-01-30 Thread Jay Kreps
I thought a bit about it and I think the getCluster() thing was overly simplistic because we try to only maintain metadata about the current set of topics the producer cares about so the cluster might not have the partitions for the topic the user cares about. I think actually what we need is a new

Re: New Producer Public API

2014-01-30 Thread Jay Kreps
Joel-- Yeah we could theoretically retain a neutered Partitioner interface that only had access to the byte[] key not the original object (which we no longer have). Ideologically most partitioning should really happen based on the byte[] not the original object to retain multi-language compatibili