[ https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16749171#comment-16749171 ]
Del Bao commented on KAFKA-7432: -------------------------------- A real-world scenario to back this use case: An event logging topic has 60 partitions. One of many downstream consumers is a data materialization service. say, it consumes the event message and make RPC calls to several external data sources to fetch signals related to this event in parallel, and then write to a downstream sink (data store). The processing time for this message is then about 4ms, i.e., 250 message per second. That's a total of 15k msg per second. The producing rate is way higher than this rate, which causes huge delays. One solution is to repartition the upstream topic. But in a corporate scenario, there are too many dependencies, not so easy. Micro-batch could be better. we process a batch of events and make RPC call once for these events. We can definitely handle the "micro-batch" at the application level. But better to have this API in the Kafka streams API. > API Method on Kafka Streams for processing chunks/batches of data > ----------------------------------------------------------------- > > Key: KAFKA-7432 > URL: https://issues.apache.org/jira/browse/KAFKA-7432 > Project: Kafka > Issue Type: New Feature > Components: streams > Reporter: sam > Priority: Major > > For many situations in Big Data it is preferable to work with a small buffer > of records at a go, rather than one record at a time. > The natural example is calling some external API that supports batching for > efficiency. > How can we do this in Kafka Streams? I cannot find anything in the API that > looks like what I want. > So far I have: > {{builder.stream[String, String]("my-input-topic") > .mapValues(externalApiCall).to("my-output-topic")}} > What I want is: > {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = > 2000).map(externalBatchedApiCall).to("my-output-topic")}} > In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In > Spark Structured Streaming we can do > {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. > > > https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)