Re: Hi, Can anyone tell me why I cannot produce any message to remote kafka server from my local machine
Hello Jacky, Have you read this FAQ: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F Guozhang On Fri, Oct 4, 2013 at 10:41 PM, Jiang Jacky wrote: > It is very weird, I have a kafka cluster in EC2, There is no any problem to > produce message from one of node by same producer. But when I move the > producer to my local machine at home, then it gives me the below error: > Failed to send messages after 3 tries. > > Can anyone tell me how do I fix this issue? I have opened all ports of my > machine at my home, and the security group is also opened for kafka server > and zookeeper in EC2. Everything is fine, but I just cannot send any > message to kafka server. > Please help me. > Thanks Everyone! > > Jacky > -- -- Guozhang
Re: testing issue with reliable sending
Shouldn't this be part of the contract? It should be able to make sure this happens before shutting down, no? The leader writes messages to its local log and then the replicas consume messages from the leader and write those to their local logs. If you set request.required.acks=1, the ack is sent to the producer only after the leader has written messages to its local log. What you are asking for, is part of the contract if request.required.acks=-1. In this case, if we need to use required.request.acks=-1, that will pretty much prevent any successful message producing while any of the brokers for a partition is unavailable. So, I don't think that's an option. (Not to mention the performance degradation). You can implement reliable delivery semantics while allowing rolling restart of brokers by setting request.required.acks=-1. When one of the replicas is shut down, the ISR reduces to remove the replica being shut down and the messages will be committed using the new ISR. Thanks, Neha On Fri, Oct 4, 2013 at 11:51 PM, Jason Rosenberg wrote: > Neha, > > I'm not sure I understand. I would have thought that if the leader > acknowledges receipt of a message, and is then shut down cleanly (with > controlled shutdown enabled), that it would be able to reliably persist any > in memory buffered messages (and replicate them), before shutting down. > Shouldn't this be part of the contract? It should be able to make sure > this happens before shutting down, no? > > I would understand a message dropped if it were a hard shutdown. > > I'm not sure then how to implement reliable delivery semantics, while > allowing a rolling restart of the broker cluster (or even to tolerate a > single node failure, where one node might be down for awhile and need to be > replaced or have a disk repaired). In this case, if we need to use > required.request.acks=-1, that will pretty much prevent any successful > message producing while any of the brokers for a partition is unavailable. > So, I don't think that's an option. (Not to mention the performance > degradation). > > Is there not a way to make this work more reliably with leader only > acknowledgment, and clean/controlled shutdown? > > My test does succeed, as expected, with acks = -1, at least for the 100 or > so iterations I've let it run so far. It does on occasion send duplicates > (but that's ok with me). > > Jason > > > On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede >wrote: > > > The occasional single message loss could happen since > > required.request.acks=1 and the leader is shut down before the follower > > gets a chance to copy the message. Can you try your test with num acks > set > > to -1 ? > > > > Thanks, > > Neha > > On Oct 4, 2013 1:21 PM, "Jason Rosenberg" wrote: > > > > > All, > > > > > > I'm having an issue with an integration test I've setup. This is using > > > 0.8-beta1. > > > > > > The test is to verify that no messages are dropped (or the sender gets > an > > > exception thrown back if failure), while doing a rolling restart of a > > > cluster of 2 brokers. > > > > > > The producer is configured to use 'request.required.acks' = '1'. > > > > > > The servers are set up to run locally on localhost, on different ports, > > and > > > different data dirs. The producer connects with a metadata brokerlist > > > like: "localhost:2024,localhost:1025" (so no vip). The servers are > set > > > up with a default replication factor of 2. The servers have controlled > > > shutdown enabled, as well. > > > > > > The producer code looks like this: > > > ... > > > Producer producer = getProducer(); > > > try { > > > KeyedMessage msg = new KeyedMessage T>(topic, > > > message); > > > producer.send(msg); > > > return true; > > > } catch (RuntimeException e) { > > > logger.warn("Error sending data to kafka", e); > > > return false; > > > } > > > ... > > > > > > The test sends groups of messages at each stage of the test (e.g. > servers > > > up, first server going down, first server coming up, second server > going > > > down, etc.). Then a consumer connects and consumes all the messages, > to > > > make sure they all arrived ok. > > > > > > It seems intermittently, a single message gets dropped, right after one > > of > > > the servers starts going down. It doesn't happen always, seems to > > happen 1 > > > out of every 20 test runs or so. Here's some sample output. I see the > > > exception inside the producer code, but I don't see the producer.send > > > method ever having an exception thrown back out to the caller (the log > > line > > > "Error sending data to kafka" is never triggered). > > > > > > What's interesting, is that it looks like the exceptions are happening > on > > > message 3, but when the consumer subsequently consumes back all the > > > messages in the broker cluster, it seems message 2 (and not message 3) > is > > > missing: > > > > > > ... > > > ... > > > 7136 [Thread-1] INFO >
Re: testing issue with reliable sending
Thanks for the explanation Neha.still holding out hope. So, if request.required.acks=-1, how does the leader confirm that the other brokers have consumed the message, before acking to the producer? Does the leader just wait for the followers in the ISR to consume? Or does the leader have a way to push, or ping the followers to consume? Couldn't that mechanism be used, during a clean shutdown, even if the messages were initially produced with acks=1? That is, when shutting down, get acks from all ISR members for each partition, before shutting down. I'm just a bit leery about using -1 across the board, because of the performance hit (but for now it seems the best option to use for reliable sending). A separate question, can the request.required.acks be set to a higher positive integer, say "2", to indicate that 2 of say 3 replicas have acked? ("request.required.acks" in the name would seem to indicate this). I'm not saying I'd want to use this (we are hoping to use only a replication factor of 2). Jason On Sat, Oct 5, 2013 at 1:00 PM, Neha Narkhede wrote: > Shouldn't this be part of the contract? It should be able to make sure > this happens before shutting down, no? > > The leader writes messages to its local log and then the replicas consume > messages from the leader and write those to their local logs. If you set > request.required.acks=1, the ack is sent to the producer only after the > leader has written messages to its local log. What you are asking for, is > part of the contract if request.required.acks=-1. > > In this case, if we need to use > required.request.acks=-1, that will pretty much prevent any successful > message producing while any of the brokers for a partition is unavailable. > So, I don't think that's an option. (Not to mention the performance > degradation). > > You can implement reliable delivery semantics while allowing rolling > restart of brokers by setting request.required.acks=-1. When one of the > replicas is shut down, the ISR reduces to remove the replica being shut > down and the messages will be committed using the new ISR. > > Thanks, > Neha > > > On Fri, Oct 4, 2013 at 11:51 PM, Jason Rosenberg wrote: > > > Neha, > > > > I'm not sure I understand. I would have thought that if the leader > > acknowledges receipt of a message, and is then shut down cleanly (with > > controlled shutdown enabled), that it would be able to reliably persist > any > > in memory buffered messages (and replicate them), before shutting down. > > Shouldn't this be part of the contract? It should be able to make sure > > this happens before shutting down, no? > > > > I would understand a message dropped if it were a hard shutdown. > > > > I'm not sure then how to implement reliable delivery semantics, while > > allowing a rolling restart of the broker cluster (or even to tolerate a > > single node failure, where one node might be down for awhile and need to > be > > replaced or have a disk repaired). In this case, if we need to use > > required.request.acks=-1, that will pretty much prevent any successful > > message producing while any of the brokers for a partition is > unavailable. > > So, I don't think that's an option. (Not to mention the performance > > degradation). > > > > Is there not a way to make this work more reliably with leader only > > acknowledgment, and clean/controlled shutdown? > > > > My test does succeed, as expected, with acks = -1, at least for the 100 > or > > so iterations I've let it run so far. It does on occasion send > duplicates > > (but that's ok with me). > > > > Jason > > > > > > On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede > >wrote: > > > > > The occasional single message loss could happen since > > > required.request.acks=1 and the leader is shut down before the follower > > > gets a chance to copy the message. Can you try your test with num acks > > set > > > to -1 ? > > > > > > Thanks, > > > Neha > > > On Oct 4, 2013 1:21 PM, "Jason Rosenberg" wrote: > > > > > > > All, > > > > > > > > I'm having an issue with an integration test I've setup. This is > using > > > > 0.8-beta1. > > > > > > > > The test is to verify that no messages are dropped (or the sender > gets > > an > > > > exception thrown back if failure), while doing a rolling restart of a > > > > cluster of 2 brokers. > > > > > > > > The producer is configured to use 'request.required.acks' = '1'. > > > > > > > > The servers are set up to run locally on localhost, on different > ports, > > > and > > > > different data dirs. The producer connects with a metadata > brokerlist > > > > like: "localhost:2024,localhost:1025" (so no vip). The servers are > > set > > > > up with a default replication factor of 2. The servers have > controlled > > > > shutdown enabled, as well. > > > > > > > > The producer code looks like this: > > > > ... > > > > Producer producer = getProducer(); > > > > try { > > > > KeyedMessage msg = new KeyedMessage > T>(topic, > > >
Re: Hi, Can anyone tell me why I cannot produce any message to remote kafka server from my local machine
Hi, I tried to setup the host.name in servier.properties, it doesn't work. I believe it is the network security issue. However, I create a new instance in the same security group without kafka, zookeeper, it does work, it can still produce to kafka server. but when I change to another ec2 account, then create the same instance, and it cannot produce to kafka server. I pay attention that there is no outbound port setting in the security group configuration of kafka server ec2, I think if we shall have to set the outbound port for the firewall? Do you guys know which outbound port should be opened for kafka server? Thanks 2013/10/5 Guozhang Wang > Hello Jacky, > > Have you read this FAQ: > > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F > > Guozhang > > > On Fri, Oct 4, 2013 at 10:41 PM, Jiang Jacky wrote: > > > It is very weird, I have a kafka cluster in EC2, There is no any problem > to > > produce message from one of node by same producer. But when I move the > > producer to my local machine at home, then it gives me the below error: > > Failed to send messages after 3 tries. > > > > Can anyone tell me how do I fix this issue? I have opened all ports of my > > machine at my home, and the security group is also opened for kafka > server > > and zookeeper in EC2. Everything is fine, but I just cannot send any > > message to kafka server. > > Please help me. > > Thanks Everyone! > > > > Jacky > > > > > > -- > -- Guozhang >
Managing Millions of Paritions in Kafka
Initially, I thought dynamic topic creation can be used to maintain per user data on Kafka. The I read that partitions can and should be used for this instead. If a partition is to be used to map a user, can there be a million, or even billion partitions in a cluster? How does one go about designing such a model. Can the replication tool be used to assign, say partitions 1 - 10,000 on replica 1, and 10,001 - 20,000 on replica 2? If not, since there is a ulimit on the file system, should one model it based on a replica/topic/partition approach. Say users 1-10,000 go on topic 10k-1, and has 10,000 partitions, and users 10,0001-20,000 go on topic 10k-2, and has 10,000 partitions. Simply put, how can a million stateful data points be handled? (I deduced that a userid-partition number mapping can be done via a partitioner, but unless a server can be configured to handle only a given set of partitions, with a range based notation, it is almost impossible to handle a large dataset. Is it that Kafka can only handle a limited set of stateful data right now?) http://stackoverflow.com/questions/17205561/data-modeling-with-kafka-topics-and-partitions Btw, why does Kafka have to keep open each partition? Can't a partition be opened for read/write when needed only? Thanks in advance!