klion26 commented on a change in pull request #13410: URL: https://github.com/apache/flink/pull/13410#discussion_r496433356
########## File path: docs/dev/connectors/kafka.zh.md ########## @@ -431,39 +371,46 @@ stream.addSink(myProducer); {% highlight scala %} val stream: DataStream[String] = ... +Properties properties = new Properties +properties.setProperty("bootstrap.servers", "localhost:9092") + val myProducer = new FlinkKafkaProducer[String]( - "localhost:9092", // broker 列表 "my-topic", // 目标 topic - new SimpleStringSchema) // 序列化 schema - -// 0.10+ 版本的 Kafka 允许在将记录写入 Kafka 时附加记录的事件时间戳; -// 此方法不适用于早期版本的 Kafka -myProducer.setWriteTimestampToKafka(true) + new SimpleStringSchema(), // 序列化 schema + properties, // producer 配置 + FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // 容错 stream.addSink(myProducer) {% endhighlight %} </div> </div> -上面的例子演示了创建 Flink Kafka Producer 来将流消息写入单个 Kafka 目标 topic 的基本用法。 -对于更高级的用法,这还有其他构造函数变体允许提供以下内容: +<a name="the-serializationschema"></a> + +## `SerializationSchema` - * *提供自定义属性*:producer 允许为内部 `KafkaProducer` 提供自定义属性配置。有关如何配置 Kafka Producer 的详细信息,请参阅 [Apache Kafka 文档](https://kafka.apache.org/documentation.html)。 - * *自定义分区器*:要将消息分配给特定的分区,可以向构造函数提供一个 `FlinkKafkaPartitioner` 的实现。这个分区器将被流中的每条记录调用,以确定消息应该发送到目标 topic 的哪个具体分区里。有关详细信息,请参阅 [Kafka Producer 分区方案](#kafka-producer-分区方案)。 - * *高级的序列化 schema*:与 consumer 类似,producer 还允许使用名为 `KeyedSerializationSchema` 的高级序列化 schema,该 schema 允许单独序列化 key 和 value。它还允许覆盖目标 topic,以便 producer 实例可以将数据发送到多个 topic。 +Flink Kafka Producer 需要知道如何将 Java/Scala 对象转化为二进制数据。 -### Kafka Producer 分区方案 +`KafkaSerializationSchema` 允许用户指定这样的 schema。它会为每个记录调用 `ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp)` 方法,产生一个写入到 Kafka 的 `ProducerRecord`。 -默认情况下,如果没有为 Flink Kafka Producer 指定自定义分区程序,则 producer 将使用 `FlinkFixedPartitioner` 为每个 Flink Kafka Producer 并行子任务映射到单个 Kafka 分区(即,接收子任务接收到的所有消息都将位于同一个 Kafka 分区中)。 +用户可以对如何将数据写到 Kafka 进行细粒度的控制。你可以通过 producer record: -可以通过扩展 `FlinkKafkaPartitioner` 类来实现自定义分区程序。所有 Kafka 版本的构造函数都允许在实例化 producer 时提供自定义分区程序。 -注意:分区器实现必须是可序列化的,因为它们将在 Flink 节点之间传输。此外,请记住分区器中的任何状态都将在作业失败时丢失,因为分区器不是 producer 的 checkpoint 状态的一部分。 +* 设置 header 值 +* 为每个 record 定义 key +* 指定数据的自定义分区 -也可以完全避免使用分区器,并简单地让 Kafka 通过其附加 key 写入的消息进行分区(使用提供的序列化 schema 为每条记录确定分区)。 -为此,在实例化 producer 时提供 `null` 自定义分区程序,提供 `null` 作为自定义分区器是很重要的; 如上所述,如果未指定自定义分区程序,则默认使用 `FlinkFixedPartitioner`。 +<a name="kafka-producers-and-fault-tolerance"></a> ### Kafka Producer 和容错 +启用 Flink 的 checkpointing 后,`FlinkKafkaProducer` 可以提供精确一次的语义保证。 + +除了启用 Flink 的 checkpointing,你也可以通过将适当的 `semantic` 参数传递给 `FlinkKafkaProducer` 来选择三种不同的操作模式: + +* `Semantic.NONE`:Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。 +* `Semantic.AT_LEAST_ONCE`(默认设置):可以保证不会丢失任何记录(但是记录可能会重复) +* `Semantic.EXACTLY_ONCE`:使用 Kafka 事务提供精确一次语义。无论何时,在使用事务写入 Kafka 时,都要记得为所有消费 Kafka 消息的应用程序设置所需的 `isolation.level`(`read_committed` 或 `read_uncommitted` - 后者是默认值)。 + ##### 注意事项 Review comment: 这里是不是漏了 <a> 标签 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org