For one of my clients, I ended up not using the splitter in Camel and instead
us a custom processor that would create an Iterator<byte[]>. This will work
with updates to camel-kafka that are included in 2.17.3. For my tests, using
the camel splitter like you have would get about 5K-10K msg/sec. With this, I
get about 200K. However, within camel, it stays as a single message so
anything in the camel route that needs to look at each line wouldn’t really
work.
from(“file://…….")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
InputStream ins = exchange.getIn().getBody(InputStream.class);
exchange.getIn().setBody(new SplitterIterator(ins));
}
})
.to("kafka:brokerAddr?topic=messages"
+
"&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer"
+
"&keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer"
);
class SplitterIterator implements Iterator<byte[]> {
final InputStream stream;
byte[] next;
SplitterIterator(InputStream i) {
stream = i;
next = readNext();
}
private byte[] readNext() {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
try {
int v = stream.read();
while (v != -1 && v != '\n') {
bout.write(v);
v = stream.read();
}
if (bout.size() == 0) {
return null;
}
return bout.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public boolean hasNext() {
return next != null;
}
public byte[] next() {
byte[] tmp = next;
next = readNext();
return tmp;
}
@Override
public void remove() {
}
};
> On Aug 1, 2016, at 4:38 AM, Sergey Zhemzhitsky <[email protected]> wrote:
>
> Hi Camel Gurus,
>
> I've faced with some performance issues of camel-kafka component during
> migrating it from 2.17.0 then to 2.17.1 and then to 2.17.2.
>
> The camel route is pretty simple and looks like this
>
> from("file:/var/lib/app/input")
> .split().simple("\n").streaming()
> .to("direct:kafka");
> from("direct:kafka")
> .to("kafka:brokerAddr?topic=messages");
>
> The first issue with camel 2.17.0 was the possibility of losing messages
> <https://github.com/apache/camel/blob/camel-2.17.0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L101>.
> Kafka's native producer is buffering the messages
> <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L468>
> and if kafka broker is unavailable then the messages can be lost when the
> route is restarted. Although the messages can be lost, the performance was
> pretty good (~10K rps) due to kafka's producer buffering.
>
> The second issue with camel 2.17.1 was that the performance of kafka
> producer degraded tremendously (up to 100 times) because of blocking on
> every message
> <https://github.com/apache/camel/blob/camel-2.17.1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L100>
> (although in that case no message losing occurs).
>
> The third issue with camel 2.17.2 (although camel started using async
> callbacks
> <https://github.com/apache/camel/blob/camel-2.17.2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L180>)
> was that the performance was still pretty poor because kafka's native
> producer was not able to buffer more than a single message (because of
> synchronous direct endpoint).
>
> The two solutions for the mentioned issues I was able to figure out:
>
> - using seda endpoint instead of direct one (then kafka's native producer
> is able to buffer the messages, but there is still a possibility to lose
> messages (because of nature of seda))
>
> - using aggregator with direct endpoint (then the route becomes more
> complicated than it is expected to be, aggregator adds additional not
> necessary delays and why at all we need additional aggregator for batching
> if the kafka's native producer already does buffering/batching?)
>
> So the question is - is there any possibility to allow kafka's native
> producer buffer more than a single message not using aggregator eip and not
> lose the messages as it can happen with intermediate seda endpoint?
>
> Kind Regards,
> Sergey
--
Daniel Kulp
[email protected] <mailto:[email protected]> - http://dankulp.com/blog
<http://dankulp.com/blog>
Talend Community Coder - http://coders.talend.com <http://coders.talend.com/>