[ 
https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raman Gupta updated KAFKA-7143:
-------------------------------
    Summary: Cannot use KafkaConsumer with Kotlin coroutines due to various 
issues  (was: Cannot use KafkaConsumer with Kotlin coroutines due to Thread id 
check)

> Cannot use KafkaConsumer with Kotlin coroutines due to various issues
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-7143
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7143
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 1.1.0
>            Reporter: Raman Gupta
>            Priority: Major
>
> I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
> [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
> supports a style of async programming that avoids the need for callbacks (and 
> existing callback-based API's such as Kafka's can easily be adapted to this). 
> With coroutines, methods with callbacks are suspended, and resumed once the 
> call is complete. With this approach, while access to the KafkaConsumer is 
> done in a thread-safe way, it does NOT happen from a single thread -- a 
> different underlying thread may actually execute the code after the 
> suspension point.
> However, the KafkaConsumer includes additional checks to verify not only the 
> thread safety of the client, but that the *same thread* is being used -- if 
> the same thread (by id) is not being used the consumer throws an exception 
> like:
> {code}
> Exception in thread "ForkJoinPool.commonPool-worker-25" 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
> {code}
> I understand this check is present to protect people from themselves, but I'd 
> like the ability to disable this check so that this code can be used 
> effectively by libraries such as Kotlin coroutines.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to