Re: Cannot run producer from local windows
Did you set host.name property as described @ https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F ? When accessing brokers from outside AWS, host.name should be set to public domain/IP. This also means that all brokers would need a public domain/IP. On 4 October 2013 11:46, Yi Jiang wrote: > Hi, everyone > I am currently installed Kafka cluster in EC2, I am sure all ports have > been opened, but I want to kick some data into kafka in cloud, and i even > cannot run the producer from my local. It always throw the exception with > "failed after 3 retries". But there is no any problem when I run it in the > cluster in cloud. I have nothing in my local server, no zookeeper etc, I am > wondering if its the problem that I am missing zookeeper in my local > windows? Can someone tell me how to resolve this issue? > > Sent from my iPhone
Kafka stops accepting messages for existing topics after a hostname change and restart
Because Kafka was detecting localhost.domain as hostname, I commented out the line "127.0.0.1 localhost.localdomain localhost" and added "127.0.0.1 ip-10-0-1-20.localdomain" in etc/hosts. When I restart Kafka (issues kill -15 pid), writes to existing topics are failing and I see several of the following errors: [2013-10-04 03:43:23,565] WARN [KafkaApi-1] Produce request with correlation id 12868451 from client on partition [consumertracking.gnomes.4002,5] failed due to Partition [consumertracking.gnomes.4002,5] doesn't exist on 1 (kafka.server.KafkaApis) I have checked the kafka data directory and all files are intact. What could have caused this issue and how can it be fixed?
RE: Too many open files
Hi Mark, Sorry for the delay. We're not using a load balancer if it's what you mean by LB. After applying the change I mentioned last time (the netfilter thing), I couldn't see any improvement. We even restart kafka, but since the restart, I saw connection count slowly getting higher. Best regards, Nicolas Berthet -Original Message- From: Mark [mailto:static.void@gmail.com] Sent: Saturday, September 28, 2013 12:35 AM To: users@kafka.apache.org Subject: Re: Too many open files No, this is all within the same DC. I think the problem has to do with the LB. We've upgraded our producers to point directory to a node for testing and after running it all night, I don't see any more connections then there are supposed to be. Can I ask which LB are you using? We are using A10's On Sep 26, 2013, at 6:41 PM, Nicolas Berthet wrote: > Hi Mark, > > I'm using centos 6.2. My file limit is something like 500k, the value is > arbitrary. > > One of the thing I changed so far are the TCP keepalive parameters, it had > moderate success so far. > > net.ipv4.tcp_keepalive_time > net.ipv4.tcp_keepalive_intvl > net.ipv4.tcp_keepalive_probes > > I still notice an abnormal number of ESTABLISHED connections, I've > been doing some search and came over this page > (http://www.lognormal.com/blog/2012/09/27/linux-tcpip-tuning/) > > I'll change the "net.netfilter.nf_conntrack_tcp_timeout_established" as > indicated there, it looks closer to the solution to my issue. > > Are you also experiencing the issue in a cross data center context ? > > Best regards, > > Nicolas Berthet > > > -Original Message- > From: Mark [mailto:static.void@gmail.com] > Sent: Friday, September 27, 2013 6:08 AM > To: users@kafka.apache.org > Subject: Re: Too many open files > > What OS settings did you change? How high is your huge file limit? > > > On Sep 25, 2013, at 10:06 PM, Nicolas Berthet > wrote: > >> Jun, >> >> I observed similar kind of things recently. (didn't notice before >> because our file limit is huge) >> >> I have a set of brokers in a datacenter, and producers in different data >> centers. >> >> At some point I got disconnections, from the producer perspective I had >> something like 15 connections to the broker. On the other hand on the broker >> side, I observed hundreds of connections from the producer in an ESTABLISHED >> state. >> >> We had some default settings for the socket timeout on the OS level, which >> we reduced hoping it would prevent the issue in the future. I'm not sure if >> the issue is from the broker or OS configuration though. I'm still keeping >> the broker under observation for the time being. >> >> Note that, for clients in the same datacenter, we didn't see this issue, the >> socket count matches on both ends. >> >> Nicolas Berthet >> >> -Original Message- >> From: Jun Rao [mailto:jun...@gmail.com] >> Sent: Thursday, September 26, 2013 12:39 PM >> To: users@kafka.apache.org >> Subject: Re: Too many open files >> >> If a client is gone, the broker should automatically close those broken >> sockets. Are you using a hardware load balancer? >> >> Thanks, >> >> Jun >> >> >> On Wed, Sep 25, 2013 at 4:48 PM, Mark wrote: >> >>> FYI if I kill all producers I don't see the number of open files drop. >>> I still see all the ESTABLISHED connections. >>> >>> Is there a broker setting to automatically kill any inactive TCP >>> connections? >>> >>> >>> On Sep 25, 2013, at 4:30 PM, Mark wrote: >>> Any other ideas? On Sep 25, 2013, at 9:06 AM, Jun Rao wrote: > We haven't seen any socket leaks with the java producer. If you > have >>> lots > of unexplained socket connections in established mode, one > possible >>> cause > is that the client created new producer instances, but didn't > close the >>> old > ones. > > Thanks, > > Jun > > > On Wed, Sep 25, 2013 at 6:08 AM, Mark >>> wrote: > >> No. We are using the kafka-rb ruby gem producer. >> https://github.com/acrosa/kafka-rb >> >> Now that you asked that question I need to ask. Is there a >> problem with the java producer? >> >> Sent from my iPhone >> >>> On Sep 24, 2013, at 9:01 PM, Jun Rao wrote: >>> >>> Are you using the java producer client? >>> >>> Thanks, >>> >>> Jun >>> >>> On Tue, Sep 24, 2013 at 5:33 PM, Mark >> wrote: Our 0.7.2 Kafka cluster keeps crashing with: 2013-09-24 17:21:47,513 - [kafka-acceptor:Acceptor@153] - Error in acceptor java.io.IOException: Too many open The obvious fix is to bump up the number of open files but I'm >>> wondering if there is a leak on the Kafka side and/or our application side. We currently have the ul
Is 30 a too high partition number?
I am using kafka as a buffer for data streaming in from various sources. Since its a time series data, I generate the key to the message by combining source ID and minute in the timestamp. This means I can utmost have 60 partitions per topic (as each source has its own topic). I have set num.partitions to be 30 (60/2) for each topic in broker config. I don't have a very good reason to pick 30 as default number of partitions per topic but I wanted it to be a high number so that I can achieve high parallelism during in-stream processing. I am worried that having a high number like 30 (default configuration had it as 2), it can negatively impact kafka performance in terms of message throughput or memory consumption. I understand that this can lead to many files per partition but I am thinking of dealing with it by having multiple directories on the same disk if at all I run into issues. My question to the community is that am I prematurely attempting to optimizing the partition number as right now even a partition number of 5 seems sufficient and hence will run into unwanted issues? Or is 30 an Ok number to use for number of partitions?
Re: Too many open files
Hi Nicolas, we did run into a similar issue here (lots of ESTABLISHED connections on the brokers, but non on the consumers/producers). In our case, it was a firewall issue where connections that were inactive for more than a certain time were silently dropped by the firewall (but no TCP RST was sent) and only one side of the connection noticed the drop. Maybe that helps Flo On 2013-09-26 9:41 PM, Nicolas Berthet wrote: > Hi Mark, > > I'm using centos 6.2. My file limit is something like 500k, the value is > arbitrary. > > One of the thing I changed so far are the TCP keepalive parameters, it had > moderate success so far. > > net.ipv4.tcp_keepalive_time > net.ipv4.tcp_keepalive_intvl > net.ipv4.tcp_keepalive_probes > > I still notice an abnormal number of ESTABLISHED connections, I've been doing > some search and came over this page > (http://www.lognormal.com/blog/2012/09/27/linux-tcpip-tuning/) > > I'll change the "net.netfilter.nf_conntrack_tcp_timeout_established" as > indicated there, it looks closer to the solution to my issue. > > Are you also experiencing the issue in a cross data center context ? > > Best regards, > > Nicolas Berthet > > > -Original Message- > From: Mark [mailto:static.void@gmail.com] > Sent: Friday, September 27, 2013 6:08 AM > To: users@kafka.apache.org > Subject: Re: Too many open files > > What OS settings did you change? How high is your huge file limit? > > > On Sep 25, 2013, at 10:06 PM, Nicolas Berthet > wrote: > >> Jun, >> >> I observed similar kind of things recently. (didn't notice before >> because our file limit is huge) >> >> I have a set of brokers in a datacenter, and producers in different data >> centers. >> >> At some point I got disconnections, from the producer perspective I had >> something like 15 connections to the broker. On the other hand on the broker >> side, I observed hundreds of connections from the producer in an ESTABLISHED >> state. >> >> We had some default settings for the socket timeout on the OS level, which >> we reduced hoping it would prevent the issue in the future. I'm not sure if >> the issue is from the broker or OS configuration though. I'm still keeping >> the broker under observation for the time being. >> >> Note that, for clients in the same datacenter, we didn't see this issue, the >> socket count matches on both ends. >> >> Nicolas Berthet >> >> -Original Message- >> From: Jun Rao [mailto:jun...@gmail.com] >> Sent: Thursday, September 26, 2013 12:39 PM >> To: users@kafka.apache.org >> Subject: Re: Too many open files >> >> If a client is gone, the broker should automatically close those broken >> sockets. Are you using a hardware load balancer? >> >> Thanks, >> >> Jun >> >> >> On Wed, Sep 25, 2013 at 4:48 PM, Mark wrote: >> >>> FYI if I kill all producers I don't see the number of open files drop. >>> I still see all the ESTABLISHED connections. >>> >>> Is there a broker setting to automatically kill any inactive TCP >>> connections? >>> >>> >>> On Sep 25, 2013, at 4:30 PM, Mark wrote: >>> Any other ideas? On Sep 25, 2013, at 9:06 AM, Jun Rao wrote: > We haven't seen any socket leaks with the java producer. If you > have >>> lots > of unexplained socket connections in established mode, one possible >>> cause > is that the client created new producer instances, but didn't close > the >>> old > ones. > > Thanks, > > Jun > > > On Wed, Sep 25, 2013 at 6:08 AM, Mark >>> wrote: > >> No. We are using the kafka-rb ruby gem producer. >> https://github.com/acrosa/kafka-rb >> >> Now that you asked that question I need to ask. Is there a problem >> with the java producer? >> >> Sent from my iPhone >> >>> On Sep 24, 2013, at 9:01 PM, Jun Rao wrote: >>> >>> Are you using the java producer client? >>> >>> Thanks, >>> >>> Jun >>> >>> On Tue, Sep 24, 2013 at 5:33 PM, Mark >> wrote: Our 0.7.2 Kafka cluster keeps crashing with: 2013-09-24 17:21:47,513 - [kafka-acceptor:Acceptor@153] - Error in acceptor java.io.IOException: Too many open The obvious fix is to bump up the number of open files but I'm >>> wondering if there is a leak on the Kafka side and/or our application side. We currently have the ulimit set to a generous 4096 but obviously we are hitting this ceiling. What's a recommended value? We are running rails and our Unicorn workers are connecting to our >>> Kafka cluster via round-robin load balancing. We have about 1500 workers to >> that would be 1500 connections right there but they should be split across >> our 3 nodes. Instead Netstat shows thousands of connections that look like >> this: >>
Re: Kafka stops accepting messages for existing topics after a hostname change and restart
When a broker starts up, it receives a LeaderAndIsrRequest from the controller broker telling the broker which partitions it should host and either lead or follow those partitions. If clients send requests to the broker before it has received this request, it throws this error you see. Did you restart the broker very quickly? You can mitigate this issue by using controlled shutdown to stop a broker. Thanks, Neha On Oct 4, 2013 2:45 AM, "Aniket Bhatnagar" wrote: > Because Kafka was detecting localhost.domain as hostname, I commented out > the line "127.0.0.1 localhost.localdomain localhost" and added > "127.0.0.1 ip-10-0-1-20.localdomain" in etc/hosts. When I restart Kafka > (issues kill -15 pid), writes to existing topics are failing and I see > several of the following errors: > > [2013-10-04 03:43:23,565] WARN [KafkaApi-1] Produce request with > correlation id 12868451 from client on partition > [consumertracking.gnomes.4002,5] failed due to Partition > [consumertracking.gnomes.4002,5] doesn't exist on 1 > (kafka.server.KafkaApis) > > > I have checked the kafka data directory and all files are intact. What > could have caused this issue and how can it be fixed? >
Re: Is 30 a too high partition number?
You probably want to think of this in terms of number of partitions on a single broker, instead of per topic since I/O is the limiting factor in this case. Another factor to consider is total number of partitions in the cluster as Zookeeper becomes a limiting factor there. 30 partitions is not too large provided the total number of partitions doesn't exceed roughly couple thousand. To give you an example, some of our clusters are 16 nodes big and some of the topics on those clusters have 30 partitions. Thanks, Neha On Oct 4, 2013 4:15 AM, "Aniket Bhatnagar" wrote: > I am using kafka as a buffer for data streaming in from various sources. > Since its a time series data, I generate the key to the message by > combining source ID and minute in the timestamp. This means I can utmost > have 60 partitions per topic (as each source has its own topic). I have > set num.partitions to be 30 (60/2) for each topic in broker config. I don't > have a very good reason to pick 30 as default number of partitions per > topic but I wanted it to be a high number so that I can achieve high > parallelism during in-stream processing. I am worried that having a high > number like 30 (default configuration had it as 2), it can negatively > impact kafka performance in terms of message throughput or memory > consumption. I understand that this can lead to many files per partition > but I am thinking of dealing with it by having multiple directories on the > same disk if at all I run into issues. > > My question to the community is that am I prematurely attempting to > optimizing the partition number as right now even a partition number of 5 > seems sufficient and hence will run into unwanted issues? Or is 30 an Ok > number to use for number of partitions? >
RE: is it possible to commit offsets on a per stream basis?
This will improve efficiency on the client side greatly. And multiple threads don't have to synchronize before committing offsets. Thanks, Jason. Regards, Libo -Original Message- From: Jason Rosenberg [mailto:j...@squareup.com] Sent: Thursday, October 03, 2013 4:13 PM To: users@kafka.apache.org Subject: Re: is it possible to commit offsets on a per stream basis? I added a comment/suggestion to: https://issues.apache.org/jira/browse/KAFKA-966 Basically to expose an api for marking an offset for commit, such that the auto-commit would only commit offsets up to the last message 'markedForCommit', and not the last 'consumed' offset, which may or may not have succeeded. This way, consumer code can just call 'markForCommit()' after successfully processing each message successfully. Does that make sense? On Mon, Sep 9, 2013 at 5:21 PM, Yu, Libo wrote: > Thanks, Neha. That number of connections formula is very helpful. > > Regards, > > Libo > > > -Original Message- > From: Neha Narkhede [mailto:neha.narkh...@gmail.com] > Sent: Monday, September 09, 2013 12:17 PM > To: users@kafka.apache.org > Subject: Re: is it possible to commit offsets on a per stream basis? > > Memory might become an issue if all the connectors are part of the > same process. But this is easily solvable by distributing the > connectors over several machines. > Number of connections would be (# of connectors) * (# of brokers) and > will proportionately increase with the # of connectors. > > Thanks, > Neha > > > On Mon, Sep 9, 2013 at 9:08 AM, Yu, Libo wrote: > > > If one connector is used for a single stream, when there are many > > topics/streams, will that cause any performance issue, e.g. too many > > connections or too much memory or big latency? > > > > Regards, > > > > Libo > > > > > > -Original Message- > > From: Neha Narkhede [mailto:neha.narkh...@gmail.com] > > Sent: Sunday, September 08, 2013 12:46 PM > > To: users@kafka.apache.org > > Subject: Re: is it possible to commit offsets on a per stream basis? > > > > That should be fine too. > > > > > > > > > > On Sat, Sep 7, 2013 at 8:33 PM, Jason Rosenberg > wrote: > > > > > To be clear, it looks like I forgot to add to my question, that I > > > am asking about creating multiple connectors, within the same > > > consumer process (as I realize I can obviously have multiple > > > connectors running on multiple hosts, etc.). But I'm guessing > > > that should be > fine too? > > > > > > Jason > > > > > > > > > > > > > > > On Sat, Sep 7, 2013 at 3:09 PM, Neha Narkhede > > > > > >wrote: > > > > > > > >> Can I create multiple connectors, and have each use the same > > > > >> Regex > > > > for the TopicFilter? Will each connector share the set of > > > > available topics? Is this safe to do? > > > > > > > > >> Or is it necessary to create mutually non-intersecting > > > > >> regex's for > > > each > > > > connector? > > > > > > > > As long as each of those consumer connectors share the same > > > > group id, > > > Kafka > > > > consumer rebalancing should automatically re-distribute the > > > > topic/partitions amongst the consumer connectors/streams evenly. > > > > > > > > Thanks, > > > > Neha > > > > > > > > > > > > On Mon, Sep 2, 2013 at 1:35 PM, Jason Rosenberg > > > > > > > wrote: > > > > > > > > > Will this work if we are using a TopicFilter, that can map to > > > > > multiple topics. Can I create multiple connectors, and have > > > > > each use the same > > > > Regex > > > > > for the TopicFilter? Will each connector share the set of > > > > > available topics? Is this safe to do? > > > > > > > > > > Or is it necessary to create mutually non-intersecting regex's > > > > > for each connector? > > > > > > > > > > It seems I have a similar issue. I have been using auto > > > > > commit mode, > > > but > > > > > it doesn't guarantee that all messages committed have been > > > > > successfully processed (seems a change to the connector itself > > > > > might expose a way to > > > > use > > > > > auto offset commit, and have it never commit a message until > > > > > it is processed). But that would be a change to the > > > > > ZookeeperConsumerConnectorEssentially, it would be great > > > > > if after processing each message, we could mark the message as > > > > > 'processed', and > > > > thus > > > > > use that status as the max offset to commit when the auto > > > > > offset commit background thread wakes up each time. > > > > > > > > > > Jason > > > > > > > > > > > > > > > On Thu, Aug 29, 2013 at 11:58 AM, Yu, Libo > wrote: > > > > > > > > > > > Thanks, Neha. That is a great answer. > > > > > > > > > > > > Regards, > > > > > > > > > > > > Libo > > > > > > > > > > > > > > > > > > -Original Message- > > > > > > From: Neha Narkhede [mailto:neha.narkh...@gmail.com] > > > > > > Sent: Thursday, August 29, 2013 1:55 PM > > > > > > To: users@kafka.apache.org > > > > > > Subject: Re: is it possible to commit o
producer API thread safety
Hi team, Is it possible to use a single producer with more than one threads? I am not sure If its send() is thread safe. Regards, Libo
Re: producer API thread safety
The send() is thread safe, so the short answer would be yes. On Fri, Oct 4, 2013 at 9:14 AM, Yu, Libo wrote: > Hi team, > > Is it possible to use a single producer with more than one threads? I am > not sure > If its send() is thread safe. > > Regards, > > Libo > > -- -- Guozhang
Re: Kafka stops accepting messages for existing topics after a hostname change and restart
I did restart broker very quickly. I saw similar errors for about 5 mins and that's when I decided to shutdown all kafka brokers and start them one by one. That seems to have enabled writes in kafka instantly after brokers were back up. How do I do a controlled shutdown? The kafka shutdown script doesnt seem to work for me and I assumed the kill -15 command should trigger a shutdown in kafka (I saw messages like kafka shutdown in the logs too) On 4 Oct 2013 18:55, "Neha Narkhede" wrote: > When a broker starts up, it receives a LeaderAndIsrRequest from the > controller broker telling the broker which partitions it should host and > either lead or follow those partitions. If clients send requests to the > broker before it has received this request, it throws this error you see. > Did you restart the broker very quickly? > > You can mitigate this issue by using controlled shutdown to stop a broker. > > Thanks, > Neha > On Oct 4, 2013 2:45 AM, "Aniket Bhatnagar" > wrote: > > > Because Kafka was detecting localhost.domain as hostname, I commented out > > the line "127.0.0.1 localhost.localdomain localhost" and > added > > "127.0.0.1 ip-10-0-1-20.localdomain" in etc/hosts. When I restart Kafka > > (issues kill -15 pid), writes to existing topics are failing and I see > > several of the following errors: > > > > [2013-10-04 03:43:23,565] WARN [KafkaApi-1] Produce request with > > correlation id 12868451 from client on partition > > [consumertracking.gnomes.4002,5] failed due to Partition > > [consumertracking.gnomes.4002,5] doesn't exist on 1 > > (kafka.server.KafkaApis) > > > > > > I have checked the kafka data directory and all files are intact. What > > could have caused this issue and how can it be fixed? > > >
Re: Kafka stops accepting messages for existing topics after a hostname change and restart
Controlled shutdown is described here - https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown On Fri, Oct 4, 2013 at 10:18 AM, Aniket Bhatnagar < aniket.bhatna...@gmail.com> wrote: > I did restart broker very quickly. I saw similar errors for about 5 mins > and that's when I decided to shutdown all kafka brokers and start them one > by one. That seems to have enabled writes in kafka instantly after brokers > were back up. > > How do I do a controlled shutdown? The kafka shutdown script doesnt seem > to work for me and I assumed the kill -15 command should trigger a shutdown > in kafka (I saw messages like kafka shutdown in the logs too) > On 4 Oct 2013 18:55, "Neha Narkhede" wrote: > > > When a broker starts up, it receives a LeaderAndIsrRequest from the > > controller broker telling the broker which partitions it should host and > > either lead or follow those partitions. If clients send requests to the > > broker before it has received this request, it throws this error you see. > > Did you restart the broker very quickly? > > > > You can mitigate this issue by using controlled shutdown to stop a > broker. > > > > Thanks, > > Neha > > On Oct 4, 2013 2:45 AM, "Aniket Bhatnagar" > > wrote: > > > > > Because Kafka was detecting localhost.domain as hostname, I commented > out > > > the line "127.0.0.1 localhost.localdomain localhost" and > > added > > > "127.0.0.1 ip-10-0-1-20.localdomain" in etc/hosts. When I restart Kafka > > > (issues kill -15 pid), writes to existing topics are failing and I see > > > several of the following errors: > > > > > > [2013-10-04 03:43:23,565] WARN [KafkaApi-1] Produce request with > > > correlation id 12868451 from client on partition > > > [consumertracking.gnomes.4002,5] failed due to Partition > > > [consumertracking.gnomes.4002,5] doesn't exist on 1 > > > (kafka.server.KafkaApis) > > > > > > > > > I have checked the kafka data directory and all files are intact. What > > > could have caused this issue and how can it be fixed? > > > > > >
testing issue with reliable sending
All, I'm having an issue with an integration test I've setup. This is using 0.8-beta1. The test is to verify that no messages are dropped (or the sender gets an exception thrown back if failure), while doing a rolling restart of a cluster of 2 brokers. The producer is configured to use 'request.required.acks' = '1'. The servers are set up to run locally on localhost, on different ports, and different data dirs. The producer connects with a metadata brokerlist like: "localhost:2024,localhost:1025" (so no vip). The servers are set up with a default replication factor of 2. The servers have controlled shutdown enabled, as well. The producer code looks like this: ... Producer producer = getProducer(); try { KeyedMessage msg = new KeyedMessage(topic, message); producer.send(msg); return true; } catch (RuntimeException e) { logger.warn("Error sending data to kafka", e); return false; } ... The test sends groups of messages at each stage of the test (e.g. servers up, first server going down, first server coming up, second server going down, etc.). Then a consumer connects and consumes all the messages, to make sure they all arrived ok. It seems intermittently, a single message gets dropped, right after one of the servers starts going down. It doesn't happen always, seems to happen 1 out of every 20 test runs or so. Here's some sample output. I see the exception inside the producer code, but I don't see the producer.send method ever having an exception thrown back out to the caller (the log line "Error sending data to kafka" is never triggered). What's interesting, is that it looks like the exceptions are happening on message 3, but when the consumer subsequently consumes back all the messages in the broker cluster, it seems message 2 (and not message 3) is missing: ... ... 7136 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 3, message: 98 7150 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 3, message: 99 7163 [Thread-2] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Shutting down server2 7163 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 0 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer - Shutting down KafkaServer 7176 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 1 7189 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 2 7203 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 3 7394 [kafka-request-handler-5] WARN state.change.logger - Broker 1946108683 received update metadata request with correlation id 7 from an old controller 178709090 with epoch 2. Latest known controller epoch is 3 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis - [KafkaApi-1946108683] error when handling request Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0] -> (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026 kafka.common.ControllerMovedException: Broker 1946108683 received update metadata request with correlation id 7 from an old controller 178709090 with epoch 2. Latest known controller epoch is 3 at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114) at kafka.server.KafkaApis.handle(KafkaApis.scala:72) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:724) 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN kafka.controller.RequestSendThread - [Controller-178709090-to-broker-178709090-send-thread], Controller 178709090 fails to send a request to broker 178709090 java.nio.channels.AsynchronousCloseException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387) at kafka.utils.Utils$.read(Utils.scala:394) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 80
RE: producer API thread safety
Great. Thanks. Regards, Libo -Original Message- From: Guozhang Wang [mailto:wangg...@gmail.com] Sent: Friday, October 04, 2013 12:27 PM To: users@kafka.apache.org Subject: Re: producer API thread safety The send() is thread safe, so the short answer would be yes. On Fri, Oct 4, 2013 at 9:14 AM, Yu, Libo wrote: > Hi team, > > Is it possible to use a single producer with more than one threads? I > am not sure If its send() is thread safe. > > Regards, > > Libo > > -- -- Guozhang
Re: testing issue with reliable sending
The occasional single message loss could happen since required.request.acks=1 and the leader is shut down before the follower gets a chance to copy the message. Can you try your test with num acks set to -1 ? Thanks, Neha On Oct 4, 2013 1:21 PM, "Jason Rosenberg" wrote: > All, > > I'm having an issue with an integration test I've setup. This is using > 0.8-beta1. > > The test is to verify that no messages are dropped (or the sender gets an > exception thrown back if failure), while doing a rolling restart of a > cluster of 2 brokers. > > The producer is configured to use 'request.required.acks' = '1'. > > The servers are set up to run locally on localhost, on different ports, and > different data dirs. The producer connects with a metadata brokerlist > like: "localhost:2024,localhost:1025" (so no vip). The servers are set > up with a default replication factor of 2. The servers have controlled > shutdown enabled, as well. > > The producer code looks like this: > ... > Producer producer = getProducer(); > try { > KeyedMessage msg = new KeyedMessage(topic, > message); > producer.send(msg); > return true; > } catch (RuntimeException e) { > logger.warn("Error sending data to kafka", e); > return false; > } > ... > > The test sends groups of messages at each stage of the test (e.g. servers > up, first server going down, first server coming up, second server going > down, etc.). Then a consumer connects and consumes all the messages, to > make sure they all arrived ok. > > It seems intermittently, a single message gets dropped, right after one of > the servers starts going down. It doesn't happen always, seems to happen 1 > out of every 20 test runs or so. Here's some sample output. I see the > exception inside the producer code, but I don't see the producer.send > method ever having an exception thrown back out to the caller (the log line > "Error sending data to kafka" is never triggered). > > What's interesting, is that it looks like the exceptions are happening on > message 3, but when the consumer subsequently consumes back all the > messages in the broker cluster, it seems message 2 (and not message 3) is > missing: > > ... > ... > 7136 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 3, message: 98 > 7150 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 3, message: 99 > 7163 [Thread-2] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Shutting down server2 > 7163 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 0 > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer - Shutting > down KafkaServer > 7176 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 1 > 7189 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 2 > 7203 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 3 > 7394 [kafka-request-handler-5] WARN state.change.logger - Broker > 1946108683 received update metadata request with correlation id 7 from an > old controller 178709090 with epoch 2. Latest known controller epoch is 3 > 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis - > [KafkaApi-1946108683] error when handling request > > Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0] > -> > > (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026 > kafka.common.ControllerMovedException: Broker 1946108683 received update > metadata request with correlation id 7 from an old controller 178709090 > with epoch 2. Latest known controller epoch is 3 > at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114) > at kafka.server.KafkaApis.handle(KafkaApis.scala:72) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > at java.lang.Thread.run(Thread.java:724) > 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN > kafka.controller.RequestSendThread - > [Controller-178709090-to-broker-178709090-send-thread], Controller > 178709090 fails to send a request to broker 178709090 > java.nio.channels.AsynchronousCloseException > at > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387) > at kafka.utils.Utils$.read(Utils.scala:394
Hi, Can anyone tell me why I cannot produce any message to remote kafka server from my local machine
It is very weird, I have a kafka cluster in EC2, There is no any problem to produce message from one of node by same producer. But when I move the producer to my local machine at home, then it gives me the below error: Failed to send messages after 3 tries. Can anyone tell me how do I fix this issue? I have opened all ports of my machine at my home, and the security group is also opened for kafka server and zookeeper in EC2. Everything is fine, but I just cannot send any message to kafka server. Please help me. Thanks Everyone! Jacky
Re: testing issue with reliable sending
Neha, I'm not sure I understand. I would have thought that if the leader acknowledges receipt of a message, and is then shut down cleanly (with controlled shutdown enabled), that it would be able to reliably persist any in memory buffered messages (and replicate them), before shutting down. Shouldn't this be part of the contract? It should be able to make sure this happens before shutting down, no? I would understand a message dropped if it were a hard shutdown. I'm not sure then how to implement reliable delivery semantics, while allowing a rolling restart of the broker cluster (or even to tolerate a single node failure, where one node might be down for awhile and need to be replaced or have a disk repaired). In this case, if we need to use required.request.acks=-1, that will pretty much prevent any successful message producing while any of the brokers for a partition is unavailable. So, I don't think that's an option. (Not to mention the performance degradation). Is there not a way to make this work more reliably with leader only acknowledgment, and clean/controlled shutdown? My test does succeed, as expected, with acks = -1, at least for the 100 or so iterations I've let it run so far. It does on occasion send duplicates (but that's ok with me). Jason On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede wrote: > The occasional single message loss could happen since > required.request.acks=1 and the leader is shut down before the follower > gets a chance to copy the message. Can you try your test with num acks set > to -1 ? > > Thanks, > Neha > On Oct 4, 2013 1:21 PM, "Jason Rosenberg" wrote: > > > All, > > > > I'm having an issue with an integration test I've setup. This is using > > 0.8-beta1. > > > > The test is to verify that no messages are dropped (or the sender gets an > > exception thrown back if failure), while doing a rolling restart of a > > cluster of 2 brokers. > > > > The producer is configured to use 'request.required.acks' = '1'. > > > > The servers are set up to run locally on localhost, on different ports, > and > > different data dirs. The producer connects with a metadata brokerlist > > like: "localhost:2024,localhost:1025" (so no vip). The servers are set > > up with a default replication factor of 2. The servers have controlled > > shutdown enabled, as well. > > > > The producer code looks like this: > > ... > > Producer producer = getProducer(); > > try { > > KeyedMessage msg = new KeyedMessage(topic, > > message); > > producer.send(msg); > > return true; > > } catch (RuntimeException e) { > > logger.warn("Error sending data to kafka", e); > > return false; > > } > > ... > > > > The test sends groups of messages at each stage of the test (e.g. servers > > up, first server going down, first server coming up, second server going > > down, etc.). Then a consumer connects and consumes all the messages, to > > make sure they all arrived ok. > > > > It seems intermittently, a single message gets dropped, right after one > of > > the servers starts going down. It doesn't happen always, seems to > happen 1 > > out of every 20 test runs or so. Here's some sample output. I see the > > exception inside the producer code, but I don't see the producer.send > > method ever having an exception thrown back out to the caller (the log > line > > "Error sending data to kafka" is never triggered). > > > > What's interesting, is that it looks like the exceptions are happening on > > message 3, but when the consumer subsequently consumes back all the > > messages in the broker cluster, it seems message 2 (and not message 3) is > > missing: > > > > ... > > ... > > 7136 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 3, message: 98 > > 7150 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 3, message: 99 > > 7163 [Thread-2] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Shutting down server2 > > 7163 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 0 > > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer - > Shutting > > down KafkaServer > > 7176 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 1 > > 7189 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 2 > > 7203 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 3 > > 7394 [kafka-request-handler-5] WARN state.change.logger - Broker > > 1946108683 received update metadata request with correlation id 7 from an > > old controller 178709090 with epoch 2.