[ 
https://issues.apache.org/jira/browse/KAFKA-640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Swapnil Ghike updated KAFKA-640:
--------------------------------

    Attachment: kafka-640.patch

This happened because ClientId.validate(clientId) in SimpleConsumer did not 
validate "." in the clientId passed from ReplicaFetcherThread. 

This patch fixes another bug - AbstractFetcherThread would create 
SimpleConsumer and pass "%s-host_%s-port_%s" as the clientId to the 
SimpleConsumer. SimpleConsumer would append the host-port string again to the 
clientId while instantiating FetchRequestAndResponseStats. 

Changes: 
- The fix is to not include the host-port string while initializing 
AbstractFetcherThread.clientId in ReplicaFetcherThread, the host and port are 
already passed through the sourceBroker argument. This new clientId is passed 
to SimpleConsumer in AbstractFetcherThread and it should validate successfully.
- Pass clientId + host-port string while instantiating *Stats in 
AbstractFetcherThread and SimpleConsumer. Since the host-port string gives 
information about the client, I think it should be ok to append it to clientId 
and not create a separate case class like ClientIdAndTopic. 
- Pass clientId + host-port string while instantiating FetchRequestBuilder in 
AbstractFetcherThread.
- Pass clientId + host-port string to the constructors of ProducerRequestStats, 
FetchRequestAndResponseStats, FetcherStats and FetcherLagStats to maintain 
uniformity with passing clientId + host-port to FetchRequestBuilder in 
AbstractFetcherThread.doWork().
- Removed a line in ClientIdTest.scala, it was redundant.

The validation criteria for clientId string that comes from the client is 
unchanged. Ideally I would like to validate the clientId that *includes* the 
host-port string, but that would require an introduction of '.' in the legal 
characters set which would be inconsistent with legal chars for Topic. Instead, 
we can maintain the same legal chars set and take care that the host-port 
string doesn't change format within the code.
                
> System Test Failures : kafka.common.InvalidClientIdException in broker log4j 
> messages
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-640
>                 URL: https://issues.apache.org/jira/browse/KAFKA-640
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: John Fung
>              Labels: replication-testing
>         Attachments: kafka-640.patch
>
>
> * To reproduce the issue, download and build the latest Kafka 0.8 branch and 
> execute this command: "<kafka_home>/system_test $ python -B 
> system_test_runner.py"
> * The following exception is found in the broker log4j messages in most 
> System Test cases: 
> [2012-11-29 09:06:21,322] WARN No previously checkpointed highwatermark value 
> found for topic test_1 partition 1. Returning 0 as the highwatermark 
> (kafka.server.HighwaterMarkCheckpoint)
> [2012-11-29 09:06:21,326] INFO [Kafka Log on Broker 1], Truncated log segment 
> /tmp/kafka_server_1_logs/test_1-1/00000000000000000000.log to target offset 0 
> (kafka.log.Log)
> [2012-11-29 09:06:21,333] ERROR Replica Manager on Broker 1: Error processing 
> leaderAndISR request LeaderAndIsrRequest(1,,1000,Map((test_1,1) -> 
> PartitionStateInfo(LeaderIsrAndControllerEpoch({ 
> "ISR":"2,3,1","leader":"2","leaderEpoch":"0" },1),3), (test_1,0) -> 
> PartitionStateInfo(LeaderIsrAndControllerEpoch({ 
> "ISR":"1,2,3","leader":"1","leaderEpoch":"0" 
> },1),3)),Set(id:2,creatorId:127.0.0.1-1354208764997,host:127.0.0.1,port:9092, 
> id:1,creatorId:127.0.0.1-1354208760105,host:127.0.0.1,port:9091),1) 
> (kafka.server.ReplicaManager)
> kafka.common.InvalidClientIdException: ClientId 
> replica-fetcher-host_127.0.0.1-port_9092 is illegal, contains a character 
> other than ASCII alphanumerics, _ and -
>         at kafka.utils.ClientId$.validate(ClientIdAndTopic.scala:36)
>         at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:81)
>         at 
> kafka.server.AbstractFetcherThread.<init>(AbstractFetcherThread.scala:44)
>         at 
> kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:26)
>         at 
> kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:26)
>         at 
> kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:44)
>         at kafka.cluster.Partition.makeFollower(Partition.scala:190)
>         at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$makeFollower(ReplicaManager.scala:236)
>         at 
> kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:201)
>         at 
> kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:191)
>         at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
>         at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:191)
>         at 
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:129)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
>         at java.lang.Thread.run(Thread.java:662)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to