[ https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045979#comment-17045979 ]
Randall Hauch commented on KAFKA-9216: -------------------------------------- Thanks for volunteering to fix this, [~EeveeB], and for identifying three potential approaches to fixing this! Before I discuss your approaches, I do want to first confirm something. If the Connect worker creates the config topic, it does so always with a single partition. That means that we're only concerned with scenarios where the topic was manually created (or modified) before the worker was started. Is that correct? Okay, now to your approaches. Just to clarify, the call path is basically: # `DistributedHerder.run()` calls `startServices()` # `DistributedHerder.startServices()` calls `configBackingStore.start()` # `KafkaConfigBackingStore.start()` calls `configLog.start()` # `KafkaBasedLog.start()` calls `initializer.run()` We can see that the `KafkaBasedLog.start()` method already has code that throws a ConnectException, so we know that already stops the herder from running (which is the desired behavior). So, as long as the `KafkaConfigBackingStore.start()` method (or anything called within it, including `initializer.run()`) throws a ConnectException with the appropriate error, the herder will stop. h3. Option 1 This option would work, but it seems to be a fair amount of work compared the others. h3. Option 2 IIUC, your second option is to modify the *initializer* function defined in `KafkaConfigBackingStore` to also get/check the number of partitions and to throw a ConnectException if the topic already exists and has more than one partition. This would require modifying the TopicAdmin to get the metadata for the existing topic and return it. While that's probably doable, it's more complicated than your next option. h3. Option 3 This is a good idea, too, especially because the `KafkaBasedLog.start()` method is already getting the partition information from the consumer in the form of a `List<PartitionInfo>` for the one topic (or a bit later, the `List<TopicPartition>` for the topic). If it stored *that* as a local variable and return an immutable version of that map via a method, the `KafkaConfigBackingStore.start()` method could use this method and fail if there is more than 1 partition. The great thing about this approach is that we don't have to modify the `TopicAdmin` utility or the initializer. The changes to `KafkaBasedLog` are minimal – we just need the getter method to return an immutable list of immutable `TopicPartition` objects. (Note that we could return `PartitionInfo`, but it's not immutable and we don't know how our new getter method might be used. Returning an immutable `List<TopicPartition>` is much safer.) We do have to modify the `KafkaConfigBackingStore.start()` method to use this new method, but that would be super simple logic. *Personally, I think this is a great approach: it's simple and localizes the changes pretty well.* h3. Option 4 A slight variation of Option 3 is to not introduce a new field and getter in `KafkaBasedLog` that returns the partition information, but to instead pass a "partition validation" function into the `KafkaBasedLog` constructor and then to use this in the `start()` method. The benefit is that we don't have to expose any new methods on `KafkaBasedLog`, but we have to change the constructor. This really has all the same benefits as option 3, but it's a little more hard to follow the logic. So I don't like this quite as much as option 3. > Enforce connect internal topic configuration at startup > ------------------------------------------------------- > > Key: KAFKA-9216 > URL: https://issues.apache.org/jira/browse/KAFKA-9216 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Affects Versions: 0.11.0.0 > Reporter: Randall Hauch > Priority: Major > > Users sometimes configure Connect's internal topic for configurations with > more than one partition. One partition is expected, however, and using more > than one leads to weird behavior that is sometimes not easy to spot. > Here's one example of a log message: > {noformat} > "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, > groupId=td-connect-server] Current config state offset 284 does not match > group assignment 274. Forcing rebalance. > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n" > {noformat} > Would it be possible to add a check in the KafkaConfigBackingStore and > prevent the worker from starting if connect config partition count !=1 ? -- This message was sent by Atlassian Jira (v8.3.4#803005)