First thing I noticed, you should be using a singleton kafka producer,
not recreating one every partition.  It's threadsafe.

On Tue, May 30, 2017 at 7:59 AM, Vikash Pareek
<vikash.par...@infoobjects.com> wrote:
> I am facing an issue related to spark streaming with kafka, my use case is as
> follow:
> 1. Spark streaming(DirectStream) application reading data/messages from
> kafka topic and process it
> 2. On the basis of proccessed message, app will write proccessed message to
> different kafka topics
> for e.g. if messgese is harmonized then write to harmonized topic else
> unharmonized topic
>
> the problem is that during the streaming somehow we are lossing some
> messaged i.e all the incoming messages are not written to harmonized or
> unharmonized topics.
> for e.g. if app received 30 messages in one batch then sometime it write all
> the messges to output topics(this is expected behaviour) but sometimes it
> writes only 27 (3 messages are lost, this number can change).
>
> Versions as follow:
> Spark 1.6.0
> Kafka 0.9
>
> Kafka topics confguration is as follow:
> # of brokers: 3
> # replicxation factor: 3
> # of paritions: 3
>
> Following are the properties we are using for kafka:
> *          val props = new Properties()
>           props.put("metadata.broker.list",
> properties.getProperty("metadataBrokerList"))
>           props.put("auto.offset.reset",
> properties.getProperty("autoOffsetReset"))
>           props.put("group.id", properties.getProperty("group.id"))
>           props.put("serializer.class", "kafka.serializer.StringEncoder")
>           props.put("outTopicHarmonized",
> properties.getProperty("outletKafkaTopicHarmonized"))
>           props.put("outTopicUnharmonized",
> properties.getProperty("outletKafkaTopicUnharmonized"))
>           props.put("acks", "all");
>           props.put("retries", "5");
>           props.put("request.required.acks", "-1")
> *
> Following is the piece of code where we are writing proccessed messges to
> kafka:
> *          val schemaRdd2 = finalHarmonizedDF.toJSON
>
>           schemaRdd2.foreachPartition { partition =>
>             val producerConfig = new ProducerConfig(props)
>             val producer = new Producer[String, String](producerConfig)
>
>             partition.foreach { row =>
>               if (debug) println(row.mkString)
>               val keyedMessage = new KeyedMessage[String,
> String](props.getProperty("outTopicHarmonized"),
>                 null, row.toString())
>               producer.send(keyedMessage)
>
>             }
>             //hack, should be done with the flush
>             Thread.sleep(1000)
>             producer.close()
>           }
> *
> We explicitely added sleep(1000) for testing purpose.
> But this is also not solving the problem :(
>
> Any suggestion would be appreciated.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to