imaffe commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r792420132
########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java ########## @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder; +import org.apache.flink.connector.pulsar.common.config.PulsarOptions; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; + +import org.apache.pulsar.client.api.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT; +import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.SINK_CONFIG_VALIDATOR; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.distinctTopics; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The builder class for {@link PulsarSink} to make it easier for the users to construct a {@link + * PulsarSink}. + * + * <p>The following example shows the minimum setup to create a PulsarSink that reads the String + * values from a Pulsar topic. + * + * <pre>{@code + * PulsarSink<String> sink = PulsarSink.builder() + * .setServiceUrl(operator().serviceUrl()) + * .setAdminUrl(operator().adminUrl()) + * .setTopics(topic) + * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING)) + * .build(); + * }</pre> + * + * <p>The service url, admin url, and the record serializer are required fields that must be set. If + * you don't set the topics, make sure you have provided a custom {@link TopicRouter}. Otherwise, + * you must provide the topics to produce. + * + * <p>To specify the delivery guarantees of PulsarSink, one can call {@link + * #setDeliveryGuarantee(DeliveryGuarantee)}. The default value of the delivery guarantee is {@link + * DeliveryGuarantee#EXACTLY_ONCE}, and it requires the Pulsar broker to turn on transaction + * support. + * + * <pre>{@code + * PulsarSink<String> sink = PulsarSink.builder() + * .setServiceUrl(operator().serviceUrl()) + * .setAdminUrl(operator().adminUrl()) + * .setTopics(topic) + * .setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING)) + * .setDeliveryGuarantee(deliveryGuarantee) + * .build(); + * }</pre> + * + * @see PulsarSink for a more detailed explanation of the different guarantees. + * @param <IN> The input type of the sink. + */ +@PublicEvolving +public class PulsarSinkBuilder<IN> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkBuilder.class); + + private final PulsarConfigBuilder configBuilder; + + private DeliveryGuarantee deliveryGuarantee; + private PulsarSerializationSchema<IN> serializationSchema; + private TopicMetadataListener metadataListener; + private TopicRoutingMode topicRoutingMode; + private TopicRouter<IN> topicRouter; + + // private builder constructor. + PulsarSinkBuilder() { + this.configBuilder = new PulsarConfigBuilder(); + this.deliveryGuarantee = DeliveryGuarantee.NONE; + } + + /** + * Sets the admin endpoint for the PulsarAdmin of the PulsarSink. + * + * @param adminUrl the url for the PulsarAdmin. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setAdminUrl(String adminUrl) { + return setConfig(PULSAR_ADMIN_URL, adminUrl); + } + + /** + * Sets the server's link for the PulsarProducer of the PulsarSink. + * + * @param serviceUrl the server url of the Pulsar cluster. + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setServiceUrl(String serviceUrl) { + return setConfig(PULSAR_SERVICE_URL, serviceUrl); + } + + public PulsarSinkBuilder<IN> setProducerName(String producerName) { + return setConfig(PULSAR_PRODUCER_NAME, producerName); + } + + /** + * Set a pulsar topic list for flink source. Some topic may not exist currently, write to this + * non-existed topic wouldn't throw any exception. + * + * @param topics The topic list you would like to consume message. + * @return this PulsarSourceBuilder. + */ + public PulsarSinkBuilder<IN> setTopics(String... topics) { + return setTopics(Arrays.asList(topics)); + } + + /** + * Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this Review comment: This comment should be sink I guess? ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.Sink.InitContext; +import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter; +import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchemaInitializationContext; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.SerializableFunction; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; + +import static java.util.Collections.emptyList; +import static org.apache.flink.util.IOUtils.closeAll; + +/** + * This class is responsible to write records in a Pulsar topic and to handle the different delivery + * {@link DeliveryGuarantee}s. + * + * @param <IN> The type of the input elements. + */ +@Internal +public class PulsarWriter<IN> implements SinkWriter<IN, PulsarCommittable, Void> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class); + + private final SinkConfiguration sinkConfiguration; + private final DeliveryGuarantee deliveryGuarantee; + private final PulsarSerializationSchema<IN> serializationSchema; + private final TopicRouter<IN> topicRouter; + private final PulsarSinkContextAdapter sinkContextAdapter; + private final TopicMetadataListener metadataListener; + private final MailboxExecutor mailboxExecutor; + private final TopicProducerRegister producerRegister; + private final Semaphore pendingMessages; + + /** + * Constructor creating a Pulsar writer. + * + * <p>It will throw a {@link RuntimeException} if {@link + * PulsarSerializationSchema#open(SerializationSchema.InitializationContext, PulsarSinkContext, + * SinkConfiguration)} fails. + * + * @param sinkConfiguration the configuration to configure the Pulsar producer. + * @param deliveryGuarantee the Sink's delivery guarantee. + * @param serializationSchema serialize to transform the incoming records to {@link RawMessage}. + * @param metadataListener the listener for querying topic metadata. + * @param topicRouterProvider create related topic router to choose topic by incoming records. + * @param initContext context to provide information about the runtime environment. + */ + public PulsarWriter( + SinkConfiguration sinkConfiguration, + DeliveryGuarantee deliveryGuarantee, + PulsarSerializationSchema<IN> serializationSchema, + TopicMetadataListener metadataListener, + SerializableFunction<SinkConfiguration, TopicRouter<IN>> topicRouterProvider, + InitContext initContext) { + this.sinkConfiguration = sinkConfiguration; + this.deliveryGuarantee = deliveryGuarantee; + this.serializationSchema = serializationSchema; + this.topicRouter = topicRouterProvider.apply(sinkConfiguration); + this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext, sinkConfiguration); + this.metadataListener = metadataListener; + this.mailboxExecutor = initContext.getMailboxExecutor(); + + // Initialize topic metadata listener. + LOG.debug("Initialize topic metadata after creating Pulsar writer."); + ProcessingTimeService timeService = initContext.getProcessingTimeService(); + metadataListener.open(sinkConfiguration, timeService); + + // Initialize topic router. + topicRouter.open(sinkConfiguration); + + // Initialize the serialization schema. + PulsarSerializationSchemaInitializationContext initializationContext = + new PulsarSerializationSchemaInitializationContext(initContext); + try { + serializationSchema.open(initializationContext, sinkContextAdapter, sinkConfiguration); + } catch (Exception e) { + throw new FlinkRuntimeException("Cannot initialize schema.", e); + } + + // Create this producer register after opening serialization schema! + this.producerRegister = new TopicProducerRegister(sinkConfiguration, serializationSchema); + this.pendingMessages = new Semaphore(sinkConfiguration.getMaxPendingMessages()); + } + + @Override + @SuppressWarnings("unchecked") + public void write(IN element, Context context) throws IOException, InterruptedException { + // Serialize the incoming element. + sinkContextAdapter.updateTimestamp(context); + RawMessage<byte[]> message = serializationSchema.serialize(element, sinkContextAdapter); + + // Choose the right topic to send. + List<String> availableTopics = metadataListener.availableTopics(); + String topic = topicRouter.route(element, message, availableTopics, sinkContextAdapter); + + // Create message builder for sending message. + TypedMessageBuilder<?> builder = createMessageBuilder(topic, deliveryGuarantee); + if (sinkConfiguration.isEnableSchemaEvolution()) { + ((TypedMessageBuilder<IN>) builder).value(element); + } else { + ((TypedMessageBuilder<byte[]>) builder).value(message.getValue()); + } + message.supplement(builder); + + // Perform message sending. + if (deliveryGuarantee == DeliveryGuarantee.NONE) { + // We would just ignore the sending exception. This may cause data loss. + builder.sendAsync(); + } else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) { + // Waiting for permits to write message. + pendingMessages.acquire(); + CompletableFuture<MessageId> sender = builder.sendAsync(); + sender.whenComplete( + (id, ex) -> { + pendingMessages.release(); + if (ex != null) { + mailboxExecutor.execute( + () -> { + throw new FlinkRuntimeException( Review comment: Wondering how will this Flink runtime exception propagate ? It's in the mailbox executor, I think it will continue but not stop the job ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org