This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch aggr-doc
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 9d4c47bc7d1b5eaae1855f542dd86005fce008b2
Author: Andrea Cosentino <[email protected]>
AuthorDate: Thu Jul 23 14:00:04 2020 +0200

    Added Aggregation documentation
---
 docs/modules/ROOT/pages/aggregation.adoc | 63 ++++++++++++++++++++++++++++++++
 docs/modules/ROOT/pages/index.adoc       |  1 +
 2 files changed, 64 insertions(+)

diff --git a/docs/modules/ROOT/pages/aggregation.adoc 
b/docs/modules/ROOT/pages/aggregation.adoc
new file mode 100644
index 0000000..443497a
--- /dev/null
+++ b/docs/modules/ROOT/pages/aggregation.adoc
@@ -0,0 +1,63 @@
+[[Aggregation-Aggregation]]
+= Aggregation
+
+In a Sink Connector scenario, there are, sometimes, use cases where an end 
user want to aggregate his Kafka record before sending them to an external 
system. 
+Usually this can be done by defining a batch size or a timeout and once the 
aggregation has been completed, sent the aggregate records collection to the 
external system.
+
+In Apache Camel it exists the 
https://camel.apache.org/components/latest/eips/aggregate-eip.html[Aggregate 
EIP] implementation and in Camel-Kafka-connector we wanted to leverage what we 
already have in the plain Apache Camel project.
+
+We introduced then the following options in the Sink Connector Configuration:
+
+[source,bash]
+----
+camel.beans.aggregate=#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator"
+camel.beans.aggregation.size=10
+camel.beans.aggregation.timeout=5000
+----
+
+So you'll be able to define your own AggregationStrategy by writing it through 
extending the AggregationStrategy Camel class or you can use one of the 
Aggregator provided by Camel.
+
+We are working on adding some Aggregator out of the box in 
camel-kafka-connector.
+
+[[HowDoesAnAggregatorLookLike-HowDoesAnAggregatorLookLike]]
+== How Does an aggregator look like?
+
+An Aggregator is something like the following:
+
+[source,java]
+----
+package org.apache.camel.kafkaconnector.aggregator;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+public class StringAggregator implements AggregationStrategy {
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        // lets append the old body to the new body
+        if (oldExchange == null) {
+            return newExchange;
+        }
+
+        String body = oldExchange.getIn().getBody(String.class);
+        if (body != null) {
+            Message newIn = newExchange.getIn();
+            String newBody = newIn.getBody(String.class);
+            if (newBody != null) {
+                body += System.lineSeparator() + newBody;
+            }
+
+            newIn.setBody(body);
+        }
+        return newExchange;
+    }
+}
+----
+
+And you may think oldExchange and newExchange like records arriving to the 
Aggregator.
+
+So in this case each newExchange body will be concated with the oldExchange 
body and separated through the System line separator.
+
+This process will go ahead until the batch size has been completed or the 
timeout has been reached.
diff --git a/docs/modules/ROOT/pages/index.adoc 
b/docs/modules/ROOT/pages/index.adoc
index 5926f32..223aa68 100644
--- a/docs/modules/ROOT/pages/index.adoc
+++ b/docs/modules/ROOT/pages/index.adoc
@@ -3,6 +3,7 @@
 
 * xref:about.adoc[What is it?]
 ** xref:basic-concepts.adoc[Basic concepts]
+** xref:aggregation.adoc[Aggregations]
 * xref:getting-started.adoc[Getting started]
 ** xref:try-it-out-locally.adoc[Try it locally]
 ** xref:try-it-out-on-openshift-with-strimzi.adoc[Try it on OpenShift cluster]

Reply via email to