Kafka users,

The code below is something that I have to write to a Topic!

def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
  Future {
    logger.info(s"Persisting ${tsDataPoints.length} data-points in
Kafka topic ${producerConfig.topic}")
    val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
    val jsonMessage = Json.toJson(dataPoints).toString()
    val recordMetaDataF = producer.send(
      new ProducerRecord[String, String](producerConfig.topic, jsonMessage)
    )
    // if we don't make it to Kafka within 3 seconds, we timeout
    val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
    logger.info(
      s"persisted ${tsDataPoints.length} data-points to kafka topic:  " +
        s"${recordMetaData.topic()} partition:
${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
    )
    ()
  }
}


How could I make this code write to a specific partition? currently my
topic does not have partitions, so by default this code write to partition
0 of the topic!

I'm using Kafka 0.9.0.0! Any suggestions?

Regards,
Joe

Reply via email to