Hey Ryanne, Seeing the below ERROR in the logs and then, it seems the process does not consume ( it does not exit with any errors ) . And this is intermittent. As in do it enough times. that does relaunch :) Is this something a known bug
[mm2-dev-58bf5df684-ln9k2] [2019-10-24 18:41:03,067] ERROR Plugin class loader for connector: 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. Returning: org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2262b621 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) On Mon, Oct 21, 2019 at 5:16 PM Ryanne Dolan <ryannedo...@gmail.com> wrote: > Vishal, the number of tasks created per source->target herder is determined > by both tasks.max and the total number of topic-partitions being > replicated. In order to use all 12 worker nodes, you'd need tasks.max >= 12 > and number of topic-partitions >= 12. From previous emails it sounds like > you have a small number of topic-partitions total (i.e. a small number of > topics with a small number of partitions per topic), so I'm guessing that's > the reason you aren't seeing more tasks being created. > > Ryanne > > On Sat, Oct 19, 2019 at 1:28 AM Vishal Santoshi <vishal.santo...@gmail.com > > > wrote: > > > Here is what I see > > > > * The max tasks are a a cap on a Connector across the cluster. If have 8 > > VMs but 8 max tasks my assumption that there would be 8 * 8 = 72 task > > threads was > > wring. The logs showed that the partitions were consumed by 8 threads on > > the 8 VMs ( 1 per VM ) which was highly un optimal. When I scaled the > > VMs to 12, it did not matter, as the max tasks still prevented any > further > > distribution. > > > > * If I cancel/resume the cluster with a max task of 48 ( keeping the > same > > job name and thus connector definition the max tasks does not change, > as > > in > > it seems to keep the same number of max task threads limit ( as in 8 ) > > > > * I can bring down a VM and see the task migrate to a free VM but the > > overall count of task threads remain the same. > > > > > > In essence, the num of tasks is a cap on threads in the cluster per > > connector, A connector is a source->sink pair that spans a cluster. Thus > if > > we have a > > A->B DAG and max tasks of 8, then there will be no more that 8 Source > > Tasks ( threads ) no matter how big the cluster is, It thus makes sense > to > > over provision ( within limits of a single VM ) on the max tasks to allow > > for adding more VMs for scale up..... > > > > > > > > On Fri, Oct 18, 2019 at 8:04 PM Vishal Santoshi < > vishal.santo...@gmail.com > > > > > wrote: > > > > > I misspoke > > > > > > >> I now have 8 VMs 8 cpus with 48 max tasks and it did spread to the > the > > > 8 VMs. I then upscaled to 12 VMs and the tasks *have not *migrated as > I > > > would expect . > > > > > > > > > > > > > > > On Fri, Oct 18, 2019 at 8:00 PM Vishal Santoshi < > > vishal.santo...@gmail.com> > > > wrote: > > > > > >> OK, You will have to explain :) > > >> > > >> I had 12 VMs with 8 cpus and 8 max tasks. I thought let me give a CPU > > to > > >> each task, which I presumed is a java thread ( even though I know the > > >> thread would be mostly ip bound ). . I saw the issue I pointed up. > > >> *I now have 8 VMs 8 cpus with 48 max tasks and it did spread to the > the > > >> 8 VMs. I then upscaled to 12 VMs and the tasks migrated as I would > > expect > > >> .* > > >> > > >> I know that a VM will have MirrorSourceConnector and > > >> MirrorHeartbeatConnector tasks up till tasks.max. So a few questions > > >> > > >> > > >> > > >> * When we say there are 48 max tasks, are we saying there are 48 > > threads > > >> ( in fact 96, each for the 2 groups above, worst case + 2 ) ? > > >> * When we talk about Connector, are we talking about a JVM process, as > > in > > >> a Connector is a JVM process ? > > >> * Why larger number of tasks.max help the spread ? As in I would > > assume > > >> there are up till 8 tasks ( or 16 ) per VM but how that should not > have > > >> prevented re assignment on a scale up ( as it clearly did ) ? > > >> > > >> The reason I ask is that I plan to run mm2 cluster on k8s and I want > to > > >> make sure that I use the version of JVM that is more docker friendly > > vis a > > >> vis, how many cpus it believes it has and as explained here > > >> > > > https://blog.softwaremill.com/docker-support-in-new-java-8-finally-fd595df0ca54 > > >> > > >> > > >> > > >> > > >> On Fri, Oct 18, 2019 at 4:15 PM Ryanne Dolan <ryannedo...@gmail.com> > > >> wrote: > > >> > > >>> What is tasks.max? Consider bumping to something like 48 if you're > > >>> running > > >>> on a dozen nodes. > > >>> > > >>> Ryanne > > >>> > > >>> On Fri, Oct 18, 2019, 1:43 PM Vishal Santoshi < > > vishal.santo...@gmail.com > > >>> > > > >>> wrote: > > >>> > > >>> > Hey Ryanne, > > >>> > > > >>> > > > >>> > I see a definite issue. I am doing an intense test and > I > > >>> bring > > >>> > up 12 VMs ( they are 12 pods with 8 cpus each ), replicating about > > 1200 > > >>> > plus topics ( fairly heavy 100mbps ) ... They are acquired and are > > >>> > staggered as they come up..I see a fraction of these nodes not > > >>> assigned any > > >>> > replication....There is plenty to go around. ( more then a couple > of > > >>> > thousand partitions ) . is there something I am missing.... As in > > my > > >>> > current case 5 of the 12 VMs are idle.. > > >>> > > > >>> > Vishal > > >>> > > > >>> > On Fri, Oct 18, 2019 at 7:05 AM Vishal Santoshi < > > >>> vishal.santo...@gmail.com > > >>> > > > > >>> > wrote: > > >>> > > > >>> > > Oh sorry a. COUNTER... is more like it.... > > >>> > > > > >>> > > On Fri, Oct 18, 2019, 6:58 AM Vishal Santoshi < > > >>> vishal.santo...@gmail.com > > >>> > > > > >>> > > wrote: > > >>> > > > > >>> > >> Will do > > >>> > >> One more thing the age/latency metrics seem to be analogous > as > > >>> in > > >>> > >> they seem to be calculated using similar routines. I would > think > > a > > >>> > metric > > >>> > >> tracking > > >>> > >> the number of flush failures ( as a GAUGE ) given > > >>> > >> offset.flush.timeout.ms would be highly beneficial. > > >>> > >> > > >>> > >> Regards.. > > >>> > >> > > >>> > >> > > >>> > >> On Thu, Oct 17, 2019 at 11:53 PM Ryanne Dolan < > > >>> ryannedo...@gmail.com> > > >>> > >> wrote: > > >>> > >> > > >>> > >>> Ah, I see you are correct. Also I misspoke saying "workers" > > >>> earlier, as > > >>> > >>> the > > >>> > >>> consumer is not created by the worker, but the task. > > >>> > >>> > > >>> > >>> I suppose the put() could be changed to putIfAbsent() here to > > >>> enable > > >>> > this > > >>> > >>> property to be changed. Maybe submit a PR? > > >>> > >>> > > >>> > >>> Ryanne > > >>> > >>> > > >>> > >>> On Thu, Oct 17, 2019 at 10:00 AM Vishal Santoshi < > > >>> > >>> vishal.santo...@gmail.com> > > >>> > >>> wrote: > > >>> > >>> > > >>> > >>> > Hmm ( I did both ) > > >>> > >>> > > > >>> > >>> > another->another_test.enabled = true > > >>> > >>> > > > >>> > >>> > another->another_test.topics = act_post > > >>> > >>> > > > >>> > >>> > another->another_test.emit.heartbeats.enabled = false > > >>> > >>> > > > >>> > >>> > another->another_test.consumer.auto.offset.reset = latest > > >>> > >>> > > > >>> > >>> > another->another_test.sync.topic.acls.enabled = false > > >>> > >>> > > > >>> > >>> > another.consumer.auto.offset.reset = latest > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > When I grep for the ConsumerConfig ( and there are 8 > instances, > > >>> this > > >>> > >>> topic > > >>> > >>> > has 4 partitions ) > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > [2019-10-17 14:01:21,879] INFO ConsumerConfig values: > > >>> > >>> > > > >>> > >>> > allow.auto.create.topics = true > > >>> > >>> > > > >>> > >>> > auto.commit.interval.ms = 5000 > > >>> > >>> > > > >>> > >>> > *auto.offset.reset* = earliest > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > I am now using the 2.4 branch from kafka trunk > > >>> > >>> > https://github.com/apache/kafka/tree/2.4/connect/mirror > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > This code change works and makes sense.. I think all other > > >>> settings > > >>> > >>> will be > > >>> > >>> > fine ( as can be overridden ) but for the 2 below.. > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > *--- > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java* > > >>> > >>> > > > >>> > >>> > *+++ > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java* > > >>> > >>> > > > >>> > >>> > @@ -230,7 +230,7 @@ public class MirrorConnectorConfig > extends > > >>> > >>> > AbstractConfig { > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > > > >>> > > > props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); > > >>> > >>> > > > >>> > >>> > > > >>> props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX)); > > >>> > >>> > > > >>> > >>> > props.put("enable.auto.commit", "false"); > > >>> > >>> > > > >>> > >>> > - props.put("auto.offset.reset", "earliest"); > > >>> > >>> > > > >>> > >>> > + props.put("auto.offset.reset", "latest"); > > >>> > >>> > > > >>> > >>> > return props; > > >>> > >>> > > > >>> > >>> > } > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > Regards. > > >>> > >>> > > > >>> > >>> > On Wed, Oct 16, 2019 at 3:36 PM Ryanne Dolan < > > >>> ryannedo...@gmail.com> > > >>> > >>> > wrote: > > >>> > >>> > > > >>> > >>> > > Vishal, you should be able to override the properties > passed > > >>> to the > > >>> > >>> > > internal workers using properties like > > >>> > >>> A->B.consumer.auto.offset.reset or > > >>> > >>> > > A.consumer.auto.offset.reset in the mm2.properties file. > > >>> Certain > > >>> > >>> > top-level > > >>> > >>> > > properties like tasks.max are honored without the A->B or A > > >>> prefix, > > >>> > >>> but > > >>> > >>> > > auto.offset.reset is not one of them. > > >>> > >>> > > > > >>> > >>> > > Ryanne > > >>> > >>> > > > > >>> > >>> > > On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi < > > >>> > >>> > vishal.santo...@gmail.com > > >>> > >>> > > > > > >>> > >>> > > wrote: > > >>> > >>> > > > > >>> > >>> > > > Hey Ryanne, > > >>> > >>> > > > > > >>> > >>> > > > > > >>> > >>> > > > How do I override auto.offset.reset = latest for > > >>> consumers > > >>> > >>> through > > >>> > >>> > > > mm2.properties. I have tried straight up . > > auto.offset.reset > > >>> and > > >>> > >>> > > consumer. > > >>> > >>> > > > auto.offset.reset but it defaults to earliest.. I do > have > > a > > >>> > query > > >>> > >>> in > > >>> > >>> > > > another thread but though you might know off hand.. > > >>> > >>> > > > > > >>> > >>> > > > I would imagine there is some way in general of > overriding > > >>> > >>> consumer and > > >>> > >>> > > > producer configs through mm2.properties in MM2 ? > > >>> > >>> > > > > > >>> > >>> > > > Regards. > > >>> > >>> > > > > > >>> > >>> > > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi < > > >>> > >>> > > vishal.santo...@gmail.com > > >>> > >>> > > > > > > >>> > >>> > > > wrote: > > >>> > >>> > > > > > >>> > >>> > > > > Thank you so much for all your help. Will keep you > > posted > > >>> on > > >>> > >>> tests I > > >>> > >>> > > > do.. > > >>> > >>> > > > > I hope this is helpful to other folks too.. > > >>> > >>> > > > > > > >>> > >>> > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan < > > >>> > >>> ryannedo...@gmail.com> > > >>> > >>> > > > > wrote: > > >>> > >>> > > > > > > >>> > >>> > > > >> That's right. MM2 is at-least-once for now, same as > > legacy > > >>> > >>> > > MirrorMaker. > > >>> > >>> > > > >> You > > >>> > >>> > > > >> can follow > > >>> https://issues.apache.org/jira/browse/KAFKA-6080 > > >>> > for > > >>> > >>> > > updates > > >>> > >>> > > > >> on > > >>> > >>> > > > >> exactly-once semantics in Connect. > > >>> > >>> > > > >> > > >>> > >>> > > > >> Ryanne > > >>> > >>> > > > >> > > >>> > >>> > > > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi < > > >>> > >>> > > > >> vishal.santo...@gmail.com> > > >>> > >>> > > > >> wrote: > > >>> > >>> > > > >> > > >>> > >>> > > > >> > >> You are correct. I'm working on a KIP and PoC > to > > >>> > >>> introduce > > >>> > >>> > > > >> > transactions to > > >>> > >>> > > > >> > >> Connect for this exact purpose :) > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > That is awesome. Any time frame ? > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > In the mean time the SLA as of now > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > 1. It is conceivable that we flush the producer to > the > > >>> > target > > >>> > >>> > > cluster > > >>> > >>> > > > >> but > > >>> > >>> > > > >> > fail to offset commit. If there was a restart before > > the > > >>> > next > > >>> > >>> > > > successful > > >>> > >>> > > > >> > offset commit, there will be duplicates and a part > > of > > >>> data > > >>> > >>> is > > >>> > >>> > > > >> replayed ( > > >>> > >>> > > > >> > at least once ) ? > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > 2. The same can be said about partial flushes, > though > > >>> am > > >>> > not > > >>> > >>> sure > > >>> > >>> > > > about > > >>> > >>> > > > >> > how kafka addresses flush ( Is a flush either > success > > >>> or a > > >>> > >>> > failure, > > >>> > >>> > > > and > > >>> > >>> > > > >> > nothing in between ) > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > Thanks.. > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan < > > >>> > >>> > > ryannedo...@gmail.com> > > >>> > >>> > > > >> > wrote: > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > Hey Vishal, glad to hear you're making progress. > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > 1. It seems though that flushing [...] the > > producer > > >>> and > > >>> > >>> > setting > > >>> > >>> > > > the > > >>> > >>> > > > >> > > > offset to the compacting topic is not atomic OR > > >>> do we > > >>> > >>> use > > >>> > >>> > > > >> > > > transactions here ? > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > You are correct. I'm working on a KIP and PoC to > > >>> introduce > > >>> > >>> > > > >> transactions > > >>> > >>> > > > >> > to > > >>> > >>> > > > >> > > Connect for this exact purpose :) > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 > ), > > >>> and I > > >>> > >>> have > > >>> > >>> > 2 > > >>> > >>> > > > >> topics > > >>> > >>> > > > >> > > with > > >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as in > > >>> there > > >>> > are > > >>> > >>> 4 > > >>> > >>> > > > consumer > > >>> > >>> > > > >> > > groups > > >>> > >>> > > > >> > > > ( on CG per thread ) ... > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > Some details here: > > >>> > >>> > > > >> > > - tasks.max controls the maximum number of tasks > > >>> created > > >>> > per > > >>> > >>> > > > Connector > > >>> > >>> > > > >> > > instance. Both MirrorSourceConnector and > > >>> > >>> > MirrorCheckpointConnector > > >>> > >>> > > > >> will > > >>> > >>> > > > >> > > create multiple tasks (up to tasks.max), but > > >>> > >>> > > > MirrorHeartbeatConnector > > >>> > >>> > > > >> > only > > >>> > >>> > > > >> > > ever creates a single task. Moreover, there cannot > > be > > >>> more > > >>> > >>> tasks > > >>> > >>> > > > than > > >>> > >>> > > > >> > > topic-partitions (for MirrorSourceConnector) or > > >>> consumer > > >>> > >>> groups > > >>> > >>> > > (for > > >>> > >>> > > > >> > > MirrorCheckpointConnector). So if you have two > > topics > > >>> with > > >>> > >>> one > > >>> > >>> > > > >> partition > > >>> > >>> > > > >> > > each and 1 consumer group total, you'll have two > > >>> > >>> > > > MirrorSourceConnector > > >>> > >>> > > > >> > > tasks, one MirrorHeartbeatConnector task, and one > > >>> > >>> > > > >> > MirrorCheckpointConnector > > >>> > >>> > > > >> > > tasks, for a total of four. And that's in one > > >>> direction > > >>> > >>> only: if > > >>> > >>> > > you > > >>> > >>> > > > >> have > > >>> > >>> > > > >> > > multiple source->target herders enabled, each will > > >>> create > > >>> > >>> tasks > > >>> > >>> > > > >> > > independently. > > >>> > >>> > > > >> > > - There are no consumer groups in MM2, > technically. > > >>> The > > >>> > >>> Connect > > >>> > >>> > > > >> framework > > >>> > >>> > > > >> > > uses the Coordinator API and internal topics to > > divide > > >>> > tasks > > >>> > >>> > among > > >>> > >>> > > > >> > workers > > >>> > >>> > > > >> > > -- not a consumer group per se. The MM2 connectors > > >>> use the > > >>> > >>> > > assign() > > >>> > >>> > > > >> API, > > >>> > >>> > > > >> > > not the subscribe() API, so there are no consumer > > >>> groups > > >>> > >>> there > > >>> > >>> > > > >> either. In > > >>> > >>> > > > >> > > fact, they don't commit() either. This is nice, as > > it > > >>> > >>> > eliminates a > > >>> > >>> > > > >> lot of > > >>> > >>> > > > >> > > the rebalancing problems legacy MirrorMaker has > been > > >>> > plagued > > >>> > >>> > with. > > >>> > >>> > > > >> With > > >>> > >>> > > > >> > > MM2, rebalancing only occurs when the number of > > >>> workers > > >>> > >>> changes > > >>> > >>> > or > > >>> > >>> > > > >> when > > >>> > >>> > > > >> > the > > >>> > >>> > > > >> > > assignments change (e.g. new topics are > discovered). > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > Ryanne > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal Santoshi < > > >>> > >>> > > > >> > > vishal.santo...@gmail.com> > > >>> > >>> > > > >> > > wrote: > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > Hey Ryanne, > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > The test was on topics that had a 7 > day > > >>> > >>> retention. > > >>> > >>> > > > Which > > >>> > >>> > > > >> > > > generally implies that the batch size for flush > is > > >>> > pretty > > >>> > >>> > high ( > > >>> > >>> > > > >> till > > >>> > >>> > > > >> > the > > >>> > >>> > > > >> > > > consumption becomes current ). The > > >>> > >>> offset.flush.timeout.ms > > >>> > >>> > > > >> defaults > > >>> > >>> > > > >> > to > > >>> > >>> > > > >> > > 5 > > >>> > >>> > > > >> > > > seconds and the code will not send in the > offsets > > >>> if the > > >>> > >>> flush > > >>> > >>> > > is > > >>> > >>> > > > >> not > > >>> > >>> > > > >> > > > complete. Increasing that time out did solve the > > >>> "not > > >>> > >>> sending > > >>> > >>> > > the > > >>> > >>> > > > >> > offset > > >>> > >>> > > > >> > > to > > >>> > >>> > > > >> > > > topic" issue. > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > Two questions ( I am being greedy here :) ) > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > 1. It seems though that flushing the flushing > the > > >>> > >>> producer and > > >>> > >>> > > > >> setting > > >>> > >>> > > > >> > > the > > >>> > >>> > > > >> > > > offset to the compacting topic is not atomic OR > > >>> do we > > >>> > >>> use > > >>> > >>> > > > >> > > > transactions here ? > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > 2. I see > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > WorkerSourceTask{id=MirrorHeartbeatConnector-0} > > >>> > flushing > > >>> > >>> > 956435 > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > WorkerSourceTask{id=MirrorSourceConnector-1} > > >>> flushing > > >>> > >>> 356251 > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > WorkerSourceTask{id=MirrorCheckpointConnector-2} > > >>> > >>> flushing 0 > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > WorkerSourceTask{id=MirrorCheckpointConnector-3} > > >>> > >>> flushing 0 > > >>> > >>> > > > >> > outstanding > > >>> > >>> > > > >> > > > messages > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 > ), > > >>> and I > > >>> > >>> have > > >>> > >>> > 2 > > >>> > >>> > > > >> topics > > >>> > >>> > > > >> > > with > > >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as in > > >>> there > > >>> > are > > >>> > >>> 4 > > >>> > >>> > > > consumer > > >>> > >>> > > > >> > > groups > > >>> > >>> > > > >> > > > ( on CG per thread ) ... > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > THANKS A LOT > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > Vishal. > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan < > > >>> > >>> > > > ryannedo...@gmail.com > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > > wrote: > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > > timed out > > >>> > >>> > > > >> > > > > while waiting for producer to flush > outstanding > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > Yeah, that's what I'd expect to see if Connect > > was > > >>> > >>> unable to > > >>> > >>> > > > send > > >>> > >>> > > > >> > > records > > >>> > >>> > > > >> > > > > to the downstream remote topics, e.g. if > > >>> > >>> > min.in-sync.replicas > > >>> > >>> > > > were > > >>> > >>> > > > >> > > > > misconfigured. Given some data seems to > arrive, > > >>> it's > > >>> > >>> > possible > > >>> > >>> > > > that > > >>> > >>> > > > >> > > > > everything is configured correctly but with > too > > >>> much > > >>> > >>> latency > > >>> > >>> > > to > > >>> > >>> > > > >> > > > > successfully commit within the default > timeouts. > > >>> You > > >>> > may > > >>> > >>> > want > > >>> > >>> > > to > > >>> > >>> > > > >> > > increase > > >>> > >>> > > > >> > > > > the number of tasks substantially to achieve > > more > > >>> > >>> > parallelism > > >>> > >>> > > > and > > >>> > >>> > > > >> > > > > throughput. > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > Ryanne > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal Santoshi > < > > >>> > >>> > > > >> > > vishal.santo...@gmail.com > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > wrote: > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > Aah no.. this is more to it. Note sure if > > >>> related > > >>> > to > > >>> > >>> the > > >>> > >>> > > > above. > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114 > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > Is timing out based on > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133 > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR > > >>> > >>> > > > >> > > > > > WorkerSourceTask{id=MirrorSourceConnector-0} > > >>> Failed > > >>> > to > > >>> > >>> > > flush, > > >>> > >>> > > > >> timed > > >>> > >>> > > > >> > > out > > >>> > >>> > > > >> > > > > > while waiting for producer to flush > > outstanding > > >>> > 36478 > > >>> > >>> > > messages > > >>> > >>> > > > >> > > > > > > > >>> > >>> (org.apache.kafka.connect.runtime.WorkerSourceTask:423) > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal > > Santoshi > > >>> < > > >>> > >>> > > > >> > > > > vishal.santo...@gmail.com > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > wrote: > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > I think this might be it.. Could you > > confirm. > > >>> It > > >>> > >>> seems > > >>> > >>> > to > > >>> > >>> > > be > > >>> > >>> > > > >> on > > >>> > >>> > > > >> > the > > >>> > >>> > > > >> > > > > path > > >>> > >>> > > > >> > > > > > > to commit the offsets.. but not sure... > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR Scheduler > > for > > >>> > >>> > > > >> > MirrorSourceConnector > > >>> > >>> > > > >> > > > > > caught > > >>> > >>> > > > >> > > > > > > exception in scheduled task: syncing topic > > >>> ACLs > > >>> > >>> > > > >> > > > > > > > > >>> (org.apache.kafka.connect.mirror.Scheduler:102) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > java.util.concurrent.ExecutionException: > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > org.apache.kafka.common.errors.SecurityDisabledException: > > >>> > >>> > > No > > >>> > >>> > > > >> > > > Authorizer > > >>> > >>> > > > >> > > > > > is > > >>> > >>> > > > >> > > > > > > configured on the broker > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > >>> > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > >>> > >>> > > > > >>> > >>> > > >>> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > >>> > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > at > > >>> java.lang.Thread.run(Thread.java:748) > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > Caused by: > > >>> > >>> > > > >> > > > > >>> org.apache.kafka.common.errors.SecurityDisabledException: > > >>> > >>> > > > >> > > > No > > >>> > >>> > > > >> > > > > > > Authorizer is configured on the broker > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne > > Dolan > > >>> < > > >>> > >>> > > > >> > > ryannedo...@gmail.com > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > > wrote: > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > >> > I do not have a single record in the > > >>> offsets > > >>> > >>> topic > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > >> That's definitely not normal. You are > > correct > > >>> > that > > >>> > >>> > > without > > >>> > >>> > > > >> > records > > >>> > >>> > > > >> > > > in > > >>> > >>> > > > >> > > > > > that > > >>> > >>> > > > >> > > > > > >> topic, MM2 will restart from EARLIEST. > The > > >>> > offsets > > >>> > >>> > should > > >>> > >>> > > > be > > >>> > >>> > > > >> > > stored > > >>> > >>> > > > >> > > > > > >> periodically and whenever the connectors > > >>> > gracefully > > >>> > >>> > > > shutdown > > >>> > >>> > > > >> or > > >>> > >>> > > > >> > > > > restart. > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > >> Is it possible the topics don't have > > required > > >>> > ACLs > > >>> > >>> or > > >>> > >>> > > > >> something? > > >>> > >>> > > > >> > > > Also > > >>> > >>> > > > >> > > > > > >> note: > > >>> > >>> > > > >> > > > > > >> Connect wants the offsets topic to have a > > >>> large > > >>> > >>> number > > >>> > >>> > of > > >>> > >>> > > > >> > > partitions > > >>> > >>> > > > >> > > > > and > > >>> > >>> > > > >> > > > > > >> to > > >>> > >>> > > > >> > > > > > >> be compacted. Though I can't imagine > either > > >>> would > > >>> > >>> > prevent > > >>> > >>> > > > >> > commits > > >>> > >>> > > > >> > > > from > > >>> > >>> > > > >> > > > > > >> being sent. > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > >> Ryanne > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal > > >>> Santoshi > > >>> > < > > >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com> > > >>> > >>> > > > >> > > > > > >> wrote: > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > >> > 2nd/restore issue ( I think I need to > > >>> solve > > >>> > the > > >>> > >>> > > offsets > > >>> > >>> > > > >> topic > > >>> > >>> > > > >> > > > issue > > >>> > >>> > > > >> > > > > > >> > before I go with the scale up and down > > >>> issue ) > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > As you had indicated, I went ahead and > > >>> created > > >>> > >>> the > > >>> > >>> > > > offsets > > >>> > >>> > > > >> > > topic. > > >>> > >>> > > > >> > > > > The > > >>> > >>> > > > >> > > > > > >> > status of the cluster ( destination ) > is > > >>> thus > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > opic# Partitions# BrokersBrokers Spread > > >>> > %Brokers > > >>> > >>> Skew > > >>> > >>> > > > >> %Brokers > > >>> > >>> > > > >> > > > > Leader > > >>> > >>> > > > >> > > > > > >> > Skew %# ReplicasUnder Replicated > %Leader > > >>> > >>> SizeProducer > > >>> > >>> > > > >> > > > > > Message/SecSummed > > >>> > >>> > > > >> > > > > > >> > Recent Offsets > > >>> > >>> > > > >> > > > > > >> > s8k.checkpoints.internal > > >>> > >>> > > > >> > > > > > >> > < > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 > > >>> > >>> > > > >> > > > > > >> > s8k.act_search_page > > >>> > >>> > > > >> > > > > > >> > < > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842 > > >>> > >>> > > > >> > > > > > >> > s8k.act_reach > > >>> > >>> > > > >> > > > > > >> > < > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529 > > >>> > >>> > > > >> > > > > > >> > mm2-status.s8k.internal > > >>> > >>> > > > >> > > > > > >> > < > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10 > > >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k_test.internal > > >>> > >>> > > > >> > > > > > >> > < > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 > > >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k.internal > > >>> > >>> > > > >> > > > > > >> > < > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 > > >>> > >>> > > > >> > > > > > >> > mm2-configs.s8k.internal > > >>> > >>> > > > >> > > > > > >> > < > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13 > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > You can see . that we have the 5 ( I > > >>> created > > >>> > >>> bot the > > >>> > >>> > > > >> offsets, > > >>> > >>> > > > >> > > to > > >>> > >>> > > > >> > > > be > > >>> > >>> > > > >> > > > > > >> safe > > >>> > >>> > > > >> > > > > > >> > for the below ) > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *clusters = s8k, s8k_test* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *s8k.bootstrap.servers = .....* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *s8k_test.bootstrap.servers = ......* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *# only allow replication dr1 -> dr2* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.enabled = true* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.topics = > > >>> > >>> act_search_page|act_reach* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.emit.heartbeats.enabled > = > > >>> false* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *s8k_test->s8k.enabled = false* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *s8k_test->s8k.emit.heartbeats.enabled > = > > >>> false* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *s8k_test.replication.factor = 3* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *s8k.replication.factor = 3* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *offsets.storage.replication.factor = > 3* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *replication.factor = 3* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *replication.policy.separator = .* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > *tasks.max = 4* > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > What seems strange is that I do not > have > > a > > >>> > single > > >>> > >>> > > record > > >>> > >>> > > > in > > >>> > >>> > > > >> > the > > >>> > >>> > > > >> > > > > > offsets > > >>> > >>> > > > >> > > > > > >> > topic.. Is that normal ? I would > > imagine > > >>> that > > >>> > >>> > > without a > > >>> > >>> > > > >> > > record, > > >>> > >>> > > > >> > > > > > there > > >>> > >>> > > > >> > > > > > >> is > > >>> > >>> > > > >> > > > > > >> > no way that a restore would happen.... > > And > > >>> that > > >>> > >>> is > > >>> > >>> > > > obvious > > >>> > >>> > > > >> > when > > >>> > >>> > > > >> > > I > > >>> > >>> > > > >> > > > > > >> restart > > >>> > >>> > > > >> > > > > > >> > the mm2 instance... Find the screenshot > > >>> > >>> attached. In > > >>> > >>> > > > >> essence > > >>> > >>> > > > >> > the > > >>> > >>> > > > >> > > > > > latency > > >>> > >>> > > > >> > > > > > >> > avg lag is reset \when the mm2 instance > > is > > >>> > reset > > >>> > >>> > > > >> indicating no > > >>> > >>> > > > >> > > > > restore > > >>> > >>> > > > >> > > > > > >> but > > >>> > >>> > > > >> > > > > > >> > restart from EARLIEST... I must be > > missing > > >>> some > > >>> > >>> thing > > >>> > >>> > > > >> simple > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne > > >>> Dolan < > > >>> > >>> > > > >> > > > ryannedo...@gmail.com > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > >> > wrote: > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> >> Vishal, the first issue is easy: you > > must > > >>> set > > >>> > >>> > > tasks.max > > >>> > >>> > > > to > > >>> > >>> > > > >> > > > > something > > >>> > >>> > > > >> > > > > > >> above > > >>> > >>> > > > >> > > > > > >> >> 1 (the default) in order to achieve > any > > >>> > >>> parallelism. > > >>> > >>> > > > This > > >>> > >>> > > > >> > > > property > > >>> > >>> > > > >> > > > > is > > >>> > >>> > > > >> > > > > > >> >> passed along to the internal Connect > > >>> workers. > > >>> > >>> It's > > >>> > >>> > > > >> > unfortunate > > >>> > >>> > > > >> > > > that > > >>> > >>> > > > >> > > > > > >> >> Connect > > >>> > >>> > > > >> > > > > > >> >> is not smart enough to default this > > >>> property > > >>> > to > > >>> > >>> the > > >>> > >>> > > > >> number of > > >>> > >>> > > > >> > > > > > workers. > > >>> > >>> > > > >> > > > > > >> I > > >>> > >>> > > > >> > > > > > >> >> suspect that will improve before long. > > >>> > >>> > > > >> > > > > > >> >> > > >>> > >>> > > > >> > > > > > >> >> For the second issue, is it possible > you > > >>> are > > >>> > >>> missing > > >>> > >>> > > the > > >>> > >>> > > > >> > > offsets > > >>> > >>> > > > >> > > > > > >> topic? It > > >>> > >>> > > > >> > > > > > >> >> should exist alongside the config and > > >>> status > > >>> > >>> topics. > > >>> > >>> > > > >> Connect > > >>> > >>> > > > >> > > > should > > >>> > >>> > > > >> > > > > > >> create > > >>> > >>> > > > >> > > > > > >> >> this topic, but there are various > > reasons > > >>> this > > >>> > >>> can > > >>> > >>> > > fail, > > >>> > >>> > > > >> e.g. > > >>> > >>> > > > >> > > if > > >>> > >>> > > > >> > > > > the > > >>> > >>> > > > >> > > > > > >> >> replication factor is misconfigured. > You > > >>> can > > >>> > try > > >>> > >>> > > > creating > > >>> > >>> > > > >> > this > > >>> > >>> > > > >> > > > > topic > > >>> > >>> > > > >> > > > > > >> >> manually or changing > > >>> > >>> > > offsets.storage.replication.factor. > > >>> > >>> > > > >> > > > > > >> >> > > >>> > >>> > > > >> > > > > > >> >> Ryanne > > >>> > >>> > > > >> > > > > > >> >> > > >>> > >>> > > > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal > > >>> Santoshi > > >>> > < > > >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com> > > >>> > >>> > > > >> > > > > > >> >> wrote: > > >>> > >>> > > > >> > > > > > >> >> > > >>> > >>> > > > >> > > > > > >> >> > Using > > >>> > >>> > > > >> > > > > > >>> > https://github.com/apache/kafka/tree/trunk/connect/mirror > > >>> > >>> > > > >> > > > > > as a > > >>> > >>> > > > >> > > > > > >> >> > guide, > > >>> > >>> > > > >> > > > > > >> >> > I have build from source the > > >>> origin/KIP-382 > > >>> > of > > >>> > >>> > > > >> > > > > > >> >> > https://github.com/apache/kafka.git > . > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > I am seeing 2 issues > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > * I brought up 2 processes on 2 > > >>> different > > >>> > >>> nodes ( > > >>> > >>> > > they > > >>> > >>> > > > >> are > > >>> > >>> > > > >> > > > > actually > > >>> > >>> > > > >> > > > > > >> >> pods on > > >>> > >>> > > > >> > > > > > >> >> > k8s but that should not matter ). > They > > >>> share > > >>> > >>> the > > >>> > >>> > > > >> > > mm2.properties > > >>> > >>> > > > >> > > > > > file > > >>> > >>> > > > >> > > > > > >> and > > >>> > >>> > > > >> > > > > > >> >> > are replicating ( 1-way ) 3 topics > > with > > >>> 8 > > >>> > >>> > partitions > > >>> > >>> > > > in > > >>> > >>> > > > >> > > total. > > >>> > >>> > > > >> > > > > > That > > >>> > >>> > > > >> > > > > > >> >> seems > > >>> > >>> > > > >> > > > > > >> >> > to be the way to create a standalone > > mm2 > > >>> > >>> cluster. > > >>> > >>> > I > > >>> > >>> > > do > > >>> > >>> > > > >> not > > >>> > >>> > > > >> > > > > however > > >>> > >>> > > > >> > > > > > >> see( > > >>> > >>> > > > >> > > > > > >> >> at > > >>> > >>> > > > >> > > > > > >> >> > least the mbeans do not show ) any > > >>> attempt > > >>> > to > > >>> > >>> > > > rebalance. > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > > > >>> > > > https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process > > >>> > >>> > > > >> > > > > > >> >> > mbeans > > >>> > >>> > > > >> > > > > > >> >> > are all on a single node > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > * I restart the processes on the 2 > > >>> nodes ( > > >>> > >>> hard > > >>> > >>> > stop > > >>> > >>> > > > ans > > >>> > >>> > > > >> > > start > > >>> > >>> > > > >> > > > ). > > >>> > >>> > > > >> > > > > > The > > >>> > >>> > > > >> > > > > > >> >> > offsets for replication seem to be > > >>> reset to > > >>> > >>> the > > >>> > >>> > > > >> earliest, > > >>> > >>> > > > >> > as > > >>> > >>> > > > >> > > if > > >>> > >>> > > > >> > > > > it > > >>> > >>> > > > >> > > > > > >> is a > > >>> > >>> > > > >> > > > > > >> >> > brand new mirroring. It is also > > obvious > > >>> from > > >>> > >>> the > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > "record-age-ms-avg|replication-latency-ms-avg" > > >>> > >>> > > > >> > > > > > >> >> > which I track through the restart. > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > This implies that > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > 1. Load balancing by rebalancing is > > not > > >>> > >>> working. I > > >>> > >>> > > > >> cannot > > >>> > >>> > > > >> > > scale > > >>> > >>> > > > >> > > > > up > > >>> > >>> > > > >> > > > > > or > > >>> > >>> > > > >> > > > > > >> >> down > > >>> > >>> > > > >> > > > > > >> >> > by adding nodes to the mm2 cluster > or > > >>> > removing > > >>> > >>> > them. > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > 2. Restore on a mirror is not > working. > > >>> If > > >>> > the > > >>> > >>> MM2 > > >>> > >>> > > > >> cluster > > >>> > >>> > > > >> > is > > >>> > >>> > > > >> > > > > > brought > > >>> > >>> > > > >> > > > > > >> >> down, > > >>> > >>> > > > >> > > > > > >> >> > it does not start mirroring from the > > >>> last > > >>> > >>> known > > >>> > >>> > > > state. I > > >>> > >>> > > > >> > see > > >>> > >>> > > > >> > > > the, > > >>> > >>> > > > >> > > > > > >> >> > state/config topics etc created as > > >>> > expected.. > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > The mm2.properties is pretty mimimal > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > *clusters = a , b* > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > *a.bootstrap.servers = k.....* > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > *b.bootstrap.servers = k.....* > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > *# only allow replication dr1 -> > dr2* > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > *a->b.enabled = true* > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > *a->b.topics = act_search_page* > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled = > false* > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > *b->a..enabled = false* > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled = > false* > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > What do you think is the issue ? > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > Thanks > > >>> > >>> > > > >> > > > > > >> >> > > > >>> > >>> > > > >> > > > > > >> >> > > >>> > >>> > > > >> > > > > > >> > > > >>> > >>> > > > >> > > > > > >> > > >>> > >>> > > > >> > > > > > > > > >>> > >>> > > > >> > > > > > > > >>> > >>> > > > >> > > > > > > >>> > >>> > > > >> > > > > > >>> > >>> > > > >> > > > > >>> > >>> > > > >> > > > >>> > >>> > > > >> > > >>> > >>> > > > > > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >>> > > >>> > >> > > >>> > > > >>> > > >> > > >