First thing I noticed, you should be using a singleton kafka producer, not recreating one every partition. It's threadsafe.
On Tue, May 30, 2017 at 7:59 AM, Vikash Pareek <vikash.par...@infoobjects.com> wrote: > I am facing an issue related to spark streaming with kafka, my use case is as > follow: > 1. Spark streaming(DirectStream) application reading data/messages from > kafka topic and process it > 2. On the basis of proccessed message, app will write proccessed message to > different kafka topics > for e.g. if messgese is harmonized then write to harmonized topic else > unharmonized topic > > the problem is that during the streaming somehow we are lossing some > messaged i.e all the incoming messages are not written to harmonized or > unharmonized topics. > for e.g. if app received 30 messages in one batch then sometime it write all > the messges to output topics(this is expected behaviour) but sometimes it > writes only 27 (3 messages are lost, this number can change). > > Versions as follow: > Spark 1.6.0 > Kafka 0.9 > > Kafka topics confguration is as follow: > # of brokers: 3 > # replicxation factor: 3 > # of paritions: 3 > > Following are the properties we are using for kafka: > * val props = new Properties() > props.put("metadata.broker.list", > properties.getProperty("metadataBrokerList")) > props.put("auto.offset.reset", > properties.getProperty("autoOffsetReset")) > props.put("group.id", properties.getProperty("group.id")) > props.put("serializer.class", "kafka.serializer.StringEncoder") > props.put("outTopicHarmonized", > properties.getProperty("outletKafkaTopicHarmonized")) > props.put("outTopicUnharmonized", > properties.getProperty("outletKafkaTopicUnharmonized")) > props.put("acks", "all"); > props.put("retries", "5"); > props.put("request.required.acks", "-1") > * > Following is the piece of code where we are writing proccessed messges to > kafka: > * val schemaRdd2 = finalHarmonizedDF.toJSON > > schemaRdd2.foreachPartition { partition => > val producerConfig = new ProducerConfig(props) > val producer = new Producer[String, String](producerConfig) > > partition.foreach { row => > if (debug) println(row.mkString) > val keyedMessage = new KeyedMessage[String, > String](props.getProperty("outTopicHarmonized"), > null, row.toString()) > producer.send(keyedMessage) > > } > //hack, should be done with the flush > Thread.sleep(1000) > producer.close() > } > * > We explicitely added sleep(1000) for testing purpose. > But this is also not solving the problem :( > > Any suggestion would be appreciated. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org