This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch aggr-doc in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 9d4c47bc7d1b5eaae1855f542dd86005fce008b2 Author: Andrea Cosentino <[email protected]> AuthorDate: Thu Jul 23 14:00:04 2020 +0200 Added Aggregation documentation --- docs/modules/ROOT/pages/aggregation.adoc | 63 ++++++++++++++++++++++++++++++++ docs/modules/ROOT/pages/index.adoc | 1 + 2 files changed, 64 insertions(+) diff --git a/docs/modules/ROOT/pages/aggregation.adoc b/docs/modules/ROOT/pages/aggregation.adoc new file mode 100644 index 0000000..443497a --- /dev/null +++ b/docs/modules/ROOT/pages/aggregation.adoc @@ -0,0 +1,63 @@ +[[Aggregation-Aggregation]] += Aggregation + +In a Sink Connector scenario, there are, sometimes, use cases where an end user want to aggregate his Kafka record before sending them to an external system. +Usually this can be done by defining a batch size or a timeout and once the aggregation has been completed, sent the aggregate records collection to the external system. + +In Apache Camel it exists the https://camel.apache.org/components/latest/eips/aggregate-eip.html[Aggregate EIP] implementation and in Camel-Kafka-connector we wanted to leverage what we already have in the plain Apache Camel project. + +We introduced then the following options in the Sink Connector Configuration: + +[source,bash] +---- +camel.beans.aggregate=#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator" +camel.beans.aggregation.size=10 +camel.beans.aggregation.timeout=5000 +---- + +So you'll be able to define your own AggregationStrategy by writing it through extending the AggregationStrategy Camel class or you can use one of the Aggregator provided by Camel. + +We are working on adding some Aggregator out of the box in camel-kafka-connector. + +[[HowDoesAnAggregatorLookLike-HowDoesAnAggregatorLookLike]] +== How Does an aggregator look like? + +An Aggregator is something like the following: + +[source,java] +---- +package org.apache.camel.kafkaconnector.aggregator; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; +import org.apache.camel.Message; + +public class StringAggregator implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + // lets append the old body to the new body + if (oldExchange == null) { + return newExchange; + } + + String body = oldExchange.getIn().getBody(String.class); + if (body != null) { + Message newIn = newExchange.getIn(); + String newBody = newIn.getBody(String.class); + if (newBody != null) { + body += System.lineSeparator() + newBody; + } + + newIn.setBody(body); + } + return newExchange; + } +} +---- + +And you may think oldExchange and newExchange like records arriving to the Aggregator. + +So in this case each newExchange body will be concated with the oldExchange body and separated through the System line separator. + +This process will go ahead until the batch size has been completed or the timeout has been reached. diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc index 5926f32..223aa68 100644 --- a/docs/modules/ROOT/pages/index.adoc +++ b/docs/modules/ROOT/pages/index.adoc @@ -3,6 +3,7 @@ * xref:about.adoc[What is it?] ** xref:basic-concepts.adoc[Basic concepts] +** xref:aggregation.adoc[Aggregations] * xref:getting-started.adoc[Getting started] ** xref:try-it-out-locally.adoc[Try it locally] ** xref:try-it-out-on-openshift-with-strimzi.adoc[Try it on OpenShift cluster]
