Hello — I am struggling about how to design a robust implementation of a
producer.
My use case is quite simple:
I want to process a relatively big stream (~8MB/s) with Storm. Kafka will be
used as intermediate between the stream and Storm. The stream is sent to a
specific server on a specific
Hello — I am struggling about how to design a robust implementation of a
producer.
My use case is quite simple:
I want to process a relatively big stream (~8MB/s) with Storm. Kafka will be
used as intermediate between the stream and Storm. The stream is sent to a
specific server on a specific
What exactly are you struggling with? Your question is too broad. What you want
to do is eminently possible, having done it myself from scratch.
Philip
> On Jan 30, 2014, at 6:00 AM, Thibaud Chardonnens wrote:
>
> Hello — I am struggling about how to design a robust implementation of a
> pro
Why go with a fancy multithreaded producer architecture? Why not rely on a
simple python/perl/whatever implementation and let a scalable web server
handle the threading issues?
Thanks for your reply, but I am missing something, how do you push the data to
a specific topic in your example? Through which client?
Le 30 janv. 2014 à 15:16, Tom Brown a écrit :
> Why go with a fancy multithreaded producer architecture? Why not rely on a
> simple python/perl/whatever impleme
Thanks for your quick answer.
Yes, sorry it's probably too broad but my main question was if there is any
best practices to build a robust, fault-tolerant producer that guarantees that
no data will be dropped while listening on the port.
From my point of view the producer will be the most critic
Well, you could start by looking at the Kafka Producer source code for some
ideas. We have built plenty of solid software on that.
As to your goal of building something solid, robust, and critical. All I
can say is you then need to keep your Producer as simple as possible -- the
simpler it is, the
Thibaud,
Sounds like one of your issues will be upstream of Kafka. Robust and UDP
aren't something I usually think of together unless you have additional
bookkeeping to detect and request lost messages. 8MB/s shouldn't be much of
a problem unless the messages are very small and looking for individ
This is on the 0.8.0-beta1 code base. So if I understand right, moving to
0.8.0 should not cause the above problem? Is there a specific change that fixes
this behaviour change in 0.8.0?
Paresh
Thibaud, I wouldn't say this is a 'robust' solution, but the Wikimedia
Foundation uses a piece of software we wrote called udp2log. We are in the
process of replacing it with more robust direct Kafka producers, but it has
worked for us in the intermediary. udp2log is a c++ daemon that listens f
Yes, you should implement kafka.javaapi.producer.async package. Internally,
we wrap that callback with a scala callback. When instantiating the
producer, you need to provide 2 types, the first one for key and the second
one for value. Make sure the second one is of type byte[].
Thanks,
Jun
On W
Not sure if 0.8.0 solved that particular problem, but it's more stable. So,
it's worth a try.
Thanks,
Jun
On Thu, Jan 30, 2014 at 7:35 AM, paresh shah wrote:
> This is on the 0.8.0-beta1 code base. So if I understand right, moving to
> 0.8.0 should not cause the above problem? Is there a spec
Jun, we've been using Kafka for more than two years. Both the key and the
value are type string. That doesn't seem to be the problem.
I just can't start the application when setting the callback handler which
I tried with string and byte[].
The reason I want to use the handler is to send metrics
We are seeing the following errors in our logs:
[2014-01-30 15:18:40,736] 2134373909 [kafka-request-handler-3] ERROR
kafka.server.KafkaApis - [KafkaApi-10881778] Error while fetching metadata for
partition [las_01_scsRawHits,0]
then they are always preceded or followed, within the same second
I hadn't noticed the UDP requirement before, that does complicate things,
and unless you're in absolute control of the network path, some data loss
is virtually guaranteed. Are you allowed to have more than one
"collector/producer" machine so that that if one fails you won't be stuck?
If you can ha
one more thing. Using the scala callback handler from java code seems to
work but I'm having a hard time creating scala Seqs from java code to make
my handler compatible with the scala signature.
On Thu, Jan 30, 2014 at 8:10 AM, Patricio Echagüe wrote:
> Jun, we've been using Kafka for more than
I think this is a good point and you are not the first person to bring it
up.
I am not hugely knowledgable about java 8 so any feedback would be useful.
In the producer I think the biggest impact is that the Callback can be
implemented with a lambda instead of a anon class which will be much nice
Hi Jun,
I ran a new new tests today.
Settings: 2 Kafka brokers, 1 Zookeeper node, 4 client machines.
Creating a new topic with 1 partition for each client process.
Commands to create a topic looked like this one:
bin/kafka-create-topic.sh --partition 1 --replica 2 --zookeeper
10.0.0.4:2181 -
Hi, <%20us...@kafka.apache.org>
Why oh whhy can I nt start kafka 8? I am on a machine with 512 megs of ram
on digi ocean. What does one have to do to get kafka to work?
root@do-kafka-sf-development-20140130051956: export
KAFKA_HEAP_OPTS="-Xmx256M -Xms128M
root@do-kafka-sf-development-20140130
I think this may be more a general java thing. Can you try running any java
class with the same command line options you are using for kafka and
confirm that that also doesn't work.
-Jay
On Thu, Jan 30, 2014 at 11:23 AM, David Montgomery <
davidmontgom...@gmail.com> wrote:
> Hi, <%20us...@kafka.
Hi,
I am going through the tools documentations for our own ops team. While
doing so, I noticed the following note from the add partition tool:
***Please note that this tool can only be used to add partitions when data
for a topic does not use a key.***
This message left me a bit dubious as to w
Hi,
This is a dedicated machine on DO.But i can say I did not have a
problem with kafka 7.
I just upgraded the macine to 1gig on digi ocean. Same error.
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
root@do-kafka-sf-development-20140130195343:/etc/supervisor/conf.d#
/var/lib/kafka-0.8.0-src/bin/kaf
are you sure the java opts are being set as you expect?
On Jan 30, 2014 12:41 PM, "David Montgomery"
wrote:
> Hi,
>
> This is a dedicated machine on DO.But i can say I did not have a
> problem with kafka 7.
> I just upgraded the macine to 1gig on digi ocean. Same error.
>
> export KAFKA_HEAP
Yeah this is confusing.
What they are trying to say is that adding partitions doesn't move data
that is already in existing partitions. I.e. say you have 10 partitions
(0-9) each containing 1GB of data and you add an 11th partition. When this
is complete partitions 0-9 will contain the exact same
Hi Neha,
I am almost sure the root of the problem is not on the client side. I ran tests
with different Kafka client library implementations and got similar results. In
my tests I "saturated" servers with load coming from 40 processes running on 4
different hosts, so blocking producers even wit
Yep, I got that after digging a bit, but thanks for the additional wording.
I would update the wiki with this explanation.
I also think it would be wise to add notes to other commands that will be
replaced by TopicCommand (kafka-topics.sh) so people know where the
commands
are after they migrate
Update:
There was no way to make it work with the javaapi one.
I made it work using the scala interface from java. The code looks a bit
ugly as I had to create a scala set from java.
For example:
@Override
public Seq> afterDequeuingExistingData(QueueItem
queueItem) {
if (queueItem == null) {
r
Sure. However, we are working actively on a new producer API for Kafka. It
will be good if you can take a look at it and provide feedback -
https://www.mail-archive.com/dev@kafka.apache.org/msg07187.html
Thanks,
Neha
On Thu, Jan 30, 2014 at 2:28 PM, Patricio Echagüe wrote:
> Update:
>
> There w
One downside to the 1A proposal is that without a Partitioner interface we
can't really package up and provide common partitioner implementations.
Example of these would be
1. HashPartitioner - The default hash partitioning
2. RoundRobinPartitioner - Just round-robins over partitions
3. ConnectionM
With option 1A, if we increase # partitions on a topic, how will the
producer find out newly created partitions? Do we expect the producer to
periodically call getCluster()?
As for ZK dependency, one of the goals of client rewrite is to reduce
dependencies so that one can implement the client in l
Does it preclude those various implementations? i.e., it could become
a producer config:
default.partitioner.strategy="minimize-connections"/"roundrobin" - and
so on; and implement those partitioners internally in the producer.
Not as clear as a .class config, but it accomplishes the same effect
no
+ dev
(this thread has become a bit unwieldy)
On Thu, Jan 30, 2014 at 5:15 PM, Joel Koshy wrote:
> Does it preclude those various implementations? i.e., it could become
> a producer config:
> default.partitioner.strategy="minimize-connections"/"roundrobin" - and
> so on; and implement those par
I thought a bit about it and I think the getCluster() thing was overly
simplistic because we try to only maintain metadata about the current set
of topics the producer cares about so the cluster might not have the
partitions for the topic the user cares about. I think actually what we
need is a new
Joel--
Yeah we could theoretically retain a neutered Partitioner interface that
only had access to the byte[] key not the original object (which we no
longer have). Ideologically most partitioning should really happen based on
the byte[] not the original object to retain multi-language compatibili
34 matches
Mail list logo