[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045979#comment-17045979
 ] 

Randall Hauch commented on KAFKA-9216:
--------------------------------------

Thanks for volunteering to fix this, [~EeveeB], and for identifying three 
potential approaches to fixing this!

Before I discuss your approaches, I do want to first confirm something. If the 
Connect worker creates the config topic, it does so always with a single 
partition. That means that we're only concerned with scenarios where the topic 
was manually created (or modified) before the worker was started. Is that 
correct?

Okay, now to your approaches. Just to clarify, the call path is basically:
 # `DistributedHerder.run()` calls `startServices()`
 # `DistributedHerder.startServices()` calls `configBackingStore.start()`
 # `KafkaConfigBackingStore.start()` calls `configLog.start()`
 # `KafkaBasedLog.start()` calls `initializer.run()`

We can see that the `KafkaBasedLog.start()` method already has code that throws 
a ConnectException, so we know that already stops the herder from running 
(which is the desired behavior). So, as long as the 
`KafkaConfigBackingStore.start()` method (or anything called within it, 
including `initializer.run()`) throws a ConnectException with the appropriate 
error, the herder will stop.
h3. Option 1

This option would work, but it seems to be a fair amount of work compared the 
others. 
h3. Option 2

IIUC, your second option is to modify the *initializer* function defined in 
`KafkaConfigBackingStore` to also get/check the number of partitions and to 
throw a ConnectException if the topic already exists and has more than one 
partition.

This would require modifying the TopicAdmin to get the metadata for the 
existing topic and return it. While that's probably doable, it's more 
complicated than your next option.
h3. Option 3

This is a good idea, too, especially because the `KafkaBasedLog.start()` method 
is already getting the partition information from the consumer in the form of a 
`List<PartitionInfo>` for the one topic (or a bit later, the 
`List<TopicPartition>` for the topic). If it stored *that* as a local variable 
and return an immutable version of that map via a method, the 
`KafkaConfigBackingStore.start()` method could use this method and fail if 
there is more than 1 partition.

The great thing about this approach is that we don't have to modify the 
`TopicAdmin` utility or the initializer. The changes to `KafkaBasedLog` are 
minimal – we just need the getter method to return an immutable list of 
immutable `TopicPartition` objects. (Note that we could return `PartitionInfo`, 
but it's not immutable and we don't know how our new getter method might be 
used. Returning an immutable `List<TopicPartition>` is much safer.)

We do have to modify the `KafkaConfigBackingStore.start()` method to use this 
new method, but that would be super simple logic.

*Personally, I think this is a great approach: it's simple and localizes the 
changes pretty well.*
h3. Option 4

A slight variation of Option 3 is to not introduce a new field and getter in 
`KafkaBasedLog` that returns the partition information, but to instead pass a 
"partition validation" function into the `KafkaBasedLog` constructor and then 
to use this in the `start()` method. The benefit is that we don't have to 
expose any new methods on `KafkaBasedLog`, but we have to change the 
constructor.

This really has all the same benefits as option 3, but it's a little more hard 
to follow the logic. So I don't like this quite as much as option 3.

 

 

> Enforce connect internal topic configuration at startup
> -------------------------------------------------------
>
>                 Key: KAFKA-9216
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9216
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.0
>            Reporter: Randall Hauch
>            Priority: Major
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to