Aah no.. this is more to  it. Note sure if related to the above.

https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114


Is timing out based on
https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133

[2019-10-14 18:55:20,820] ERROR
WorkerSourceTask{id=MirrorSourceConnector-0} Failed to flush, timed out
while waiting for producer to flush outstanding 36478 messages
(org.apache.kafka.connect.runtime.WorkerSourceTask:423)



On Mon, Oct 14, 2019 at 3:15 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> I think this might be it.. Could you confirm. It seems to be on the path
> to commit the offsets.. but not sure...
>
> [2019-10-14 15:29:14,531] ERROR Scheduler for MirrorSourceConnector caught
> exception in scheduled task: syncing topic ACLs
> (org.apache.kafka.connect.mirror.Scheduler:102)
>
> java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is
> configured on the broker
>
>         at
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>
>         at
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>
>         at
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>
>         at
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>
>         at
> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273)
>
>         at
> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214)
>
>         at
> org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
>
>         at
> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
>
>         at
> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
>
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>         at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.kafka.common.errors.SecurityDisabledException: No
> Authorizer is configured on the broker
>
> On Mon, Oct 14, 2019 at 12:30 PM Ryanne Dolan <ryannedo...@gmail.com>
> wrote:
>
>> > I do not have a single record in the offsets topic
>>
>> That's definitely not normal. You are correct that without records in that
>> topic, MM2 will restart from EARLIEST. The offsets should be stored
>> periodically and whenever the connectors gracefully shutdown or restart.
>>
>> Is it possible the topics don't have required ACLs or something? Also
>> note:
>> Connect wants the offsets topic to have a large number of partitions and
>> to
>> be compacted. Though I can't imagine either would prevent commits from
>> being sent.
>>
>> Ryanne
>>
>> On Mon, Oct 14, 2019 at 10:46 AM Vishal Santoshi <
>> vishal.santo...@gmail.com>
>> wrote:
>>
>> > 2nd/restore issue  ( I think I need to solve the offsets topic issue
>> > before I go with the scale up and down issue )
>> >
>> > As you had indicated, I went ahead and created the offsets topic. The
>> > status of the cluster  ( destination ) is thus
>> >
>> > opic# Partitions# BrokersBrokers Spread %Brokers Skew %Brokers Leader
>> > Skew %# ReplicasUnder Replicated %Leader SizeProducer Message/SecSummed
>> > Recent Offsets
>> > s8k.checkpoints.internal
>> > <
>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal
>> >
>> > 1 3 60 0 0 3 0 0.00 0
>> > s8k.act_search_page
>> > <
>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page
>> >
>> > 1 3 60 0 0 3 0 6675.30 4,166,842
>> > s8k.act_reach
>> > <
>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach
>> >
>> > 1 3 60 0 0 3 0 20657.92 11,579,529
>> > mm2-status.s8k.internal
>> > <
>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal
>> >
>> > 5 5 100 0 0 3 0 0.00 10
>> > mm2-offsets.s8k_test.internal
>> > <
>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal
>> >
>> > 1 3 60 0 0 3 0 0.00 0
>> > mm2-offsets.s8k.internal
>> > <
>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal
>> >
>> > 1 3 60 0 0 3 0 0.00 0
>> > mm2-configs.s8k.internal
>> > <
>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal
>> >
>> > 1 3 60 0 0 3 0 0.00 13
>> >
>> > You can see . that we have the  5 ( I created bot the offsets, to be
>> safe
>> > for the below )
>> >
>> > *clusters = s8k, s8k_test*
>> >
>> >
>> >
>> > *s8k.bootstrap.servers = .....*
>> >
>> > *s8k_test.bootstrap.servers = ......*
>> >
>> >
>> > *# only allow replication dr1 -> dr2*
>> >
>> > *s8k->s8k_test.enabled = true*
>> >
>> > *s8k->s8k_test.topics = act_search_page|act_reach*
>> >
>> > *s8k->s8k_test.emit.heartbeats.enabled = false*
>> >
>> >
>> >
>> >
>> > *s8k_test->s8k.enabled = false*
>> >
>> > *s8k_test->s8k.emit.heartbeats.enabled = false*
>> >
>> > *s8k_test.replication.factor = 3*
>> >
>> > *s8k.replication.factor = 3*
>> >
>> > *offsets.storage.replication.factor = 3*
>> >
>> > *replication.factor = 3*
>> >
>> > *replication.policy.separator = .*
>> >
>> > *tasks.max = 4*
>> >
>> >
>> >
>> >
>> > What seems strange is that I do not have a single record in the offsets
>> > topic.. Is that normal ?   I would imagine that without a record, there
>> is
>> > no way that a restore would happen.... And that is obvious when I
>> restart
>> > the mm2 instance... Find the screenshot attached. In essence the latency
>> > avg lag is reset \when the mm2 instance is reset indicating no restore
>> but
>> > restart from EARLIEST... I must be missing some thing simple
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne Dolan <ryannedo...@gmail.com>
>> > wrote:
>> >
>> >> Vishal, the first issue is easy: you must set tasks.max to something
>> above
>> >> 1 (the default) in order to achieve any parallelism. This property is
>> >> passed along to the internal Connect workers. It's unfortunate that
>> >> Connect
>> >> is not smart enough to default this property to the number of workers.
>> I
>> >> suspect that will improve before long.
>> >>
>> >> For the second issue, is it possible you are missing the offsets
>> topic? It
>> >> should exist alongside the config and status topics. Connect should
>> create
>> >> this topic, but there are various reasons this can fail, e.g. if the
>> >> replication factor is misconfigured. You can try creating this topic
>> >> manually or changing offsets.storage.replication.factor.
>> >>
>> >> Ryanne
>> >>
>> >> On Sun, Oct 13, 2019, 5:13 PM Vishal Santoshi <
>> vishal.santo...@gmail.com>
>> >> wrote:
>> >>
>> >> > Using https://github.com/apache/kafka/tree/trunk/connect/mirror as a
>> >> > guide,
>> >> > I have build from source the origin/KIP-382 of
>> >> > https://github.com/apache/kafka.git.
>> >> >
>> >> > I am seeing 2 issues
>> >> >
>> >> > * I brought up 2 processes on 2 different nodes ( they are actually
>> >> pods on
>> >> > k8s but that should not matter ). They share the mm2.properties file
>> and
>> >> > are replicating ( 1-way ) 3 topics with 8 partitions in total.  That
>> >> seems
>> >> > to be the way to create a standalone mm2 cluster. I do not however
>> see(
>> >> at
>> >> > least the mbeans do not show ) any attempt to rebalance.
>> >> >
>> >> >
>> >>
>> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process
>> >> > mbeans
>> >> > are all on a single node
>> >> >
>> >> >
>> >> >
>> >> > * I restart the processes on the 2 nodes ( hard stop ans start ). The
>> >> > offsets for replication seem to be reset to the earliest, as if it
>> is a
>> >> > brand new mirroring. It is also obvious from the
>> >> > "record-age-ms-avg|replication-latency-ms-avg"
>> >> > which I track through the restart.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > This implies that
>> >> >
>> >> >
>> >> > 1. Load balancing by rebalancing is not working. I cannot scale up or
>> >> down
>> >> > by adding nodes to the mm2 cluster or removing them.
>> >> >
>> >> > 2. Restore on a mirror is not working. If the MM2 cluster is brought
>> >> down,
>> >> > it does not start mirroring from the last known state. I see the,
>> >> > state/config topics etc created as expected..
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > The mm2.properties is pretty mimimal
>> >> >
>> >> >
>> >> > *clusters = a , b*
>> >> >
>> >> >
>> >> >
>> >> > *a.bootstrap.servers = k.....*
>> >> >
>> >> > *b.bootstrap.servers = k.....*
>> >> >
>> >> >
>> >> > *# only allow replication dr1 -> dr2*
>> >> >
>> >> > *a->b.enabled = true*
>> >> >
>> >> > *a->b.topics = act_search_page*
>> >> >
>> >> > *a->b.emit.heartbeats.enabled = false*
>> >> >
>> >> >
>> >> > *b->a..enabled = false*
>> >> >
>> >> > *b->a.emit.heartbeats.enabled = false*
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > What do you think is the issue ?
>> >> >
>> >> >
>> >> > Thanks
>> >> >
>> >>
>> >
>>
>

Reply via email to