Hey, > Do you see any timeouts in the logs? No. At least not exception related to timeout. I see exception like: InstanceAlreadyExistsException, SecurityDisabledException.
But I see these issues below, it might be related... Is it expected to see ClassLoader exceptions? [2019-12-27 11:34:48,056] INFO Started MirrorCheckpointConnector with 871 consumer groups. (org.apache.kafka.connect.mirror.MirrorCheckpointConnector:69) [2019-12-27 11:34:48,060] INFO Finished creating connector MirrorCheckpointConnector (org.apache.kafka.connect.runtime.Worker:273) [2019-12-27 11:34:48,060] ERROR Plugin class loader for connector: 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not found. Returning: org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2eced48b (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) [2019-12-27 11:34:48,060] INFO SourceConnectorConfig values: config.action.reload = restart connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = null name = MirrorCheckpointConnector tasks.max = 1 transforms = [] value.converter = null (org.apache.kafka.connect.runtime.SourceConnectorConfig:347) [2019-12-27 11:34:48,062] INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = null name = MirrorCheckpointConnector tasks.max = 1 transforms = [] value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347) [2019-12-27 11:34:48,063] ERROR Plugin class loader for connector: 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not found. Returning: org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2eced48b (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) [2019-12-27 11:34:48,066] INFO [Worker clientId=connect-1, groupId=eucmain-mm2] Handling task config update by restarting tasks [MirrorCheckpointConnector-0, MirrorSourceConnector-0] (org.apache.kafka.connect.runtime.distributed.DistributedHerder:574) [2019-12-27 11:34:48,066] INFO Stopping task MirrorCheckpointConnector-0 (org.apache.kafka.connect.runtime.Worker:704) [2019-12-27 11:34:48,611] INFO Stopping DistributedHerder-connect-1 took 544 ms. (org.apache.kafka.connect.mirror.MirrorCheckpointTask:99) [2019-12-27 11:34:48,611] INFO Stopping task MirrorSourceConnector-0 (org.apache.kafka.connect.runtime.Worker:704) [2019-12-27 11:34:48,616] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2019-12-27 11:34:48,617] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2019-12-27 11:34:48,617] INFO [Producer clientId=connector-producer-MirrorCheckpointConnector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183) [2019-12-27 11:34:50,587] INFO syncing topic configs took 2930 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2019-12-27 11:34:53,612] ERROR Graceful stop of task MirrorSourceConnector-0 failed. (org.apache.kafka.connect.runtime.Worker:736) [2019-12-27 11:34:53,613] INFO [Worker clientId=connect-1, groupId=eucmain-mm2] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222) [2019-12-27 11:34:53,613] INFO [Worker clientId=connect-1, groupId=eucmain-mm2] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:533) [2019-12-27 11:34:53,613] ERROR Failed to fetch offsets from namespace MirrorSourceConnector: (org.apache.kafka.connect.storage.OffsetStorageReaderImpl:113) org.apache.kafka.connect.errors.ConnectException: Offset reader closed while attempting to read offsets. This is likely because the task was been scheduled to stop but has taken longer than the graceful shutdown period to do so. at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) at org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:227) at org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:222) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:222) at org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:92) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2019-12-27 11:34:53,616] INFO WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2019-12-27 11:34:53,617] INFO WorkerSourceTask{id=MirrorSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2019-12-27 11:34:53,617] ERROR WorkerSourceTask{id=MirrorSourceConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets. at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) at org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:227) at org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:222) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:222) at org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:92) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader closed while attempting to read offsets. This is likely because the task was been scheduled to stop but has taken longer than the graceful shutdown period to do so. at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) ... 21 more [2019-12-27 11:34:53,621] ERROR WorkerSourceTask{id=MirrorSourceConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180) [2019-12-27 11:34:53,622] INFO [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183) After awhile this exceptions stop and I only see these logs while nothing really happens (well, minimal traffic on the destination broker, probably only some cluster maintenance.) [2019-12-27 11:35:17,556] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2019-12-27 11:35:17,556] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2019-12-27 11:35:17,582] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets successfully in 25 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515) [2019-12-27 11:35:19,154] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2019-12-27 11:35:19,154] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2019-12-27 11:35:19,455] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets successfully in 301 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515) [2019-12-27 11:35:53,718] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2019-12-27 11:35:53,718] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2019-12-27 11:35:53,772] INFO WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2019-12-27 11:35:53,773] INFO WorkerSourceTask{id=MirrorSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2019-12-27 11:36:17,582] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2019-12-27 11:36:17,582] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2019-12-27 11:36:17,593] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets successfully in 11 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515) [2019-12-27 11:36:19,456] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2019-12-27 11:36:19,456] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2019-12-27 11:36:19,483] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets successfully in 27 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515) [2019-12-27 11:36:53,719] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2019-12-27 11:36:53,719] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2019-12-27 11:36:53,911] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} Finished commitOffsets successfully in 192 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515) [2019-12-27 11:36:53,911] INFO WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2019-12-27 11:36:53,911] INFO WorkerSourceTask{id=MirrorSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) then ... [2019-12-27 11:44:47,642] ERROR Scheduler for MirrorSourceConnector caught exception in scheduled task: syncing topic ACLs (org.apache.kafka.connect.mirror.Scheduler:102) java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is configured on the broker at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) and then the usual logs from WorkerSourceTask... for some minutes. And then another Security... exception: [2019-12-27 11:54:47,643] ERROR Scheduler for MirrorSourceConnector caught exception in scheduled task: syncing topic ACLs (org.apache.kafka.connect.mirror.Scheduler:102) java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is configured on the broker at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) and only after another minute, it starts to subscribe to the topics: [2019-12-27 11:55:11,298] INFO [Consumer clientId=consumer-7, groupId=null] Subscribed to partition(s): Ninja ... ... <listing a lot of topics here> [2019-12-27 11:55:11,318] INFO Starting with 2303 previously uncommitted partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask:94) [2019-12-27 11:55:11,319] INFO [Consumer clientId=consumer-7, groupId=null] Seeking to offset 0 for partition Ninja...-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1564) ... [2019-12-27 11:55:11,719] INFO task-thread-MirrorSourceConnector-0 replicating 2710 topic-partitions eucmain->euwbackup: [Ninja... And then I see some real traffic towards the destination cluster, I guess it is the time when it really starts the mirroring. Peter On Wed, 11 Dec 2019 at 20:26, Ryanne Dolan <ryannedo...@gmail.com> wrote: > Hey Peter. Do you see any timeouts in the logs? The internal scheduler will > timeout each task after 60 seconds by default, which might not be long > enough to finish some of the bootstrap tasks in your case. My team has > observed that behavior in super-flaky environments, e.g. when connectivity > drops during bootstrapping, in which case MirrorSourceConnector can get > into a funky state. This resolves when it refreshes its state after a > while. The default refresh interval of 10 minutes seems to jibe with your > observations. > > My team patched our internal MM2 build months ago to force bootstrapping to > complete correctly. I can share the patch, and if it helps we can raise a > PR. > > Ryanne > > On Mon, Dec 9, 2019 at 5:28 AM Péter Sinóros-Szabó > <peter.sinoros-sz...@transferwise.com.invalid> wrote: > > > Hi, > > > > I am experimenting with Mirror Make 2 in 2.4.0-rc3. It seems to start up > > fine, connects to both source and destination, creates new topics... > > But it does not start to actually mirror the messages until about 12 > > minutes after MM2 was started. I would expect it to start mirroring in > some > > seconds after startup. > > > > Source cluster has about 2800 partitions, destination cluster is empty. > > Both clusters are in AWS but in different regions. > > > > What may cause the 12 minutes delay? > > > > Config is: > > --- > > clusters = eucmain, euwbackup > > eucmain.bootstrap.servers = > > test-kafka-main-fra01.xx:9092,test-kafka-main-fra02.xx:9092 > > euwbackup.bootstrap.servers = 172.30.197.203:9092,172.30.213.104:9092 > > eucmain->euwbackup.enabled = true > > eucmain->euwbackup.topics = .* > > eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__).* > > eucmain->euwbackup.rename.topics = false > > replication.policy.separator = __ > > eucmain.client.id = mm2 > > > > I do not see any serious errors in the logs that I would think of a cause > > of this. > > > > Thanks, > > Peter > > > -- - Sini