[ 
https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raman Gupta updated KAFKA-7143:
-------------------------------
    Description: 
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's are usually easily be adapted to this style with 
a simple wrapper). With coroutines, continuations are used instead: methods 
with callbacks are suspended, and resumed once the call is complete. With 
coroutines, while access to the KafkaConsumer is done in a thread-safe way, it 
does NOT necessarily happen from a single thread -- a different underlying 
thread may actually execute the code after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.

There is a workaround for the above: run the consumer in a coroutine with a 
single-thread context, which isn't ideal because it dedicates a thread to the 
consumer.

However, further problems await -- the `commitAsync` method also cannot be used 
with coroutines because the callback is never executed and therefore the 
coroutine is never resumed past the suspension point. Upon investigation, it 
seems the callback is only executed after future calls to poll, which in a 
regular polling loop with coroutines will never happen because of the 
suspension on `commitAsync`, so we have a deadlock. I guess the idea behind 
this Kafka consumer API design is that consuming new messages may continue, 
even though commits of previous offsets (which happened an arbitrarily long 
amount of time in the past) have not necessarily been processed. However, with 
a coroutine based API, the commitAsync can be sequential before the next poll 
like commitSync, but happen asynchronously without tying up a client 
application thread.

  was:
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's are usually easily be adapted to this style with 
a simple wrapper). With coroutines, methods with callbacks are suspended, and 
resumed once the call is complete. With this approach, while access to the 
KafkaConsumer is done in a thread-safe way, it does NOT necessarily happen from 
a single thread -- a different underlying thread may actually execute the code 
after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.

There is a workaround for the above: run the consumer in a coroutine with a 
single-thread context, which isn't ideal because it dedicates a thread to the 
consumer. However, further problems await -- the `commitAsync` method also 
cannot be used with coroutines because the callback is never executed and 
therefore the coroutine is never resumed past the suspension point. The 
callback seems to only be executed on future calls to poll, which will never 
happen because of the suspension, so we have a deadlock. I guess the idea 
behind this Kafka consumer API design is that consuming new messages may 
continue, even though commits of previous offsets (which happened an 
arbitrarily long amount of time in the past) have not necessarily been 
processed. However, with a coroutine based API, the commitAsync can be 
sequential before the next poll like commitSync, but happen asynchronously 
without tying up a client application thread.


> Cannot use KafkaConsumer with Kotlin coroutines due to various issues
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-7143
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7143
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 1.1.0
>            Reporter: Raman Gupta
>            Priority: Major
>
> I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
> [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
> supports a style of async programming that avoids the need for callbacks (and 
> existing callback-based API's are usually easily be adapted to this style 
> with a simple wrapper). With coroutines, continuations are used instead: 
> methods with callbacks are suspended, and resumed once the call is complete. 
> With coroutines, while access to the KafkaConsumer is done in a thread-safe 
> way, it does NOT necessarily happen from a single thread -- a different 
> underlying thread may actually execute the code after the suspension point.
> However, the KafkaConsumer includes additional checks to verify not only the 
> thread safety of the client, but that the *same thread* is being used -- if 
> the same thread (by id) is not being used the consumer throws an exception 
> like:
> {code}
> Exception in thread "ForkJoinPool.commonPool-worker-25" 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
> {code}
> I understand this check is present to protect people from themselves, but I'd 
> like the ability to disable this check so that this code can be used 
> effectively by libraries such as Kotlin coroutines.
> There is a workaround for the above: run the consumer in a coroutine with a 
> single-thread context, which isn't ideal because it dedicates a thread to the 
> consumer.
> However, further problems await -- the `commitAsync` method also cannot be 
> used with coroutines because the callback is never executed and therefore the 
> coroutine is never resumed past the suspension point. Upon investigation, it 
> seems the callback is only executed after future calls to poll, which in a 
> regular polling loop with coroutines will never happen because of the 
> suspension on `commitAsync`, so we have a deadlock. I guess the idea behind 
> this Kafka consumer API design is that consuming new messages may continue, 
> even though commits of previous offsets (which happened an arbitrarily long 
> amount of time in the past) have not necessarily been processed. However, 
> with a coroutine based API, the commitAsync can be sequential before the next 
> poll like commitSync, but happen asynchronously without tying up a client 
> application thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to