[
https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601778#comment-15601778
]
zhangxinyu edited comment on SPARK-17935 at 10/26/16 3:26 AM:
--------------------------------------------------------------
h2. KafkaSink Design Doc
h4. Goal
Output results to kafka cluster(version 0.10.0.0) in structured streaming
module.
h4. Implement
Four classes are implemented to output data to kafka cluster in structured
streaming module.
* *KafkaSinkProvider*
This class extends trait *StreamSinkProvider* and trait *DataSourceRegister*
and overrides function *shortName* and *createSink*. In function *createSink*,
*KafkaSink* is created.
* *KafkaSink*
KafkaSink extends *Sink* and overrides function *addBatch*. *KafkaSinkRDD* will
be created in function *addBatch*.
* *KafkaSinkRDD*
*KafkaSinkRDD* is designed to distributedly send results to kafka clusters. It
extends *RDD*. In function *compute*, *CachedKafkaProducer* will be called to
get or create producer to send data
* *CachedKafkaProducer*
*CachedKafkaProducer* is used to store producers in the executors so that these
producers can be reused.
h4. Configuration
* *Kafka Producer Configuration*
"*.option()*" is used to configure kafka producer configurations which are all
starting with "*kafka.*". For example, producer configuration
*bootstrap.servers* can be configured by *.option("kafka.bootstrap.servers",
kafka-servers)*.
* *Other Configuration*
Other configuration is also set by ".option()". The difference is these
configurations don't start with "kafka.".
h4. Usage
val query = input.writeStream
.format("kafka-sink-10")
.outputMode("append")
.option("kafka.bootstrap.servers", kafka-servers)
.option(“topic”, topic)
.start()
was (Author: zhangxinyu):
h2. KafkaSink Design Doc
h4. Goal
Output results to kafka cluster(version 0.10.0.0) in structured streaming
module.
h4. Implement
Four classes are implemented to output data to kafka cluster in structured
streaming module.
* *KafkaSinkProvider*
This class extends trait *StreamSinkProvider* and trait *DataSourceRegister*
and overrides function *shortName* and *createSink*. In function *createSink*,
*KafkaSink* is created.
* *KafkaSink*
KafkaSink extends *Sink* and overrides function *addBatch*. *KafkaSinkRDD* will
be created in function *addBatch*.
* *KafkaSinkRDD*
*KafkaSinkRDD* is designed to distributedly send results to kafka clusters. It
extends *RDD*. In function *compute*, *CachedKafkaProducer* will be called to
get or create producer to send data
* *CachedKafkaProducer*
*CachedKafkaProducer* is used to store producers in the executors so that these
producers can be reused.
h4. Configuration
* *Kafka Producer Configuration*
"*.option()*" is used to configure kafka producer configurations which are all
starting with "*kafka.*". For example, producer configuration
*bootstrap.servers* can be configured by *.option("kafka.bootstrap.servers",
kafka-servers)*.
* *Other Configuration*
Other configuration is also set by ".option()". The difference is these
configurations don't start with "kafka.".
h4. Usage
val query = input.writeStream
.format("kafkaSink")
.outputMode("append")
.option("kafka.bootstrap.servers", kafka-servers)
.option(“topic”, topic)
.start()
> Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
> ------------------------------------------------------------------------------
>
> Key: SPARK-17935
> URL: https://issues.apache.org/jira/browse/SPARK-17935
> Project: Spark
> Issue Type: Improvement
> Components: SQL, Streaming
> Affects Versions: 2.0.0
> Reporter: zhangxinyu
>
> Now spark already supports kafkaInputStream. It would be useful that we add
> `KafkaForeachWriter` to output results to kafka in structured streaming
> module.
> `KafkaForeachWriter.scala` is put in external kafka-0.8.0.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]