Hello,

Apologies, if this is not the right forum to ask this question. With the
AKKA consumer code below, does it start the number of consumers in
parallel, as specified by the argument of the mapAsync call?

def consume() = {
  val consumer = Consumer.committableSource(consumerSettings,
Subscriptions.topics("wuhu.settlement.request.queue"))
  val pl = consumer.mapAsync(WORKERS) { msg =>
    
settlementSink.settle(msg.record.value().parseJson.convertTo[SettlementRequest]).map(_
=> msg)
  }.mapAsync(WORKERS) { msg =>
      msg.committableOffset.commitScaladsl()
  }
  pl.runWith(Sink.ignore)
}


-- 
Kind regards
*Josh Meraj Maidana*

Reply via email to