> I use https://github.com/akka/reactive-kafka > <https://github.com/akka/reactive-kafka>. This library has just been taken > over by the Akka team and will in incorporated into future version of Akka.
Thanks Dave for the pointer. I downloaded the reactive-kakka .8 branch since we're using Kafka .8.2.2. We need at-most semantics and control over individual offset commit so I was looking at the example below on the branch page https://github.com/akka/reactive-kafka/tree/0.8 Could this example be tweaked to provide at-most once semantics ? And there's mention in the doc of configuring the concurrency level of internal thread pools, so I assume that would be applicable to this example ? -- Nick Manual Commit (version 0.8 and above) In order to be able to achieve "at-least-once" delivery, you can use following API to obtain an additional Sink, when you can stream back messages that you processed. An underlying actor will periodically flush offsets of these messages as committed. Reactive Kafka supports manual commit both to Zookeeper (legacy) and Kafka storage. Dual commit is not supported. In order to commit manually to zookeeper, you have to add an optional module to your dependencies: import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka} implicit val actorSystem = ActorSystem("ReactiveKafka") implicit val materializer = ActorMaterializer() val kafka = new ReactiveKafka() val consumerProperties = ConsumerProperties( brokerList = "localhost:9092", zooKeeperHost = "localhost:2181", topic = "lowercaseStrings", groupId = "groupName", decoder = new StringDecoder()) .commitInterval(5 seconds) // flush interval val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties) Source.fromPublisher(consumerWithOffsetSink.publisher) .map(processMessage(_)) // your message processing .to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit .run() ________________________________________ From: David Buschman <david.busch...@timeli.io> Sent: Wednesday, April 6, 2016 5:01 AM To: users@kafka.apache.org Subject: Re: Low-level Consumer Example (Scala) I use https://github.com/akka/reactive-kafka <https://github.com/akka/reactive-kafka>. This library has just been taken over by the Akka team and will in incorporated into future version of Akka. It allows for at-least-once semantics as well as at-most-once semantics. WARNING: The new API ( v0.11.X) is unproven, you might start with 0.10.X first, which is the version I am using now. Thanks, DaVe. David Buschman d...@timeli.io > On Apr 5, 2016, at 1:58 PM, Afshartous, Nick <nafshart...@turbine.com> wrote: > > > Hi, > > I'm looking for a complete low-level consumer example. Ideally one in Scala > that continuously consumes from a topic and commits offsets. > > Thanks for any pointers, > > -- > > Nick