I might have created a build from the trunk, rather then the 2.4 branch , but will confirm.
On Thu, Oct 24, 2019 at 4:44 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > The above may not be an issue as in it just uses the returned class > loader to resolve the Connector I think . What is not obvious, why it does > not go ahead and consume .. > > [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,571] INFO refreshing known > target topics took 15 ms (org.apache.kafka.connect.mirror.Scheduler:95) > > [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,574] INFO Started > MirrorSourceConnector with 120 topic-partitions. > (org.apache.kafka.connect.mirror.MirrorSourceConnector:121) > > [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,574] INFO Starting > MirrorSourceConnector took 160 ms. > (org.apache.kafka.connect.mirror.MirrorSourceConnector:122) > > [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,577] INFO Finished > creating connector MirrorSourceConnector > (org.apache.kafka.connect.runtime.Worker:272) > > [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,578] ERROR Plugin class > loader for connector: > 'org.apache.kafka.connect.mirror.MirrorSourceConnector' was not found. > Returning: > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2262b621 > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) > > [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,579] INFO > SourceConnectorConfig values: > > [mm2-dev-749469cf68-vpm2l] config.action.reload = restart > > [mm2-dev-749469cf68-vpm2l] connector.class = > org.apache.kafka.connect.mirror.MirrorSourceConnector > > [mm2-dev-749469cf68-vpm2l] errors.log.enable = false > > [mm2-dev-749469cf68-vpm2l] errors.log.include.messages = false > > [mm2-dev-749469cf68-vpm2l] errors.retry.delay.max.ms = 60000 > > [mm2-dev-749469cf68-vpm2l] errors.retry.timeout = 0 > > [mm2-dev-749469cf68-vpm2l] errors.tolerance = none > > [mm2-dev-749469cf68-vpm2l] header.converter = null > > [mm2-dev-749469cf68-vpm2l] key.converter = null > > [mm2-dev-749469cf68-vpm2l] name = MirrorSourceConnector > > [mm2-dev-749469cf68-vpm2l] tasks.max = 48 > > [mm2-dev-749469cf68-vpm2l] transforms = [] > > [mm2-dev-749469cf68-vpm2l] value.converter = null > > [mm2-dev-749469cf68-vpm2l] > (org.apache.kafka.connect.runtime.SourceConnectorConfig:347) > > [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,580] INFO > EnrichedConnectorConfig values: > > [mm2-dev-749469cf68-vpm2l] config.action.reload = restart > > [mm2-dev-749469cf68-vpm2l] connector.class = > org.apache.kafka.connect.mirror.MirrorSourceConnector > > [mm2-dev-749469cf68-vpm2l] errors.log.enable = false > > [mm2-dev-749469cf68-vpm2l] errors.log.include.messages = false > > [mm2-dev-749469cf68-vpm2l] errors.retry.delay.max.ms = 60000 > > [mm2-dev-749469cf68-vpm2l] errors.retry.timeout = 0 > > [mm2-dev-749469cf68-vpm2l] errors.tolerance = none > > [mm2-dev-749469cf68-vpm2l] header.converter = null > > [mm2-dev-749469cf68-vpm2l] key.converter = null > > [mm2-dev-749469cf68-vpm2l] name = MirrorSourceConnector > > [mm2-dev-749469cf68-vpm2l] tasks.max = 48 > > [mm2-dev-749469cf68-vpm2l] transforms = [] > > [mm2-dev-749469cf68-vpm2l] value.converter = null > > [mm2-dev-749469cf68-vpm2l] > (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347) > > > > And from there on nothing.. > > > > > On Thu, Oct 24, 2019 at 3:02 PM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> Hey Ryanne, >> >> Seeing the below ERROR in the logs and then, it seems the >> process does not consume ( it does not exit with any errors ) . And this is >> intermittent. As in do it enough times. that does relaunch :) Is this >> something a known bug >> >> [mm2-dev-58bf5df684-ln9k2] [2019-10-24 18:41:03,067] ERROR >> Plugin class loader for connector: >> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. >> Returning: >> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2262b621 >> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) >> >> >> >> >> >> >> On Mon, Oct 21, 2019 at 5:16 PM Ryanne Dolan <ryannedo...@gmail.com> >> wrote: >> >>> Vishal, the number of tasks created per source->target herder is >>> determined >>> by both tasks.max and the total number of topic-partitions being >>> replicated. In order to use all 12 worker nodes, you'd need tasks.max >= >>> 12 >>> and number of topic-partitions >= 12. From previous emails it sounds like >>> you have a small number of topic-partitions total (i.e. a small number of >>> topics with a small number of partitions per topic), so I'm guessing >>> that's >>> the reason you aren't seeing more tasks being created. >>> >>> Ryanne >>> >>> On Sat, Oct 19, 2019 at 1:28 AM Vishal Santoshi < >>> vishal.santo...@gmail.com> >>> wrote: >>> >>> > Here is what I see >>> > >>> > * The max tasks are a a cap on a Connector across the cluster. If >>> have 8 >>> > VMs but 8 max tasks my assumption that there would be 8 * 8 = 72 task >>> > threads was >>> > wring. The logs showed that the partitions were consumed by 8 threads >>> on >>> > the 8 VMs ( 1 per VM ) which was highly un optimal. When I scaled the >>> > VMs to 12, it did not matter, as the max tasks still prevented any >>> further >>> > distribution. >>> > >>> > * If I cancel/resume the cluster with a max task of 48 ( keeping the >>> same >>> > job name and thus connector definition the max tasks does not >>> change, as >>> > in >>> > it seems to keep the same number of max task threads limit ( as in 8 >>> ) >>> > >>> > * I can bring down a VM and see the task migrate to a free VM but the >>> > overall count of task threads remain the same. >>> > >>> > >>> > In essence, the num of tasks is a cap on threads in the cluster per >>> > connector, A connector is a source->sink pair that spans a cluster. >>> Thus if >>> > we have a >>> > A->B DAG and max tasks of 8, then there will be no more that 8 Source >>> > Tasks ( threads ) no matter how big the cluster is, It thus makes >>> sense to >>> > over provision ( within limits of a single VM ) on the max tasks to >>> allow >>> > for adding more VMs for scale up..... >>> > >>> > >>> > >>> > On Fri, Oct 18, 2019 at 8:04 PM Vishal Santoshi < >>> vishal.santo...@gmail.com >>> > > >>> > wrote: >>> > >>> > > I misspoke >>> > > >>> > > >> I now have 8 VMs 8 cpus with 48 max tasks and it did spread to >>> the the >>> > > 8 VMs. I then upscaled to 12 VMs and the tasks *have not *migrated >>> as I >>> > > would expect . >>> > > >>> > > >>> > > >>> > > >>> > > On Fri, Oct 18, 2019 at 8:00 PM Vishal Santoshi < >>> > vishal.santo...@gmail.com> >>> > > wrote: >>> > > >>> > >> OK, You will have to explain :) >>> > >> >>> > >> I had 12 VMs with 8 cpus and 8 max tasks. I thought let me give a >>> CPU >>> > to >>> > >> each task, which I presumed is a java thread ( even though I know >>> the >>> > >> thread would be mostly ip bound ). . I saw the issue I pointed up. >>> > >> *I now have 8 VMs 8 cpus with 48 max tasks and it did spread to the >>> the >>> > >> 8 VMs. I then upscaled to 12 VMs and the tasks migrated as I would >>> > expect >>> > >> .* >>> > >> >>> > >> I know that a VM will have MirrorSourceConnector and >>> > >> MirrorHeartbeatConnector tasks up till tasks.max. So a few >>> questions >>> > >> >>> > >> >>> > >> >>> > >> * When we say there are 48 max tasks, are we saying there are 48 >>> > threads >>> > >> ( in fact 96, each for the 2 groups above, worst case + 2 ) ? >>> > >> * When we talk about Connector, are we talking about a JVM process, >>> as >>> > in >>> > >> a Connector is a JVM process ? >>> > >> * Why larger number of tasks.max help the spread ? As in I would >>> > assume >>> > >> there are up till 8 tasks ( or 16 ) per VM but how that should not >>> have >>> > >> prevented re assignment on a scale up ( as it clearly did ) ? >>> > >> >>> > >> The reason I ask is that I plan to run mm2 cluster on k8s and I >>> want to >>> > >> make sure that I use the version of JVM that is more docker friendly >>> > vis a >>> > >> vis, how many cpus it believes it has and as explained here >>> > >> >>> > >>> https://blog.softwaremill.com/docker-support-in-new-java-8-finally-fd595df0ca54 >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> On Fri, Oct 18, 2019 at 4:15 PM Ryanne Dolan <ryannedo...@gmail.com >>> > >>> > >> wrote: >>> > >> >>> > >>> What is tasks.max? Consider bumping to something like 48 if you're >>> > >>> running >>> > >>> on a dozen nodes. >>> > >>> >>> > >>> Ryanne >>> > >>> >>> > >>> On Fri, Oct 18, 2019, 1:43 PM Vishal Santoshi < >>> > vishal.santo...@gmail.com >>> > >>> > >>> > >>> wrote: >>> > >>> >>> > >>> > Hey Ryanne, >>> > >>> > >>> > >>> > >>> > >>> > I see a definite issue. I am doing an intense test >>> and I >>> > >>> bring >>> > >>> > up 12 VMs ( they are 12 pods with 8 cpus each ), replicating >>> about >>> > 1200 >>> > >>> > plus topics ( fairly heavy 100mbps ) ... They are acquired and >>> are >>> > >>> > staggered as they come up..I see a fraction of these nodes not >>> > >>> assigned any >>> > >>> > replication....There is plenty to go around. ( more then a >>> couple of >>> > >>> > thousand partitions ) . is there something I am missing.... As >>> in >>> > my >>> > >>> > current case 5 of the 12 VMs are idle.. >>> > >>> > >>> > >>> > Vishal >>> > >>> > >>> > >>> > On Fri, Oct 18, 2019 at 7:05 AM Vishal Santoshi < >>> > >>> vishal.santo...@gmail.com >>> > >>> > > >>> > >>> > wrote: >>> > >>> > >>> > >>> > > Oh sorry a. COUNTER... is more like it.... >>> > >>> > > >>> > >>> > > On Fri, Oct 18, 2019, 6:58 AM Vishal Santoshi < >>> > >>> vishal.santo...@gmail.com >>> > >>> > > >>> > >>> > > wrote: >>> > >>> > > >>> > >>> > >> Will do >>> > >>> > >> One more thing the age/latency metrics seem to be >>> analogous as >>> > >>> in >>> > >>> > >> they seem to be calculated using similar routines. I would >>> think >>> > a >>> > >>> > metric >>> > >>> > >> tracking >>> > >>> > >> the number of flush failures ( as a GAUGE ) given >>> > >>> > >> offset.flush.timeout.ms would be highly beneficial. >>> > >>> > >> >>> > >>> > >> Regards.. >>> > >>> > >> >>> > >>> > >> >>> > >>> > >> On Thu, Oct 17, 2019 at 11:53 PM Ryanne Dolan < >>> > >>> ryannedo...@gmail.com> >>> > >>> > >> wrote: >>> > >>> > >> >>> > >>> > >>> Ah, I see you are correct. Also I misspoke saying "workers" >>> > >>> earlier, as >>> > >>> > >>> the >>> > >>> > >>> consumer is not created by the worker, but the task. >>> > >>> > >>> >>> > >>> > >>> I suppose the put() could be changed to putIfAbsent() here to >>> > >>> enable >>> > >>> > this >>> > >>> > >>> property to be changed. Maybe submit a PR? >>> > >>> > >>> >>> > >>> > >>> Ryanne >>> > >>> > >>> >>> > >>> > >>> On Thu, Oct 17, 2019 at 10:00 AM Vishal Santoshi < >>> > >>> > >>> vishal.santo...@gmail.com> >>> > >>> > >>> wrote: >>> > >>> > >>> >>> > >>> > >>> > Hmm ( I did both ) >>> > >>> > >>> > >>> > >>> > >>> > another->another_test.enabled = true >>> > >>> > >>> > >>> > >>> > >>> > another->another_test.topics = act_post >>> > >>> > >>> > >>> > >>> > >>> > another->another_test.emit.heartbeats.enabled = false >>> > >>> > >>> > >>> > >>> > >>> > another->another_test.consumer.auto.offset.reset = latest >>> > >>> > >>> > >>> > >>> > >>> > another->another_test.sync.topic.acls.enabled = false >>> > >>> > >>> > >>> > >>> > >>> > another.consumer.auto.offset.reset = latest >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > When I grep for the ConsumerConfig ( and there are 8 >>> instances, >>> > >>> this >>> > >>> > >>> topic >>> > >>> > >>> > has 4 partitions ) >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > [2019-10-17 14:01:21,879] INFO ConsumerConfig values: >>> > >>> > >>> > >>> > >>> > >>> > allow.auto.create.topics = true >>> > >>> > >>> > >>> > >>> > >>> > auto.commit.interval.ms = 5000 >>> > >>> > >>> > >>> > >>> > >>> > *auto.offset.reset* = earliest >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > I am now using the 2.4 branch from kafka trunk >>> > >>> > >>> > https://github.com/apache/kafka/tree/2.4/connect/mirror >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > This code change works and makes sense.. I think all other >>> > >>> settings >>> > >>> > >>> will be >>> > >>> > >>> > fine ( as can be overridden ) but for the 2 below.. >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > *--- >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java* >>> > >>> > >>> > >>> > >>> > >>> > *+++ >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java* >>> > >>> > >>> > >>> > >>> > >>> > @@ -230,7 +230,7 @@ public class MirrorConnectorConfig >>> extends >>> > >>> > >>> > AbstractConfig { >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX)); >>> > >>> > >>> > >>> > >>> > >>> > props.put("enable.auto.commit", "false"); >>> > >>> > >>> > >>> > >>> > >>> > - props.put("auto.offset.reset", "earliest"); >>> > >>> > >>> > >>> > >>> > >>> > + props.put("auto.offset.reset", "latest"); >>> > >>> > >>> > >>> > >>> > >>> > return props; >>> > >>> > >>> > >>> > >>> > >>> > } >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > Regards. >>> > >>> > >>> > >>> > >>> > >>> > On Wed, Oct 16, 2019 at 3:36 PM Ryanne Dolan < >>> > >>> ryannedo...@gmail.com> >>> > >>> > >>> > wrote: >>> > >>> > >>> > >>> > >>> > >>> > > Vishal, you should be able to override the properties >>> passed >>> > >>> to the >>> > >>> > >>> > > internal workers using properties like >>> > >>> > >>> A->B.consumer.auto.offset.reset or >>> > >>> > >>> > > A.consumer.auto.offset.reset in the mm2.properties file. >>> > >>> Certain >>> > >>> > >>> > top-level >>> > >>> > >>> > > properties like tasks.max are honored without the A->B >>> or A >>> > >>> prefix, >>> > >>> > >>> but >>> > >>> > >>> > > auto.offset.reset is not one of them. >>> > >>> > >>> > > >>> > >>> > >>> > > Ryanne >>> > >>> > >>> > > >>> > >>> > >>> > > On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi < >>> > >>> > >>> > vishal.santo...@gmail.com >>> > >>> > >>> > > > >>> > >>> > >>> > > wrote: >>> > >>> > >>> > > >>> > >>> > >>> > > > Hey Ryanne, >>> > >>> > >>> > > > >>> > >>> > >>> > > > >>> > >>> > >>> > > > How do I override auto.offset.reset = latest for >>> > >>> consumers >>> > >>> > >>> through >>> > >>> > >>> > > > mm2.properties. I have tried straight up . >>> > auto.offset.reset >>> > >>> and >>> > >>> > >>> > > consumer. >>> > >>> > >>> > > > auto.offset.reset but it defaults to earliest.. I do >>> have >>> > a >>> > >>> > query >>> > >>> > >>> in >>> > >>> > >>> > > > another thread but though you might know off hand.. >>> > >>> > >>> > > > >>> > >>> > >>> > > > I would imagine there is some way in general of >>> overriding >>> > >>> > >>> consumer and >>> > >>> > >>> > > > producer configs through mm2.properties in MM2 ? >>> > >>> > >>> > > > >>> > >>> > >>> > > > Regards. >>> > >>> > >>> > > > >>> > >>> > >>> > > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi < >>> > >>> > >>> > > vishal.santo...@gmail.com >>> > >>> > >>> > > > > >>> > >>> > >>> > > > wrote: >>> > >>> > >>> > > > >>> > >>> > >>> > > > > Thank you so much for all your help. Will keep you >>> > posted >>> > >>> on >>> > >>> > >>> tests I >>> > >>> > >>> > > > do.. >>> > >>> > >>> > > > > I hope this is helpful to other folks too.. >>> > >>> > >>> > > > > >>> > >>> > >>> > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan < >>> > >>> > >>> ryannedo...@gmail.com> >>> > >>> > >>> > > > > wrote: >>> > >>> > >>> > > > > >>> > >>> > >>> > > > >> That's right. MM2 is at-least-once for now, same as >>> > legacy >>> > >>> > >>> > > MirrorMaker. >>> > >>> > >>> > > > >> You >>> > >>> > >>> > > > >> can follow >>> > >>> https://issues.apache.org/jira/browse/KAFKA-6080 >>> > >>> > for >>> > >>> > >>> > > updates >>> > >>> > >>> > > > >> on >>> > >>> > >>> > > > >> exactly-once semantics in Connect. >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >> Ryanne >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi < >>> > >>> > >>> > > > >> vishal.santo...@gmail.com> >>> > >>> > >>> > > > >> wrote: >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >> > >> You are correct. I'm working on a KIP and >>> PoC to >>> > >>> > >>> introduce >>> > >>> > >>> > > > >> > transactions to >>> > >>> > >>> > > > >> > >> Connect for this exact purpose :) >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> > That is awesome. Any time frame ? >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> > In the mean time the SLA as of now >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> > 1. It is conceivable that we flush the producer >>> to the >>> > >>> > target >>> > >>> > >>> > > cluster >>> > >>> > >>> > > > >> but >>> > >>> > >>> > > > >> > fail to offset commit. If there was a restart >>> before >>> > the >>> > >>> > next >>> > >>> > >>> > > > successful >>> > >>> > >>> > > > >> > offset commit, there will be duplicates and a >>> part >>> > of >>> > >>> data >>> > >>> > >>> is >>> > >>> > >>> > > > >> replayed ( >>> > >>> > >>> > > > >> > at least once ) ? >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> > 2. The same can be said about partial flushes, >>> though >>> > >>> am >>> > >>> > not >>> > >>> > >>> sure >>> > >>> > >>> > > > about >>> > >>> > >>> > > > >> > how kafka addresses flush ( Is a flush either >>> success >>> > >>> or a >>> > >>> > >>> > failure, >>> > >>> > >>> > > > and >>> > >>> > >>> > > > >> > nothing in between ) >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> > Thanks.. >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan < >>> > >>> > >>> > > ryannedo...@gmail.com> >>> > >>> > >>> > > > >> > wrote: >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> > > Hey Vishal, glad to hear you're making progress. >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > > > 1. It seems though that flushing [...] the >>> > producer >>> > >>> and >>> > >>> > >>> > setting >>> > >>> > >>> > > > the >>> > >>> > >>> > > > >> > > > offset to the compacting topic is not atomic >>> OR >>> > >>> do we >>> > >>> > >>> use >>> > >>> > >>> > > > >> > > > transactions here ? >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > > You are correct. I'm working on a KIP and PoC to >>> > >>> introduce >>> > >>> > >>> > > > >> transactions >>> > >>> > >>> > > > >> > to >>> > >>> > >>> > > > >> > > Connect for this exact purpose :) >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz >>> num.tasks=4 ), >>> > >>> and I >>> > >>> > >>> have >>> > >>> > >>> > 2 >>> > >>> > >>> > > > >> topics >>> > >>> > >>> > > > >> > > with >>> > >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as >>> in >>> > >>> there >>> > >>> > are >>> > >>> > >>> 4 >>> > >>> > >>> > > > consumer >>> > >>> > >>> > > > >> > > groups >>> > >>> > >>> > > > >> > > > ( on CG per thread ) ... >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > > Some details here: >>> > >>> > >>> > > > >> > > - tasks.max controls the maximum number of tasks >>> > >>> created >>> > >>> > per >>> > >>> > >>> > > > Connector >>> > >>> > >>> > > > >> > > instance. Both MirrorSourceConnector and >>> > >>> > >>> > MirrorCheckpointConnector >>> > >>> > >>> > > > >> will >>> > >>> > >>> > > > >> > > create multiple tasks (up to tasks.max), but >>> > >>> > >>> > > > MirrorHeartbeatConnector >>> > >>> > >>> > > > >> > only >>> > >>> > >>> > > > >> > > ever creates a single task. Moreover, there >>> cannot >>> > be >>> > >>> more >>> > >>> > >>> tasks >>> > >>> > >>> > > > than >>> > >>> > >>> > > > >> > > topic-partitions (for MirrorSourceConnector) or >>> > >>> consumer >>> > >>> > >>> groups >>> > >>> > >>> > > (for >>> > >>> > >>> > > > >> > > MirrorCheckpointConnector). So if you have two >>> > topics >>> > >>> with >>> > >>> > >>> one >>> > >>> > >>> > > > >> partition >>> > >>> > >>> > > > >> > > each and 1 consumer group total, you'll have two >>> > >>> > >>> > > > MirrorSourceConnector >>> > >>> > >>> > > > >> > > tasks, one MirrorHeartbeatConnector task, and >>> one >>> > >>> > >>> > > > >> > MirrorCheckpointConnector >>> > >>> > >>> > > > >> > > tasks, for a total of four. And that's in one >>> > >>> direction >>> > >>> > >>> only: if >>> > >>> > >>> > > you >>> > >>> > >>> > > > >> have >>> > >>> > >>> > > > >> > > multiple source->target herders enabled, each >>> will >>> > >>> create >>> > >>> > >>> tasks >>> > >>> > >>> > > > >> > > independently. >>> > >>> > >>> > > > >> > > - There are no consumer groups in MM2, >>> technically. >>> > >>> The >>> > >>> > >>> Connect >>> > >>> > >>> > > > >> framework >>> > >>> > >>> > > > >> > > uses the Coordinator API and internal topics to >>> > divide >>> > >>> > tasks >>> > >>> > >>> > among >>> > >>> > >>> > > > >> > workers >>> > >>> > >>> > > > >> > > -- not a consumer group per se. The MM2 >>> connectors >>> > >>> use the >>> > >>> > >>> > > assign() >>> > >>> > >>> > > > >> API, >>> > >>> > >>> > > > >> > > not the subscribe() API, so there are no >>> consumer >>> > >>> groups >>> > >>> > >>> there >>> > >>> > >>> > > > >> either. In >>> > >>> > >>> > > > >> > > fact, they don't commit() either. This is nice, >>> as >>> > it >>> > >>> > >>> > eliminates a >>> > >>> > >>> > > > >> lot of >>> > >>> > >>> > > > >> > > the rebalancing problems legacy MirrorMaker has >>> been >>> > >>> > plagued >>> > >>> > >>> > with. >>> > >>> > >>> > > > >> With >>> > >>> > >>> > > > >> > > MM2, rebalancing only occurs when the number of >>> > >>> workers >>> > >>> > >>> changes >>> > >>> > >>> > or >>> > >>> > >>> > > > >> when >>> > >>> > >>> > > > >> > the >>> > >>> > >>> > > > >> > > assignments change (e.g. new topics are >>> discovered). >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > > Ryanne >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal >>> Santoshi < >>> > >>> > >>> > > > >> > > vishal.santo...@gmail.com> >>> > >>> > >>> > > > >> > > wrote: >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > > > Hey Ryanne, >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > The test was on topics that had a >>> 7 day >>> > >>> > >>> retention. >>> > >>> > >>> > > > Which >>> > >>> > >>> > > > >> > > > generally implies that the batch size for >>> flush is >>> > >>> > pretty >>> > >>> > >>> > high ( >>> > >>> > >>> > > > >> till >>> > >>> > >>> > > > >> > the >>> > >>> > >>> > > > >> > > > consumption becomes current ). The >>> > >>> > >>> offset.flush.timeout.ms >>> > >>> > >>> > > > >> defaults >>> > >>> > >>> > > > >> > to >>> > >>> > >>> > > > >> > > 5 >>> > >>> > >>> > > > >> > > > seconds and the code will not send in the >>> offsets >>> > >>> if the >>> > >>> > >>> flush >>> > >>> > >>> > > is >>> > >>> > >>> > > > >> not >>> > >>> > >>> > > > >> > > > complete. Increasing that time out did solve >>> the >>> > >>> "not >>> > >>> > >>> sending >>> > >>> > >>> > > the >>> > >>> > >>> > > > >> > offset >>> > >>> > >>> > > > >> > > to >>> > >>> > >>> > > > >> > > > topic" issue. >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > Two questions ( I am being greedy here :) ) >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > 1. It seems though that flushing the flushing >>> the >>> > >>> > >>> producer and >>> > >>> > >>> > > > >> setting >>> > >>> > >>> > > > >> > > the >>> > >>> > >>> > > > >> > > > offset to the compacting topic is not atomic >>> OR >>> > >>> do we >>> > >>> > >>> use >>> > >>> > >>> > > > >> > > > transactions here ? >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > 2. I see >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> WorkerSourceTask{id=MirrorHeartbeatConnector-0} >>> > >>> > flushing >>> > >>> > >>> > 956435 >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > WorkerSourceTask{id=MirrorSourceConnector-1} >>> > >>> flushing >>> > >>> > >>> 356251 >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> WorkerSourceTask{id=MirrorCheckpointConnector-2} >>> > >>> > >>> flushing 0 >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> WorkerSourceTask{id=MirrorCheckpointConnector-3} >>> > >>> > >>> flushing 0 >>> > >>> > >>> > > > >> > outstanding >>> > >>> > >>> > > > >> > > > messages >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz >>> num.tasks=4 ), >>> > >>> and I >>> > >>> > >>> have >>> > >>> > >>> > 2 >>> > >>> > >>> > > > >> topics >>> > >>> > >>> > > > >> > > with >>> > >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as >>> in >>> > >>> there >>> > >>> > are >>> > >>> > >>> 4 >>> > >>> > >>> > > > consumer >>> > >>> > >>> > > > >> > > groups >>> > >>> > >>> > > > >> > > > ( on CG per thread ) ... >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > THANKS A LOT >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > Vishal. >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan < >>> > >>> > >>> > > > ryannedo...@gmail.com >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> > > > wrote: >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > > > > timed out >>> > >>> > >>> > > > >> > > > > while waiting for producer to flush >>> outstanding >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > > Yeah, that's what I'd expect to see if >>> Connect >>> > was >>> > >>> > >>> unable to >>> > >>> > >>> > > > send >>> > >>> > >>> > > > >> > > records >>> > >>> > >>> > > > >> > > > > to the downstream remote topics, e.g. if >>> > >>> > >>> > min.in-sync.replicas >>> > >>> > >>> > > > were >>> > >>> > >>> > > > >> > > > > misconfigured. Given some data seems to >>> arrive, >>> > >>> it's >>> > >>> > >>> > possible >>> > >>> > >>> > > > that >>> > >>> > >>> > > > >> > > > > everything is configured correctly but with >>> too >>> > >>> much >>> > >>> > >>> latency >>> > >>> > >>> > > to >>> > >>> > >>> > > > >> > > > > successfully commit within the default >>> timeouts. >>> > >>> You >>> > >>> > may >>> > >>> > >>> > want >>> > >>> > >>> > > to >>> > >>> > >>> > > > >> > > increase >>> > >>> > >>> > > > >> > > > > the number of tasks substantially to achieve >>> > more >>> > >>> > >>> > parallelism >>> > >>> > >>> > > > and >>> > >>> > >>> > > > >> > > > > throughput. >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > > Ryanne >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal >>> Santoshi < >>> > >>> > >>> > > > >> > > vishal.santo...@gmail.com >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > > wrote: >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > > > Aah no.. this is more to it. Note sure if >>> > >>> related >>> > >>> > to >>> > >>> > >>> the >>> > >>> > >>> > > > above. >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114 >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > Is timing out based on >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133 >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR >>> > >>> > >>> > > > >> > > > > > >>> WorkerSourceTask{id=MirrorSourceConnector-0} >>> > >>> Failed >>> > >>> > to >>> > >>> > >>> > > flush, >>> > >>> > >>> > > > >> timed >>> > >>> > >>> > > > >> > > out >>> > >>> > >>> > > > >> > > > > > while waiting for producer to flush >>> > outstanding >>> > >>> > 36478 >>> > >>> > >>> > > messages >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> (org.apache.kafka.connect.runtime.WorkerSourceTask:423) >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal >>> > Santoshi >>> > >>> < >>> > >>> > >>> > > > >> > > > > vishal.santo...@gmail.com >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > wrote: >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > > I think this might be it.. Could you >>> > confirm. >>> > >>> It >>> > >>> > >>> seems >>> > >>> > >>> > to >>> > >>> > >>> > > be >>> > >>> > >>> > > > >> on >>> > >>> > >>> > > > >> > the >>> > >>> > >>> > > > >> > > > > path >>> > >>> > >>> > > > >> > > > > > > to commit the offsets.. but not sure... >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR >>> Scheduler >>> > for >>> > >>> > >>> > > > >> > MirrorSourceConnector >>> > >>> > >>> > > > >> > > > > > caught >>> > >>> > >>> > > > >> > > > > > > exception in scheduled task: syncing >>> topic >>> > >>> ACLs >>> > >>> > >>> > > > >> > > > > > > >>> > >>> (org.apache.kafka.connect.mirror.Scheduler:102) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > java.util.concurrent.ExecutionException: >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > org.apache.kafka.common.errors.SecurityDisabledException: >>> > >>> > >>> > > No >>> > >>> > >>> > > > >> > > > Authorizer >>> > >>> > >>> > > > >> > > > > > is >>> > >>> > >>> > > > >> > > > > > > configured on the broker >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> >>> > >>> > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > >>> > >>> > >>> >>> > >>> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> >>> > >>> > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > at >>> > >>> java.lang.Thread.run(Thread.java:748) >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > Caused by: >>> > >>> > >>> > > > >> > > >>> > >>> org.apache.kafka.common.errors.SecurityDisabledException: >>> > >>> > >>> > > > >> > > > No >>> > >>> > >>> > > > >> > > > > > > Authorizer is configured on the broker >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne >>> > Dolan >>> > >>> < >>> > >>> > >>> > > > >> > > ryannedo...@gmail.com >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > > > > wrote: >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >> > I do not have a single record in the >>> > >>> offsets >>> > >>> > >>> topic >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >> That's definitely not normal. You are >>> > correct >>> > >>> > that >>> > >>> > >>> > > without >>> > >>> > >>> > > > >> > records >>> > >>> > >>> > > > >> > > > in >>> > >>> > >>> > > > >> > > > > > that >>> > >>> > >>> > > > >> > > > > > >> topic, MM2 will restart from EARLIEST. >>> The >>> > >>> > offsets >>> > >>> > >>> > should >>> > >>> > >>> > > > be >>> > >>> > >>> > > > >> > > stored >>> > >>> > >>> > > > >> > > > > > >> periodically and whenever the >>> connectors >>> > >>> > gracefully >>> > >>> > >>> > > > shutdown >>> > >>> > >>> > > > >> or >>> > >>> > >>> > > > >> > > > > restart. >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >> Is it possible the topics don't have >>> > required >>> > >>> > ACLs >>> > >>> > >>> or >>> > >>> > >>> > > > >> something? >>> > >>> > >>> > > > >> > > > Also >>> > >>> > >>> > > > >> > > > > > >> note: >>> > >>> > >>> > > > >> > > > > > >> Connect wants the offsets topic to >>> have a >>> > >>> large >>> > >>> > >>> number >>> > >>> > >>> > of >>> > >>> > >>> > > > >> > > partitions >>> > >>> > >>> > > > >> > > > > and >>> > >>> > >>> > > > >> > > > > > >> to >>> > >>> > >>> > > > >> > > > > > >> be compacted. Though I can't imagine >>> either >>> > >>> would >>> > >>> > >>> > prevent >>> > >>> > >>> > > > >> > commits >>> > >>> > >>> > > > >> > > > from >>> > >>> > >>> > > > >> > > > > > >> being sent. >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >> Ryanne >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal >>> > >>> Santoshi >>> > >>> > < >>> > >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com> >>> > >>> > >>> > > > >> > > > > > >> wrote: >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >> > 2nd/restore issue ( I think I need >>> to >>> > >>> solve >>> > >>> > the >>> > >>> > >>> > > offsets >>> > >>> > >>> > > > >> topic >>> > >>> > >>> > > > >> > > > issue >>> > >>> > >>> > > > >> > > > > > >> > before I go with the scale up and >>> down >>> > >>> issue ) >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > As you had indicated, I went ahead >>> and >>> > >>> created >>> > >>> > >>> the >>> > >>> > >>> > > > offsets >>> > >>> > >>> > > > >> > > topic. >>> > >>> > >>> > > > >> > > > > The >>> > >>> > >>> > > > >> > > > > > >> > status of the cluster ( destination >>> ) is >>> > >>> thus >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > opic# Partitions# BrokersBrokers >>> Spread >>> > >>> > %Brokers >>> > >>> > >>> Skew >>> > >>> > >>> > > > >> %Brokers >>> > >>> > >>> > > > >> > > > > Leader >>> > >>> > >>> > > > >> > > > > > >> > Skew %# ReplicasUnder Replicated >>> %Leader >>> > >>> > >>> SizeProducer >>> > >>> > >>> > > > >> > > > > > Message/SecSummed >>> > >>> > >>> > > > >> > > > > > >> > Recent Offsets >>> > >>> > >>> > > > >> > > > > > >> > s8k.checkpoints.internal >>> > >>> > >>> > > > >> > > > > > >> > < >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >>> > >>> > >>> > > > >> > > > > > >> > s8k.act_search_page >>> > >>> > >>> > > > >> > > > > > >> > < >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842 >>> > >>> > >>> > > > >> > > > > > >> > s8k.act_reach >>> > >>> > >>> > > > >> > > > > > >> > < >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529 >>> > >>> > >>> > > > >> > > > > > >> > mm2-status.s8k.internal >>> > >>> > >>> > > > >> > > > > > >> > < >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10 >>> > >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k_test.internal >>> > >>> > >>> > > > >> > > > > > >> > < >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >>> > >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k.internal >>> > >>> > >>> > > > >> > > > > > >> > < >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >>> > >>> > >>> > > > >> > > > > > >> > mm2-configs.s8k.internal >>> > >>> > >>> > > > >> > > > > > >> > < >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13 >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > You can see . that we have the 5 ( I >>> > >>> created >>> > >>> > >>> bot the >>> > >>> > >>> > > > >> offsets, >>> > >>> > >>> > > > >> > > to >>> > >>> > >>> > > > >> > > > be >>> > >>> > >>> > > > >> > > > > > >> safe >>> > >>> > >>> > > > >> > > > > > >> > for the below ) >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *clusters = s8k, s8k_test* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *s8k.bootstrap.servers = .....* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *s8k_test.bootstrap.servers = ......* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *# only allow replication dr1 -> dr2* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.enabled = true* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.topics = >>> > >>> > >>> act_search_page|act_reach* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> *s8k->s8k_test.emit.heartbeats.enabled = >>> > >>> false* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *s8k_test->s8k.enabled = false* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> *s8k_test->s8k.emit.heartbeats.enabled = >>> > >>> false* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *s8k_test.replication.factor = 3* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *s8k.replication.factor = 3* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *offsets.storage.replication.factor >>> = 3* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *replication.factor = 3* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *replication.policy.separator = .* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > *tasks.max = 4* >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > What seems strange is that I do not >>> have >>> > a >>> > >>> > single >>> > >>> > >>> > > record >>> > >>> > >>> > > > in >>> > >>> > >>> > > > >> > the >>> > >>> > >>> > > > >> > > > > > offsets >>> > >>> > >>> > > > >> > > > > > >> > topic.. Is that normal ? I would >>> > imagine >>> > >>> that >>> > >>> > >>> > > without a >>> > >>> > >>> > > > >> > > record, >>> > >>> > >>> > > > >> > > > > > there >>> > >>> > >>> > > > >> > > > > > >> is >>> > >>> > >>> > > > >> > > > > > >> > no way that a restore would >>> happen.... >>> > And >>> > >>> that >>> > >>> > >>> is >>> > >>> > >>> > > > obvious >>> > >>> > >>> > > > >> > when >>> > >>> > >>> > > > >> > > I >>> > >>> > >>> > > > >> > > > > > >> restart >>> > >>> > >>> > > > >> > > > > > >> > the mm2 instance... Find the >>> screenshot >>> > >>> > >>> attached. In >>> > >>> > >>> > > > >> essence >>> > >>> > >>> > > > >> > the >>> > >>> > >>> > > > >> > > > > > latency >>> > >>> > >>> > > > >> > > > > > >> > avg lag is reset \when the mm2 >>> instance >>> > is >>> > >>> > reset >>> > >>> > >>> > > > >> indicating no >>> > >>> > >>> > > > >> > > > > restore >>> > >>> > >>> > > > >> > > > > > >> but >>> > >>> > >>> > > > >> > > > > > >> > restart from EARLIEST... I must be >>> > missing >>> > >>> some >>> > >>> > >>> thing >>> > >>> > >>> > > > >> simple >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM >>> Ryanne >>> > >>> Dolan < >>> > >>> > >>> > > > >> > > > ryannedo...@gmail.com >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > > >> > wrote: >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> >> Vishal, the first issue is easy: you >>> > must >>> > >>> set >>> > >>> > >>> > > tasks.max >>> > >>> > >>> > > > to >>> > >>> > >>> > > > >> > > > > something >>> > >>> > >>> > > > >> > > > > > >> above >>> > >>> > >>> > > > >> > > > > > >> >> 1 (the default) in order to achieve >>> any >>> > >>> > >>> parallelism. >>> > >>> > >>> > > > This >>> > >>> > >>> > > > >> > > > property >>> > >>> > >>> > > > >> > > > > is >>> > >>> > >>> > > > >> > > > > > >> >> passed along to the internal Connect >>> > >>> workers. >>> > >>> > >>> It's >>> > >>> > >>> > > > >> > unfortunate >>> > >>> > >>> > > > >> > > > that >>> > >>> > >>> > > > >> > > > > > >> >> Connect >>> > >>> > >>> > > > >> > > > > > >> >> is not smart enough to default this >>> > >>> property >>> > >>> > to >>> > >>> > >>> the >>> > >>> > >>> > > > >> number of >>> > >>> > >>> > > > >> > > > > > workers. >>> > >>> > >>> > > > >> > > > > > >> I >>> > >>> > >>> > > > >> > > > > > >> >> suspect that will improve before >>> long. >>> > >>> > >>> > > > >> > > > > > >> >> >>> > >>> > >>> > > > >> > > > > > >> >> For the second issue, is it >>> possible you >>> > >>> are >>> > >>> > >>> missing >>> > >>> > >>> > > the >>> > >>> > >>> > > > >> > > offsets >>> > >>> > >>> > > > >> > > > > > >> topic? It >>> > >>> > >>> > > > >> > > > > > >> >> should exist alongside the config >>> and >>> > >>> status >>> > >>> > >>> topics. >>> > >>> > >>> > > > >> Connect >>> > >>> > >>> > > > >> > > > should >>> > >>> > >>> > > > >> > > > > > >> create >>> > >>> > >>> > > > >> > > > > > >> >> this topic, but there are various >>> > reasons >>> > >>> this >>> > >>> > >>> can >>> > >>> > >>> > > fail, >>> > >>> > >>> > > > >> e.g. >>> > >>> > >>> > > > >> > > if >>> > >>> > >>> > > > >> > > > > the >>> > >>> > >>> > > > >> > > > > > >> >> replication factor is >>> misconfigured. You >>> > >>> can >>> > >>> > try >>> > >>> > >>> > > > creating >>> > >>> > >>> > > > >> > this >>> > >>> > >>> > > > >> > > > > topic >>> > >>> > >>> > > > >> > > > > > >> >> manually or changing >>> > >>> > >>> > > offsets.storage.replication.factor. >>> > >>> > >>> > > > >> > > > > > >> >> >>> > >>> > >>> > > > >> > > > > > >> >> Ryanne >>> > >>> > >>> > > > >> > > > > > >> >> >>> > >>> > >>> > > > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal >>> > >>> Santoshi >>> > >>> > < >>> > >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com> >>> > >>> > >>> > > > >> > > > > > >> >> wrote: >>> > >>> > >>> > > > >> > > > > > >> >> >>> > >>> > >>> > > > >> > > > > > >> >> > Using >>> > >>> > >>> > > > >> > > > >>> > >>> > https://github.com/apache/kafka/tree/trunk/connect/mirror >>> > >>> > >>> > > > >> > > > > > as a >>> > >>> > >>> > > > >> > > > > > >> >> > guide, >>> > >>> > >>> > > > >> > > > > > >> >> > I have build from source the >>> > >>> origin/KIP-382 >>> > >>> > of >>> > >>> > >>> > > > >> > > > > > >> >> > >>> https://github.com/apache/kafka.git. >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > I am seeing 2 issues >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > * I brought up 2 processes on 2 >>> > >>> different >>> > >>> > >>> nodes ( >>> > >>> > >>> > > they >>> > >>> > >>> > > > >> are >>> > >>> > >>> > > > >> > > > > actually >>> > >>> > >>> > > > >> > > > > > >> >> pods on >>> > >>> > >>> > > > >> > > > > > >> >> > k8s but that should not matter ). >>> They >>> > >>> share >>> > >>> > >>> the >>> > >>> > >>> > > > >> > > mm2.properties >>> > >>> > >>> > > > >> > > > > > file >>> > >>> > >>> > > > >> > > > > > >> and >>> > >>> > >>> > > > >> > > > > > >> >> > are replicating ( 1-way ) 3 topics >>> > with >>> > >>> 8 >>> > >>> > >>> > partitions >>> > >>> > >>> > > > in >>> > >>> > >>> > > > >> > > total. >>> > >>> > >>> > > > >> > > > > > That >>> > >>> > >>> > > > >> > > > > > >> >> seems >>> > >>> > >>> > > > >> > > > > > >> >> > to be the way to create a >>> standalone >>> > mm2 >>> > >>> > >>> cluster. >>> > >>> > >>> > I >>> > >>> > >>> > > do >>> > >>> > >>> > > > >> not >>> > >>> > >>> > > > >> > > > > however >>> > >>> > >>> > > > >> > > > > > >> see( >>> > >>> > >>> > > > >> > > > > > >> >> at >>> > >>> > >>> > > > >> > > > > > >> >> > least the mbeans do not show ) any >>> > >>> attempt >>> > >>> > to >>> > >>> > >>> > > > rebalance. >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >>> > >>> >>> > >>> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process >>> > >>> > >>> > > > >> > > > > > >> >> > mbeans >>> > >>> > >>> > > > >> > > > > > >> >> > are all on a single node >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > * I restart the processes on the 2 >>> > >>> nodes ( >>> > >>> > >>> hard >>> > >>> > >>> > stop >>> > >>> > >>> > > > ans >>> > >>> > >>> > > > >> > > start >>> > >>> > >>> > > > >> > > > ). >>> > >>> > >>> > > > >> > > > > > The >>> > >>> > >>> > > > >> > > > > > >> >> > offsets for replication seem to be >>> > >>> reset to >>> > >>> > >>> the >>> > >>> > >>> > > > >> earliest, >>> > >>> > >>> > > > >> > as >>> > >>> > >>> > > > >> > > if >>> > >>> > >>> > > > >> > > > > it >>> > >>> > >>> > > > >> > > > > > >> is a >>> > >>> > >>> > > > >> > > > > > >> >> > brand new mirroring. It is also >>> > obvious >>> > >>> from >>> > >>> > >>> the >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > "record-age-ms-avg|replication-latency-ms-avg" >>> > >>> > >>> > > > >> > > > > > >> >> > which I track through the restart. >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > This implies that >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > 1. Load balancing by rebalancing >>> is >>> > not >>> > >>> > >>> working. I >>> > >>> > >>> > > > >> cannot >>> > >>> > >>> > > > >> > > scale >>> > >>> > >>> > > > >> > > > > up >>> > >>> > >>> > > > >> > > > > > or >>> > >>> > >>> > > > >> > > > > > >> >> down >>> > >>> > >>> > > > >> > > > > > >> >> > by adding nodes to the mm2 >>> cluster or >>> > >>> > removing >>> > >>> > >>> > them. >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > 2. Restore on a mirror is not >>> working. >>> > >>> If >>> > >>> > the >>> > >>> > >>> MM2 >>> > >>> > >>> > > > >> cluster >>> > >>> > >>> > > > >> > is >>> > >>> > >>> > > > >> > > > > > brought >>> > >>> > >>> > > > >> > > > > > >> >> down, >>> > >>> > >>> > > > >> > > > > > >> >> > it does not start mirroring from >>> the >>> > >>> last >>> > >>> > >>> known >>> > >>> > >>> > > > state. I >>> > >>> > >>> > > > >> > see >>> > >>> > >>> > > > >> > > > the, >>> > >>> > >>> > > > >> > > > > > >> >> > state/config topics etc created as >>> > >>> > expected.. >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > The mm2.properties is pretty >>> mimimal >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > *clusters = a , b* >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > *a.bootstrap.servers = k.....* >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > *b.bootstrap.servers = k.....* >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > *# only allow replication dr1 -> >>> dr2* >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.enabled = true* >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.topics = act_search_page* >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled = >>> false* >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > *b->a..enabled = false* >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled = >>> false* >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > What do you think is the issue ? >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> > Thanks >>> > >>> > >>> > > > >> > > > > > >> >> > >>> > >>> > >>> > > > >> > > > > > >> >> >>> > >>> > >>> > > > >> > > > > > >> > >>> > >>> > >>> > > > >> > > > > > >> >>> > >>> > >>> > > > >> > > > > > > >>> > >>> > >>> > > > >> > > > > > >>> > >>> > >>> > > > >> > > > > >>> > >>> > >>> > > > >> > > > >>> > >>> > >>> > > > >> > > >>> > >>> > >>> > > > >> > >>> > >>> > >>> > > > >> >>> > >>> > >>> > > > > >>> > >>> > >>> > > > >>> > >>> > >>> > > >>> > >>> > >>> > >>> > >>> > >>> >>> > >>> > >> >>> > >>> > >>> > >>> >>> > >> >>> > >>> >>