HI, I'm building a safe Kafka cluster using SASL_PLAINTEXT。 It can create the topic, but not producing and consuming data。
The following is an related log : less controller.log [2016-03-08 11:43:44,164] INFO [Controller 0]: Controller starting up (kafka.controller.KafkaController) [2016-03-08 11:43:44,218] INFO [Controller 0]: Broker 0 starting become controller state transition (kafka.controller.KafkaController) [2016-03-08 11:43:44,223] INFO [Controller 0]: Controller 0 incremented epoch to 1 (kafka.controller.KafkaController) [2016-03-08 11:43:44,224] DEBUG [Controller 0]: Registering IsrChangeNotificationListener (kafka.controller.KafkaController) [2016-03-08 11:43:44,241] INFO [Controller 0]: Partitions undergoing preferred replica election: (kafka.controller.KafkaController) [2016-03-08 11:43:44,242] INFO [Controller 0]: Partitions that completed preferred replica election: (kafka.controller.KafkaController) [2016-03-08 11:43:44,243] INFO [Controller 0]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) [2016-03-08 11:43:44,246] INFO [Controller 0]: Partitions being reassigned: Map() (kafka.controller.KafkaController) [2016-03-08 11:43:44,246] INFO [Controller 0]: Partitions already reassigned: List() (kafka.controller.KafkaController) [2016-03-08 11:43:44,248] INFO [Controller 0]: Resuming reassignment of partitions: Map() (kafka.controller.KafkaController) [2016-03-08 11:43:44,257] INFO [Controller 0]: List of topics to be deleted: (kafka.controller.KafkaController) [2016-03-08 11:43:44,258] INFO [Controller 0]: List of topics ineligible for deletion: (kafka.controller.KafkaController) [2016-03-08 11:43:44,262] INFO [Controller 0]: Currently active brokers in the cluster: Set() (kafka.controller.KafkaController) [2016-03-08 11:43:44,262] INFO [Controller 0]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) [2016-03-08 11:43:44,263] INFO [Controller 0]: Current list of topics in the cluster: Set() (kafka.controller.KafkaController) [2016-03-08 11:43:44,267] INFO [Replica state machine on controller 0]: Started replica state machine with initial state -> Map() (kafka.controller.ReplicaStateMachine) [2016-03-08 11:43:44,273] INFO [Partition state machine on Controller 0]: Started partition state machine with initial state -> Map() (kafka.controller.PartitionStateMachine) [2016-03-08 11:43:44,274] INFO [Controller 0]: Broker 0 is ready to serve as the new controller with epoch 1 (kafka.controller.KafkaController) [2016-03-08 11:43:44,276] INFO [Controller 0]: Starting preferred replica leader election for partitions (kafka.controller.KafkaController) [2016-03-08 11:43:44,277] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions (kafka.controller.PartitionStateMachine) [2016-03-08 11:43:44,283] INFO [Controller 0]: starting the partition rebalance scheduler (kafka.controller.KafkaController) [2016-03-08 11:43:44,284] INFO [Controller 0]: Controller startup complete (kafka.controller.KafkaController) [2016-03-08 11:43:44,422] INFO [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children 0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2016-03-08 11:43:44,538] INFO [BrokerChangeListener on Controller 0]: Newly added brokers: 0, deleted brokers: , all live brokers: 0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2016-03-08 11:43:44,539] DEBUG [Channel manager on controller 0]: Controller 0 trying to connect to broker 0 (kafka.controller.ControllerChannelManager) [2016-03-08 11:43:44,555] INFO [Controller-0-to-broker-0-send-thread], Starting (kafka.controller.RequestSendThread) [2016-03-08 11:43:44,555] INFO [Controller 0]: New broker startup callback for 0 (kafka.controller.KafkaController) [2016-03-08 11:43:44,604] WARN [Controller-0-to-broker-0-send-thread], Controller 0's connection to broker Node(0, kafka-001.vm 9095) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to Node(0, kafka-001.vm, 9095) failed at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$1.apply(NetworkClientBlockingOps.scala:62) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$1.apply(NetworkClientBlockingOps.scala:58) at kafka.utils.NetworkClientBlockingOps$$anonfun$kafka$utils$NetworkClientBlockingOps$$pollUntil$extension$2.apply(NetworkClientBlockingOps.scala:106) at kafka.utils.NetworkClientBlockingOps$$anonfun$kafka$utils$NetworkClientBlockingOps$$pollUntil$extension$2.apply(NetworkClientBlockingOps.scala:105) at kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClientBlockingOps.scala:129) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.scala:139) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:105) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:58) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:225) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:172) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:171) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) less server.log [2016-03-08 11:43:43,440] INFO starting (kafka.server.KafkaServer) [2016-03-08 11:43:43,446] INFO Connecting to zookeeper on kafka-001.vm:2182 (kafka.server.KafkaServer) [2016-03-08 11:43:43,940] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager) [2016-03-08 11:43:43,946] INFO Loading logs. (kafka.log.LogManager) [2016-03-08 11:43:43,955] INFO Logs loading complete. (kafka.log.LogManager) [2016-03-08 11:43:44,020] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2016-03-08 11:43:44,022] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2016-03-08 11:43:44,028] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2016-03-08 11:43:44,104] INFO Awaiting socket connections on kafka-001.vm:9095. (kafka.network.Acceptor) [2016-03-08 11:43:44,107] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer) [2016-03-08 11:43:44,132] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-03-08 11:43:44,134] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-03-08 11:43:44,211] INFO Creating /controller (is it secure? true) (kafka.utils.ZKCheckedEphemeral) [2016-03-08 11:43:44,217] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-03-08 11:43:44,217] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2016-03-08 11:43:44,299] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator) [2016-03-08 11:43:44,301] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-03-08 11:43:44,302] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-03-08 11:43:44,303] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator) [2016-03-08 11:43:44,308] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.GroupMetadataManager) [2016-03-08 11:43:44,353] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-03-08 11:43:44,354] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-03-08 11:43:44,361] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2016-03-08 11:43:44,374] INFO Creating /brokers/ids/0 (is it secure? true) (kafka.utils.ZKCheckedEphemeral) [2016-03-08 11:43:44,378] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-03-08 11:43:44,379] INFO Registered broker 0 at path /brokers/ids/0 with addresses: SASL_PLAINTEXT -> EndPoint(kafka-001.vm,9095,SASL_PLAINTEXT) (kafka.utils.ZkUtils) [2016-03-08 11:43:44,396] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2016-03-08 11:43:44,419] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2016-03-08 11:53:44,298] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager) How to solve this problem? Hope to get your help。 Thank you very much! ---------------------------------- 张现忠 | moose 陌陌科技 | 技术部 E-mail: zhang.xianzh...@immomo.com<mailto:wu.chun...@immomo.com>