[ 
https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984657#comment-16984657
 ] 

Werner Donné commented on KAFKA-6989:
-------------------------------------

Hello,

Since Kafka consumers receive messages in batches the processor classes and 
interfaces could be extended to also forward lists of messages. The stream DSL 
could then allow mappers like {{ValueMapper<Stream<? super V>,Stream<? extends 
VR>>}} or {{KeyValueMapper<Stream<KeyValue<? super K, ? super 
v>,Stream<KeyValue<? extends KR, ? extends VR>>>}}. Implementations could then 
create chains of completion stages and have everything executed in a fork join 
pool. This will already reduce the relative number of blocking waits.

If the result is forwarded to a topic this can also be done internally with a 
chained sequence of completion stages using the callback variant of the 
producer send method. This can be wrapped in a producer transaction.

A further step could be to allow completion stages in the mapper interfaces. A 
task could then build one completion stage chain containing all steps and wait 
for it to complete. That's only one blocking wait per message batch. It would 
not require a modification of the current thread model.

Best regards,

Werner.
 
 

> Support Async Processing in Streams
> -----------------------------------
>
>                 Key: KAFKA-6989
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6989
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>              Labels: needs-kip
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-311%3A+Async+processing+with+dynamic+scheduling+in+Kafka+Streams]
> Today Kafka Streams use a single-thread per task architecture to achieve 
> embarrassing parallelism and good isolation. However there are a couple 
> scenarios where async processing may be preferable:
> 1) External resource access or heavy IOs with high-latency. Suppose you need 
> to access a remote REST api, read / write to an external store, or do a heavy 
> disk IO operation that may result in high latency. Current threading model 
> would block any other records before this record's done, waiting on the 
> remote call / IO to finish.
> 2) Robust failure handling with retries. Imagine the app-level processing of 
> a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an 
> external system, and this call failed), and failed records are moved into a 
> separate "retry" topic. How can you process such failed records in a scalable 
> way? For example, imagine you need to implement a retry policy such as "retry 
> with exponential backoff". Here, you have the problem that 1. you can't 
> really pause processing a single record because this will pause the 
> processing of the full stream (bottleneck!) and 2. there is no 
> straight-forward way to "sort" failed records based on their "next retry 
> time" (think: priority queue).
> 3) Delayed processing. One use case is delaying re-processing (e.g. "delay 
> re-processing this event for 5 minutes") as mentioned in 2), another is for 
> implementing a scheduler: e.g. do some additional operations later based on 
> this processed record. based on Zalando Dublin, for example, are implementing 
> a distributed web crawler. Note that although this feature can be handled in 
> punctuation, it is not well aligned with our current offset committing 
> behavior, which always advance the offset once the record has been done 
> traversing the topology.
> I'm thinking of two options to support this feature:
> 1. Make the commit() mechanism more customizable to users for them to 
> implement multi-threading processing themselves: users can always do async 
> processing in the Processor API by spawning a thread-poll, e.g. but the key 
> is that the offset to be committed should be only advanced with such async 
> processing is done. This is a light-weight approach: we provide all the 
> pieces and tools, and users stack them up to build their own LEGOs.
> 2. Provide an general API to do async processing in Processor API, and take 
> care of the offsets committing internally. This is a heavy-weight approach: 
> the API may not cover all async scenarios, but it is a easy way to cover the 
> rest majority scenarios, and users do not need to worry of internal 
> implementation details such as offsets and fault tolerance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to