HI,
> as far as i understand, log retention time in kafka will delete message
> that's older than the retention time.
log retention is applicable to log segment files. In kafka, each topic
can have multiple
partitions and each partition data stored in multiple log segment files.
> say i h
Thanks Jay. The producer metrics from jconsole is quite helpful.
I've switched to the new producer and run producer benchmark with
*/usr/lib/kafka/bin/kafka-run-class.sh
org.apache.kafka.clients.tools.ProducerPerformance topic1 5 1000 -1
acks=1 bootstrap.servers=node1:9092,node2:9092,node
Hi,
We are using an async producer to send data to a kafka cluster.The event rate
at peak is around 250 events/second of size 25KB each.
In the producer code base we have added specific debug statements to capture
the time taken to create a producer,create a keyed message with a byte payload
&
Thank you, Guozhang. All clear now.
On Thu, Nov 20, 2014 at 1:29 AM, Guozhang Wang wrote:
> Hi Shlomi,
>
> By just use "kafka-topics.sh --zookeeper localhost:2181 --alter --topic
> test_topic --partitions 8" the controller will auto assign replicas to the
> newly added partitions, but will not t
Hi all,
I had a question about TTL changes to a topic’s configuration. When I make
the change and restart kafka, will kafka apply the changes to all existing
messages on disk retroactively or would it do it only for new messages entering
the topic?
Regards
Parag
Hi all,
This question is related to node recovery. Say, there is a hardware failure
and the node cannot be brought up. But I can access the hard drives and recover
the data. Is it possible for me to stash the contents of the /tmp/kafka-logs/
onto another instance and have it be a replacemen
Hello,
We're looking into using Kafka for a improved version of a system and the
question of how to scale Kafka came up. Specifically, we want to try to make
the system scale as transparently as possible. The concern was that if we go
from N to N*2 consumers that we would have some that are s
Great. There is a single I/O thread per producer client that does the
sending so it could be either that the sender thread or that thread is just
pegged. One way to dive in and see what is happening is to add the command
line option
* -agentlib:hprof=cpu=samples,depth=10*
This will tell us where t
Hi,
We ran into the corrupted index problem and the broker fails to start. I have
followed the solved JIRAs and the issue is not similar to those and the fixes
didn’t apply. Full details in the JIRA.
Also, when the index is deleted, it is getting created again and it too is
corrupted.
https:/
Yes, this will work. You will want to configure the new instance with the
same node id as the failed instance.
-Jay
On Thu, Nov 20, 2014 at 10:06 AM, Parag Shah wrote:
> Hi all,
>
> This question is related to node recovery. Say, there is a hardware
> failure and the node cannot be brought
queue.enqueue.timeout.ms only applies if the producer is configured in
async mode.
Thanks,
Jun
On Thu, Nov 20, 2014 at 5:01 AM, Devendra Tagare <
devendra.tag...@pubmatic.com> wrote:
> Hi,
>
> We are using an async producer to send data to a kafka cluster.The event
> rate at peak is around 250
The retention configure change will be applied on existing data too.
Thanks,
Jun
On Thu, Nov 20, 2014 at 10:00 AM, Parag Shah wrote:
> Hi all,
>
> I had a question about TTL changes to a topic’s configuration. When I
> make the change and restart kafka, will kafka apply the changes to all
Hi all,
I'm a beginner of Kafka, currently I'm stuck by how to send out a KeyedMessage
by producer. I would like to design a partition
function to route the message based on the key, but the producer cannot send
the KeyedMessage and I got this exception:
java.lang.ClassCastException: [B cannot b
I've been looking at the new producer api with anticipation, but have not
fired it up yet.
One question I have, is it looks like there's no longer a 'batch' send mode
(and I get that this is all now handled internally, e.g. you send
individual messages, that then get collated and batched up and se
Hi Haoming,
Take a look at the code here
https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
for your partKey it should be string and when you converting it into
byte array you can use partKey.getBytes("UTF8")
-Harsha
On Thu, Nov 20, 2014, a
Hi Harsha,
Thanks for suggestion!
I have checked this link before, and I tried to create the partition key like
the following:
val hardKey = "2"
val parkey = hardKey.getBytes("UTF8")
But I still get the same exception. I also tried set "UTF8" as "UTF-8", but no
luck...
Reg
Internally it works as you describe, there is only one CountDownLatch per
batch sent, each of the futures is just a wrapper around that.
It is true that if you accumulate thousands of futures in a list that may
be a fair number of objects you are retaining, and there will be some work
involved in
also the (key: Key, value: Val, topic: Option[String]) "value" should be
a string converted to a byte array.
Can you send a example of your key and value data.
On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote:
> Hi Harsha,
>
> Thanks for suggestion!
>
> I have checked this link before, an
Hi Harsha,
I just tried to hard code a string message, then convert the message to byte
array, but no lucky...
The following is how my program works:
Create a hardcode key, which is String, then convert to byte array, iterate a
network message, send the message one by one:
networkelement
iI would helpful to see the full stack trace. Also, how have you
instantiated your Producer class? Did you set a value for
"serializer.class" in the property?
/***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.steal
HI Jun,
Do you want me to request Jira ticket for feature a notification for new
consumer API and old consumer feature that consumer stream is dying. So
application can try to restart it programmatically. I understand this is
due to network or zk cluster instability.
Let me know if you have alt
I guess it would make the api less clean, but I can imagine a sendBatch
method, which returns a single Future that gets triggered only when all
messages in the batch were finished. The callback info could then contain
info about the success/exceptions encountered by each sub-group of
messages. An
Hi Joe,
You remind me, maybe I included the incorrect serializer.
Here is how I created the producer:
And(s"a synchronous Kafka producer app that writes to the topic
$inputTopic")
val producerApp = {
val config = {
val c = new Properties
c.put("producer.ty
Ok, here is the hrpof output
CPU SAMPLES BEGIN (total = 202493) Fri Nov 21 08:07:51 2014
rank self accum count trace method
1 39.30% 39.30% 79585 300923 java.net.SocketInputStream.socketRead0
2 20.62% 59.92% 41750 300450 java.net.PlainSocketImpl.socketAccept
3 9.52% 69.45% 192
Yes, that was what I was thinking, you don't need to set the serializer
class if you want Array[byte] that is the default. Remove the line
c.put("key.serializer.class",
"kafka.serializer.StringEncoder") you should either see it work or have to
work through the next issue, hopefully the former =8^)
Hi Joe,
I just tried, it works! Now I should to think how to design the partition
function.
Thanks!
Haoming
> Date: Thu, 20 Nov 2014 20:44:29 -0500
> Subject: Re: Partition Key Cannot be Send Out by Producer
> From: joe.st...@stealth.ly
> To: users@kafka.apache.org
>
> Yes, that was what I was
Hi Parag
Just to expand on Jun’s comment, log retention and deletion is at the segment
level, not the message level. Because it’s at the segment level I would avoid
using the term TTL, as that would normally be applied to individual items.
Every log.retention.check.interval.ms (default 5 minute
Can you just monitor the consumer byte/message/fetch rate?
Thanks,
Jun
On Thu, Nov 20, 2014 at 5:31 PM, Bhavesh Mistry
wrote:
> HI Jun,
>
> Do you want me to request Jira ticket for feature a notification for new
> consumer API and old consumer feature that consumer stream is dying. So
> appl
So I suspect that the bottleneck is actually in the writer thread (the one
calling send()), not the I/O thread. You could verify this by checking the
JMX stats which will give the amount of time the I/O thread spends waiting.
But since epollWait shows up first that is the I/O thread waiting for wor
If you plan ahead of time with enough partitions then you won't fall into
an issue of backed up consumers when you scale them up.
If you have 100 partitions 20 consumers can read from them (each could read
from 5 partitions). You can scale up to 100 consumers (one for each
partition) as the upper
While it’s good to plan ahead for growth, Kafka will still let you add more
partitions to a topic
https://kafka.apache.org/081/ops.html#basic_ops_modify_topic. This will
rebalance the hashing if you are partitioning by your key, and consumers will
probably end up with different partitions, but
You need to be thoughtful about adding more partitions. This is paramount
if you are doing semantic partitioning in which case adding more partitions
could break things downstream.
If you average lets say 100,000 messages per second and at full tilt
consumer 1:1 for each partition you can process
Meant to say "burst 1,000,000 messages per second on those X partitions for
10 minutes"
On Fri, Nov 21, 2014 at 12:12 AM, Joe Stein wrote:
> You need to be thoughtful about adding more partitions. This is paramount
> if you are doing semantic partitioning in which case adding more partitions
> c
Hi,
We are using an async producer.
The producer properties are:
producer.type= async
queue.enqueue.timeout.ms=20
send.buffer.bytes=1024000
topic.metadata.refresh.interval.ms=3
Please find more details in the mail thread below.
Regards,
Dev
From: Devendra
34 matches
Mail list logo