Cluster of two nodes?
Hello, is there any value in running zookeeper and kafka in a cluster of two nodes? I.e. one instance of zookeeper and kafka on each node?
kafka “stops working” after a large message is enqueued
I'm running kafka_2.11-0.9.0.0 and a java-based producer/consumer. With messages ~70 KB everything works fine. However, after the producer enqueues a larger, 70 MB message, kafka appears to stop delivering the messages to the consumer. I.e. not only is the large message not delivered but also subsequent smaller messages. I know the producer succeeds because I use kafka callback for the confirmation and I can see the messages in the kafka message log. kafka config custom changes: message.max.bytes=2 replica.fetch.max.bytes=2 consumer config: props.put("fetch.message.max.bytes", "2"); props.put("max.partition.fetch.bytes", "2");
unable to delete a topic
Try as I might I've found it impossible to delete a topic in kafka 0.9.0. I set delete.topic.enable in config.properties. I tried kafka-topics.sh with the delete command .I responds with: "topic marked for deletetion" but after a 30 minute wait the topic was till there. Then I stopped kafka and wiped out the kafka-logs/*. I logged on to zookeepr and did "rmr /brokers/topics/. The zookeeper reported the topis as gone. But when I bring kafka back up, out of nowhere the topics pop up again.
Re: unable to delete a topic
Thanks. Setting auto.create.topics.enable=false made the difference. On Wednesday, February 3, 2016 2:45 PM, John Holland wrote: What I ended up doing, after having similar issues your having, was: - stop all the brokers - rm -rf all the topic data across the brokers - delete the topic node in ZK - set auto.create.topics.enable=false in the server.properties - start the brokers up again The topic stayed deleted this way. In my case the topic was misconfigured so I created it again and then did a rolling restart after setting auto.create.topics.enable back to true. I'm wondering if I have a rogue service in my environment publishing to the topic in question. On Wed, Feb 3, 2016 at 12:30 PM Tech Bolek wrote: > Try as I might I've found it impossible to delete a topic in kafka 0.9.0. > I set delete.topic.enable in config.properties. > I tried kafka-topics.sh with the delete command .I responds with: "topic > marked for deletetion" but after a 30 minute wait the topic was till there. > Then I stopped kafka and wiped out the kafka-logs/*. I logged > on to zookeepr and did "rmr /brokers/topics/. The zookeeper > reported the topis as gone. But when I bring kafka back up, out of nowhere > the topics pop up again. > > >
Re: kafka “stops working” after a large message is enqueued
Deleted the topic and recreated (with max bytes set) but that did not help.What helped though is upping the java heap size.I monitored the consumer with jstat. I noticed 2 full garbage collection attempts right after publishing the large message. After that the consumer appeared dormant. Upping the java heap size allowed to consume the message. Wondering why the consumer remained silent, i.e. no out of heap memory error or anything. On Tuesday, February 2, 2016 8:35 PM, Joe Lawson wrote: Make sure the topic is created after message Max bytes is set. On Feb 2, 2016 9:04 PM, "Tech Bolek" wrote: > I'm running kafka_2.11-0.9.0.0 and a java-based producer/consumer. With > messages ~70 KB everything works fine. However, after the producer enqueues > a larger, 70 MB message, kafka appears to stop delivering the messages to > the consumer. I.e. not only is the large message not delivered but also > subsequent smaller messages. I know the producer succeeds because I use > kafka callback for the confirmation and I can see the messages in the kafka > message log. > kafka config custom changes: > message.max.bytes=2 replica.fetch.max.bytes=2 > consumer config: > props.put("fetch.message.max.bytes", "2"); > props.put("max.partition.fetch.bytes", "2"); >
Kafka consumer becomes deaf after an hour or so
I got a consumer using high-level API based on the example from here: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+ExampleWorks fine but after an hour or so of inactivity it stops responding to new messages. All I see the log is: INFO [2016-02-23 17:06:10,070] org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 4002ms for sessionid 0x152b35436a9003d, closing socket connection and attempting reconnectINFO [2016-02-23 17:06:10,173] org.I0Itec.zkclient.ZkClient: zookeeper state changed (Disconnected)INFO [2016-02-23 17:06:11,223] org.apache.zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)INFO [2016-02-23 17:06:11,225] org.apache.zookeeper.ClientCnxn: Socket connection established to localhost/127.0.0.1:2181, initiating sessionINFO [2016-02-23 17:06:11,231] org.apache.zookeeper.ClientCnxn: Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x152b35436a9003d, negotiated timeout = 6000 INFO [2016-02-23 17:06:11,231] org.I0Itec.zkclient.ZkClient: zookeeper state changed (SyncConnected) Init code: Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("group.id", "ONEGROUP"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.message.max.bytes", "2"); props.put("max.partition.fetch.bytes", "2"); ConsumerConfig cc = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(cc); Map topicCountMap = new HashMap();topicCountMap.put("LISTENER1", new Integer(1));Map>> consumerMap = consumer.createMessageStreams(topicCountMap); ExecutorService threadExecutor = Executors.newFixedThreadPool(1); int threadNumber = 0; List> streams = consumerMap.get("LISTENER1"); for (final KafkaStream stream : streams) { threadExecutor.submit(new ProcessorThread(stream, threadNumber)); threadNumber++; } ProcessorThread: class ProcessorThread implements Runnable { private final KafkaStream stream; private final int threadNumber; public ProcessorThread(KafkaStream stream, int threadNumber) { this.stream = stream; this.threadNumber = threadNumber; } public void run() { ConsumerIterator it = stream.iterator(); while(it.hasNext()) { MessageAndMetadata msg = it.next(); byte[] bytes = msg.message(); String msgString = new String(bytes); /* process message *. } }
kafka zookeeper: no route to host exception on connect. Strange address format?
Here is the scenario: - My kafka server and the zookeeper are running and working fine on the remote server as long as I launch the process on the same remote server. - I don't have any connectivity issues between my local machine and the server. I can ssh and access all other applications on my remote server from my local machine. - When trying to connect a client to the remote zookeeper directly from my local machine I get a a NoRouteToHost exception is being thrown. - The client logfile contains the following lines: org.apache.zookeeper.ZooKeeper: Initiating client connection, connectString=remotehost:2181 sessionTimeout=12watcher=org.I0Itec.zkclient.ZkClient@62807a4d org.apache.zookeeper.ClientCnxn: Opening socket connection to server remotehost/192.13.12.1:2181. Will not attempt to authenticate usingSASL (unknown error)The URL I'm specifying when connecting to zookeeper is remotehost:2181. However, note the "remotehost/192.13.12.1:2181" in the log. He appears to resolve the hostname to IP correctly and then... slapping a forward slash and the IP on to the URL which looks weird to me. Is this how he should resolve and pass down the URL to the socket??
Re: kafka zookeeper: no route to host exception on connect. Strange address format?
Found the issue. The zookeeper listen port was not open to firewall. On Monday, August 29, 2016 8:55 AM, Tech Bolek wrote: Here is the scenario: - My kafka server and the zookeeper are running and working fine on the remote server as long as I launch the process on the same remote server. - I don't have any connectivity issues between my local machine and the server. I can ssh and access all other applications on my remote server from my local machine. - When trying to connect a client to the remote zookeeper directly from my local machine I get a a NoRouteToHost exception is being thrown. - The client logfile contains the following lines: org.apache.zookeeper.ZooKeeper: Initiating client connection, connectString=remotehost:2181 sessionTimeout=12watcher=org.I0Itec.zkclient.ZkClient@62807a4d org.apache.zookeeper.ClientCnxn: Opening socket connection to server remotehost/192.13.12.1:2181. Will not attempt to authenticate usingSASL (unknown error)The URL I'm specifying when connecting to zookeeper is remotehost:2181. However, note the "remotehost/192.13.12.1:2181" in the log. He appears to resolve the hostname to IP correctly and then... slapping a forward slash and the IP on to the URL which looks weird to me. Is this how he should resolve and pass down the URL to the socket??