imaffe commented on a change in pull request #19056:
URL: https://github.com/apache/flink/pull/19056#discussion_r827566838



##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition 
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you 
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic 
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the 
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's 
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported 
in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
 interface,

Review comment:
       This is my mistake though this refers to a concept that is pretty 
stable. I'l; change it to use the latest stable version.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to