>  I use https://github.com/akka/reactive-kafka 
> <https://github.com/akka/reactive-kafka>. This library has just been taken 
> over by the Akka team and will in incorporated into future version of Akka.

Thanks Dave for the pointer.  

I downloaded the reactive-kakka .8 branch since we're using Kafka .8.2.2.  We 
need at-most semantics and control over individual offset commit so I was 
looking at the example below on the branch page

   https://github.com/akka/reactive-kafka/tree/0.8

Could this example be tweaked to provide at-most once semantics ?  And there's 
mention in the doc of configuring the concurrency level of internal thread 
pools, so I assume that would be applicable to this example ?
--
     Nick

Manual Commit (version 0.8 and above)

In order to be able to achieve "at-least-once" delivery, you can use following 
API to obtain an additional Sink, when you can stream back messages that you 
processed. An underlying actor will periodically flush offsets of these 
messages as committed. Reactive Kafka supports manual commit both to Zookeeper 
(legacy) and Kafka storage. Dual commit is not supported. In order to commit 
manually to zookeeper, you have to add an optional module to your dependencies:

import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}

implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
val consumerProperties = ConsumerProperties(
  brokerList = "localhost:9092",
  zooKeeperHost = "localhost:2181",
  topic = "lowercaseStrings",
  groupId = "groupName",
  decoder = new StringDecoder())
.commitInterval(5 seconds) // flush interval

val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source.fromPublisher(consumerWithOffsetSink.publisher)
  .map(processMessage(_)) // your message processing
  .to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit
  .run()


________________________________________
From: David Buschman <david.busch...@timeli.io>
Sent: Wednesday, April 6, 2016 5:01 AM
To: users@kafka.apache.org
Subject: Re: Low-level Consumer Example (Scala)

I use https://github.com/akka/reactive-kafka 
<https://github.com/akka/reactive-kafka>. This library has just been taken over 
by the Akka team and will in incorporated into future version of Akka.

It allows for at-least-once semantics as well as at-most-once semantics.

WARNING:  The new API ( v0.11.X) is unproven, you might start with 0.10.X 
first, which is the version I am using now.

Thanks,
    DaVe.

David Buschman
d...@timeli.io



> On Apr 5, 2016, at 1:58 PM, Afshartous, Nick <nafshart...@turbine.com> wrote:
>
>
> Hi,
>
> I'm looking for a complete low-level consumer example.  Ideally one in Scala 
> that continuously consumes from a topic and commits offsets.
>
> Thanks for any pointers,
>
> --
>
>     Nick

Reply via email to