Hey Peter. > No Authorizer is configured on the broker
You'll need to disable ACL sync to avoid this error. It's harmless tho. > Failed to fetch offsets I have not encountered this. Is it possible one of the clusters is/was unreachable? Are any of the clusters using Kerberos or SSL that may be misconfigured? > Plugin class loader... I'll look into this. Ryanne On Fri, Dec 27, 2019, 7:44 AM Péter Sinóros-Szabó <peter.sinoros-sz...@transferwise.com.invalid> wrote: > Hey, > > > Do you see any timeouts in the logs? > No. At least not exception related to timeout. > I see exception > like: InstanceAlreadyExistsException, SecurityDisabledException. > > But I see these issues below, it might be related... Is it expected to see > ClassLoader exceptions? > > [2019-12-27 11:34:48,056] INFO Started MirrorCheckpointConnector with 871 > consumer groups. > (org.apache.kafka.connect.mirror.MirrorCheckpointConnector:69) > [2019-12-27 11:34:48,060] INFO Finished creating connector > MirrorCheckpointConnector (org.apache.kafka.connect.runtime.Worker:273) > [2019-12-27 11:34:48,060] ERROR Plugin class loader for connector: > 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not found. > Returning: > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2eced48b > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) > [2019-12-27 11:34:48,060] INFO SourceConnectorConfig values: > config.action.reload = restart > connector.class = > org.apache.kafka.connect.mirror.MirrorCheckpointConnector > errors.log.enable = false > errors.log.include.messages = false > errors.retry.delay.max.ms = 60000 > errors.retry.timeout = 0 > errors.tolerance = none > header.converter = null > key.converter = null > name = MirrorCheckpointConnector > tasks.max = 1 > transforms = [] > value.converter = null > (org.apache.kafka.connect.runtime.SourceConnectorConfig:347) > [2019-12-27 11:34:48,062] INFO EnrichedConnectorConfig values: > config.action.reload = restart > connector.class = > org.apache.kafka.connect.mirror.MirrorCheckpointConnector > errors.log.enable = false > errors.log.include.messages = false > errors.retry.delay.max.ms = 60000 > errors.retry.timeout = 0 > errors.tolerance = none > header.converter = null > key.converter = null > name = MirrorCheckpointConnector > tasks.max = 1 > transforms = [] > value.converter = null > > > (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347) > [2019-12-27 11:34:48,063] ERROR Plugin class loader for connector: > 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not found. > Returning: > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2eced48b > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) > [2019-12-27 11:34:48,066] INFO [Worker clientId=connect-1, > groupId=eucmain-mm2] Handling task config update by restarting tasks > [MirrorCheckpointConnector-0, MirrorSourceConnector-0] > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:574) > [2019-12-27 11:34:48,066] INFO Stopping task MirrorCheckpointConnector-0 > (org.apache.kafka.connect.runtime.Worker:704) > [2019-12-27 11:34:48,611] INFO Stopping DistributedHerder-connect-1 took > 544 ms. (org.apache.kafka.connect.mirror.MirrorCheckpointTask:99) > [2019-12-27 11:34:48,611] INFO Stopping task MirrorSourceConnector-0 > (org.apache.kafka.connect.runtime.Worker:704) > [2019-12-27 11:34:48,616] INFO > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets > (org.apache.kafka.connect.runtime.WorkerSourceTask:416) > [2019-12-27 11:34:48,617] INFO > WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding > messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask:433) > [2019-12-27 11:34:48,617] INFO [Producer > clientId=connector-producer-MirrorCheckpointConnector-0] Closing the Kafka > producer with timeoutMillis = 30000 ms. > (org.apache.kafka.clients.producer.KafkaProducer:1183) > [2019-12-27 11:34:50,587] INFO syncing topic configs took 2930 ms > (org.apache.kafka.connect.mirror.Scheduler:95) > [2019-12-27 11:34:53,612] ERROR Graceful stop of task > MirrorSourceConnector-0 failed. > (org.apache.kafka.connect.runtime.Worker:736) > [2019-12-27 11:34:53,613] INFO [Worker clientId=connect-1, > groupId=eucmain-mm2] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222) > [2019-12-27 11:34:53,613] INFO [Worker clientId=connect-1, > groupId=eucmain-mm2] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:533) > [2019-12-27 11:34:53,613] ERROR Failed to fetch offsets from namespace > MirrorSourceConnector: > (org.apache.kafka.connect.storage.OffsetStorageReaderImpl:113) > org.apache.kafka.connect.errors.ConnectException: Offset reader closed > while attempting to read offsets. This is likely because the task was been > scheduled to stop but has taken longer than the graceful shutdown period to > do so. > at > > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) > at > > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) > at > > org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:227) > at > > org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:222) > at > java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > > org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:222) > at > > org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:92) > at > > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2019-12-27 11:34:53,616] INFO WorkerSourceTask{id=MirrorSourceConnector-0} > Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) > [2019-12-27 11:34:53,617] INFO WorkerSourceTask{id=MirrorSourceConnector-0} > flushing 0 outstanding messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask:433) > [2019-12-27 11:34:53,617] ERROR > WorkerSourceTask{id=MirrorSourceConnector-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) > org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets. > at > > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114) > at > > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) > at > > org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:227) > at > > org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:222) > at > java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > at > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > > org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:222) > at > > org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:92) > at > > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader > closed while attempting to read offsets. This is likely because the task > was been scheduled to stop but has taken longer than the graceful shutdown > period to do so. > at > > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) > ... 21 more > [2019-12-27 11:34:53,621] ERROR > WorkerSourceTask{id=MirrorSourceConnector-0} Task is being killed and will > not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:180) > [2019-12-27 11:34:53,622] INFO [Producer clientId=producer-4] Closing the > Kafka producer with timeoutMillis = 9223372036854775807 ms. > (org.apache.kafka.clients.producer.KafkaProducer:1183) > > > After awhile this exceptions stop and I only see these logs while nothing > really happens (well, minimal traffic on the destination broker, probably > only some cluster maintenance.) > > [2019-12-27 11:35:17,556] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets > (org.apache.kafka.connect.runtime.WorkerSourceTask:416) > [2019-12-27 11:35:17,556] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding > messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask:433) > [2019-12-27 11:35:17,582] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets > successfully in 25 ms > (org.apache.kafka.connect.runtime.WorkerSourceTask:515) > [2019-12-27 11:35:19,154] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets > (org.apache.kafka.connect.runtime.WorkerSourceTask:416) > [2019-12-27 11:35:19,154] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding > messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask:433) > [2019-12-27 11:35:19,455] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets > successfully in 301 ms > (org.apache.kafka.connect.runtime.WorkerSourceTask:515) > [2019-12-27 11:35:53,718] INFO > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets > (org.apache.kafka.connect.runtime.WorkerSourceTask:416) > [2019-12-27 11:35:53,718] INFO > WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding > messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask:433) > [2019-12-27 11:35:53,772] INFO WorkerSourceTask{id=MirrorSourceConnector-0} > Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) > [2019-12-27 11:35:53,773] INFO WorkerSourceTask{id=MirrorSourceConnector-0} > flushing 0 outstanding messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask:433) > [2019-12-27 11:36:17,582] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets > (org.apache.kafka.connect.runtime.WorkerSourceTask:416) > [2019-12-27 11:36:17,582] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding > messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask:433) > [2019-12-27 11:36:17,593] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets > successfully in 11 ms > (org.apache.kafka.connect.runtime.WorkerSourceTask:515) > [2019-12-27 11:36:19,456] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets > (org.apache.kafka.connect.runtime.WorkerSourceTask:416) > [2019-12-27 11:36:19,456] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding > messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask:433) > [2019-12-27 11:36:19,483] INFO > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets > successfully in 27 ms > (org.apache.kafka.connect.runtime.WorkerSourceTask:515) > [2019-12-27 11:36:53,719] INFO > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets > (org.apache.kafka.connect.runtime.WorkerSourceTask:416) > [2019-12-27 11:36:53,719] INFO > WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding > messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask:433) > [2019-12-27 11:36:53,911] INFO > WorkerSourceTask{id=MirrorCheckpointConnector-0} Finished commitOffsets > successfully in 192 ms > (org.apache.kafka.connect.runtime.WorkerSourceTask:515) > [2019-12-27 11:36:53,911] INFO WorkerSourceTask{id=MirrorSourceConnector-0} > Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) > [2019-12-27 11:36:53,911] INFO WorkerSourceTask{id=MirrorSourceConnector-0} > flushing 0 outstanding messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask:433) > > > then ... > > [2019-12-27 11:44:47,642] ERROR Scheduler for MirrorSourceConnector caught > exception in scheduled task: syncing topic ACLs > (org.apache.kafka.connect.mirror.Scheduler:102) > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is > configured on the broker > at > > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > > and then the usual logs from WorkerSourceTask... for some minutes. And then > another Security... exception: > > [2019-12-27 11:54:47,643] ERROR Scheduler for MirrorSourceConnector caught > exception in scheduled task: syncing topic ACLs > (org.apache.kafka.connect.mirror.Scheduler:102) > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is > configured on the broker > at > > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > > and only after another minute, it starts to subscribe to the topics: > > [2019-12-27 11:55:11,298] INFO [Consumer clientId=consumer-7, groupId=null] > Subscribed to partition(s): Ninja ... ... <listing a lot of topics here> > [2019-12-27 11:55:11,318] INFO Starting with 2303 previously uncommitted > partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask:94) > [2019-12-27 11:55:11,319] INFO [Consumer clientId=consumer-7, groupId=null] > Seeking to offset 0 for partition Ninja...-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1564) > ... > [2019-12-27 11:55:11,719] INFO task-thread-MirrorSourceConnector-0 > replicating 2710 topic-partitions eucmain->euwbackup: [Ninja... > > And then I see some real traffic towards the destination cluster, I guess > it is the time when it really starts the mirroring. > > Peter > > On Wed, 11 Dec 2019 at 20:26, Ryanne Dolan <ryannedo...@gmail.com> wrote: > > > Hey Peter. Do you see any timeouts in the logs? The internal scheduler > will > > timeout each task after 60 seconds by default, which might not be long > > enough to finish some of the bootstrap tasks in your case. My team has > > observed that behavior in super-flaky environments, e.g. when > connectivity > > drops during bootstrapping, in which case MirrorSourceConnector can get > > into a funky state. This resolves when it refreshes its state after a > > while. The default refresh interval of 10 minutes seems to jibe with your > > observations. > > > > My team patched our internal MM2 build months ago to force bootstrapping > to > > complete correctly. I can share the patch, and if it helps we can raise a > > PR. > > > > Ryanne > > > > On Mon, Dec 9, 2019 at 5:28 AM Péter Sinóros-Szabó > > <peter.sinoros-sz...@transferwise.com.invalid> wrote: > > > > > Hi, > > > > > > I am experimenting with Mirror Make 2 in 2.4.0-rc3. It seems to start > up > > > fine, connects to both source and destination, creates new topics... > > > But it does not start to actually mirror the messages until about 12 > > > minutes after MM2 was started. I would expect it to start mirroring in > > some > > > seconds after startup. > > > > > > Source cluster has about 2800 partitions, destination cluster is empty. > > > Both clusters are in AWS but in different regions. > > > > > > What may cause the 12 minutes delay? > > > > > > Config is: > > > --- > > > clusters = eucmain, euwbackup > > > eucmain.bootstrap.servers = > > > test-kafka-main-fra01.xx:9092,test-kafka-main-fra02.xx:9092 > > > euwbackup.bootstrap.servers = 172.30.197.203:9092,172.30.213.104:9092 > > > eucmain->euwbackup.enabled = true > > > eucmain->euwbackup.topics = .* > > > eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__).* > > > eucmain->euwbackup.rename.topics = false > > > replication.policy.separator = __ > > > eucmain.client.id = mm2 > > > > > > I do not see any serious errors in the logs that I would think of a > cause > > > of this. > > > > > > Thanks, > > > Peter > > > > > > > > -- > - Sini >