Di Shang created KAFKA-4961:
-------------------------------

             Summary: Mirrormaker 
                 Key: KAFKA-4961
                 URL: https://issues.apache.org/jira/browse/KAFKA-4961
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.9.0.1
            Reporter: Di Shang


We are running a cluster of 3 brokers and using mirrormaker to replicate a 
topic to a different 3-broker cluster. Occasionally we find that when the 
source cluster is under heavy load with lots of messages coming in, mirrormaker 
will crash with SchemaException. 

{quote}
27 Mar 2017 19:02:22.030 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) 
Disconnecting from node 5 due to request timeout.
27 Mar 2017 19:02:22.032 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) 
Disconnecting from node 7 due to request timeout.
27 Mar 2017 19:02:22.033 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient 
onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@96db60c9,
 
request=RequestSend(header={api_key=1,api_version=1,correlation_id=76978,client_id=dx-stg02-wdc04-0},
 
body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}),
 createdTimeMs=1490641301465, sendTimeMs=1490641301465) with correlation id 
76978 due to node 5 being disconnected
27 Mar 2017 19:02:22.035 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch 
failed
org.apache.kafka.common.errors.DisconnectException: null
27 Mar 2017 19:02:22.037 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient 
onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@fcb8c50d,
 
request=RequestSend(header={api_key=1,api_version=1,correlation_id=76980,client_id=dx-stg02-wdc04-0},
 
body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}),
 createdTimeMs=1490641301466, sendTimeMs=1490641301466) with correlation id 
76980 due to node 7 being disconnected
27 Mar 2017 19:02:22.038 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch 
failed
org.apache.kafka.common.errors.DisconnectException: null
27 Mar 2017 19:02:22.039 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.NetworkClient maybeUpdate(line:619) Sending metadata 
request ClientRequest(expectResponse=true, callback=null, 
request=RequestSend(header={api_key=3,api_version=0,correlation_id=88357,client_id=dx-stg02-wdc04-0},
 body={topics=[]}), isInitiatedByNetworkClient, createdTimeMs=1490641342039, 
sendTimeMs=0) to node 6
27 Mar 2017 19:02:22.040 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient 
onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@5d530ce8,
 
request=RequestSend(header={api_key=1,api_version=1,correlation_id=88355,client_id=dx-stg02-wdc04-0},
 
body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}),
 createdTimeMs=1490641342039, sendTimeMs=0) with correlation id 88355 due to 
node 5 being disconnected
27 Mar 2017 19:02:22.040 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch 
failed
org.apache.kafka.common.errors.DisconnectException: null
27 Mar 2017 19:02:22.041 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient 
onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@8a4a0afc,
 
request=RequestSend(header={api_key=1,api_version=1,correlation_id=88356,client_id=dx-stg02-wdc04-0},
 
body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}),
 createdTimeMs=1490641342039, sendTimeMs=0) with correlation id 88356 due to 
node 7 being disconnected
27 Mar 2017 19:02:22.041 [mirrormaker-thread-0] DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch 
failed
org.apache.kafka.common.errors.DisconnectException: null
27 Mar 2017 19:02:22.044 [mirrormaker-thread-0] FATAL 
kafka.tools.MirrorMaker$MirrorMakerThread fatal(line:116) 
[mirrormaker-thread-0] Mirror maker thread failure due to 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading field 'partition_responses': Error reading field 
'record_set': java.lang.IllegalArgumentException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) 
~[kafka-clients-0.9.0.1.jar:?]
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
 ~[kafka-clients-0.9.0.1.jar:?]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) 
~[kafka-clients-0.9.0.1.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
 ~[kafka-clients-0.9.0.1.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
 ~[kafka-clients-0.9.0.1.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
 ~[kafka-clients-0.9.0.1.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
 ~[kafka-clients-0.9.0.1.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
~[kafka-clients-0.9.0.1.jar:?]
        at 
kafka.tools.MirrorMaker$MirrorMakerNewConsumer.receive(MirrorMaker.scala:526) 
~[kafka_2.11-0.9.0.1.jar:?]
        at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:395) 
[kafka_2.11-0.9.0.1.jar:?]
{quote}

All the brokers and mirrormaker are running 0.9.0.1, there are 50~100 clients 
that are producing messages in the source cluster, some of them are using the 
0.9 client and some are on 0.8. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to