[ https://issues.apache.org/jira/browse/KAFKA-4961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Di Shang updated KAFKA-4961: ---------------------------- Description: We are running a cluster of 3 brokers and using mirrormaker to replicate a topic to a different 3-broker cluster. Occasionally we find that when the source cluster is under heavy load with lots of messages coming in, mirrormaker will crash with SchemaException. {noformat} 27 Mar 2017 19:02:22.030 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) Disconnecting from node 5 due to request timeout. 27 Mar 2017 19:02:22.032 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) Disconnecting from node 7 due to request timeout. 27 Mar 2017 19:02:22.033 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@96db60c9, request=RequestSend(header={api_key=1,api_version=1,correlation_id=76978,client_id=dx-stg02-wdc04-0}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}), createdTimeMs=1490641301465, sendTimeMs=1490641301465) with correlation id 76978 due to node 5 being disconnected 27 Mar 2017 19:02:22.035 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch failed org.apache.kafka.common.errors.DisconnectException: null 27 Mar 2017 19:02:22.037 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@fcb8c50d, request=RequestSend(header={api_key=1,api_version=1,correlation_id=76980,client_id=dx-stg02-wdc04-0}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}), createdTimeMs=1490641301466, sendTimeMs=1490641301466) with correlation id 76980 due to node 7 being disconnected 27 Mar 2017 19:02:22.038 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch failed org.apache.kafka.common.errors.DisconnectException: null 27 Mar 2017 19:02:22.039 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.NetworkClient maybeUpdate(line:619) Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=88357,client_id=dx-stg02-wdc04-0}, body={topics=[]}), isInitiatedByNetworkClient, createdTimeMs=1490641342039, sendTimeMs=0) to node 6 27 Mar 2017 19:02:22.040 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@5d530ce8, request=RequestSend(header={api_key=1,api_version=1,correlation_id=88355,client_id=dx-stg02-wdc04-0}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}), createdTimeMs=1490641342039, sendTimeMs=0) with correlation id 88355 due to node 5 being disconnected 27 Mar 2017 19:02:22.040 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch failed org.apache.kafka.common.errors.DisconnectException: null 27 Mar 2017 19:02:22.041 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@8a4a0afc, request=RequestSend(header={api_key=1,api_version=1,correlation_id=88356,client_id=dx-stg02-wdc04-0}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}), createdTimeMs=1490641342039, sendTimeMs=0) with correlation id 88356 due to node 7 being disconnected 27 Mar 2017 19:02:22.041 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch failed org.apache.kafka.common.errors.DisconnectException: null 27 Mar 2017 19:02:22.044 [mirrormaker-thread-0] FATAL kafka.tools.MirrorMaker$MirrorMakerThread fatal(line:116) [mirrormaker-thread-0] Mirror maker thread failure due to org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading field 'record_set': java.lang.IllegalArgumentException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) ~[kafka-clients-0.9.0.1.jar:?] at kafka.tools.MirrorMaker$MirrorMakerNewConsumer.receive(MirrorMaker.scala:526) ~[kafka_2.11-0.9.0.1.jar:?] at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:395) [kafka_2.11-0.9.0.1.jar:?] 27 Mar 2017 19:02:22.046 [mirrormaker-thread-0] INFO kafka.tools.MirrorMaker$MirrorMakerThread info(line:68) [mirrormaker-thread-0] Flushing producer. 27 Mar 2017 19:02:22.046 [mirrormaker-thread-0] INFO kafka.tools.MirrorMaker$MirrorMakerThread info(line:68) [mirrormaker-thread-0] Committing consumer offsets. 27 Mar 2017 19:02:22.047 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@f37de83b, request=RequestSend(header={api_key=1,api_version=1,correlation_id=88358,client_id=dx-stg02-wdc04-0}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}), createdTimeMs=1490641342041, sendTimeMs=0) with correlation id 88358 due to node 5 being disconnected 27 Mar 2017 19:02:22.047 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch failed org.apache.kafka.common.errors.DisconnectException: null 27 Mar 2017 19:02:22.048 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@10fcee02, request=RequestSend(header={api_key=1,api_version=1,correlation_id=88359,client_id=dx-stg02-wdc04-0}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}), createdTimeMs=1490641342041, sendTimeMs=0) with correlation id 88359 due to node 7 being disconnected 27 Mar 2017 19:02:22.048 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch failed org.apache.kafka.common.errors.DisconnectException: null 27 Mar 2017 19:02:22.049 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle(line:518) Committed offset ... 27 Mar 2017 19:02:22.053 [mirrormaker-thread-0] INFO kafka.tools.MirrorMaker$MirrorMakerThread info(line:68) [mirrormaker-thread-0] Shutting down consumer connectors. {noformat} All the brokers and mirrormaker are running 0.9.0.1, there are 50~100 clients that are producing messages in the source cluster, some of them are using the 0.9 client and some are on 0.8. We have process that auto restart mirrormaker after 30s when it crashes but it simply crashes again with the same error within 1~10 min. The only solution we found so far is to wait for a few hrs for the source cluster load to go down and then the mirror would work again. Although there is still massive number of messages in the source cluster waiting to be replicated, mirrormaker seems to be handling those just fine. We also saw the DisconnectException whenever the SchemaException occurs, not sure whether they are related though. was: We are running a cluster of 3 brokers and using mirrormaker to replicate a topic to a different 3-broker cluster. Occasionally we find that when the source cluster is under heavy load with lots of messages coming in, mirrormaker will crash with SchemaException. {noformat} 27 Mar 2017 19:02:22.030 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) Disconnecting from node 5 due to request timeout. 27 Mar 2017 19:02:22.032 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) Disconnecting from node 7 due to request timeout. 27 Mar 2017 19:02:22.033 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@96db60c9, request=RequestSend(header={api_key=1,api_version=1,correlation_id=76978,client_id=dx-stg02-wdc04-0}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}), createdTimeMs=1490641301465, sendTimeMs=1490641301465) with correlation id 76978 due to node 5 being disconnected 27 Mar 2017 19:02:22.035 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch failed org.apache.kafka.common.errors.DisconnectException: null 27 Mar 2017 19:02:22.037 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@fcb8c50d, request=RequestSend(header={api_key=1,api_version=1,correlation_id=76980,client_id=dx-stg02-wdc04-0}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}), createdTimeMs=1490641301466, sendTimeMs=1490641301466) with correlation id 76980 due to node 7 being disconnected 27 Mar 2017 19:02:22.038 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch failed org.apache.kafka.common.errors.DisconnectException: null 27 Mar 2017 19:02:22.039 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.NetworkClient maybeUpdate(line:619) Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=88357,client_id=dx-stg02-wdc04-0}, body={topics=[]}), isInitiatedByNetworkClient, createdTimeMs=1490641342039, sendTimeMs=0) to node 6 27 Mar 2017 19:02:22.040 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@5d530ce8, request=RequestSend(header={api_key=1,api_version=1,correlation_id=88355,client_id=dx-stg02-wdc04-0}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}), createdTimeMs=1490641342039, sendTimeMs=0) with correlation id 88355 due to node 5 being disconnected 27 Mar 2017 19:02:22.040 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch failed org.apache.kafka.common.errors.DisconnectException: null 27 Mar 2017 19:02:22.041 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient onComplete(line:376) Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@8a4a0afc, request=RequestSend(header={api_key=1,api_version=1,correlation_id=88356,client_id=dx-stg02-wdc04-0}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}), createdTimeMs=1490641342039, sendTimeMs=0) with correlation id 88356 due to node 7 being disconnected 27 Mar 2017 19:02:22.041 [mirrormaker-thread-0] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch failed org.apache.kafka.common.errors.DisconnectException: null 27 Mar 2017 19:02:22.044 [mirrormaker-thread-0] FATAL kafka.tools.MirrorMaker$MirrorMakerThread fatal(line:116) [mirrormaker-thread-0] Mirror maker thread failure due to org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading field 'record_set': java.lang.IllegalArgumentException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) ~[kafka-clients-0.9.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) ~[kafka-clients-0.9.0.1.jar:?] at kafka.tools.MirrorMaker$MirrorMakerNewConsumer.receive(MirrorMaker.scala:526) ~[kafka_2.11-0.9.0.1.jar:?] at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:395) [kafka_2.11-0.9.0.1.jar:?] {noformat} All the brokers and mirrormaker are running 0.9.0.1, there are 50~100 clients that are producing messages in the source cluster, some of them are using the 0.9 client and some are on 0.8. > Mirrormaker crash with org.apache.kafka.common.protocol.types.SchemaException > ----------------------------------------------------------------------------- > > Key: KAFKA-4961 > URL: https://issues.apache.org/jira/browse/KAFKA-4961 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.1 > Reporter: Di Shang > Labels: mirror-maker > > We are running a cluster of 3 brokers and using mirrormaker to replicate a > topic to a different 3-broker cluster. Occasionally we find that when the > source cluster is under heavy load with lots of messages coming in, > mirrormaker will crash with SchemaException. > {noformat} > 27 Mar 2017 19:02:22.030 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) > Disconnecting from node 5 due to request timeout. > 27 Mar 2017 19:02:22.032 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) > Disconnecting from node 7 due to request timeout. > 27 Mar 2017 19:02:22.033 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient > onComplete(line:376) Cancelled FETCH request > ClientRequest(expectResponse=true, > callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@96db60c9, > > request=RequestSend(header={api_key=1,api_version=1,correlation_id=76978,client_id=dx-stg02-wdc04-0}, > > body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}), > createdTimeMs=1490641301465, sendTimeMs=1490641301465) with correlation id > 76978 due to node 5 being disconnected > 27 Mar 2017 19:02:22.035 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch > failed > org.apache.kafka.common.errors.DisconnectException: null > 27 Mar 2017 19:02:22.037 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient > onComplete(line:376) Cancelled FETCH request > ClientRequest(expectResponse=true, > callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@fcb8c50d, > > request=RequestSend(header={api_key=1,api_version=1,correlation_id=76980,client_id=dx-stg02-wdc04-0}, > > body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}), > createdTimeMs=1490641301466, sendTimeMs=1490641301466) with correlation id > 76980 due to node 7 being disconnected > 27 Mar 2017 19:02:22.038 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch > failed > org.apache.kafka.common.errors.DisconnectException: null > 27 Mar 2017 19:02:22.039 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.NetworkClient maybeUpdate(line:619) Sending metadata > request ClientRequest(expectResponse=true, callback=null, > request=RequestSend(header={api_key=3,api_version=0,correlation_id=88357,client_id=dx-stg02-wdc04-0}, > body={topics=[]}), isInitiatedByNetworkClient, createdTimeMs=1490641342039, > sendTimeMs=0) to node 6 > 27 Mar 2017 19:02:22.040 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient > onComplete(line:376) Cancelled FETCH request > ClientRequest(expectResponse=true, > callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@5d530ce8, > > request=RequestSend(header={api_key=1,api_version=1,correlation_id=88355,client_id=dx-stg02-wdc04-0}, > > body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}), > createdTimeMs=1490641342039, sendTimeMs=0) with correlation id 88355 due to > node 5 being disconnected > 27 Mar 2017 19:02:22.040 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch > failed > org.apache.kafka.common.errors.DisconnectException: null > 27 Mar 2017 19:02:22.041 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient > onComplete(line:376) Cancelled FETCH request > ClientRequest(expectResponse=true, > callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@8a4a0afc, > > request=RequestSend(header={api_key=1,api_version=1,correlation_id=88356,client_id=dx-stg02-wdc04-0}, > > body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}), > createdTimeMs=1490641342039, sendTimeMs=0) with correlation id 88356 due to > node 7 being disconnected > 27 Mar 2017 19:02:22.041 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch > failed > org.apache.kafka.common.errors.DisconnectException: null > 27 Mar 2017 19:02:22.044 [mirrormaker-thread-0] FATAL > kafka.tools.MirrorMaker$MirrorMakerThread fatal(line:116) > [mirrormaker-thread-0] Mirror maker thread failure due to > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'responses': Error reading field 'partition_responses': Error reading field > 'record_set': java.lang.IllegalArgumentException > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) > ~[kafka-clients-0.9.0.1.jar:?] > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) > ~[kafka-clients-0.9.0.1.jar:?] > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > ~[kafka-clients-0.9.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > ~[kafka-clients-0.9.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > ~[kafka-clients-0.9.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > ~[kafka-clients-0.9.0.1.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) > ~[kafka-clients-0.9.0.1.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > ~[kafka-clients-0.9.0.1.jar:?] > at > kafka.tools.MirrorMaker$MirrorMakerNewConsumer.receive(MirrorMaker.scala:526) > ~[kafka_2.11-0.9.0.1.jar:?] > at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:395) > [kafka_2.11-0.9.0.1.jar:?] > 27 Mar 2017 19:02:22.046 [mirrormaker-thread-0] INFO > kafka.tools.MirrorMaker$MirrorMakerThread info(line:68) > [mirrormaker-thread-0] Flushing producer. > 27 Mar 2017 19:02:22.046 [mirrormaker-thread-0] INFO > kafka.tools.MirrorMaker$MirrorMakerThread info(line:68) > [mirrormaker-thread-0] Committing consumer offsets. > 27 Mar 2017 19:02:22.047 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient > onComplete(line:376) Cancelled FETCH request > ClientRequest(expectResponse=true, > callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@f37de83b, > > request=RequestSend(header={api_key=1,api_version=1,correlation_id=88358,client_id=dx-stg02-wdc04-0}, > > body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}), > createdTimeMs=1490641342041, sendTimeMs=0) with correlation id 88358 due to > node 5 being disconnected > 27 Mar 2017 19:02:22.047 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch > failed > org.apache.kafka.common.errors.DisconnectException: null > 27 Mar 2017 19:02:22.048 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient > onComplete(line:376) Cancelled FETCH request > ClientRequest(expectResponse=true, > callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@10fcee02, > > request=RequestSend(header={api_key=1,api_version=1,correlation_id=88359,client_id=dx-stg02-wdc04-0}, > > body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}), > createdTimeMs=1490641342041, sendTimeMs=0) with correlation id 88359 due to > node 7 being disconnected > 27 Mar 2017 19:02:22.048 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch > failed > org.apache.kafka.common.errors.DisconnectException: null > 27 Mar 2017 19:02:22.049 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator > handle(line:518) Committed offset ... > 27 Mar 2017 19:02:22.053 [mirrormaker-thread-0] INFO > kafka.tools.MirrorMaker$MirrorMakerThread info(line:68) > [mirrormaker-thread-0] Shutting down consumer connectors. > {noformat} > All the brokers and mirrormaker are running 0.9.0.1, there are 50~100 clients > that are producing messages in the source cluster, some of them are using the > 0.9 client and some are on 0.8. > We have process that auto restart mirrormaker after 30s when it crashes but > it simply crashes again with the same error within 1~10 min. The only > solution we found so far is to wait for a few hrs for the source cluster load > to go down and then the mirror would work again. Although there is still > massive number of messages in the source cluster waiting to be replicated, > mirrormaker seems to be handling those just fine. > We also saw the DisconnectException whenever the SchemaException occurs, not > sure whether they are related though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)