[ https://issues.apache.org/jira/browse/KAFKA-640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Swapnil Ghike updated KAFKA-640: -------------------------------- Attachment: kafka-640.patch This happened because ClientId.validate(clientId) in SimpleConsumer did not validate "." in the clientId passed from ReplicaFetcherThread. This patch fixes another bug - AbstractFetcherThread would create SimpleConsumer and pass "%s-host_%s-port_%s" as the clientId to the SimpleConsumer. SimpleConsumer would append the host-port string again to the clientId while instantiating FetchRequestAndResponseStats. Changes: - The fix is to not include the host-port string while initializing AbstractFetcherThread.clientId in ReplicaFetcherThread, the host and port are already passed through the sourceBroker argument. This new clientId is passed to SimpleConsumer in AbstractFetcherThread and it should validate successfully. - Pass clientId + host-port string while instantiating *Stats in AbstractFetcherThread and SimpleConsumer. Since the host-port string gives information about the client, I think it should be ok to append it to clientId and not create a separate case class like ClientIdAndTopic. - Pass clientId + host-port string while instantiating FetchRequestBuilder in AbstractFetcherThread. - Pass clientId + host-port string to the constructors of ProducerRequestStats, FetchRequestAndResponseStats, FetcherStats and FetcherLagStats to maintain uniformity with passing clientId + host-port to FetchRequestBuilder in AbstractFetcherThread.doWork(). - Removed a line in ClientIdTest.scala, it was redundant. The validation criteria for clientId string that comes from the client is unchanged. Ideally I would like to validate the clientId that *includes* the host-port string, but that would require an introduction of '.' in the legal characters set which would be inconsistent with legal chars for Topic. Instead, we can maintain the same legal chars set and take care that the host-port string doesn't change format within the code. > System Test Failures : kafka.common.InvalidClientIdException in broker log4j > messages > ------------------------------------------------------------------------------------- > > Key: KAFKA-640 > URL: https://issues.apache.org/jira/browse/KAFKA-640 > Project: Kafka > Issue Type: Bug > Reporter: John Fung > Labels: replication-testing > Attachments: kafka-640.patch > > > * To reproduce the issue, download and build the latest Kafka 0.8 branch and > execute this command: "<kafka_home>/system_test $ python -B > system_test_runner.py" > * The following exception is found in the broker log4j messages in most > System Test cases: > [2012-11-29 09:06:21,322] WARN No previously checkpointed highwatermark value > found for topic test_1 partition 1. Returning 0 as the highwatermark > (kafka.server.HighwaterMarkCheckpoint) > [2012-11-29 09:06:21,326] INFO [Kafka Log on Broker 1], Truncated log segment > /tmp/kafka_server_1_logs/test_1-1/00000000000000000000.log to target offset 0 > (kafka.log.Log) > [2012-11-29 09:06:21,333] ERROR Replica Manager on Broker 1: Error processing > leaderAndISR request LeaderAndIsrRequest(1,,1000,Map((test_1,1) -> > PartitionStateInfo(LeaderIsrAndControllerEpoch({ > "ISR":"2,3,1","leader":"2","leaderEpoch":"0" },1),3), (test_1,0) -> > PartitionStateInfo(LeaderIsrAndControllerEpoch({ > "ISR":"1,2,3","leader":"1","leaderEpoch":"0" > },1),3)),Set(id:2,creatorId:127.0.0.1-1354208764997,host:127.0.0.1,port:9092, > id:1,creatorId:127.0.0.1-1354208760105,host:127.0.0.1,port:9091),1) > (kafka.server.ReplicaManager) > kafka.common.InvalidClientIdException: ClientId > replica-fetcher-host_127.0.0.1-port_9092 is illegal, contains a character > other than ASCII alphanumerics, _ and - > at kafka.utils.ClientId$.validate(ClientIdAndTopic.scala:36) > at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:81) > at > kafka.server.AbstractFetcherThread.<init>(AbstractFetcherThread.scala:44) > at > kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:26) > at > kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:26) > at > kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:44) > at kafka.cluster.Partition.makeFollower(Partition.scala:190) > at > kafka.server.ReplicaManager.kafka$server$ReplicaManager$$makeFollower(ReplicaManager.scala:236) > at > kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:201) > at > kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:191) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) > at > kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:191) > at > kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:129) > at kafka.server.KafkaApis.handle(KafkaApis.scala:60) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) > at java.lang.Thread.run(Thread.java:662) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira