dianfu commented on code in PR #19682: URL: https://github.com/apache/flink/pull/19682#discussion_r876564037
########## flink-python/pyflink/datastream/connectors.py: ########## @@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource': return PulsarSource(self._j_pulsar_source_builder.build()) +class DeliveryGuarantee(Enum): + """ + DeliverGuarantees that can be chosen. In general your pipeline can only offer the lowest + delivery guarantee which is supported by your sources and sinks. + + :data: `EXACTLY_ONCE`: + Records are only delivered exactly-once also under failover scenarios. To build a complete + exactly-once pipeline is required that the source and sink support exactly-once and are + properly configured. + + :data: `AT_LEAST_ONCE`: + Records are ensured to be delivered but it may happen that the same record is delivered + multiple times. Usually, this guarantee is faster than the exactly-once delivery. + + :data: `NONE`: + Records are delivered on a best effort basis. It is often the fastest way to process records + but it may happen that records are lost or duplicated. + """ + + EXACTLY_ONCE = 0, + AT_LEAST_ONCE = 1, + NONE = 2 + + def _to_j_delivery_guarantee(self): + JDeliveryGuarantee = get_gateway().jvm \ + .org.apache.flink.connector.base.DeliveryGuarantee + return getattr(JDeliveryGuarantee, self.name) + + +class PulsarSerializationSchema(object): + """ + The serialization schema for how to serialize records into Pulsar. + """ + + def __init__(self, _j_pulsar_serialization_schema): + self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema + + @staticmethod + def flink_schema(serialization_schema: SerializationSchema) \ Review Comment: Thanks for the explanation. Could we create a ticket for this? ########## flink-python/pyflink/datastream/connectors/pulsar.py: ########## @@ -387,3 +409,297 @@ def build(self) -> 'PulsarSource': Build the PulsarSource. """ return PulsarSource(self._j_pulsar_source_builder.build()) + + +# ---- PulsarSink ---- + + +class PulsarSerializationSchema(object): + """ + The serialization schema for how to serialize records into Pulsar. + """ + + def __init__(self, _j_pulsar_serialization_schema): + self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema + + @staticmethod + def flink_schema(serialization_schema: SerializationSchema) \ + -> 'PulsarSerializationSchema': + """ + Create a PulsarSerializationSchema by using the flink's SerializationSchema. It would + serialize the message into byte array and send it to Pulsar with Schema#BYTES. + """ + JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \ + .connector.pulsar.sink.writer.serializer.PulsarSerializationSchema + _j_pulsar_serialization_schema = JPulsarSerializationSchema.flinkSchema( + serialization_schema._j_serialization_schema) + return PulsarSerializationSchema(_j_pulsar_serialization_schema) + + +class TopicRoutingMode(Enum): + """ + The routing policy for choosing the desired topic by the given message. + + :data: `ROUND_ROBIN`: + + The producer will publish messages across all partitions in a round-robin fashion to achieve + maximum throughput. Please note that round-robin is not done per individual message but + rather it's set to the same boundary of batching delay, to ensure batching is effective. + + :data: `MESSAGE_KEY_HASH`: + + If no key is provided, The partitioned producer will randomly pick one single topic partition + and publish all the messages into that partition. If a key is provided on the message, the + partitioned producer will hash the key and assign the message to a particular partition. + + :data: `CUSTOM`: + + Use custom topic router implementation that will be called to determine the partition for a + particular message. + """ + + ROUND_ROBIN = 0 + MESSAGE_KEY_HASH = 1 + CUSTOM = 2 + + def _to_j_topic_routing_mode(self): + JTopicRoutingMode = get_gateway().jvm \ + .org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode + return getattr(JTopicRoutingMode, self.name) + + +class MessageDelayer(object): + """ + A delayer for Pulsar broker passing the sent message to the downstream consumer. This is only + works in :data:`SubscriptionType.Shared` subscription. + + Read delayed message delivery + https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery for better + understanding this feature. + """ + def __init__(self, _j_message_delayer): + self._j_message_delayer = _j_message_delayer + + @staticmethod + def never() -> 'MessageDelayer': + """ + All the messages should be consumed immediately. + """ + JMessageDelayer = get_gateway().jvm \ + .org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer + return MessageDelayer(JMessageDelayer.never()) + + @staticmethod + def fixed(duration: Duration) -> 'MessageDelayer': + """ + All the messages should be consumed in a fixed duration. + """ + JMessageDelayer = get_gateway().jvm \ + .org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer + return MessageDelayer(JMessageDelayer.fixed(duration._j_duration)) + + +class PulsarSink(Sink): + """ + The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to construct a + PulsarSink. The following example shows how to create a PulsarSink receiving records of + String type. + + Example: + :: + + >>> sink = PulsarSink() \\ Review Comment: ```suggestion >>> sink = PulsarSink \\ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org