Guozhang Wang created KAFKA-6989:

             Summary: Support Async Processing in Streams
                 Key: KAFKA-6989
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Guozhang Wang

Today Kafka Streams use a single-thread per task architecture to achieve 
embarrassing parallelism and good isolation. However there are a couple 
scenarios where async processing may be preferable:

1) External resource access or heavy IOs with high-latency. Suppose you need to 
access a remote REST api, read / write to an external store, or do a heavy disk 
IO operation that may result in high latency. Current threading model would 
block any other records before this record's done, waiting on the remote call / 
IO to finish.

2) Robust failure handling with retries. Imagine the app-level processing of a 
(non-corrupted) record fails (e.g. the user attempted to do a RPC to an 
external system, and this call failed), and failed records are moved into a 
separate "retry" topic. How can you process such failed records in a scalable 
way? For example, imagine you need to implement a retry policy such as "retry 
with exponential backoff". Here, you have the problem that 1. you can't really 
pause processing a single record because this will pause the processing of the 
full stream (bottleneck!) and 2. there is no straight-forward way to "sort" 
failed records based on their "next retry time" (think: priority queue).

3) Delayed processing. One use case is delaying re-processing (e.g. "delay 
re-processing this event for 5 minutes") as mentioned in 2), another is for 
implementing a scheduler: e.g. do some additional operations later based on 
this processed record. based on Zalando Dublin, for example, are implementing a 
distributed web crawler. Note that although this feature can be handled in 
punctuation, it is not well aligned with our current offset committing 
behavior, which always advance the offset once the record has been done 
traversing the topology.

I'm thinking of two options to support this feature:

1. Make the commit() mechanism more customizable to users for them to implement 
multi-threading processing themselves: users can always do async processing in 
the Processor API by spawning a thread-poll, e.g. but the key is that the 
offset to be committed should be only advanced with such async processing is 
done. This is a light-weight approach: we provide all the pieces and tools, and 
users stack them up to build their own LEGOs.

2. Provide an general API to do async processing in Processor API, and take 
care of the offsets committing internally. This is a heavy-weight approach: the 
API may not cover all async scenarios, but it is a easy way to cover the rest 
majority scenarios, and users do not need to worry of internal implementation 
details such as offsets and fault tolerance.

This message was sent by Atlassian JIRA

Reply via email to